1use std::{io, time::Duration};
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use futures_timer::Delay;
27use libp2p_core::Multiaddr;
28use libp2p_identity::PeerId;
29use libp2p_swarm::Stream;
30use thiserror::Error;
31use web_time::SystemTime;
32
33use crate::{
34 proto,
35 protocol::{Limit, MAX_MESSAGE_SIZE},
36 HOP_PROTOCOL_NAME,
37};
38
39#[derive(Debug, Error)]
40pub enum ConnectError {
41 #[error("Remote reported resource limit exceeded.")]
42 ResourceLimitExceeded,
43 #[error("Relay failed to connect to destination.")]
44 ConnectionFailed,
45 #[error("Relay has no reservation for destination.")]
46 NoReservation,
47 #[error("Remote denied permission.")]
48 PermissionDenied,
49 #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
50 Unsupported,
51 #[error("IO error")]
52 Io(#[from] io::Error),
53 #[error("Protocol error")]
54 Protocol(#[from] ProtocolViolation),
55}
56
57#[derive(Debug, Error)]
58pub enum ReserveError {
59 #[error("Reservation refused.")]
60 Refused,
61 #[error("Remote reported resource limit exceeded.")]
62 ResourceLimitExceeded,
63 #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
64 Unsupported,
65 #[error("IO error")]
66 Io(#[from] io::Error),
67 #[error("Protocol error")]
68 Protocol(#[from] ProtocolViolation),
69}
70
71#[derive(Debug, Error)]
72pub enum ProtocolViolation {
73 #[error(transparent)]
74 Codec(#[from] quick_protobuf_codec::Error),
75 #[error("Expected 'status' field to be set.")]
76 MissingStatusField,
77 #[error("Expected 'reservation' field to be set.")]
78 MissingReservationField,
79 #[error("Expected at least one address in reservation.")]
80 NoAddressesInReservation,
81 #[error("Invalid expiration timestamp in reservation.")]
82 InvalidReservationExpiration,
83 #[error("Invalid addresses in reservation.")]
84 InvalidReservationAddrs,
85 #[error("Unexpected message type 'connect'")]
86 UnexpectedTypeConnect,
87 #[error("Unexpected message type 'reserve'")]
88 UnexpectedTypeReserve,
89 #[error("Unexpected message status '{0:?}'")]
90 UnexpectedStatus(proto::Status),
91}
92
93impl From<quick_protobuf_codec::Error> for ConnectError {
94 fn from(e: quick_protobuf_codec::Error) -> Self {
95 ConnectError::Protocol(ProtocolViolation::Codec(e))
96 }
97}
98
99impl From<quick_protobuf_codec::Error> for ReserveError {
100 fn from(e: quick_protobuf_codec::Error) -> Self {
101 ReserveError::Protocol(ProtocolViolation::Codec(e))
102 }
103}
104
105pub(crate) struct Reservation {
106 pub(crate) renewal_timeout: Delay,
107 pub(crate) addrs: Vec<Multiaddr>,
108 pub(crate) limit: Option<Limit>,
109}
110
111pub(crate) struct Circuit {
112 pub(crate) stream: Stream,
113 pub(crate) read_buffer: Bytes,
114 pub(crate) limit: Option<Limit>,
115}
116
117pub(crate) async fn make_reservation(stream: Stream) -> Result<Reservation, ReserveError> {
118 let msg = proto::HopMessage {
119 type_pb: proto::HopMessageType::RESERVE,
120 peer: None,
121 reservation: None,
122 limit: None,
123 status: None,
124 };
125 let mut substream = Framed::new(stream, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
126
127 substream.send(msg).await?;
128
129 substream.close().await?;
130
131 let proto::HopMessage {
132 type_pb,
133 peer: _,
134 reservation,
135 limit,
136 status,
137 } = substream
138 .next()
139 .await
140 .ok_or(ReserveError::Io(io::ErrorKind::UnexpectedEof.into()))??;
141
142 match type_pb {
143 proto::HopMessageType::CONNECT => {
144 return Err(ReserveError::Protocol(
145 ProtocolViolation::UnexpectedTypeConnect,
146 ));
147 }
148 proto::HopMessageType::RESERVE => {
149 return Err(ReserveError::Protocol(
150 ProtocolViolation::UnexpectedTypeReserve,
151 ));
152 }
153 proto::HopMessageType::STATUS => {}
154 }
155
156 let limit = limit.map(Into::into);
157
158 match status.ok_or(ProtocolViolation::MissingStatusField)? {
159 proto::Status::OK => {}
160 proto::Status::RESERVATION_REFUSED => {
161 return Err(ReserveError::Refused);
162 }
163 proto::Status::RESOURCE_LIMIT_EXCEEDED => {
164 return Err(ReserveError::ResourceLimitExceeded);
165 }
166 s => {
167 return Err(ReserveError::Protocol(ProtocolViolation::UnexpectedStatus(
168 s,
169 )))
170 }
171 }
172
173 let reservation = reservation.ok_or(ReserveError::Protocol(
174 ProtocolViolation::MissingReservationField,
175 ))?;
176
177 if reservation.addrs.is_empty() {
178 return Err(ReserveError::Protocol(
179 ProtocolViolation::NoAddressesInReservation,
180 ));
181 }
182
183 let addrs = reservation
184 .addrs
185 .into_iter()
186 .map(|b| Multiaddr::try_from(b.to_vec()))
187 .collect::<Result<Vec<Multiaddr>, _>>()
188 .map_err(|_| ReserveError::Protocol(ProtocolViolation::InvalidReservationAddrs))?;
189
190 let renewal_timeout = reservation
191 .expire
192 .checked_sub(
193 SystemTime::now()
194 .duration_since(SystemTime::UNIX_EPOCH)
195 .unwrap()
196 .as_secs(),
197 )
198 .and_then(|duration| duration.checked_sub(duration / 4))
200 .map(Duration::from_secs)
201 .map(Delay::new)
202 .ok_or(ReserveError::Protocol(
203 ProtocolViolation::InvalidReservationExpiration,
204 ))?;
205
206 Ok(Reservation {
207 renewal_timeout,
208 addrs,
209 limit,
210 })
211}
212
213pub(crate) async fn open_circuit(
214 protocol: Stream,
215 dst_peer_id: PeerId,
216) -> Result<Circuit, ConnectError> {
217 let msg = proto::HopMessage {
218 type_pb: proto::HopMessageType::CONNECT,
219 peer: Some(proto::Peer {
220 id: dst_peer_id.to_bytes(),
221 addrs: vec![],
222 }),
223 reservation: None,
224 limit: None,
225 status: None,
226 };
227
228 let mut substream = Framed::new(protocol, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
229
230 substream.send(msg).await?;
231
232 let proto::HopMessage {
233 type_pb,
234 peer: _,
235 reservation: _,
236 limit,
237 status,
238 } = substream
239 .next()
240 .await
241 .ok_or(ConnectError::Io(io::ErrorKind::UnexpectedEof.into()))??;
242
243 match type_pb {
244 proto::HopMessageType::CONNECT => {
245 return Err(ConnectError::Protocol(
246 ProtocolViolation::UnexpectedTypeConnect,
247 ));
248 }
249 proto::HopMessageType::RESERVE => {
250 return Err(ConnectError::Protocol(
251 ProtocolViolation::UnexpectedTypeReserve,
252 ));
253 }
254 proto::HopMessageType::STATUS => {}
255 }
256
257 match status {
258 Some(proto::Status::OK) => {}
259 Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => {
260 return Err(ConnectError::ResourceLimitExceeded);
261 }
262 Some(proto::Status::CONNECTION_FAILED) => {
263 return Err(ConnectError::ConnectionFailed);
264 }
265 Some(proto::Status::NO_RESERVATION) => {
266 return Err(ConnectError::NoReservation);
267 }
268 Some(proto::Status::PERMISSION_DENIED) => {
269 return Err(ConnectError::PermissionDenied);
270 }
271 Some(s) => {
272 return Err(ConnectError::Protocol(ProtocolViolation::UnexpectedStatus(
273 s,
274 )));
275 }
276 None => {
277 return Err(ConnectError::Protocol(
278 ProtocolViolation::MissingStatusField,
279 ));
280 }
281 }
282
283 let limit = limit.map(Into::into);
284
285 let FramedParts {
286 io,
287 read_buffer,
288 write_buffer,
289 ..
290 } = substream.into_parts();
291 assert!(
292 write_buffer.is_empty(),
293 "Expect a flushed Framed to have empty write buffer."
294 );
295
296 let circuit = Circuit {
297 stream: io,
298 read_buffer: read_buffer.freeze(),
299 limit,
300 };
301
302 Ok(circuit)
303}