libp2p_relay/priv_client/
transport.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2// Copyright 2021 Protocol Labs.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22use std::{
23    collections::VecDeque,
24    pin::Pin,
25    task::{Context, Poll, Waker},
26};
27
28use futures::{
29    channel::{mpsc, oneshot},
30    future::{ready, BoxFuture, FutureExt, Ready},
31    sink::SinkExt,
32    stream::{SelectAll, Stream, StreamExt},
33};
34use libp2p_core::{
35    multiaddr::{Multiaddr, Protocol},
36    transport::{DialOpts, ListenerId, TransportError, TransportEvent},
37};
38use libp2p_identity::PeerId;
39use thiserror::Error;
40
41use crate::{
42    multiaddr_ext::MultiaddrExt,
43    priv_client::Connection,
44    protocol::{
45        outbound_hop,
46        outbound_hop::{ConnectError, ReserveError},
47    },
48    RequestId,
49};
50
51/// A [`Transport`] enabling client relay capabilities.
52///
53/// Note: The transport only handles listening and dialing on relayed [`Multiaddr`], and depends on
54/// an other transport to do the actual transmission of data. They should be combined through the
55/// [`OrTransport`](libp2p_core::transport::choice::OrTransport).
56///
57/// Allows the local node to:
58///
59/// 1. Establish relayed connections by dialing `/p2p-circuit` addresses.
60///
61///    ```
62///    # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport,
63///    # transport::{DialOpts, PortUse}, connection::Endpoint};
64///    # use libp2p_core::transport::memory::MemoryTransport;
65///    # use libp2p_core::transport::choice::OrTransport;
66///    # use libp2p_relay as relay;
67///    # use libp2p_identity::PeerId;
68///    let actual_transport = MemoryTransport::default();
69///    let (relay_transport, behaviour) = relay::client::new(
70///        PeerId::random()
71///    );
72///    let mut transport = OrTransport::new(relay_transport, actual_transport);
73///    # let relay_id = PeerId::random();
74///    # let destination_id = PeerId::random();
75///    let dst_addr_via_relay = Multiaddr::empty()
76///        .with(Protocol::Memory(40)) // Relay address.
77///        .with(Protocol::P2p(relay_id.into())) // Relay peer id.
78///        .with(Protocol::P2pCircuit) // Signal to connect via relay and not directly.
79///        .with(Protocol::P2p(destination_id.into())); // Destination peer id.
80///    transport.dial(dst_addr_via_relay, DialOpts {
81///         port_use: PortUse::Reuse,
82///         role: Endpoint::Dialer,
83///    }).unwrap();
84///    ```
85///
86/// 3. Listen for incoming relayed connections via specific relay.
87///
88///    ```
89///    # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, transport::ListenerId, Transport};
90///    # use libp2p_core::transport::memory::MemoryTransport;
91///    # use libp2p_core::transport::choice::OrTransport;
92///    # use libp2p_relay as relay;
93///    # use libp2p_identity::PeerId;
94///    # let relay_id = PeerId::random();
95///    # let local_peer_id = PeerId::random();
96///    let actual_transport = MemoryTransport::default();
97///    let (relay_transport, behaviour) = relay::client::new(
98///       local_peer_id
99///    );
100///    let mut transport = OrTransport::new(relay_transport, actual_transport);
101///    let relay_addr = Multiaddr::empty()
102///        .with(Protocol::Memory(40)) // Relay address.
103///        .with(Protocol::P2p(relay_id.into())) // Relay peer id.
104///        .with(Protocol::P2pCircuit); // Signal to listen via remote relay node.
105///    transport.listen_on(ListenerId::next(), relay_addr).unwrap();
106///    ```
107pub struct Transport {
108    to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
109    pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
110    listeners: SelectAll<Listener>,
111}
112
113impl Transport {
114    pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
115        let (to_behaviour, from_transport) = mpsc::channel(1000);
116        let transport = Transport {
117            to_behaviour,
118            pending_to_behaviour: VecDeque::new(),
119            listeners: SelectAll::new(),
120        };
121        (transport, from_transport)
122    }
123}
124
125impl libp2p_core::Transport for Transport {
126    type Output = Connection;
127    type Error = Error;
128    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
129    type Dial = BoxFuture<'static, Result<Connection, Error>>;
130
131    fn listen_on(
132        &mut self,
133        listener_id: ListenerId,
134        addr: Multiaddr,
135    ) -> Result<(), TransportError<Self::Error>> {
136        let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
137            RelayedMultiaddr {
138                relay_peer_id: None,
139                relay_addr: _,
140                ..
141            } => return Err(Error::MissingDstPeerId.into()),
142            RelayedMultiaddr {
143                relay_peer_id: _,
144                relay_addr: None,
145                ..
146            } => return Err(Error::MissingRelayAddr.into()),
147            RelayedMultiaddr {
148                relay_peer_id: Some(peer_id),
149                relay_addr: Some(addr),
150                ..
151            } => (peer_id, addr),
152        };
153
154        let (to_listener, from_behaviour) = mpsc::channel(0);
155        self.pending_to_behaviour
156            .push_back(TransportToBehaviourMsg::ListenReq {
157                relay_peer_id,
158                relay_addr,
159                to_listener,
160            });
161
162        let listener = Listener {
163            listener_id,
164            queued_events: Default::default(),
165            from_behaviour,
166            is_closed: false,
167            waker: None,
168        };
169        self.listeners.push(listener);
170        Ok(())
171    }
172
173    fn remove_listener(&mut self, id: ListenerId) -> bool {
174        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
175            listener.close(Ok(()));
176            true
177        } else {
178            false
179        }
180    }
181
182    fn dial(
183        &mut self,
184        addr: Multiaddr,
185        dial_opts: DialOpts,
186    ) -> Result<Self::Dial, TransportError<Self::Error>> {
187        if dial_opts.role.is_listener() {
188            // [`Endpoint::Listener`] is used for NAT and firewall
189            // traversal. One would coordinate such traversal via a previously
190            // established relayed connection, but never using a relayed connection
191            // itself.
192            return Err(TransportError::MultiaddrNotSupported(addr));
193        }
194
195        let RelayedMultiaddr {
196            relay_peer_id,
197            relay_addr,
198            dst_peer_id,
199            dst_addr,
200        } = parse_relayed_multiaddr(addr)?;
201
202        // TODO: In the future we might want to support dialing a relay by its address only.
203        let relay_peer_id = relay_peer_id.ok_or(Error::MissingRelayPeerId)?;
204        let relay_addr = relay_addr.ok_or(Error::MissingRelayAddr)?;
205        let dst_peer_id = dst_peer_id.ok_or(Error::MissingDstPeerId)?;
206
207        let mut to_behaviour = self.to_behaviour.clone();
208        Ok(async move {
209            let (tx, rx) = oneshot::channel();
210            to_behaviour
211                .send(TransportToBehaviourMsg::DialReq {
212                    request_id: RequestId::new(),
213                    relay_addr,
214                    relay_peer_id,
215                    dst_addr,
216                    dst_peer_id,
217                    send_back: tx,
218                })
219                .await?;
220            let stream = rx.await??;
221
222            Ok(stream)
223        }
224        .boxed())
225    }
226
227    fn poll(
228        mut self: Pin<&mut Self>,
229        cx: &mut Context<'_>,
230    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>
231    where
232        Self: Sized,
233    {
234        loop {
235            if !self.pending_to_behaviour.is_empty() {
236                match self.to_behaviour.poll_ready(cx) {
237                    Poll::Ready(Ok(())) => {
238                        let msg = self
239                            .pending_to_behaviour
240                            .pop_front()
241                            .expect("Called !is_empty().");
242                        let _ = self.to_behaviour.start_send(msg);
243                        continue;
244                    }
245                    Poll::Ready(Err(_)) => unreachable!("Receiver is never dropped."),
246                    Poll::Pending => {}
247                }
248            }
249            match self.listeners.poll_next_unpin(cx) {
250                Poll::Ready(Some(event)) => return Poll::Ready(event),
251                _ => return Poll::Pending,
252            }
253        }
254    }
255}
256
257#[derive(Default)]
258struct RelayedMultiaddr {
259    relay_peer_id: Option<PeerId>,
260    relay_addr: Option<Multiaddr>,
261    dst_peer_id: Option<PeerId>,
262    dst_addr: Option<Multiaddr>,
263}
264
265/// Parse a [`Multiaddr`] containing a [`Protocol::P2pCircuit`].
266fn parse_relayed_multiaddr(addr: Multiaddr) -> Result<RelayedMultiaddr, TransportError<Error>> {
267    if !addr.is_relayed() {
268        return Err(TransportError::MultiaddrNotSupported(addr));
269    }
270
271    let mut relayed_multiaddr = RelayedMultiaddr::default();
272
273    let mut before_circuit = true;
274    for protocol in addr.into_iter() {
275        match protocol {
276            Protocol::P2pCircuit => {
277                if before_circuit {
278                    before_circuit = false;
279                } else {
280                    return Err(Error::MultipleCircuitRelayProtocolsUnsupported.into());
281                }
282            }
283            Protocol::P2p(peer_id) => {
284                if before_circuit {
285                    if relayed_multiaddr.relay_peer_id.is_some() {
286                        return Err(Error::MalformedMultiaddr.into());
287                    }
288                    relayed_multiaddr.relay_peer_id = Some(peer_id)
289                } else {
290                    if relayed_multiaddr.dst_peer_id.is_some() {
291                        return Err(Error::MalformedMultiaddr.into());
292                    }
293                    relayed_multiaddr.dst_peer_id = Some(peer_id)
294                }
295            }
296            p => {
297                if before_circuit {
298                    relayed_multiaddr
299                        .relay_addr
300                        .get_or_insert(Multiaddr::empty())
301                        .push(p);
302                } else {
303                    relayed_multiaddr
304                        .dst_addr
305                        .get_or_insert(Multiaddr::empty())
306                        .push(p);
307                }
308            }
309        }
310    }
311
312    Ok(relayed_multiaddr)
313}
314
315pub(crate) struct Listener {
316    listener_id: ListenerId,
317    /// Queue of events to report when polled.
318    queued_events: VecDeque<<Self as Stream>::Item>,
319    /// Channel for messages from the behaviour [`Handler`][super::handler::Handler].
320    from_behaviour: mpsc::Receiver<ToListenerMsg>,
321    /// The listener can be closed either manually with
322    /// [`Transport::remove_listener`](libp2p_core::Transport) or if the sender side of the
323    /// `from_behaviour` channel is dropped.
324    is_closed: bool,
325    waker: Option<Waker>,
326}
327
328impl Listener {
329    /// Close the listener.
330    ///
331    /// This will create a [`TransportEvent::ListenerClosed`] event
332    /// and terminate the stream once all remaining events in queue have
333    /// been reported.
334    fn close(&mut self, reason: Result<(), Error>) {
335        self.queued_events
336            .push_back(TransportEvent::ListenerClosed {
337                listener_id: self.listener_id,
338                reason,
339            });
340        self.is_closed = true;
341
342        if let Some(waker) = self.waker.take() {
343            waker.wake();
344        }
345    }
346}
347
348impl Stream for Listener {
349    type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
350
351    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352        loop {
353            if let Some(event) = self.queued_events.pop_front() {
354                self.waker = None;
355                return Poll::Ready(Some(event));
356            }
357
358            if self.is_closed {
359                // Terminate the stream if the listener closed and
360                // all remaining events have been reported.
361                self.waker = None;
362                return Poll::Ready(None);
363            }
364
365            let msg = match self.from_behaviour.poll_next_unpin(cx) {
366                Poll::Ready(Some(msg)) => msg,
367                Poll::Ready(None) => {
368                    // Sender of `from_behaviour` has been dropped, signaling listener to close.
369                    self.close(Ok(()));
370                    continue;
371                }
372                Poll::Pending => {
373                    self.waker = Some(cx.waker().clone());
374                    return Poll::Pending;
375                }
376            };
377
378            match msg {
379                ToListenerMsg::Reservation(Ok(Reservation { addrs })) => {
380                    debug_assert!(
381                        self.queued_events.is_empty(),
382                        "Assert empty due to previous `pop_front` attempt."
383                    );
384                    // Returned as [`ListenerEvent::NewAddress`] in next iteration of loop.
385                    self.queued_events = addrs
386                        .into_iter()
387                        .map(|listen_addr| TransportEvent::NewAddress {
388                            listener_id: self.listener_id,
389                            listen_addr,
390                        })
391                        .collect();
392                }
393                ToListenerMsg::IncomingRelayedConnection {
394                    stream,
395                    src_peer_id,
396                    relay_addr,
397                    relay_peer_id: _,
398                } => {
399                    let listener_id = self.listener_id;
400
401                    self.queued_events.push_back(TransportEvent::Incoming {
402                        upgrade: ready(Ok(stream)),
403                        listener_id,
404                        local_addr: relay_addr.with(Protocol::P2pCircuit),
405                        send_back_addr: Protocol::P2p(src_peer_id).into(),
406                    })
407                }
408                ToListenerMsg::Reservation(Err(e)) => self.close(Err(Error::Reservation(e))),
409            };
410        }
411    }
412}
413
414/// Error that occurred during relay connection setup.
415#[derive(Debug, Error)]
416pub enum Error {
417    #[error("Missing relay peer id.")]
418    MissingRelayPeerId,
419    #[error("Missing relay address.")]
420    MissingRelayAddr,
421    #[error("Missing destination peer id.")]
422    MissingDstPeerId,
423    #[error("Invalid peer id hash.")]
424    InvalidHash,
425    #[error("Failed to send message to relay behaviour: {0:?}")]
426    SendingMessageToBehaviour(#[from] mpsc::SendError),
427    #[error("Response from behaviour was canceled")]
428    ResponseFromBehaviourCanceled(#[from] oneshot::Canceled),
429    #[error(
430        "Address contains multiple circuit relay protocols (`p2p-circuit`) which is not supported."
431    )]
432    MultipleCircuitRelayProtocolsUnsupported,
433    #[error("One of the provided multiaddresses is malformed.")]
434    MalformedMultiaddr,
435    #[error("Failed to get Reservation.")]
436    Reservation(#[from] ReserveError),
437    #[error("Failed to connect to destination.")]
438    Connect(#[from] ConnectError),
439}
440
441impl From<Error> for TransportError<Error> {
442    fn from(error: Error) -> Self {
443        TransportError::Other(error)
444    }
445}
446
447/// Message from the [`Transport`] to the [`Behaviour`](crate::Behaviour)
448/// [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour).
449pub(crate) enum TransportToBehaviourMsg {
450    /// Dial destination node via relay node.
451    #[allow(dead_code)]
452    DialReq {
453        request_id: RequestId,
454        relay_addr: Multiaddr,
455        relay_peer_id: PeerId,
456        dst_addr: Option<Multiaddr>,
457        dst_peer_id: PeerId,
458        send_back: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
459    },
460    /// Listen for incoming relayed connections via relay node.
461    ListenReq {
462        relay_peer_id: PeerId,
463        relay_addr: Multiaddr,
464        to_listener: mpsc::Sender<ToListenerMsg>,
465    },
466}
467
468#[allow(clippy::large_enum_variant)]
469pub enum ToListenerMsg {
470    Reservation(Result<Reservation, ReserveError>),
471    IncomingRelayedConnection {
472        stream: Connection,
473        src_peer_id: PeerId,
474        relay_peer_id: PeerId,
475        relay_addr: Multiaddr,
476    },
477}
478
479pub struct Reservation {
480    pub(crate) addrs: Vec<Multiaddr>,
481}