libp2p_relay/protocol/
outbound_hop.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{io, time::Duration};
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use futures_timer::Delay;
27use libp2p_core::Multiaddr;
28use libp2p_identity::PeerId;
29use libp2p_swarm::Stream;
30use thiserror::Error;
31use web_time::SystemTime;
32
33use crate::{
34    proto,
35    protocol::{Limit, MAX_MESSAGE_SIZE},
36    HOP_PROTOCOL_NAME,
37};
38
39#[derive(Debug, Error)]
40pub enum ConnectError {
41    #[error("Remote reported resource limit exceeded.")]
42    ResourceLimitExceeded,
43    #[error("Relay failed to connect to destination.")]
44    ConnectionFailed,
45    #[error("Relay has no reservation for destination.")]
46    NoReservation,
47    #[error("Remote denied permission.")]
48    PermissionDenied,
49    #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
50    Unsupported,
51    #[error("IO error")]
52    Io(#[from] io::Error),
53    #[error("Protocol error")]
54    Protocol(#[from] ProtocolViolation),
55}
56
57#[derive(Debug, Error)]
58pub enum ReserveError {
59    #[error("Reservation refused.")]
60    Refused,
61    #[error("Remote reported resource limit exceeded.")]
62    ResourceLimitExceeded,
63    #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
64    Unsupported,
65    #[error("IO error")]
66    Io(#[from] io::Error),
67    #[error("Protocol error")]
68    Protocol(#[from] ProtocolViolation),
69}
70
71#[derive(Debug, Error)]
72pub enum ProtocolViolation {
73    #[error(transparent)]
74    Codec(#[from] quick_protobuf_codec::Error),
75    #[error("Expected 'status' field to be set.")]
76    MissingStatusField,
77    #[error("Expected 'reservation' field to be set.")]
78    MissingReservationField,
79    #[error("Expected at least one address in reservation.")]
80    NoAddressesInReservation,
81    #[error("Invalid expiration timestamp in reservation.")]
82    InvalidReservationExpiration,
83    #[error("Invalid addresses in reservation.")]
84    InvalidReservationAddrs,
85    #[error("Unexpected message type 'connect'")]
86    UnexpectedTypeConnect,
87    #[error("Unexpected message type 'reserve'")]
88    UnexpectedTypeReserve,
89    #[error("Unexpected message status '{0:?}'")]
90    UnexpectedStatus(proto::Status),
91}
92
93impl From<quick_protobuf_codec::Error> for ConnectError {
94    fn from(e: quick_protobuf_codec::Error) -> Self {
95        ConnectError::Protocol(ProtocolViolation::Codec(e))
96    }
97}
98
99impl From<quick_protobuf_codec::Error> for ReserveError {
100    fn from(e: quick_protobuf_codec::Error) -> Self {
101        ReserveError::Protocol(ProtocolViolation::Codec(e))
102    }
103}
104
105pub(crate) struct Reservation {
106    pub(crate) renewal_timeout: Delay,
107    pub(crate) addrs: Vec<Multiaddr>,
108    pub(crate) limit: Option<Limit>,
109}
110
111pub(crate) struct Circuit {
112    pub(crate) stream: Stream,
113    pub(crate) read_buffer: Bytes,
114    pub(crate) limit: Option<Limit>,
115}
116
117pub(crate) async fn make_reservation(stream: Stream) -> Result<Reservation, ReserveError> {
118    let msg = proto::HopMessage {
119        type_pb: proto::HopMessageType::RESERVE,
120        peer: None,
121        reservation: None,
122        limit: None,
123        status: None,
124    };
125    let mut substream = Framed::new(stream, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
126
127    substream.send(msg).await?;
128
129    substream.close().await?;
130
131    let proto::HopMessage {
132        type_pb,
133        peer: _,
134        reservation,
135        limit,
136        status,
137    } = substream
138        .next()
139        .await
140        .ok_or(ReserveError::Io(io::ErrorKind::UnexpectedEof.into()))??;
141
142    match type_pb {
143        proto::HopMessageType::CONNECT => {
144            return Err(ReserveError::Protocol(
145                ProtocolViolation::UnexpectedTypeConnect,
146            ));
147        }
148        proto::HopMessageType::RESERVE => {
149            return Err(ReserveError::Protocol(
150                ProtocolViolation::UnexpectedTypeReserve,
151            ));
152        }
153        proto::HopMessageType::STATUS => {}
154    }
155
156    let limit = limit.map(Into::into);
157
158    match status.ok_or(ProtocolViolation::MissingStatusField)? {
159        proto::Status::OK => {}
160        proto::Status::RESERVATION_REFUSED => {
161            return Err(ReserveError::Refused);
162        }
163        proto::Status::RESOURCE_LIMIT_EXCEEDED => {
164            return Err(ReserveError::ResourceLimitExceeded);
165        }
166        s => {
167            return Err(ReserveError::Protocol(ProtocolViolation::UnexpectedStatus(
168                s,
169            )))
170        }
171    }
172
173    let reservation = reservation.ok_or(ReserveError::Protocol(
174        ProtocolViolation::MissingReservationField,
175    ))?;
176
177    if reservation.addrs.is_empty() {
178        return Err(ReserveError::Protocol(
179            ProtocolViolation::NoAddressesInReservation,
180        ));
181    }
182
183    let addrs = reservation
184        .addrs
185        .into_iter()
186        .map(|b| Multiaddr::try_from(b.to_vec()))
187        .collect::<Result<Vec<Multiaddr>, _>>()
188        .map_err(|_| ReserveError::Protocol(ProtocolViolation::InvalidReservationAddrs))?;
189
190    let renewal_timeout = reservation
191        .expire
192        .checked_sub(
193            SystemTime::now()
194                .duration_since(SystemTime::UNIX_EPOCH)
195                .unwrap()
196                .as_secs(),
197        )
198        // Renew the reservation after 3/4 of the reservation expiration timestamp.
199        .and_then(|duration| duration.checked_sub(duration / 4))
200        .map(Duration::from_secs)
201        .map(Delay::new)
202        .ok_or(ReserveError::Protocol(
203            ProtocolViolation::InvalidReservationExpiration,
204        ))?;
205
206    Ok(Reservation {
207        renewal_timeout,
208        addrs,
209        limit,
210    })
211}
212
213pub(crate) async fn open_circuit(
214    protocol: Stream,
215    dst_peer_id: PeerId,
216) -> Result<Circuit, ConnectError> {
217    let msg = proto::HopMessage {
218        type_pb: proto::HopMessageType::CONNECT,
219        peer: Some(proto::Peer {
220            id: dst_peer_id.to_bytes(),
221            addrs: vec![],
222        }),
223        reservation: None,
224        limit: None,
225        status: None,
226    };
227
228    let mut substream = Framed::new(protocol, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
229
230    substream.send(msg).await?;
231
232    let proto::HopMessage {
233        type_pb,
234        peer: _,
235        reservation: _,
236        limit,
237        status,
238    } = substream
239        .next()
240        .await
241        .ok_or(ConnectError::Io(io::ErrorKind::UnexpectedEof.into()))??;
242
243    match type_pb {
244        proto::HopMessageType::CONNECT => {
245            return Err(ConnectError::Protocol(
246                ProtocolViolation::UnexpectedTypeConnect,
247            ));
248        }
249        proto::HopMessageType::RESERVE => {
250            return Err(ConnectError::Protocol(
251                ProtocolViolation::UnexpectedTypeReserve,
252            ));
253        }
254        proto::HopMessageType::STATUS => {}
255    }
256
257    match status {
258        Some(proto::Status::OK) => {}
259        Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => {
260            return Err(ConnectError::ResourceLimitExceeded);
261        }
262        Some(proto::Status::CONNECTION_FAILED) => {
263            return Err(ConnectError::ConnectionFailed);
264        }
265        Some(proto::Status::NO_RESERVATION) => {
266            return Err(ConnectError::NoReservation);
267        }
268        Some(proto::Status::PERMISSION_DENIED) => {
269            return Err(ConnectError::PermissionDenied);
270        }
271        Some(s) => {
272            return Err(ConnectError::Protocol(ProtocolViolation::UnexpectedStatus(
273                s,
274            )));
275        }
276        None => {
277            return Err(ConnectError::Protocol(
278                ProtocolViolation::MissingStatusField,
279            ));
280        }
281    }
282
283    let limit = limit.map(Into::into);
284
285    let FramedParts {
286        io,
287        read_buffer,
288        write_buffer,
289        ..
290    } = substream.into_parts();
291    assert!(
292        write_buffer.is_empty(),
293        "Expect a flushed Framed to have empty write buffer."
294    );
295
296    let circuit = Circuit {
297        stream: io,
298        read_buffer: read_buffer.freeze(),
299        limit,
300    };
301
302    Ok(circuit)
303}