libp2p_core/
transport.rs

1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use std::{
29    error::Error,
30    fmt,
31    pin::Pin,
32    sync::atomic::{AtomicUsize, Ordering},
33    task::{Context, Poll},
34};
35
36use futures::prelude::*;
37use multiaddr::Multiaddr;
38
39pub mod and_then;
40pub mod choice;
41pub mod dummy;
42pub mod global_only;
43pub mod map;
44pub mod map_err;
45pub mod memory;
46pub mod timeout;
47pub mod upgrade;
48
49mod boxed;
50mod optional;
51
52pub use self::{
53    boxed::Boxed, choice::OrTransport, memory::MemoryTransport, optional::OptionalTransport,
54    upgrade::Upgrade,
55};
56use crate::{ConnectedPoint, Endpoint};
57
58static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);
59
60/// The port use policy for a new connection.
61#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
62pub enum PortUse {
63    /// Always allocate a new port for the dial.
64    New,
65    /// Best effor reusing of an existing port.
66    ///
67    /// If there is no listener present that can be used to dial, a new port is allocated.
68    #[default]
69    Reuse,
70}
71
72/// Options to customize the behaviour during dialing.
73#[derive(Debug, Copy, Clone)]
74pub struct DialOpts {
75    /// The endpoint establishing a new connection.
76    ///
77    /// When attempting a hole-punch, both parties simultaneously "dial" each other but one party
78    /// has to be the "listener" on the final connection. This option specifies the role of
79    /// this node in the final connection.
80    pub role: Endpoint,
81    /// The port use policy for a new connection.
82    pub port_use: PortUse,
83}
84
85/// A transport provides connection-oriented communication between two peers
86/// through ordered streams of data (i.e. connections).
87///
88/// Connections are established either by [listening](Transport::listen_on)
89/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
90/// obtains a connection by listening is often referred to as the *listener* and the
91/// peer that initiated the connection through dialing as the *dialer*, in
92/// contrast to the traditional roles of *server* and *client*.
93///
94/// Most transports also provide a form of reliable delivery on the established
95/// connections but the precise semantics of these guarantees depend on the
96/// specific transport.
97///
98/// This trait is implemented for concrete connection-oriented transport protocols
99/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
100/// functionality to the dialing or listening process (e.g. name resolution via
101/// the DNS).
102///
103/// Additional protocols can be layered on top of the connections established
104/// by a [`Transport`] through an upgrade mechanism that is initiated via
105/// [`upgrade`](Transport::upgrade).
106///
107/// Note for implementers: Futures returned by [`Transport::dial`] should only
108/// do work once polled for the first time. E.g. in the case of TCP, connecting
109/// to the remote should not happen immediately on [`Transport::dial`] but only
110/// once the returned [`Future`] is polled. The caller of [`Transport::dial`]
111/// may call the method multiple times with a set of addresses, racing a subset
112/// of the returned dials to success concurrently.
113pub trait Transport {
114    /// The result of a connection setup process, including protocol upgrades.
115    ///
116    /// Typically the output contains at least a handle to a data stream (i.e. a
117    /// connection or a substream multiplexer on top of a connection) that
118    /// provides APIs for sending and receiving data through the connection.
119    type Output;
120
121    /// An error that occurred during connection setup.
122    type Error: Error;
123
124    /// A pending [`Output`](Transport::Output) for an inbound connection,
125    /// obtained from the [`Transport`] stream.
126    ///
127    /// After a connection has been accepted by the transport, it may need to go through
128    /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
129    /// post-processing should not block the `Listener` from producing the next
130    /// connection, hence further connection setup proceeds asynchronously.
131    /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
132    /// of the connection setup process.
133    type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
134
135    /// A pending [`Output`](Transport::Output) for an outbound connection,
136    /// obtained from [dialing](Transport::dial).
137    type Dial: Future<Output = Result<Self::Output, Self::Error>>;
138
139    /// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
140    fn listen_on(
141        &mut self,
142        id: ListenerId,
143        addr: Multiaddr,
144    ) -> Result<(), TransportError<Self::Error>>;
145
146    /// Remove a listener.
147    ///
148    /// Return `true` if there was a listener with this Id, `false`
149    /// otherwise.
150    fn remove_listener(&mut self, id: ListenerId) -> bool;
151
152    /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
153    ///
154    /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
155    /// try an alternative [`Transport`], if available.
156    fn dial(
157        &mut self,
158        addr: Multiaddr,
159        opts: DialOpts,
160    ) -> Result<Self::Dial, TransportError<Self::Error>>;
161
162    /// Poll for [`TransportEvent`]s.
163    ///
164    /// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the
165    /// lowest level of the transport stack. The item must be a
166    /// [`ListenerUpgrade`](Transport::ListenerUpgrade) future that resolves to an
167    /// [`Output`](Transport::Output) value once all protocol upgrades have been applied.
168    ///
169    /// Transports are expected to produce [`TransportEvent::Incoming`] events only for
170    /// listen addresses which have previously been announced via
171    /// a [`TransportEvent::NewAddress`] event and which have not been invalidated by
172    /// an [`TransportEvent::AddressExpired`] event yet.
173    fn poll(
174        self: Pin<&mut Self>,
175        cx: &mut Context<'_>,
176    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
177
178    /// Boxes the transport, including custom transport errors.
179    fn boxed(self) -> boxed::Boxed<Self::Output>
180    where
181        Self: Sized + Send + Unpin + 'static,
182        Self::Dial: Send + 'static,
183        Self::ListenerUpgrade: Send + 'static,
184        Self::Error: Send + Sync,
185    {
186        boxed::boxed(self)
187    }
188
189    /// Applies a function on the connections created by the transport.
190    fn map<F, O>(self, f: F) -> map::Map<Self, F>
191    where
192        Self: Sized,
193        F: FnOnce(Self::Output, ConnectedPoint) -> O,
194    {
195        map::Map::new(self, f)
196    }
197
198    /// Applies a function on the errors generated by the futures of the transport.
199    fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
200    where
201        Self: Sized,
202        F: FnOnce(Self::Error) -> E,
203    {
204        map_err::MapErr::new(self, f)
205    }
206
207    /// Adds a fallback transport that is used when encountering errors
208    /// while establishing inbound or outbound connections.
209    ///
210    /// The returned transport will act like `self`, except that if `listen_on` or `dial`
211    /// return an error then `other` will be tried.
212    fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
213    where
214        Self: Sized,
215        U: Transport,
216        <U as Transport>::Error: 'static,
217    {
218        OrTransport::new(self, other)
219    }
220
221    /// Applies a function producing an asynchronous result to every connection
222    /// created by this transport.
223    ///
224    /// This function can be used for ad-hoc protocol upgrades or
225    /// for processing or adapting the output for following configurations.
226    ///
227    /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
228    fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
229    where
230        Self: Sized,
231        C: FnOnce(Self::Output, ConnectedPoint) -> F,
232        F: TryFuture<Ok = O>,
233        <F as TryFuture>::Error: Error + 'static,
234    {
235        and_then::AndThen::new(self, f)
236    }
237
238    /// Begins a series of protocol upgrades via an [`upgrade::Builder`].
239    fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
240    where
241        Self: Sized,
242        Self::Error: 'static,
243    {
244        upgrade::Builder::new(self, version)
245    }
246}
247
248/// The ID of a single listener.
249#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
250pub struct ListenerId(usize);
251
252impl ListenerId {
253    /// Creates a new `ListenerId`.
254    pub fn next() -> Self {
255        ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
256    }
257}
258
259impl fmt::Display for ListenerId {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        write!(f, "{}", self.0)
262    }
263}
264
265/// Event produced by [`Transport`]s.
266pub enum TransportEvent<TUpgr, TErr> {
267    /// A new address is being listened on.
268    NewAddress {
269        /// The listener that is listening on the new address.
270        listener_id: ListenerId,
271        /// The new address that is being listened on.
272        listen_addr: Multiaddr,
273    },
274    /// An address is no longer being listened on.
275    AddressExpired {
276        /// The listener that is no longer listening on the address.
277        listener_id: ListenerId,
278        /// The new address that is being listened on.
279        listen_addr: Multiaddr,
280    },
281    /// A connection is incoming on one of the listeners.
282    Incoming {
283        /// The listener that produced the upgrade.
284        listener_id: ListenerId,
285        /// The produced upgrade.
286        upgrade: TUpgr,
287        /// Local connection address.
288        local_addr: Multiaddr,
289        /// Address used to send back data to the incoming client.
290        send_back_addr: Multiaddr,
291    },
292    /// A listener closed.
293    ListenerClosed {
294        /// The ID of the listener that closed.
295        listener_id: ListenerId,
296        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
297        /// if the stream produced an error.
298        reason: Result<(), TErr>,
299    },
300    /// A listener errored.
301    ///
302    /// The listener will continue to be polled for new events and the event
303    /// is for informational purposes only.
304    ListenerError {
305        /// The ID of the listener that errored.
306        listener_id: ListenerId,
307        /// The error value.
308        error: TErr,
309    },
310}
311
312impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
313    /// In case this [`TransportEvent`] is an upgrade, apply the given function
314    /// to the upgrade and produce another transport event based the function's result.
315    pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
316        match self {
317            TransportEvent::Incoming {
318                listener_id,
319                upgrade,
320                local_addr,
321                send_back_addr,
322            } => TransportEvent::Incoming {
323                listener_id,
324                upgrade: map(upgrade),
325                local_addr,
326                send_back_addr,
327            },
328            TransportEvent::NewAddress {
329                listen_addr,
330                listener_id,
331            } => TransportEvent::NewAddress {
332                listen_addr,
333                listener_id,
334            },
335            TransportEvent::AddressExpired {
336                listen_addr,
337                listener_id,
338            } => TransportEvent::AddressExpired {
339                listen_addr,
340                listener_id,
341            },
342            TransportEvent::ListenerError { listener_id, error } => {
343                TransportEvent::ListenerError { listener_id, error }
344            }
345            TransportEvent::ListenerClosed {
346                listener_id,
347                reason,
348            } => TransportEvent::ListenerClosed {
349                listener_id,
350                reason,
351            },
352        }
353    }
354
355    /// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
356    /// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
357    /// error and produce another transport event based on the function's result.
358    pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
359        match self {
360            TransportEvent::Incoming {
361                listener_id,
362                upgrade,
363                local_addr,
364                send_back_addr,
365            } => TransportEvent::Incoming {
366                listener_id,
367                upgrade,
368                local_addr,
369                send_back_addr,
370            },
371            TransportEvent::NewAddress {
372                listen_addr,
373                listener_id,
374            } => TransportEvent::NewAddress {
375                listen_addr,
376                listener_id,
377            },
378            TransportEvent::AddressExpired {
379                listen_addr,
380                listener_id,
381            } => TransportEvent::AddressExpired {
382                listen_addr,
383                listener_id,
384            },
385            TransportEvent::ListenerError { listener_id, error } => TransportEvent::ListenerError {
386                listener_id,
387                error: map_err(error),
388            },
389            TransportEvent::ListenerClosed {
390                listener_id,
391                reason,
392            } => TransportEvent::ListenerClosed {
393                listener_id,
394                reason: reason.map_err(map_err),
395            },
396        }
397    }
398
399    /// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming) transport event.
400    pub fn is_upgrade(&self) -> bool {
401        matches!(self, TransportEvent::Incoming { .. })
402    }
403
404    /// Try to turn this transport event into the upgrade parts of the
405    /// incoming connection.
406    ///
407    /// Returns `None` if the event is not actually an incoming connection,
408    /// otherwise the upgrade and the remote address.
409    pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
410        let TransportEvent::Incoming {
411            upgrade,
412            send_back_addr,
413            ..
414        } = self
415        else {
416            return None;
417        };
418
419        Some((upgrade, send_back_addr))
420    }
421
422    /// Returns `true` if this is a [`TransportEvent::NewAddress`].
423    pub fn is_new_address(&self) -> bool {
424        matches!(self, TransportEvent::NewAddress { .. })
425    }
426
427    /// Try to turn this transport event into the new `Multiaddr`.
428    ///
429    /// Returns `None` if the event is not actually a [`TransportEvent::NewAddress`],
430    /// otherwise the address.
431    pub fn into_new_address(self) -> Option<Multiaddr> {
432        if let TransportEvent::NewAddress { listen_addr, .. } = self {
433            Some(listen_addr)
434        } else {
435            None
436        }
437    }
438
439    /// Returns `true` if this is an [`TransportEvent::AddressExpired`].
440    pub fn is_address_expired(&self) -> bool {
441        matches!(self, TransportEvent::AddressExpired { .. })
442    }
443
444    /// Try to turn this transport event into the expire `Multiaddr`.
445    ///
446    /// Returns `None` if the event is not actually a [`TransportEvent::AddressExpired`],
447    /// otherwise the address.
448    pub fn into_address_expired(self) -> Option<Multiaddr> {
449        if let TransportEvent::AddressExpired { listen_addr, .. } = self {
450            Some(listen_addr)
451        } else {
452            None
453        }
454    }
455
456    /// Returns `true` if this is an [`TransportEvent::ListenerError`] transport event.
457    pub fn is_listener_error(&self) -> bool {
458        matches!(self, TransportEvent::ListenerError { .. })
459    }
460
461    /// Try to turn this transport event into the listener error.
462    ///
463    /// Returns `None` if the event is not actually a [`TransportEvent::ListenerError`]`,
464    /// otherwise the error.
465    pub fn into_listener_error(self) -> Option<TErr> {
466        if let TransportEvent::ListenerError { error, .. } = self {
467            Some(error)
468        } else {
469            None
470        }
471    }
472}
473
474impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
475    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
476        match self {
477            TransportEvent::NewAddress {
478                listener_id,
479                listen_addr,
480            } => f
481                .debug_struct("TransportEvent::NewAddress")
482                .field("listener_id", listener_id)
483                .field("listen_addr", listen_addr)
484                .finish(),
485            TransportEvent::AddressExpired {
486                listener_id,
487                listen_addr,
488            } => f
489                .debug_struct("TransportEvent::AddressExpired")
490                .field("listener_id", listener_id)
491                .field("listen_addr", listen_addr)
492                .finish(),
493            TransportEvent::Incoming {
494                listener_id,
495                local_addr,
496                ..
497            } => f
498                .debug_struct("TransportEvent::Incoming")
499                .field("listener_id", listener_id)
500                .field("local_addr", local_addr)
501                .finish(),
502            TransportEvent::ListenerClosed {
503                listener_id,
504                reason,
505            } => f
506                .debug_struct("TransportEvent::Closed")
507                .field("listener_id", listener_id)
508                .field("reason", reason)
509                .finish(),
510            TransportEvent::ListenerError { listener_id, error } => f
511                .debug_struct("TransportEvent::ListenerError")
512                .field("listener_id", listener_id)
513                .field("error", error)
514                .finish(),
515        }
516    }
517}
518
519/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
520/// on a [`Transport`].
521#[derive(Debug, Clone)]
522pub enum TransportError<TErr> {
523    /// The [`Multiaddr`] passed as parameter is not supported.
524    ///
525    /// Contains back the same address.
526    MultiaddrNotSupported(Multiaddr),
527
528    /// Any other error that a [`Transport`] may produce.
529    Other(TErr),
530}
531
532impl<TErr> TransportError<TErr> {
533    /// Applies a function to the error in [`TransportError::Other`].
534    pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
535        match self {
536            TransportError::MultiaddrNotSupported(addr) => {
537                TransportError::MultiaddrNotSupported(addr)
538            }
539            TransportError::Other(err) => TransportError::Other(map(err)),
540        }
541    }
542}
543
544impl<TErr> fmt::Display for TransportError<TErr>
545where
546    TErr: fmt::Display,
547{
548    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549        match self {
550            TransportError::MultiaddrNotSupported(addr) => {
551                write!(f, "Multiaddr is not supported: {addr}")
552            }
553            TransportError::Other(_) => Ok(()),
554        }
555    }
556}
557
558impl<TErr> Error for TransportError<TErr>
559where
560    TErr: Error + 'static,
561{
562    fn source(&self) -> Option<&(dyn Error + 'static)> {
563        match self {
564            TransportError::MultiaddrNotSupported(_) => None,
565            TransportError::Other(err) => Some(err),
566        }
567    }
568}