libp2p_core/transport.rs
1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use std::{
29 error::Error,
30 fmt,
31 pin::Pin,
32 sync::atomic::{AtomicUsize, Ordering},
33 task::{Context, Poll},
34};
35
36use futures::prelude::*;
37use multiaddr::Multiaddr;
38
39pub mod and_then;
40pub mod choice;
41pub mod dummy;
42pub mod global_only;
43pub mod map;
44pub mod map_err;
45pub mod memory;
46pub mod timeout;
47pub mod upgrade;
48
49mod boxed;
50mod optional;
51
52pub use self::{
53 boxed::Boxed, choice::OrTransport, memory::MemoryTransport, optional::OptionalTransport,
54 upgrade::Upgrade,
55};
56use crate::{ConnectedPoint, Endpoint};
57
58static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);
59
60/// The port use policy for a new connection.
61#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
62pub enum PortUse {
63 /// Always allocate a new port for the dial.
64 New,
65 /// Best effor reusing of an existing port.
66 ///
67 /// If there is no listener present that can be used to dial, a new port is allocated.
68 #[default]
69 Reuse,
70}
71
72/// Options to customize the behaviour during dialing.
73#[derive(Debug, Copy, Clone)]
74pub struct DialOpts {
75 /// The endpoint establishing a new connection.
76 ///
77 /// When attempting a hole-punch, both parties simultaneously "dial" each other but one party
78 /// has to be the "listener" on the final connection. This option specifies the role of
79 /// this node in the final connection.
80 pub role: Endpoint,
81 /// The port use policy for a new connection.
82 pub port_use: PortUse,
83}
84
85/// A transport provides connection-oriented communication between two peers
86/// through ordered streams of data (i.e. connections).
87///
88/// Connections are established either by [listening](Transport::listen_on)
89/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
90/// obtains a connection by listening is often referred to as the *listener* and the
91/// peer that initiated the connection through dialing as the *dialer*, in
92/// contrast to the traditional roles of *server* and *client*.
93///
94/// Most transports also provide a form of reliable delivery on the established
95/// connections but the precise semantics of these guarantees depend on the
96/// specific transport.
97///
98/// This trait is implemented for concrete connection-oriented transport protocols
99/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
100/// functionality to the dialing or listening process (e.g. name resolution via
101/// the DNS).
102///
103/// Additional protocols can be layered on top of the connections established
104/// by a [`Transport`] through an upgrade mechanism that is initiated via
105/// [`upgrade`](Transport::upgrade).
106///
107/// Note for implementers: Futures returned by [`Transport::dial`] should only
108/// do work once polled for the first time. E.g. in the case of TCP, connecting
109/// to the remote should not happen immediately on [`Transport::dial`] but only
110/// once the returned [`Future`] is polled. The caller of [`Transport::dial`]
111/// may call the method multiple times with a set of addresses, racing a subset
112/// of the returned dials to success concurrently.
113pub trait Transport {
114 /// The result of a connection setup process, including protocol upgrades.
115 ///
116 /// Typically the output contains at least a handle to a data stream (i.e. a
117 /// connection or a substream multiplexer on top of a connection) that
118 /// provides APIs for sending and receiving data through the connection.
119 type Output;
120
121 /// An error that occurred during connection setup.
122 type Error: Error;
123
124 /// A pending [`Output`](Transport::Output) for an inbound connection,
125 /// obtained from the [`Transport`] stream.
126 ///
127 /// After a connection has been accepted by the transport, it may need to go through
128 /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
129 /// post-processing should not block the `Listener` from producing the next
130 /// connection, hence further connection setup proceeds asynchronously.
131 /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
132 /// of the connection setup process.
133 type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
134
135 /// A pending [`Output`](Transport::Output) for an outbound connection,
136 /// obtained from [dialing](Transport::dial).
137 type Dial: Future<Output = Result<Self::Output, Self::Error>>;
138
139 /// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
140 fn listen_on(
141 &mut self,
142 id: ListenerId,
143 addr: Multiaddr,
144 ) -> Result<(), TransportError<Self::Error>>;
145
146 /// Remove a listener.
147 ///
148 /// Return `true` if there was a listener with this Id, `false`
149 /// otherwise.
150 fn remove_listener(&mut self, id: ListenerId) -> bool;
151
152 /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
153 ///
154 /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
155 /// try an alternative [`Transport`], if available.
156 fn dial(
157 &mut self,
158 addr: Multiaddr,
159 opts: DialOpts,
160 ) -> Result<Self::Dial, TransportError<Self::Error>>;
161
162 /// Poll for [`TransportEvent`]s.
163 ///
164 /// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the
165 /// lowest level of the transport stack. The item must be a
166 /// [`ListenerUpgrade`](Transport::ListenerUpgrade) future that resolves to an
167 /// [`Output`](Transport::Output) value once all protocol upgrades have been applied.
168 ///
169 /// Transports are expected to produce [`TransportEvent::Incoming`] events only for
170 /// listen addresses which have previously been announced via
171 /// a [`TransportEvent::NewAddress`] event and which have not been invalidated by
172 /// an [`TransportEvent::AddressExpired`] event yet.
173 fn poll(
174 self: Pin<&mut Self>,
175 cx: &mut Context<'_>,
176 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
177
178 /// Boxes the transport, including custom transport errors.
179 fn boxed(self) -> boxed::Boxed<Self::Output>
180 where
181 Self: Sized + Send + Unpin + 'static,
182 Self::Dial: Send + 'static,
183 Self::ListenerUpgrade: Send + 'static,
184 Self::Error: Send + Sync,
185 {
186 boxed::boxed(self)
187 }
188
189 /// Applies a function on the connections created by the transport.
190 fn map<F, O>(self, f: F) -> map::Map<Self, F>
191 where
192 Self: Sized,
193 F: FnOnce(Self::Output, ConnectedPoint) -> O,
194 {
195 map::Map::new(self, f)
196 }
197
198 /// Applies a function on the errors generated by the futures of the transport.
199 fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
200 where
201 Self: Sized,
202 F: FnOnce(Self::Error) -> E,
203 {
204 map_err::MapErr::new(self, f)
205 }
206
207 /// Adds a fallback transport that is used when encountering errors
208 /// while establishing inbound or outbound connections.
209 ///
210 /// The returned transport will act like `self`, except that if `listen_on` or `dial`
211 /// return an error then `other` will be tried.
212 fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
213 where
214 Self: Sized,
215 U: Transport,
216 <U as Transport>::Error: 'static,
217 {
218 OrTransport::new(self, other)
219 }
220
221 /// Applies a function producing an asynchronous result to every connection
222 /// created by this transport.
223 ///
224 /// This function can be used for ad-hoc protocol upgrades or
225 /// for processing or adapting the output for following configurations.
226 ///
227 /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
228 fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
229 where
230 Self: Sized,
231 C: FnOnce(Self::Output, ConnectedPoint) -> F,
232 F: TryFuture<Ok = O>,
233 <F as TryFuture>::Error: Error + 'static,
234 {
235 and_then::AndThen::new(self, f)
236 }
237
238 /// Begins a series of protocol upgrades via an [`upgrade::Builder`].
239 fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
240 where
241 Self: Sized,
242 Self::Error: 'static,
243 {
244 upgrade::Builder::new(self, version)
245 }
246}
247
248/// The ID of a single listener.
249#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
250pub struct ListenerId(usize);
251
252impl ListenerId {
253 /// Creates a new `ListenerId`.
254 pub fn next() -> Self {
255 ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
256 }
257}
258
259impl fmt::Display for ListenerId {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 write!(f, "{}", self.0)
262 }
263}
264
265/// Event produced by [`Transport`]s.
266pub enum TransportEvent<TUpgr, TErr> {
267 /// A new address is being listened on.
268 NewAddress {
269 /// The listener that is listening on the new address.
270 listener_id: ListenerId,
271 /// The new address that is being listened on.
272 listen_addr: Multiaddr,
273 },
274 /// An address is no longer being listened on.
275 AddressExpired {
276 /// The listener that is no longer listening on the address.
277 listener_id: ListenerId,
278 /// The new address that is being listened on.
279 listen_addr: Multiaddr,
280 },
281 /// A connection is incoming on one of the listeners.
282 Incoming {
283 /// The listener that produced the upgrade.
284 listener_id: ListenerId,
285 /// The produced upgrade.
286 upgrade: TUpgr,
287 /// Local connection address.
288 local_addr: Multiaddr,
289 /// Address used to send back data to the incoming client.
290 send_back_addr: Multiaddr,
291 },
292 /// A listener closed.
293 ListenerClosed {
294 /// The ID of the listener that closed.
295 listener_id: ListenerId,
296 /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
297 /// if the stream produced an error.
298 reason: Result<(), TErr>,
299 },
300 /// A listener errored.
301 ///
302 /// The listener will continue to be polled for new events and the event
303 /// is for informational purposes only.
304 ListenerError {
305 /// The ID of the listener that errored.
306 listener_id: ListenerId,
307 /// The error value.
308 error: TErr,
309 },
310}
311
312impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
313 /// In case this [`TransportEvent`] is an upgrade, apply the given function
314 /// to the upgrade and produce another transport event based the function's result.
315 pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
316 match self {
317 TransportEvent::Incoming {
318 listener_id,
319 upgrade,
320 local_addr,
321 send_back_addr,
322 } => TransportEvent::Incoming {
323 listener_id,
324 upgrade: map(upgrade),
325 local_addr,
326 send_back_addr,
327 },
328 TransportEvent::NewAddress {
329 listen_addr,
330 listener_id,
331 } => TransportEvent::NewAddress {
332 listen_addr,
333 listener_id,
334 },
335 TransportEvent::AddressExpired {
336 listen_addr,
337 listener_id,
338 } => TransportEvent::AddressExpired {
339 listen_addr,
340 listener_id,
341 },
342 TransportEvent::ListenerError { listener_id, error } => {
343 TransportEvent::ListenerError { listener_id, error }
344 }
345 TransportEvent::ListenerClosed {
346 listener_id,
347 reason,
348 } => TransportEvent::ListenerClosed {
349 listener_id,
350 reason,
351 },
352 }
353 }
354
355 /// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
356 /// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
357 /// error and produce another transport event based on the function's result.
358 pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
359 match self {
360 TransportEvent::Incoming {
361 listener_id,
362 upgrade,
363 local_addr,
364 send_back_addr,
365 } => TransportEvent::Incoming {
366 listener_id,
367 upgrade,
368 local_addr,
369 send_back_addr,
370 },
371 TransportEvent::NewAddress {
372 listen_addr,
373 listener_id,
374 } => TransportEvent::NewAddress {
375 listen_addr,
376 listener_id,
377 },
378 TransportEvent::AddressExpired {
379 listen_addr,
380 listener_id,
381 } => TransportEvent::AddressExpired {
382 listen_addr,
383 listener_id,
384 },
385 TransportEvent::ListenerError { listener_id, error } => TransportEvent::ListenerError {
386 listener_id,
387 error: map_err(error),
388 },
389 TransportEvent::ListenerClosed {
390 listener_id,
391 reason,
392 } => TransportEvent::ListenerClosed {
393 listener_id,
394 reason: reason.map_err(map_err),
395 },
396 }
397 }
398
399 /// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming) transport event.
400 pub fn is_upgrade(&self) -> bool {
401 matches!(self, TransportEvent::Incoming { .. })
402 }
403
404 /// Try to turn this transport event into the upgrade parts of the
405 /// incoming connection.
406 ///
407 /// Returns `None` if the event is not actually an incoming connection,
408 /// otherwise the upgrade and the remote address.
409 pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
410 let TransportEvent::Incoming {
411 upgrade,
412 send_back_addr,
413 ..
414 } = self
415 else {
416 return None;
417 };
418
419 Some((upgrade, send_back_addr))
420 }
421
422 /// Returns `true` if this is a [`TransportEvent::NewAddress`].
423 pub fn is_new_address(&self) -> bool {
424 matches!(self, TransportEvent::NewAddress { .. })
425 }
426
427 /// Try to turn this transport event into the new `Multiaddr`.
428 ///
429 /// Returns `None` if the event is not actually a [`TransportEvent::NewAddress`],
430 /// otherwise the address.
431 pub fn into_new_address(self) -> Option<Multiaddr> {
432 if let TransportEvent::NewAddress { listen_addr, .. } = self {
433 Some(listen_addr)
434 } else {
435 None
436 }
437 }
438
439 /// Returns `true` if this is an [`TransportEvent::AddressExpired`].
440 pub fn is_address_expired(&self) -> bool {
441 matches!(self, TransportEvent::AddressExpired { .. })
442 }
443
444 /// Try to turn this transport event into the expire `Multiaddr`.
445 ///
446 /// Returns `None` if the event is not actually a [`TransportEvent::AddressExpired`],
447 /// otherwise the address.
448 pub fn into_address_expired(self) -> Option<Multiaddr> {
449 if let TransportEvent::AddressExpired { listen_addr, .. } = self {
450 Some(listen_addr)
451 } else {
452 None
453 }
454 }
455
456 /// Returns `true` if this is an [`TransportEvent::ListenerError`] transport event.
457 pub fn is_listener_error(&self) -> bool {
458 matches!(self, TransportEvent::ListenerError { .. })
459 }
460
461 /// Try to turn this transport event into the listener error.
462 ///
463 /// Returns `None` if the event is not actually a [`TransportEvent::ListenerError`]`,
464 /// otherwise the error.
465 pub fn into_listener_error(self) -> Option<TErr> {
466 if let TransportEvent::ListenerError { error, .. } = self {
467 Some(error)
468 } else {
469 None
470 }
471 }
472}
473
474impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
475 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
476 match self {
477 TransportEvent::NewAddress {
478 listener_id,
479 listen_addr,
480 } => f
481 .debug_struct("TransportEvent::NewAddress")
482 .field("listener_id", listener_id)
483 .field("listen_addr", listen_addr)
484 .finish(),
485 TransportEvent::AddressExpired {
486 listener_id,
487 listen_addr,
488 } => f
489 .debug_struct("TransportEvent::AddressExpired")
490 .field("listener_id", listener_id)
491 .field("listen_addr", listen_addr)
492 .finish(),
493 TransportEvent::Incoming {
494 listener_id,
495 local_addr,
496 ..
497 } => f
498 .debug_struct("TransportEvent::Incoming")
499 .field("listener_id", listener_id)
500 .field("local_addr", local_addr)
501 .finish(),
502 TransportEvent::ListenerClosed {
503 listener_id,
504 reason,
505 } => f
506 .debug_struct("TransportEvent::Closed")
507 .field("listener_id", listener_id)
508 .field("reason", reason)
509 .finish(),
510 TransportEvent::ListenerError { listener_id, error } => f
511 .debug_struct("TransportEvent::ListenerError")
512 .field("listener_id", listener_id)
513 .field("error", error)
514 .finish(),
515 }
516 }
517}
518
519/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
520/// on a [`Transport`].
521#[derive(Debug, Clone)]
522pub enum TransportError<TErr> {
523 /// The [`Multiaddr`] passed as parameter is not supported.
524 ///
525 /// Contains back the same address.
526 MultiaddrNotSupported(Multiaddr),
527
528 /// Any other error that a [`Transport`] may produce.
529 Other(TErr),
530}
531
532impl<TErr> TransportError<TErr> {
533 /// Applies a function to the error in [`TransportError::Other`].
534 pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
535 match self {
536 TransportError::MultiaddrNotSupported(addr) => {
537 TransportError::MultiaddrNotSupported(addr)
538 }
539 TransportError::Other(err) => TransportError::Other(map(err)),
540 }
541 }
542}
543
544impl<TErr> fmt::Display for TransportError<TErr>
545where
546 TErr: fmt::Display,
547{
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 match self {
550 TransportError::MultiaddrNotSupported(addr) => {
551 write!(f, "Multiaddr is not supported: {addr}")
552 }
553 TransportError::Other(_) => Ok(()),
554 }
555 }
556}
557
558impl<TErr> Error for TransportError<TErr>
559where
560 TErr: Error + 'static,
561{
562 fn source(&self) -> Option<&(dyn Error + 'static)> {
563 match self {
564 TransportError::MultiaddrNotSupported(_) => None,
565 TransportError::Other(err) => Some(err),
566 }
567 }
568}