libp2p_relay/priv_client/
transport.rs1use std::{
23    collections::VecDeque,
24    pin::Pin,
25    task::{Context, Poll, Waker},
26};
27
28use futures::{
29    channel::{mpsc, oneshot},
30    future::{ready, BoxFuture, FutureExt, Ready},
31    sink::SinkExt,
32    stream::{SelectAll, Stream, StreamExt},
33};
34use libp2p_core::{
35    multiaddr::{Multiaddr, Protocol},
36    transport::{DialOpts, ListenerId, TransportError, TransportEvent},
37};
38use libp2p_identity::PeerId;
39use thiserror::Error;
40
41use crate::{
42    multiaddr_ext::MultiaddrExt,
43    priv_client::Connection,
44    protocol::{
45        outbound_hop,
46        outbound_hop::{ConnectError, ReserveError},
47    },
48    RequestId,
49};
50
51pub struct Transport {
108    to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
109    pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
110    listeners: SelectAll<Listener>,
111}
112
113impl Transport {
114    pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
115        let (to_behaviour, from_transport) = mpsc::channel(1000);
116        let transport = Transport {
117            to_behaviour,
118            pending_to_behaviour: VecDeque::new(),
119            listeners: SelectAll::new(),
120        };
121        (transport, from_transport)
122    }
123}
124
125impl libp2p_core::Transport for Transport {
126    type Output = Connection;
127    type Error = Error;
128    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
129    type Dial = BoxFuture<'static, Result<Connection, Error>>;
130
131    fn listen_on(
132        &mut self,
133        listener_id: ListenerId,
134        addr: Multiaddr,
135    ) -> Result<(), TransportError<Self::Error>> {
136        let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
137            RelayedMultiaddr {
138                relay_peer_id: None,
139                relay_addr: _,
140                ..
141            } => return Err(Error::MissingDstPeerId.into()),
142            RelayedMultiaddr {
143                relay_peer_id: _,
144                relay_addr: None,
145                ..
146            } => return Err(Error::MissingRelayAddr.into()),
147            RelayedMultiaddr {
148                relay_peer_id: Some(peer_id),
149                relay_addr: Some(addr),
150                ..
151            } => (peer_id, addr),
152        };
153
154        let (to_listener, from_behaviour) = mpsc::channel(0);
155        self.pending_to_behaviour
156            .push_back(TransportToBehaviourMsg::ListenReq {
157                relay_peer_id,
158                relay_addr,
159                to_listener,
160            });
161
162        let listener = Listener {
163            listener_id,
164            queued_events: Default::default(),
165            from_behaviour,
166            is_closed: false,
167            waker: None,
168        };
169        self.listeners.push(listener);
170        Ok(())
171    }
172
173    fn remove_listener(&mut self, id: ListenerId) -> bool {
174        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
175            listener.close(Ok(()));
176            true
177        } else {
178            false
179        }
180    }
181
182    fn dial(
183        &mut self,
184        addr: Multiaddr,
185        dial_opts: DialOpts,
186    ) -> Result<Self::Dial, TransportError<Self::Error>> {
187        if dial_opts.role.is_listener() {
188            return Err(TransportError::MultiaddrNotSupported(addr));
193        }
194
195        let RelayedMultiaddr {
196            relay_peer_id,
197            relay_addr,
198            dst_peer_id,
199            dst_addr,
200        } = parse_relayed_multiaddr(addr)?;
201
202        let relay_peer_id = relay_peer_id.ok_or(Error::MissingRelayPeerId)?;
204        let relay_addr = relay_addr.ok_or(Error::MissingRelayAddr)?;
205        let dst_peer_id = dst_peer_id.ok_or(Error::MissingDstPeerId)?;
206
207        let mut to_behaviour = self.to_behaviour.clone();
208        Ok(async move {
209            let (tx, rx) = oneshot::channel();
210            to_behaviour
211                .send(TransportToBehaviourMsg::DialReq {
212                    request_id: RequestId::new(),
213                    relay_addr,
214                    relay_peer_id,
215                    dst_addr,
216                    dst_peer_id,
217                    send_back: tx,
218                })
219                .await?;
220            let stream = rx.await??;
221
222            Ok(stream)
223        }
224        .boxed())
225    }
226
227    fn poll(
228        mut self: Pin<&mut Self>,
229        cx: &mut Context<'_>,
230    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>
231    where
232        Self: Sized,
233    {
234        loop {
235            if !self.pending_to_behaviour.is_empty() {
236                match self.to_behaviour.poll_ready(cx) {
237                    Poll::Ready(Ok(())) => {
238                        let msg = self
239                            .pending_to_behaviour
240                            .pop_front()
241                            .expect("Called !is_empty().");
242                        let _ = self.to_behaviour.start_send(msg);
243                        continue;
244                    }
245                    Poll::Ready(Err(_)) => unreachable!("Receiver is never dropped."),
246                    Poll::Pending => {}
247                }
248            }
249            match self.listeners.poll_next_unpin(cx) {
250                Poll::Ready(Some(event)) => return Poll::Ready(event),
251                _ => return Poll::Pending,
252            }
253        }
254    }
255}
256
257#[derive(Default)]
258struct RelayedMultiaddr {
259    relay_peer_id: Option<PeerId>,
260    relay_addr: Option<Multiaddr>,
261    dst_peer_id: Option<PeerId>,
262    dst_addr: Option<Multiaddr>,
263}
264
265fn parse_relayed_multiaddr(addr: Multiaddr) -> Result<RelayedMultiaddr, TransportError<Error>> {
267    if !addr.is_relayed() {
268        return Err(TransportError::MultiaddrNotSupported(addr));
269    }
270
271    let mut relayed_multiaddr = RelayedMultiaddr::default();
272
273    let mut before_circuit = true;
274    for protocol in addr.into_iter() {
275        match protocol {
276            Protocol::P2pCircuit => {
277                if before_circuit {
278                    before_circuit = false;
279                } else {
280                    return Err(Error::MultipleCircuitRelayProtocolsUnsupported.into());
281                }
282            }
283            Protocol::P2p(peer_id) => {
284                if before_circuit {
285                    if relayed_multiaddr.relay_peer_id.is_some() {
286                        return Err(Error::MalformedMultiaddr.into());
287                    }
288                    relayed_multiaddr.relay_peer_id = Some(peer_id)
289                } else {
290                    if relayed_multiaddr.dst_peer_id.is_some() {
291                        return Err(Error::MalformedMultiaddr.into());
292                    }
293                    relayed_multiaddr.dst_peer_id = Some(peer_id)
294                }
295            }
296            p => {
297                if before_circuit {
298                    relayed_multiaddr
299                        .relay_addr
300                        .get_or_insert_with(Multiaddr::empty)
301                        .push(p);
302                } else {
303                    relayed_multiaddr
304                        .dst_addr
305                        .get_or_insert_with(Multiaddr::empty)
306                        .push(p);
307                }
308            }
309        }
310    }
311
312    Ok(relayed_multiaddr)
313}
314
315pub(crate) struct Listener {
316    listener_id: ListenerId,
317    queued_events: VecDeque<<Self as Stream>::Item>,
319    from_behaviour: mpsc::Receiver<ToListenerMsg>,
321    is_closed: bool,
325    waker: Option<Waker>,
326}
327
328impl Listener {
329    fn close(&mut self, reason: Result<(), Error>) {
335        self.queued_events
336            .push_back(TransportEvent::ListenerClosed {
337                listener_id: self.listener_id,
338                reason,
339            });
340        self.is_closed = true;
341
342        if let Some(waker) = self.waker.take() {
343            waker.wake();
344        }
345    }
346}
347
348impl Stream for Listener {
349    type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
350
351    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352        loop {
353            if let Some(event) = self.queued_events.pop_front() {
354                self.waker = None;
355                return Poll::Ready(Some(event));
356            }
357
358            if self.is_closed {
359                self.waker = None;
362                return Poll::Ready(None);
363            }
364
365            let msg = match self.from_behaviour.poll_next_unpin(cx) {
366                Poll::Ready(Some(msg)) => msg,
367                Poll::Ready(None) => {
368                    self.close(Ok(()));
370                    continue;
371                }
372                Poll::Pending => {
373                    self.waker = Some(cx.waker().clone());
374                    return Poll::Pending;
375                }
376            };
377
378            match msg {
379                ToListenerMsg::Reservation(Ok(Reservation { addrs })) => {
380                    debug_assert!(
381                        self.queued_events.is_empty(),
382                        "Assert empty due to previous `pop_front` attempt."
383                    );
384                    self.queued_events = addrs
386                        .into_iter()
387                        .map(|listen_addr| TransportEvent::NewAddress {
388                            listener_id: self.listener_id,
389                            listen_addr,
390                        })
391                        .collect();
392                }
393                ToListenerMsg::IncomingRelayedConnection {
394                    stream,
395                    src_peer_id,
396                    relay_addr,
397                    relay_peer_id: _,
398                } => {
399                    let listener_id = self.listener_id;
400
401                    self.queued_events.push_back(TransportEvent::Incoming {
402                        upgrade: ready(Ok(stream)),
403                        listener_id,
404                        local_addr: relay_addr.with(Protocol::P2pCircuit),
405                        send_back_addr: Protocol::P2p(src_peer_id).into(),
406                    })
407                }
408                ToListenerMsg::Reservation(Err(e)) => self.close(Err(Error::Reservation(e))),
409            };
410        }
411    }
412}
413
414#[derive(Debug, Error)]
416pub enum Error {
417    #[error("Missing relay peer id.")]
418    MissingRelayPeerId,
419    #[error("Missing relay address.")]
420    MissingRelayAddr,
421    #[error("Missing destination peer id.")]
422    MissingDstPeerId,
423    #[error("Invalid peer id hash.")]
424    InvalidHash,
425    #[error("Failed to send message to relay behaviour: {0:?}")]
426    SendingMessageToBehaviour(#[from] mpsc::SendError),
427    #[error("Response from behaviour was canceled")]
428    ResponseFromBehaviourCanceled(#[from] oneshot::Canceled),
429    #[error(
430        "Address contains multiple circuit relay protocols (`p2p-circuit`) which is not supported."
431    )]
432    MultipleCircuitRelayProtocolsUnsupported,
433    #[error("One of the provided multiaddresses is malformed.")]
434    MalformedMultiaddr,
435    #[error("Failed to get Reservation.")]
436    Reservation(#[from] ReserveError),
437    #[error("Failed to connect to destination.")]
438    Connect(#[from] ConnectError),
439}
440
441impl From<Error> for TransportError<Error> {
442    fn from(error: Error) -> Self {
443        TransportError::Other(error)
444    }
445}
446
447pub(crate) enum TransportToBehaviourMsg {
450    #[allow(dead_code)]
452    DialReq {
453        request_id: RequestId,
454        relay_addr: Multiaddr,
455        relay_peer_id: PeerId,
456        dst_addr: Option<Multiaddr>,
457        dst_peer_id: PeerId,
458        send_back: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
459    },
460    ListenReq {
462        relay_peer_id: PeerId,
463        relay_addr: Multiaddr,
464        to_listener: mpsc::Sender<ToListenerMsg>,
465    },
466}
467
468#[allow(clippy::large_enum_variant)]
469pub enum ToListenerMsg {
470    Reservation(Result<Reservation, ReserveError>),
471    IncomingRelayedConnection {
472        stream: Connection,
473        src_peer_id: PeerId,
474        relay_peer_id: PeerId,
475        relay_addr: Multiaddr,
476    },
477}
478
479pub struct Reservation {
480    pub(crate) addrs: Vec<Multiaddr>,
481}