libp2p_relay/priv_client/
transport.rs1use std::{
23 collections::VecDeque,
24 pin::Pin,
25 task::{Context, Poll, Waker},
26};
27
28use futures::{
29 channel::{mpsc, oneshot},
30 future::{ready, BoxFuture, FutureExt, Ready},
31 sink::SinkExt,
32 stream::{SelectAll, Stream, StreamExt},
33};
34use libp2p_core::{
35 multiaddr::{Multiaddr, Protocol},
36 transport::{DialOpts, ListenerId, TransportError, TransportEvent},
37};
38use libp2p_identity::PeerId;
39use thiserror::Error;
40
41use crate::{
42 multiaddr_ext::MultiaddrExt,
43 priv_client::Connection,
44 protocol::{
45 outbound_hop,
46 outbound_hop::{ConnectError, ReserveError},
47 },
48 RequestId,
49};
50
51pub struct Transport {
108 to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
109 pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
110 listeners: SelectAll<Listener>,
111}
112
113impl Transport {
114 pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
115 let (to_behaviour, from_transport) = mpsc::channel(1000);
116 let transport = Transport {
117 to_behaviour,
118 pending_to_behaviour: VecDeque::new(),
119 listeners: SelectAll::new(),
120 };
121 (transport, from_transport)
122 }
123}
124
125impl libp2p_core::Transport for Transport {
126 type Output = Connection;
127 type Error = Error;
128 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
129 type Dial = BoxFuture<'static, Result<Connection, Error>>;
130
131 fn listen_on(
132 &mut self,
133 listener_id: ListenerId,
134 addr: Multiaddr,
135 ) -> Result<(), TransportError<Self::Error>> {
136 let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
137 RelayedMultiaddr {
138 relay_peer_id: None,
139 relay_addr: _,
140 ..
141 } => return Err(Error::MissingDstPeerId.into()),
142 RelayedMultiaddr {
143 relay_peer_id: _,
144 relay_addr: None,
145 ..
146 } => return Err(Error::MissingRelayAddr.into()),
147 RelayedMultiaddr {
148 relay_peer_id: Some(peer_id),
149 relay_addr: Some(addr),
150 ..
151 } => (peer_id, addr),
152 };
153
154 let (to_listener, from_behaviour) = mpsc::channel(0);
155 self.pending_to_behaviour
156 .push_back(TransportToBehaviourMsg::ListenReq {
157 relay_peer_id,
158 relay_addr,
159 to_listener,
160 });
161
162 let listener = Listener {
163 listener_id,
164 queued_events: Default::default(),
165 from_behaviour,
166 is_closed: false,
167 waker: None,
168 };
169 self.listeners.push(listener);
170 Ok(())
171 }
172
173 fn remove_listener(&mut self, id: ListenerId) -> bool {
174 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
175 listener.close(Ok(()));
176 true
177 } else {
178 false
179 }
180 }
181
182 fn dial(
183 &mut self,
184 addr: Multiaddr,
185 dial_opts: DialOpts,
186 ) -> Result<Self::Dial, TransportError<Self::Error>> {
187 if dial_opts.role.is_listener() {
188 return Err(TransportError::MultiaddrNotSupported(addr));
193 }
194
195 let RelayedMultiaddr {
196 relay_peer_id,
197 relay_addr,
198 dst_peer_id,
199 dst_addr,
200 } = parse_relayed_multiaddr(addr)?;
201
202 let relay_peer_id = relay_peer_id.ok_or(Error::MissingRelayPeerId)?;
204 let relay_addr = relay_addr.ok_or(Error::MissingRelayAddr)?;
205 let dst_peer_id = dst_peer_id.ok_or(Error::MissingDstPeerId)?;
206
207 let mut to_behaviour = self.to_behaviour.clone();
208 Ok(async move {
209 let (tx, rx) = oneshot::channel();
210 to_behaviour
211 .send(TransportToBehaviourMsg::DialReq {
212 request_id: RequestId::new(),
213 relay_addr,
214 relay_peer_id,
215 dst_addr,
216 dst_peer_id,
217 send_back: tx,
218 })
219 .await?;
220 let stream = rx.await??;
221
222 Ok(stream)
223 }
224 .boxed())
225 }
226
227 fn poll(
228 mut self: Pin<&mut Self>,
229 cx: &mut Context<'_>,
230 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>
231 where
232 Self: Sized,
233 {
234 loop {
235 if !self.pending_to_behaviour.is_empty() {
236 match self.to_behaviour.poll_ready(cx) {
237 Poll::Ready(Ok(())) => {
238 let msg = self
239 .pending_to_behaviour
240 .pop_front()
241 .expect("Called !is_empty().");
242 let _ = self.to_behaviour.start_send(msg);
243 continue;
244 }
245 Poll::Ready(Err(_)) => unreachable!("Receiver is never dropped."),
246 Poll::Pending => {}
247 }
248 }
249 match self.listeners.poll_next_unpin(cx) {
250 Poll::Ready(Some(event)) => return Poll::Ready(event),
251 _ => return Poll::Pending,
252 }
253 }
254 }
255}
256
257#[derive(Default)]
258struct RelayedMultiaddr {
259 relay_peer_id: Option<PeerId>,
260 relay_addr: Option<Multiaddr>,
261 dst_peer_id: Option<PeerId>,
262 dst_addr: Option<Multiaddr>,
263}
264
265fn parse_relayed_multiaddr(addr: Multiaddr) -> Result<RelayedMultiaddr, TransportError<Error>> {
267 if !addr.is_relayed() {
268 return Err(TransportError::MultiaddrNotSupported(addr));
269 }
270
271 let mut relayed_multiaddr = RelayedMultiaddr::default();
272
273 let mut before_circuit = true;
274 for protocol in addr.into_iter() {
275 match protocol {
276 Protocol::P2pCircuit => {
277 if before_circuit {
278 before_circuit = false;
279 } else {
280 return Err(Error::MultipleCircuitRelayProtocolsUnsupported.into());
281 }
282 }
283 Protocol::P2p(peer_id) => {
284 if before_circuit {
285 if relayed_multiaddr.relay_peer_id.is_some() {
286 return Err(Error::MalformedMultiaddr.into());
287 }
288 relayed_multiaddr.relay_peer_id = Some(peer_id)
289 } else {
290 if relayed_multiaddr.dst_peer_id.is_some() {
291 return Err(Error::MalformedMultiaddr.into());
292 }
293 relayed_multiaddr.dst_peer_id = Some(peer_id)
294 }
295 }
296 p => {
297 if before_circuit {
298 relayed_multiaddr
299 .relay_addr
300 .get_or_insert(Multiaddr::empty())
301 .push(p);
302 } else {
303 relayed_multiaddr
304 .dst_addr
305 .get_or_insert(Multiaddr::empty())
306 .push(p);
307 }
308 }
309 }
310 }
311
312 Ok(relayed_multiaddr)
313}
314
315pub(crate) struct Listener {
316 listener_id: ListenerId,
317 queued_events: VecDeque<<Self as Stream>::Item>,
319 from_behaviour: mpsc::Receiver<ToListenerMsg>,
321 is_closed: bool,
325 waker: Option<Waker>,
326}
327
328impl Listener {
329 fn close(&mut self, reason: Result<(), Error>) {
335 self.queued_events
336 .push_back(TransportEvent::ListenerClosed {
337 listener_id: self.listener_id,
338 reason,
339 });
340 self.is_closed = true;
341
342 if let Some(waker) = self.waker.take() {
343 waker.wake();
344 }
345 }
346}
347
348impl Stream for Listener {
349 type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
350
351 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 loop {
353 if let Some(event) = self.queued_events.pop_front() {
354 self.waker = None;
355 return Poll::Ready(Some(event));
356 }
357
358 if self.is_closed {
359 self.waker = None;
362 return Poll::Ready(None);
363 }
364
365 let msg = match self.from_behaviour.poll_next_unpin(cx) {
366 Poll::Ready(Some(msg)) => msg,
367 Poll::Ready(None) => {
368 self.close(Ok(()));
370 continue;
371 }
372 Poll::Pending => {
373 self.waker = Some(cx.waker().clone());
374 return Poll::Pending;
375 }
376 };
377
378 match msg {
379 ToListenerMsg::Reservation(Ok(Reservation { addrs })) => {
380 debug_assert!(
381 self.queued_events.is_empty(),
382 "Assert empty due to previous `pop_front` attempt."
383 );
384 self.queued_events = addrs
386 .into_iter()
387 .map(|listen_addr| TransportEvent::NewAddress {
388 listener_id: self.listener_id,
389 listen_addr,
390 })
391 .collect();
392 }
393 ToListenerMsg::IncomingRelayedConnection {
394 stream,
395 src_peer_id,
396 relay_addr,
397 relay_peer_id: _,
398 } => {
399 let listener_id = self.listener_id;
400
401 self.queued_events.push_back(TransportEvent::Incoming {
402 upgrade: ready(Ok(stream)),
403 listener_id,
404 local_addr: relay_addr.with(Protocol::P2pCircuit),
405 send_back_addr: Protocol::P2p(src_peer_id).into(),
406 })
407 }
408 ToListenerMsg::Reservation(Err(e)) => self.close(Err(Error::Reservation(e))),
409 };
410 }
411 }
412}
413
414#[derive(Debug, Error)]
416pub enum Error {
417 #[error("Missing relay peer id.")]
418 MissingRelayPeerId,
419 #[error("Missing relay address.")]
420 MissingRelayAddr,
421 #[error("Missing destination peer id.")]
422 MissingDstPeerId,
423 #[error("Invalid peer id hash.")]
424 InvalidHash,
425 #[error("Failed to send message to relay behaviour: {0:?}")]
426 SendingMessageToBehaviour(#[from] mpsc::SendError),
427 #[error("Response from behaviour was canceled")]
428 ResponseFromBehaviourCanceled(#[from] oneshot::Canceled),
429 #[error(
430 "Address contains multiple circuit relay protocols (`p2p-circuit`) which is not supported."
431 )]
432 MultipleCircuitRelayProtocolsUnsupported,
433 #[error("One of the provided multiaddresses is malformed.")]
434 MalformedMultiaddr,
435 #[error("Failed to get Reservation.")]
436 Reservation(#[from] ReserveError),
437 #[error("Failed to connect to destination.")]
438 Connect(#[from] ConnectError),
439}
440
441impl From<Error> for TransportError<Error> {
442 fn from(error: Error) -> Self {
443 TransportError::Other(error)
444 }
445}
446
447pub(crate) enum TransportToBehaviourMsg {
450 #[allow(dead_code)]
452 DialReq {
453 request_id: RequestId,
454 relay_addr: Multiaddr,
455 relay_peer_id: PeerId,
456 dst_addr: Option<Multiaddr>,
457 dst_peer_id: PeerId,
458 send_back: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
459 },
460 ListenReq {
462 relay_peer_id: PeerId,
463 relay_addr: Multiaddr,
464 to_listener: mpsc::Sender<ToListenerMsg>,
465 },
466}
467
468#[allow(clippy::large_enum_variant)]
469pub enum ToListenerMsg {
470 Reservation(Result<Reservation, ReserveError>),
471 IncomingRelayedConnection {
472 stream: Connection,
473 src_peer_id: PeerId,
474 relay_peer_id: PeerId,
475 relay_addr: Multiaddr,
476 },
477}
478
479pub struct Reservation {
480 pub(crate) addrs: Vec<Multiaddr>,
481}