libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that will be used in order to
35//!     reach nodes on the network based on their address. See the `transport` module for more
36//!     information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state machine that defines
38//!     how the swarm should behave once it is connected to a node.
39//!
40//! # Network Behaviour
41//!
42//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
43//! the swarm how it should behave. This includes which protocols are supported
44//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
45//! controls what happens on the network. Multiple types that implement
46//! `NetworkBehaviour` can be composed into a single behaviour.
47//!
48//! # Protocols Handler
49//!
50//! The [`ConnectionHandler`] trait defines how each active connection to a
51//! remote should behave: how to handle incoming substreams, which protocols
52//! are supported, when to open a new outbound substream, etc.
53
54#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
72#[doc(hidden)]
73pub mod derive_prelude {
74    pub use either::Either;
75    pub use futures::prelude as futures;
76    pub use libp2p_core::{
77        transport::{ListenerId, PortUse},
78        ConnectedPoint, Endpoint, Multiaddr,
79    };
80    pub use libp2p_identity::PeerId;
81
82    pub use crate::{
83        behaviour::{
84            AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85            ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86            ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87            NewListener,
88        },
89        connection::ConnectionId,
90        ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92    };
93}
94
95use std::{
96    collections::{HashMap, HashSet, VecDeque},
97    error, fmt, io,
98    num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99    pin::Pin,
100    task::{Context, Poll},
101    time::Duration,
102};
103
104pub use behaviour::{
105    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112    pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113    IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
114};
115use dial_opts::{DialOpts, PeerCondition};
116pub use executor::Executor;
117use futures::{prelude::*, stream::FusedStream};
118pub use handler::{
119    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122use libp2p_core::{
123    connection::ConnectedPoint,
124    muxing::StreamMuxerBox,
125    transport::{self, ListenerId, TransportError, TransportEvent},
126    Multiaddr, Transport,
127};
128use libp2p_identity::PeerId;
129#[cfg(feature = "macros")]
130pub use libp2p_swarm_derive::NetworkBehaviour;
131pub use listen_opts::ListenOpts;
132use smallvec::SmallVec;
133pub use stream::Stream;
134pub use stream_protocol::{InvalidProtocol, StreamProtocol};
135use tracing::Instrument;
136#[doc(hidden)]
137pub use translation::_address_translation;
138
139use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
140
141/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
142type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
143
144/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
145/// supports.
146pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
147
148/// Custom event that can be received by the [`ConnectionHandler`] of the
149/// [`NetworkBehaviour`].
150pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
151
152/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
153pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
154
155/// Event generated by the `Swarm`.
156#[derive(Debug)]
157#[non_exhaustive]
158pub enum SwarmEvent<TBehaviourOutEvent> {
159    /// Event generated by the `NetworkBehaviour`.
160    Behaviour(TBehaviourOutEvent),
161    /// A connection to the given peer has been opened.
162    ConnectionEstablished {
163        /// Identity of the peer that we have connected to.
164        peer_id: PeerId,
165        /// Identifier of the connection.
166        connection_id: ConnectionId,
167        /// Endpoint of the connection that has been opened.
168        endpoint: ConnectedPoint,
169        /// Number of established connections to this peer, including the one that has just been
170        /// opened.
171        num_established: NonZeroU32,
172        /// [`Some`] when the new connection is an outgoing connection.
173        /// Addresses are dialed concurrently. Contains the addresses and errors
174        /// of dial attempts that failed before the one successful dial.
175        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
176        /// How long it took to establish this connection
177        established_in: std::time::Duration,
178    },
179    /// A connection with the given peer has been closed,
180    /// possibly as a result of an error.
181    ConnectionClosed {
182        /// Identity of the peer that we have connected to.
183        peer_id: PeerId,
184        /// Identifier of the connection.
185        connection_id: ConnectionId,
186        /// Endpoint of the connection that has been closed.
187        endpoint: ConnectedPoint,
188        /// Number of other remaining connections to this same peer.
189        num_established: u32,
190        /// Reason for the disconnection, if it was not a successful
191        /// active close.
192        cause: Option<ConnectionError>,
193    },
194    /// A new connection arrived on a listener and is in the process of protocol negotiation.
195    ///
196    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
197    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
198    /// generated for this connection.
199    IncomingConnection {
200        /// Identifier of the connection.
201        connection_id: ConnectionId,
202        /// Local connection address.
203        /// This address has been earlier reported with a
204        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
205        local_addr: Multiaddr,
206        /// Address used to send back data to the remote.
207        send_back_addr: Multiaddr,
208    },
209    /// An error happened on an inbound connection during its initial handshake.
210    ///
211    /// This can include, for example, an error during the handshake of the encryption layer, or
212    /// the connection unexpectedly closed.
213    IncomingConnectionError {
214        /// Identifier of the connection.
215        connection_id: ConnectionId,
216        /// Local connection address.
217        /// This address has been earlier reported with a
218        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
219        local_addr: Multiaddr,
220        /// Address used to send back data to the remote.
221        send_back_addr: Multiaddr,
222        /// The error that happened.
223        error: ListenError,
224    },
225    /// An error happened on an outbound connection.
226    OutgoingConnectionError {
227        /// Identifier of the connection.
228        connection_id: ConnectionId,
229        /// If known, [`PeerId`] of the peer we tried to reach.
230        peer_id: Option<PeerId>,
231        /// Error that has been encountered.
232        error: DialError,
233    },
234    /// One of our listeners has reported a new local listening address.
235    NewListenAddr {
236        /// The listener that is listening on the new address.
237        listener_id: ListenerId,
238        /// The new address that is being listened on.
239        address: Multiaddr,
240    },
241    /// One of our listeners has reported the expiration of a listening address.
242    ExpiredListenAddr {
243        /// The listener that is no longer listening on the address.
244        listener_id: ListenerId,
245        /// The expired address.
246        address: Multiaddr,
247    },
248    /// One of the listeners gracefully closed.
249    ListenerClosed {
250        /// The listener that closed.
251        listener_id: ListenerId,
252        /// The addresses that the listener was listening on. These addresses are now considered
253        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
254        /// has been generated for each of them.
255        addresses: Vec<Multiaddr>,
256        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
257        /// if the stream produced an error.
258        reason: Result<(), io::Error>,
259    },
260    /// One of the listeners reported a non-fatal error.
261    ListenerError {
262        /// The listener that errored.
263        listener_id: ListenerId,
264        /// The listener error.
265        error: io::Error,
266    },
267    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
268    /// implementation.
269    ///
270    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
271    /// reported if the dialing attempt succeeds, otherwise a
272    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
273    /// is reported.
274    Dialing {
275        /// Identity of the peer that we are connecting to.
276        peer_id: Option<PeerId>,
277
278        /// Identifier of the connection.
279        connection_id: ConnectionId,
280    },
281    /// We have discovered a new candidate for an external address for us.
282    NewExternalAddrCandidate { address: Multiaddr },
283    /// An external address of the local node was confirmed.
284    ExternalAddrConfirmed { address: Multiaddr },
285    /// An external address of the local node expired, i.e. is no-longer confirmed.
286    ExternalAddrExpired { address: Multiaddr },
287    /// We have discovered a new address of a peer.
288    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
289}
290
291impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
292    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour`
293    /// variant, otherwise fail.
294    #[allow(clippy::result_large_err)]
295    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
296        match self {
297            SwarmEvent::Behaviour(inner) => Ok(inner),
298            other => Err(other),
299        }
300    }
301}
302
303/// Contains the state of the network, plus the way it should behave.
304///
305/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
306/// progress.
307pub struct Swarm<TBehaviour>
308where
309    TBehaviour: NetworkBehaviour,
310{
311    /// [`Transport`] for dialing remote peers and listening for incoming connection.
312    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
313
314    /// The nodes currently active.
315    pool: Pool<THandler<TBehaviour>>,
316
317    /// The local peer ID.
318    local_peer_id: PeerId,
319
320    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
321    /// handlers.
322    behaviour: TBehaviour,
323
324    /// List of protocols that the behaviour says it supports.
325    supported_protocols: SmallVec<[Vec<u8>; 16]>,
326
327    confirmed_external_addr: HashSet<Multiaddr>,
328
329    /// Multiaddresses that our listeners are listening on,
330    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
331
332    /// Pending event to be delivered to connection handlers
333    /// (or dropped if the peer disconnected) before the `behaviour`
334    /// can be polled again.
335    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
336
337    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
338}
339
340impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
341
342impl<TBehaviour> Swarm<TBehaviour>
343where
344    TBehaviour: NetworkBehaviour,
345{
346    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
347    /// [`Config`].
348    pub fn new(
349        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
350        behaviour: TBehaviour,
351        local_peer_id: PeerId,
352        config: Config,
353    ) -> Self {
354        tracing::info!(%local_peer_id);
355
356        Swarm {
357            local_peer_id,
358            transport,
359            pool: Pool::new(local_peer_id, config.pool_config),
360            behaviour,
361            supported_protocols: Default::default(),
362            confirmed_external_addr: Default::default(),
363            listened_addrs: HashMap::new(),
364            pending_handler_event: None,
365            pending_swarm_events: VecDeque::default(),
366        }
367    }
368
369    /// Returns information about the connections underlying the [`Swarm`].
370    pub fn network_info(&self) -> NetworkInfo {
371        let num_peers = self.pool.num_peers();
372        let connection_counters = self.pool.counters().clone();
373        NetworkInfo {
374            num_peers,
375            connection_counters,
376        }
377    }
378
379    /// Starts listening on the given address.
380    /// Returns an error if the address is not supported.
381    ///
382    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
383    /// Depending on the underlying transport, one listener may have multiple listening addresses.
384    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
385        let opts = ListenOpts::new(addr);
386        let id = opts.listener_id();
387        self.add_listener(opts)?;
388        Ok(id)
389    }
390
391    /// Remove some listener.
392    ///
393    /// Returns `true` if there was a listener with this ID, `false`
394    /// otherwise.
395    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
396        self.transport.remove_listener(listener_id)
397    }
398
399    /// Dial a known or unknown peer.
400    ///
401    /// See also [`DialOpts`].
402    ///
403    /// ```
404    /// # use libp2p_swarm::Swarm;
405    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
406    /// # use libp2p_core::{Multiaddr, Transport};
407    /// # use libp2p_core::transport::dummy::DummyTransport;
408    /// # use libp2p_swarm::dummy;
409    /// # use libp2p_identity::PeerId;
410    /// #
411    /// # #[tokio::main]
412    /// # async fn main() {
413    /// let mut swarm = build_swarm();
414    ///
415    /// // Dial a known peer.
416    /// swarm.dial(PeerId::random());
417    ///
418    /// // Dial an unknown peer.
419    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
420    /// # }
421    ///
422    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
423    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
424    /// # }
425    /// ```
426    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
427        let dial_opts = opts.into();
428
429        let peer_id = dial_opts.get_peer_id();
430        let condition = dial_opts.peer_condition();
431        let connection_id = dial_opts.connection_id();
432
433        let should_dial = match (condition, peer_id) {
434            (_, None) => true,
435            (PeerCondition::Always, _) => true,
436            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
437            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
438            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
439                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
440            }
441        };
442
443        if !should_dial {
444            let e = DialError::DialPeerConditionFalse(condition);
445
446            self.behaviour
447                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
448                    peer_id,
449                    error: &e,
450                    connection_id,
451                }));
452
453            return Err(e);
454        }
455
456        let addresses = {
457            let mut addresses_from_opts = dial_opts.get_addresses();
458
459            match self.behaviour.handle_pending_outbound_connection(
460                connection_id,
461                peer_id,
462                addresses_from_opts.as_slice(),
463                dial_opts.role_override(),
464            ) {
465                Ok(addresses) => {
466                    if dial_opts.extend_addresses_through_behaviour() {
467                        addresses_from_opts.extend(addresses)
468                    } else {
469                        let num_addresses = addresses.len();
470
471                        if num_addresses > 0 {
472                            tracing::debug!(
473                                connection=%connection_id,
474                                discarded_addresses_count=%num_addresses,
475                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
476                            )
477                        }
478                    }
479                }
480                Err(cause) => {
481                    let error = DialError::Denied { cause };
482
483                    self.behaviour
484                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
485                            peer_id,
486                            error: &error,
487                            connection_id,
488                        }));
489
490                    return Err(error);
491                }
492            }
493
494            let mut unique_addresses = HashSet::new();
495            addresses_from_opts.retain(|addr| {
496                !self.listened_addrs.values().flatten().any(|a| a == addr)
497                    && unique_addresses.insert(addr.clone())
498            });
499
500            if addresses_from_opts.is_empty() {
501                let error = DialError::NoAddresses;
502                self.behaviour
503                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
504                        peer_id,
505                        error: &error,
506                        connection_id,
507                    }));
508                return Err(error);
509            };
510
511            addresses_from_opts
512        };
513
514        let dials = addresses
515            .into_iter()
516            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
517                Ok(address) => {
518                    let dial = self.transport.dial(
519                        address.clone(),
520                        transport::DialOpts {
521                            role: dial_opts.role_override(),
522                            port_use: dial_opts.port_use(),
523                        },
524                    );
525                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
526                    span.follows_from(tracing::Span::current());
527                    match dial {
528                        Ok(fut) => fut
529                            .map(|r| (address, r.map_err(TransportError::Other)))
530                            .instrument(span)
531                            .boxed(),
532                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
533                    }
534                }
535                Err(address) => futures::future::ready((
536                    address.clone(),
537                    Err(TransportError::MultiaddrNotSupported(address)),
538                ))
539                .boxed(),
540            })
541            .collect();
542
543        self.pool.add_outgoing(
544            dials,
545            peer_id,
546            dial_opts.role_override(),
547            dial_opts.port_use(),
548            dial_opts.dial_concurrency_override(),
549            connection_id,
550        );
551
552        Ok(())
553    }
554
555    /// Returns an iterator that produces the list of addresses we're listening on.
556    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
557        self.listened_addrs.values().flatten()
558    }
559
560    /// Returns the peer ID of the swarm passed as parameter.
561    pub fn local_peer_id(&self) -> &PeerId {
562        &self.local_peer_id
563    }
564
565    /// List all **confirmed** external address for the local node.
566    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
567        self.confirmed_external_addr.iter()
568    }
569
570    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
571        let addr = opts.address();
572        let listener_id = opts.listener_id();
573
574        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
575            self.behaviour
576                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
577                    listener_id,
578                    err: &e,
579                }));
580
581            return Err(e);
582        }
583
584        self.behaviour
585            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
586                listener_id,
587            }));
588
589        Ok(())
590    }
591
592    /// Add a **confirmed** external address for the local node.
593    ///
594    /// This function should only be called with addresses that are guaranteed to be reachable.
595    /// The address is broadcast to all [`NetworkBehaviour`]s via
596    /// [`FromSwarm::ExternalAddrConfirmed`].
597    pub fn add_external_address(&mut self, a: Multiaddr) {
598        self.behaviour
599            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
600                addr: &a,
601            }));
602        self.confirmed_external_addr.insert(a);
603    }
604
605    /// Remove an external address for the local node.
606    ///
607    /// The address is broadcast to all [`NetworkBehaviour`]s via
608    /// [`FromSwarm::ExternalAddrExpired`].
609    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
610        self.behaviour
611            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
612        self.confirmed_external_addr.remove(addr);
613    }
614
615    /// Add a new external address of a remote peer.
616    ///
617    /// The address is broadcast to all [`NetworkBehaviour`]s via
618    /// [`FromSwarm::NewExternalAddrOfPeer`].
619    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
620        self.behaviour
621            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
622                peer_id,
623                addr: &addr,
624            }))
625    }
626
627    /// Disconnects a peer by its peer ID, closing all connections to said peer.
628    ///
629    /// Returns `Ok(())` if there was one or more established connections to the peer.
630    ///
631    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll
632    /// [`ConnectionHandler::poll_close`] to completion. Use this function if you want to close
633    /// a connection _despite_ it still being in use by one or more handlers.
634    #[allow(clippy::result_unit_err)]
635    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
636        let was_connected = self.pool.is_connected(peer_id);
637        self.pool.disconnect(peer_id);
638
639        if was_connected {
640            Ok(())
641        } else {
642            Err(())
643        }
644    }
645
646    /// Attempt to gracefully close a connection.
647    ///
648    /// Closing a connection is asynchronous but this function will return immediately.
649    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted
650    /// once the connection is actually closed.
651    ///
652    /// # Returns
653    ///
654    /// - `true` if the connection was established and is now being closed.
655    /// - `false` if the connection was not found or is no longer established.
656    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
657        if let Some(established) = self.pool.get_established(connection_id) {
658            established.start_close();
659            return true;
660        }
661
662        false
663    }
664
665    /// Checks whether there is an established connection to a peer.
666    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
667        self.pool.is_connected(*peer_id)
668    }
669
670    /// Returns the currently connected peers.
671    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
672        self.pool.iter_connected()
673    }
674
675    /// Returns a reference to the provided [`NetworkBehaviour`].
676    pub fn behaviour(&self) -> &TBehaviour {
677        &self.behaviour
678    }
679
680    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
681    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
682        &mut self.behaviour
683    }
684
685    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
686        match event {
687            PoolEvent::ConnectionEstablished {
688                peer_id,
689                id,
690                endpoint,
691                connection,
692                concurrent_dial_errors,
693                established_in,
694            } => {
695                let handler = match endpoint.clone() {
696                    ConnectedPoint::Dialer {
697                        address,
698                        role_override,
699                        port_use,
700                    } => {
701                        match self.behaviour.handle_established_outbound_connection(
702                            id,
703                            peer_id,
704                            &address,
705                            role_override,
706                            port_use,
707                        ) {
708                            Ok(handler) => handler,
709                            Err(cause) => {
710                                let dial_error = DialError::Denied { cause };
711                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
712                                    DialFailure {
713                                        connection_id: id,
714                                        error: &dial_error,
715                                        peer_id: Some(peer_id),
716                                    },
717                                ));
718
719                                self.pending_swarm_events.push_back(
720                                    SwarmEvent::OutgoingConnectionError {
721                                        peer_id: Some(peer_id),
722                                        connection_id: id,
723                                        error: dial_error,
724                                    },
725                                );
726                                return;
727                            }
728                        }
729                    }
730                    ConnectedPoint::Listener {
731                        local_addr,
732                        send_back_addr,
733                    } => {
734                        match self.behaviour.handle_established_inbound_connection(
735                            id,
736                            peer_id,
737                            &local_addr,
738                            &send_back_addr,
739                        ) {
740                            Ok(handler) => handler,
741                            Err(cause) => {
742                                let listen_error = ListenError::Denied { cause };
743                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
744                                    ListenFailure {
745                                        local_addr: &local_addr,
746                                        send_back_addr: &send_back_addr,
747                                        error: &listen_error,
748                                        connection_id: id,
749                                        peer_id: Some(peer_id),
750                                    },
751                                ));
752
753                                self.pending_swarm_events.push_back(
754                                    SwarmEvent::IncomingConnectionError {
755                                        connection_id: id,
756                                        send_back_addr,
757                                        local_addr,
758                                        error: listen_error,
759                                    },
760                                );
761                                return;
762                            }
763                        }
764                    }
765                };
766
767                let supported_protocols = handler
768                    .listen_protocol()
769                    .upgrade()
770                    .protocol_info()
771                    .map(|p| p.as_ref().as_bytes().to_vec())
772                    .collect();
773                let other_established_connection_ids = self
774                    .pool
775                    .iter_established_connections_of_peer(&peer_id)
776                    .collect::<Vec<_>>();
777                let num_established = NonZeroU32::new(
778                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
779                )
780                .expect("n + 1 is always non-zero; qed");
781
782                self.pool
783                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
784
785                tracing::debug!(
786                    peer=%peer_id,
787                    ?endpoint,
788                    total_peers=%num_established,
789                    "Connection established"
790                );
791                let failed_addresses = concurrent_dial_errors
792                    .as_ref()
793                    .map(|es| {
794                        es.iter()
795                            .map(|(a, _)| a)
796                            .cloned()
797                            .collect::<Vec<Multiaddr>>()
798                    })
799                    .unwrap_or_default();
800                self.behaviour
801                    .on_swarm_event(FromSwarm::ConnectionEstablished(
802                        behaviour::ConnectionEstablished {
803                            peer_id,
804                            connection_id: id,
805                            endpoint: &endpoint,
806                            failed_addresses: &failed_addresses,
807                            other_established: other_established_connection_ids.len(),
808                        },
809                    ));
810                self.supported_protocols = supported_protocols;
811                self.pending_swarm_events
812                    .push_back(SwarmEvent::ConnectionEstablished {
813                        peer_id,
814                        connection_id: id,
815                        num_established,
816                        endpoint,
817                        concurrent_dial_errors,
818                        established_in,
819                    });
820            }
821            PoolEvent::PendingOutboundConnectionError {
822                id: connection_id,
823                error,
824                peer,
825            } => {
826                let error = error.into();
827
828                self.behaviour
829                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
830                        peer_id: peer,
831                        error: &error,
832                        connection_id,
833                    }));
834
835                if let Some(peer) = peer {
836                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
837                } else {
838                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
839                }
840
841                self.pending_swarm_events
842                    .push_back(SwarmEvent::OutgoingConnectionError {
843                        peer_id: peer,
844                        connection_id,
845                        error,
846                    });
847            }
848            PoolEvent::PendingInboundConnectionError {
849                id,
850                send_back_addr,
851                local_addr,
852                error,
853            } => {
854                let error = error.into();
855
856                tracing::debug!("Incoming connection failed: {:?}", error);
857                self.behaviour
858                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
859                        local_addr: &local_addr,
860                        send_back_addr: &send_back_addr,
861                        error: &error,
862                        connection_id: id,
863                        peer_id: None,
864                    }));
865                self.pending_swarm_events
866                    .push_back(SwarmEvent::IncomingConnectionError {
867                        connection_id: id,
868                        local_addr,
869                        send_back_addr,
870                        error,
871                    });
872            }
873            PoolEvent::ConnectionClosed {
874                id,
875                connected,
876                error,
877                remaining_established_connection_ids,
878                ..
879            } => {
880                if let Some(error) = error.as_ref() {
881                    tracing::debug!(
882                        total_peers=%remaining_established_connection_ids.len(),
883                        "Connection closed with error {:?}: {:?}",
884                        error,
885                        connected,
886                    );
887                } else {
888                    tracing::debug!(
889                        total_peers=%remaining_established_connection_ids.len(),
890                        "Connection closed: {:?}",
891                        connected
892                    );
893                }
894                let peer_id = connected.peer_id;
895                let endpoint = connected.endpoint;
896                let num_established =
897                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
898
899                self.behaviour
900                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
901                        peer_id,
902                        connection_id: id,
903                        endpoint: &endpoint,
904                        cause: error.as_ref(),
905                        remaining_established: num_established as usize,
906                    }));
907                self.pending_swarm_events
908                    .push_back(SwarmEvent::ConnectionClosed {
909                        peer_id,
910                        connection_id: id,
911                        endpoint,
912                        cause: error,
913                        num_established,
914                    });
915            }
916            PoolEvent::ConnectionEvent { peer_id, id, event } => {
917                self.behaviour
918                    .on_connection_handler_event(peer_id, id, event);
919            }
920            PoolEvent::AddressChange {
921                peer_id,
922                id,
923                new_endpoint,
924                old_endpoint,
925            } => {
926                self.behaviour
927                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
928                        peer_id,
929                        connection_id: id,
930                        old: &old_endpoint,
931                        new: &new_endpoint,
932                    }));
933            }
934        }
935    }
936
937    fn handle_transport_event(
938        &mut self,
939        event: TransportEvent<
940            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
941            io::Error,
942        >,
943    ) {
944        match event {
945            TransportEvent::Incoming {
946                listener_id: _,
947                upgrade,
948                local_addr,
949                send_back_addr,
950            } => {
951                let connection_id = ConnectionId::next();
952
953                match self.behaviour.handle_pending_inbound_connection(
954                    connection_id,
955                    &local_addr,
956                    &send_back_addr,
957                ) {
958                    Ok(()) => {}
959                    Err(cause) => {
960                        let listen_error = ListenError::Denied { cause };
961
962                        self.behaviour
963                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
964                                local_addr: &local_addr,
965                                send_back_addr: &send_back_addr,
966                                error: &listen_error,
967                                connection_id,
968                                peer_id: None,
969                            }));
970
971                        self.pending_swarm_events
972                            .push_back(SwarmEvent::IncomingConnectionError {
973                                connection_id,
974                                local_addr,
975                                send_back_addr,
976                                error: listen_error,
977                            });
978                        return;
979                    }
980                }
981
982                self.pool.add_incoming(
983                    upgrade,
984                    IncomingInfo {
985                        local_addr: &local_addr,
986                        send_back_addr: &send_back_addr,
987                    },
988                    connection_id,
989                );
990
991                self.pending_swarm_events
992                    .push_back(SwarmEvent::IncomingConnection {
993                        connection_id,
994                        local_addr,
995                        send_back_addr,
996                    })
997            }
998            TransportEvent::NewAddress {
999                listener_id,
1000                listen_addr,
1001            } => {
1002                tracing::debug!(
1003                    listener=?listener_id,
1004                    address=%listen_addr,
1005                    "New listener address"
1006                );
1007                let addrs = self.listened_addrs.entry(listener_id).or_default();
1008                if !addrs.contains(&listen_addr) {
1009                    addrs.push(listen_addr.clone())
1010                }
1011                self.behaviour
1012                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1013                        listener_id,
1014                        addr: &listen_addr,
1015                    }));
1016                self.pending_swarm_events
1017                    .push_back(SwarmEvent::NewListenAddr {
1018                        listener_id,
1019                        address: listen_addr,
1020                    })
1021            }
1022            TransportEvent::AddressExpired {
1023                listener_id,
1024                listen_addr,
1025            } => {
1026                tracing::debug!(
1027                    listener=?listener_id,
1028                    address=%listen_addr,
1029                    "Expired listener address"
1030                );
1031                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1032                    addrs.retain(|a| a != &listen_addr);
1033                }
1034                self.behaviour
1035                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1036                        listener_id,
1037                        addr: &listen_addr,
1038                    }));
1039                self.pending_swarm_events
1040                    .push_back(SwarmEvent::ExpiredListenAddr {
1041                        listener_id,
1042                        address: listen_addr,
1043                    })
1044            }
1045            TransportEvent::ListenerClosed {
1046                listener_id,
1047                reason,
1048            } => {
1049                tracing::debug!(
1050                    listener=?listener_id,
1051                    ?reason,
1052                    "Listener closed"
1053                );
1054                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1055                for addr in addrs.iter() {
1056                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1057                        ExpiredListenAddr { listener_id, addr },
1058                    ));
1059                }
1060                self.behaviour
1061                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1062                        listener_id,
1063                        reason: reason.as_ref().copied(),
1064                    }));
1065                self.pending_swarm_events
1066                    .push_back(SwarmEvent::ListenerClosed {
1067                        listener_id,
1068                        addresses: addrs.to_vec(),
1069                        reason,
1070                    })
1071            }
1072            TransportEvent::ListenerError { listener_id, error } => {
1073                self.behaviour
1074                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1075                        listener_id,
1076                        err: &error,
1077                    }));
1078                self.pending_swarm_events
1079                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1080            }
1081        }
1082    }
1083
1084    fn handle_behaviour_event(
1085        &mut self,
1086        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1087    ) {
1088        match event {
1089            ToSwarm::GenerateEvent(event) => {
1090                self.pending_swarm_events
1091                    .push_back(SwarmEvent::Behaviour(event));
1092            }
1093            ToSwarm::Dial { opts } => {
1094                let peer_id = opts.get_peer_id();
1095                let connection_id = opts.connection_id();
1096                if let Ok(()) = self.dial(opts) {
1097                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1098                        peer_id,
1099                        connection_id,
1100                    });
1101                }
1102            }
1103            ToSwarm::ListenOn { opts } => {
1104                // Error is dispatched internally, safe to ignore.
1105                let _ = self.add_listener(opts);
1106            }
1107            ToSwarm::RemoveListener { id } => {
1108                self.remove_listener(id);
1109            }
1110            ToSwarm::NotifyHandler {
1111                peer_id,
1112                handler,
1113                event,
1114            } => {
1115                assert!(self.pending_handler_event.is_none());
1116                let handler = match handler {
1117                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1118                    NotifyHandler::Any => {
1119                        let ids = self
1120                            .pool
1121                            .iter_established_connections_of_peer(&peer_id)
1122                            .collect();
1123                        PendingNotifyHandler::Any(ids)
1124                    }
1125                };
1126
1127                self.pending_handler_event = Some((peer_id, handler, event));
1128            }
1129            ToSwarm::NewExternalAddrCandidate(addr) => {
1130                if !self.confirmed_external_addr.contains(&addr) {
1131                    self.behaviour
1132                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1133                            NewExternalAddrCandidate { addr: &addr },
1134                        ));
1135                    self.pending_swarm_events
1136                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1137                }
1138            }
1139            ToSwarm::ExternalAddrConfirmed(addr) => {
1140                self.add_external_address(addr.clone());
1141                self.pending_swarm_events
1142                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1143            }
1144            ToSwarm::ExternalAddrExpired(addr) => {
1145                self.remove_external_address(&addr);
1146                self.pending_swarm_events
1147                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1148            }
1149            ToSwarm::CloseConnection {
1150                peer_id,
1151                connection,
1152            } => match connection {
1153                CloseConnection::One(connection_id) => {
1154                    if let Some(conn) = self.pool.get_established(connection_id) {
1155                        conn.start_close();
1156                    }
1157                }
1158                CloseConnection::All => {
1159                    self.pool.disconnect(peer_id);
1160                }
1161            },
1162            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1163                self.behaviour
1164                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1165                        peer_id,
1166                        addr: &address,
1167                    }));
1168                self.pending_swarm_events
1169                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1170            }
1171        }
1172    }
1173
1174    /// Internal function used by everything event-related.
1175    ///
1176    /// Polls the `Swarm` for the next event.
1177    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1178    fn poll_next_event(
1179        mut self: Pin<&mut Self>,
1180        cx: &mut Context<'_>,
1181    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1182        // We use a `this` variable because the compiler can't mutably borrow multiple times
1183        // across a `Deref`.
1184        let this = &mut *self;
1185
1186        // This loop polls the components below in a prioritized order.
1187        //
1188        // 1. [`NetworkBehaviour`]
1189        // 2. Connection [`Pool`]
1190        // 3. [`ListenersStream`]
1191        //
1192        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1193        //
1194        // (2) is polled before (3) to prioritize existing connections
1195        // over upgrading new incoming connections.
1196        loop {
1197            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1198                return Poll::Ready(swarm_event);
1199            }
1200
1201            match this.pending_handler_event.take() {
1202                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the
1203                // previous iteration to the connection handler(s).
1204                Some((peer_id, handler, event)) => match handler {
1205                    PendingNotifyHandler::One(conn_id) => {
1206                        match this.pool.get_established(conn_id) {
1207                            Some(conn) => match notify_one(conn, event, cx) {
1208                                None => continue,
1209                                Some(event) => {
1210                                    this.pending_handler_event = Some((peer_id, handler, event));
1211                                }
1212                            },
1213                            None => continue,
1214                        }
1215                    }
1216                    PendingNotifyHandler::Any(ids) => {
1217                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1218                            None => continue,
1219                            Some((event, ids)) => {
1220                                let handler = PendingNotifyHandler::Any(ids);
1221                                this.pending_handler_event = Some((peer_id, handler, event));
1222                            }
1223                        }
1224                    }
1225                },
1226                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1227                None => match this.behaviour.poll(cx) {
1228                    Poll::Pending => {}
1229                    Poll::Ready(behaviour_event) => {
1230                        this.handle_behaviour_event(behaviour_event);
1231
1232                        continue;
1233                    }
1234                },
1235            }
1236
1237            // Poll the known peers.
1238            match this.pool.poll(cx) {
1239                Poll::Pending => {}
1240                Poll::Ready(pool_event) => {
1241                    this.handle_pool_event(pool_event);
1242                    continue;
1243                }
1244            }
1245
1246            // Poll the listener(s) for new connections.
1247            match Pin::new(&mut this.transport).poll(cx) {
1248                Poll::Pending => {}
1249                Poll::Ready(transport_event) => {
1250                    this.handle_transport_event(transport_event);
1251                    continue;
1252                }
1253            }
1254
1255            return Poll::Pending;
1256        }
1257    }
1258}
1259
1260/// Connection to notify of a pending event.
1261///
1262/// The connection IDs out of which to notify one of an event are captured at
1263/// the time the behaviour emits the event, in order not to forward the event to
1264/// a new connection which the behaviour may not have been aware of at the time
1265/// it issued the request for sending it.
1266enum PendingNotifyHandler {
1267    One(ConnectionId),
1268    Any(SmallVec<[ConnectionId; 10]>),
1269}
1270
1271/// Notify a single connection of an event.
1272///
1273/// Returns `Some` with the given event if the connection is not currently
1274/// ready to receive another event, in which case the current task is
1275/// scheduled to be woken up.
1276///
1277/// Returns `None` if the connection is closing or the event has been
1278/// successfully sent, in either case the event is consumed.
1279fn notify_one<THandlerInEvent>(
1280    conn: &mut EstablishedConnection<THandlerInEvent>,
1281    event: THandlerInEvent,
1282    cx: &mut Context<'_>,
1283) -> Option<THandlerInEvent> {
1284    match conn.poll_ready_notify_handler(cx) {
1285        Poll::Pending => Some(event),
1286        Poll::Ready(Err(())) => None, // connection is closing
1287        Poll::Ready(Ok(())) => {
1288            // Can now only fail if connection is closing.
1289            let _ = conn.notify_handler(event);
1290            None
1291        }
1292    }
1293}
1294
1295/// Notify any one of a given list of connections of a peer of an event.
1296///
1297/// Returns `Some` with the given event and a new list of connections if
1298/// none of the given connections was able to receive the event but at
1299/// least one of them is not closing, in which case the current task
1300/// is scheduled to be woken up. The returned connections are those which
1301/// may still become ready to receive another event.
1302///
1303/// Returns `None` if either all connections are closing or the event
1304/// was successfully sent to a handler, in either case the event is consumed.
1305fn notify_any<THandler, TBehaviour>(
1306    ids: SmallVec<[ConnectionId; 10]>,
1307    pool: &mut Pool<THandler>,
1308    event: THandlerInEvent<TBehaviour>,
1309    cx: &mut Context<'_>,
1310) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1311where
1312    TBehaviour: NetworkBehaviour,
1313    THandler: ConnectionHandler<
1314        FromBehaviour = THandlerInEvent<TBehaviour>,
1315        ToBehaviour = THandlerOutEvent<TBehaviour>,
1316    >,
1317{
1318    let mut pending = SmallVec::new();
1319    let mut event = Some(event); // (1)
1320    for id in ids.into_iter() {
1321        if let Some(conn) = pool.get_established(id) {
1322            match conn.poll_ready_notify_handler(cx) {
1323                Poll::Pending => pending.push(id),
1324                Poll::Ready(Err(())) => {} // connection is closing
1325                Poll::Ready(Ok(())) => {
1326                    let e = event.take().expect("by (1),(2)");
1327                    if let Err(e) = conn.notify_handler(e) {
1328                        event = Some(e) // (2)
1329                    } else {
1330                        break;
1331                    }
1332                }
1333            }
1334        }
1335    }
1336
1337    event.and_then(|e| {
1338        if !pending.is_empty() {
1339            Some((e, pending))
1340        } else {
1341            None
1342        }
1343    })
1344}
1345
1346/// Stream of events returned by [`Swarm`].
1347///
1348/// Includes events from the [`NetworkBehaviour`] as well as events about
1349/// connection and listener status. See [`SwarmEvent`] for details.
1350///
1351/// Note: This stream is infinite and it is guaranteed that
1352/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1353impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1354where
1355    TBehaviour: NetworkBehaviour,
1356{
1357    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1358
1359    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1360        self.as_mut().poll_next_event(cx).map(Some)
1361    }
1362}
1363
1364/// The stream of swarm events never terminates, so we can implement fused for it.
1365impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1366where
1367    TBehaviour: NetworkBehaviour,
1368{
1369    fn is_terminated(&self) -> bool {
1370        false
1371    }
1372}
1373
1374pub struct Config {
1375    pool_config: PoolConfig,
1376}
1377
1378impl Config {
1379    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1380    /// [`Swarm::new`].
1381    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1382        Self {
1383            pool_config: PoolConfig::new(Some(Box::new(executor))),
1384        }
1385    }
1386
1387    #[doc(hidden)]
1388    /// Used on connection benchmarks.
1389    pub fn without_executor() -> Self {
1390        Self {
1391            pool_config: PoolConfig::new(None),
1392        }
1393    }
1394
1395    /// Sets executor to the `wasm` executor.
1396    /// Background tasks will be executed by the browser on the next micro-tick.
1397    ///
1398    /// Spawning a task is similar too:
1399    /// ```typescript
1400    /// function spawn(task: () => Promise<void>) {
1401    ///     task()
1402    /// }
1403    /// ```
1404    #[cfg(feature = "wasm-bindgen")]
1405    pub fn with_wasm_executor() -> Self {
1406        Self::with_executor(crate::executor::WasmBindgenExecutor)
1407    }
1408
1409    /// Builds a new [`Config`] from the given `tokio` executor.
1410    #[cfg(all(
1411        feature = "tokio",
1412        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1413    ))]
1414    pub fn with_tokio_executor() -> Self {
1415        Self::with_executor(crate::executor::TokioExecutor)
1416    }
1417
1418    /// Builds a new [`Config`] from the given `async-std` executor.
1419    #[cfg(all(
1420        feature = "async-std",
1421        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1422    ))]
1423    pub fn with_async_std_executor() -> Self {
1424        Self::with_executor(crate::executor::AsyncStdExecutor)
1425    }
1426
1427    /// Configures the number of events from the [`NetworkBehaviour`] in
1428    /// destination to the [`ConnectionHandler`] that can be buffered before
1429    /// the [`Swarm`] has to wait. An individual buffer with this number of
1430    /// events exists for each individual connection.
1431    ///
1432    /// The ideal value depends on the executor used, the CPU speed, and the
1433    /// volume of events. If this value is too low, then the [`Swarm`] will
1434    /// be sleeping more often than necessary. Increasing this value increases
1435    /// the overall memory usage.
1436    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1437        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1438        self
1439    }
1440
1441    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1442    /// [`NetworkBehaviour`].
1443    ///
1444    /// Each connection has its own buffer.
1445    ///
1446    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1447    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1448    /// than necessary. Increasing this value increases the overall memory
1449    /// usage, and more importantly the latency between the moment when an
1450    /// event is emitted and the moment when it is received by the
1451    /// [`NetworkBehaviour`].
1452    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1453        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1454        self
1455    }
1456
1457    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1458    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1459        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1460        self
1461    }
1462
1463    /// Configures an override for the substream upgrade protocol to use.
1464    ///
1465    /// The subtream upgrade protocol is the multistream-select protocol
1466    /// used for protocol negotiation on substreams. Since a listener
1467    /// supports all existing versions, the choice of upgrade protocol
1468    /// only effects the "dialer", i.e. the peer opening a substream.
1469    ///
1470    /// > **Note**: If configured, specific upgrade protocols for
1471    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1472    /// > are ignored.
1473    pub fn with_substream_upgrade_protocol_override(
1474        mut self,
1475        v: libp2p_core::upgrade::Version,
1476    ) -> Self {
1477        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1478        self
1479    }
1480
1481    /// The maximum number of inbound streams concurrently negotiating on a
1482    /// connection. New inbound streams exceeding the limit are dropped and thus
1483    /// reset.
1484    ///
1485    /// Note: This only enforces a limit on the number of concurrently
1486    /// negotiating inbound streams. The total number of inbound streams on a
1487    /// connection is the sum of negotiating and negotiated streams. A limit on
1488    /// the total number of streams can be enforced at the
1489    /// [`StreamMuxerBox`] level.
1490    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1491        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1492        self
1493    }
1494
1495    /// How long to keep a connection alive once it is idling.
1496    ///
1497    /// Defaults to 10s.
1498    ///
1499    /// Typically, you shouldn't _need_ to modify this default as connections will be kept alive
1500    /// whilst they are "in use" (see below). Depending on the application's usecase, it may be
1501    /// desirable to keep connections alive despite them not being in use.
1502    ///
1503    /// A connection is considered idle if:
1504    /// - There are no active inbound streams.
1505    /// - There are no active outbounds streams.
1506    /// - There are no pending outbound streams (i.e. all streams requested via
1507    ///   [`ConnectionHandlerEvent::OutboundSubstreamRequest`] have completed).
1508    /// - Every [`ConnectionHandler`] returns `false` from
1509    ///   [`ConnectionHandler::connection_keep_alive`].
1510    ///
1511    /// Once all these conditions are true, the idle connection timeout starts ticking.
1512    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1513        self.pool_config.idle_connection_timeout = timeout;
1514        self
1515    }
1516}
1517
1518/// Possible errors when trying to establish or upgrade an outbound connection.
1519#[derive(Debug)]
1520pub enum DialError {
1521    /// The peer identity obtained on the connection matches the local peer.
1522    LocalPeerId { address: Multiaddr },
1523    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`]
1524    /// and [`DialOpts`].
1525    NoAddresses,
1526    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1527    /// the dial was aborted.
1528    DialPeerConditionFalse(dial_opts::PeerCondition),
1529    /// Pending connection attempt has been aborted.
1530    Aborted,
1531    /// The peer identity obtained on the connection did not match the one that was expected.
1532    WrongPeerId {
1533        obtained: PeerId,
1534        address: Multiaddr,
1535    },
1536    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1537    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1538    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1539    Denied { cause: ConnectionDenied },
1540    /// An error occurred while negotiating the transport protocol(s) on a connection.
1541    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1542}
1543
1544impl From<PendingOutboundConnectionError> for DialError {
1545    fn from(error: PendingOutboundConnectionError) -> Self {
1546        match error {
1547            PendingOutboundConnectionError::Aborted => DialError::Aborted,
1548            PendingOutboundConnectionError::WrongPeerId { obtained, address } => {
1549                DialError::WrongPeerId { obtained, address }
1550            }
1551            PendingOutboundConnectionError::LocalPeerId { address } => {
1552                DialError::LocalPeerId { address }
1553            }
1554            PendingOutboundConnectionError::Transport(e) => DialError::Transport(e),
1555        }
1556    }
1557}
1558
1559impl fmt::Display for DialError {
1560    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561        match self {
1562            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1563            DialError::LocalPeerId { address } => write!(
1564                f,
1565                "Dial error: tried to dial local peer id at {address:?}."
1566            ),
1567            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1568            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1569            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1570            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1571            DialError::Aborted => write!(
1572                f,
1573                "Dial error: Pending connection attempt has been aborted."
1574            ),
1575            DialError::WrongPeerId { obtained, address } => write!(
1576                f,
1577                "Dial error: Unexpected peer ID {obtained} at {address:?}."
1578            ),
1579            DialError::Transport(errors) => {
1580                write!(f, "Failed to negotiate transport protocol(s): [")?;
1581
1582                for (addr, error) in errors {
1583                    write!(f, "({addr}")?;
1584                    print_error_chain(f, error)?;
1585                    write!(f, ")")?;
1586                }
1587                write!(f, "]")?;
1588
1589                Ok(())
1590            }
1591            DialError::Denied { .. } => {
1592                write!(f, "Dial error")
1593            }
1594        }
1595    }
1596}
1597
1598fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1599    write!(f, ": {e}")?;
1600
1601    if let Some(source) = e.source() {
1602        print_error_chain(f, source)?;
1603    }
1604
1605    Ok(())
1606}
1607
1608impl error::Error for DialError {
1609    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1610        match self {
1611            DialError::LocalPeerId { .. } => None,
1612            DialError::NoAddresses => None,
1613            DialError::DialPeerConditionFalse(_) => None,
1614            DialError::Aborted => None,
1615            DialError::WrongPeerId { .. } => None,
1616            DialError::Transport(_) => None,
1617            DialError::Denied { cause } => Some(cause),
1618        }
1619    }
1620}
1621
1622/// Possible errors when upgrading an inbound connection.
1623#[derive(Debug)]
1624pub enum ListenError {
1625    /// Pending connection attempt has been aborted.
1626    Aborted,
1627    /// The peer identity obtained on the connection did not match the one that was expected.
1628    WrongPeerId {
1629        obtained: PeerId,
1630        endpoint: ConnectedPoint,
1631    },
1632    /// The connection was dropped because it resolved to our own [`PeerId`].
1633    LocalPeerId {
1634        address: Multiaddr,
1635    },
1636    Denied {
1637        cause: ConnectionDenied,
1638    },
1639    /// An error occurred while negotiating the transport protocol(s) on a connection.
1640    Transport(TransportError<io::Error>),
1641}
1642
1643impl From<PendingInboundConnectionError> for ListenError {
1644    fn from(error: PendingInboundConnectionError) -> Self {
1645        match error {
1646            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1647            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1648            PendingInboundConnectionError::LocalPeerId { address } => {
1649                ListenError::LocalPeerId { address }
1650            }
1651        }
1652    }
1653}
1654
1655impl fmt::Display for ListenError {
1656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657        match self {
1658            ListenError::Aborted => write!(
1659                f,
1660                "Listen error: Pending connection attempt has been aborted."
1661            ),
1662            ListenError::WrongPeerId { obtained, endpoint } => write!(
1663                f,
1664                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1665            ),
1666            ListenError::Transport(_) => {
1667                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1668            }
1669            ListenError::Denied { cause } => {
1670                write!(f, "Listen error: Denied: {cause}")
1671            }
1672            ListenError::LocalPeerId { address } => {
1673                write!(f, "Listen error: Local peer ID at {address:?}.")
1674            }
1675        }
1676    }
1677}
1678
1679impl error::Error for ListenError {
1680    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1681        match self {
1682            ListenError::WrongPeerId { .. } => None,
1683            ListenError::Transport(err) => Some(err),
1684            ListenError::Aborted => None,
1685            ListenError::Denied { cause } => Some(cause),
1686            ListenError::LocalPeerId { .. } => None,
1687        }
1688    }
1689}
1690
1691/// A connection was denied.
1692///
1693/// To figure out which [`NetworkBehaviour`] denied the connection, use
1694/// [`ConnectionDenied::downcast`].
1695#[derive(Debug)]
1696pub struct ConnectionDenied {
1697    inner: Box<dyn error::Error + Send + Sync + 'static>,
1698}
1699
1700impl ConnectionDenied {
1701    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1702        Self {
1703            inner: cause.into(),
1704        }
1705    }
1706
1707    /// Attempt to downcast to a particular reason for why the connection was denied.
1708    pub fn downcast<E>(self) -> Result<E, Self>
1709    where
1710        E: error::Error + Send + Sync + 'static,
1711    {
1712        let inner = self
1713            .inner
1714            .downcast::<E>()
1715            .map_err(|inner| ConnectionDenied { inner })?;
1716
1717        Ok(*inner)
1718    }
1719
1720    /// Attempt to downcast to a particular reason for why the connection was denied.
1721    pub fn downcast_ref<E>(&self) -> Option<&E>
1722    where
1723        E: error::Error + Send + Sync + 'static,
1724    {
1725        self.inner.downcast_ref::<E>()
1726    }
1727}
1728
1729impl fmt::Display for ConnectionDenied {
1730    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1731        write!(f, "connection denied")
1732    }
1733}
1734
1735impl error::Error for ConnectionDenied {
1736    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1737        Some(self.inner.as_ref())
1738    }
1739}
1740
1741/// Information about the connections obtained by [`Swarm::network_info()`].
1742#[derive(Clone, Debug)]
1743pub struct NetworkInfo {
1744    /// The total number of connected peers.
1745    num_peers: usize,
1746    /// Counters of ongoing network connections.
1747    connection_counters: ConnectionCounters,
1748}
1749
1750impl NetworkInfo {
1751    /// The number of connected peers, i.e. peers with whom at least
1752    /// one established connection exists.
1753    pub fn num_peers(&self) -> usize {
1754        self.num_peers
1755    }
1756
1757    /// Gets counters for ongoing network connections.
1758    pub fn connection_counters(&self) -> &ConnectionCounters {
1759        &self.connection_counters
1760    }
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765    use libp2p_core::{
1766        multiaddr,
1767        multiaddr::multiaddr,
1768        transport,
1769        transport::{memory::MemoryTransportError, TransportEvent},
1770        upgrade,
1771    };
1772    use libp2p_identity as identity;
1773    use libp2p_plaintext as plaintext;
1774    use libp2p_yamux as yamux;
1775    use quickcheck::*;
1776
1777    use super::*;
1778    use crate::test::{CallTraceBehaviour, MockBehaviour};
1779
1780    // Test execution state.
1781    // Connection => Disconnecting => Connecting.
1782    enum State {
1783        Connecting,
1784        Disconnecting,
1785    }
1786
1787    fn new_test_swarm(
1788        config: Config,
1789    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1790        let id_keys = identity::Keypair::generate_ed25519();
1791        let local_public_key = id_keys.public();
1792        let transport = transport::MemoryTransport::default()
1793            .upgrade(upgrade::Version::V1)
1794            .authenticate(plaintext::Config::new(&id_keys))
1795            .multiplex(yamux::Config::default())
1796            .boxed();
1797        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1798
1799        Swarm::new(transport, behaviour, local_public_key.into(), config)
1800    }
1801
1802    fn swarms_connected<TBehaviour>(
1803        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1804        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1805        num_connections: usize,
1806    ) -> bool
1807    where
1808        TBehaviour: NetworkBehaviour,
1809        THandlerOutEvent<TBehaviour>: Clone,
1810    {
1811        swarm1
1812            .behaviour()
1813            .num_connections_to_peer(*swarm2.local_peer_id())
1814            == num_connections
1815            && swarm2
1816                .behaviour()
1817                .num_connections_to_peer(*swarm1.local_peer_id())
1818                == num_connections
1819            && swarm1.is_connected(swarm2.local_peer_id())
1820            && swarm2.is_connected(swarm1.local_peer_id())
1821    }
1822
1823    fn swarms_disconnected<TBehaviour>(
1824        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1825        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1826    ) -> bool
1827    where
1828        TBehaviour: NetworkBehaviour,
1829        THandlerOutEvent<TBehaviour>: Clone,
1830    {
1831        swarm1
1832            .behaviour()
1833            .num_connections_to_peer(*swarm2.local_peer_id())
1834            == 0
1835            && swarm2
1836                .behaviour()
1837                .num_connections_to_peer(*swarm1.local_peer_id())
1838                == 0
1839            && !swarm1.is_connected(swarm2.local_peer_id())
1840            && !swarm2.is_connected(swarm1.local_peer_id())
1841    }
1842
1843    /// Establishes multiple connections between two peers,
1844    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1845    ///
1846    /// The test expects both behaviours to be notified via calls to
1847    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1848    /// / [`FromSwarm::ConnectionClosed`]
1849    #[tokio::test]
1850    async fn test_swarm_disconnect() {
1851        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1852        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1853
1854        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1855        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1856
1857        swarm1.listen_on(addr1.clone()).unwrap();
1858        swarm2.listen_on(addr2.clone()).unwrap();
1859
1860        let swarm1_id = *swarm1.local_peer_id();
1861
1862        let mut reconnected = false;
1863        let num_connections = 10;
1864
1865        for _ in 0..num_connections {
1866            swarm1.dial(addr2.clone()).unwrap();
1867        }
1868        let mut state = State::Connecting;
1869
1870        future::poll_fn(move |cx| loop {
1871            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1872            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1873            match state {
1874                State::Connecting => {
1875                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1876                        if reconnected {
1877                            return Poll::Ready(());
1878                        }
1879                        swarm2
1880                            .disconnect_peer_id(swarm1_id)
1881                            .expect("Error disconnecting");
1882                        state = State::Disconnecting;
1883                    }
1884                }
1885                State::Disconnecting => {
1886                    if swarms_disconnected(&swarm1, &swarm2) {
1887                        if reconnected {
1888                            return Poll::Ready(());
1889                        }
1890                        reconnected = true;
1891                        for _ in 0..num_connections {
1892                            swarm2.dial(addr1.clone()).unwrap();
1893                        }
1894                        state = State::Connecting;
1895                    }
1896                }
1897            }
1898
1899            if poll1.is_pending() && poll2.is_pending() {
1900                return Poll::Pending;
1901            }
1902        })
1903        .await
1904    }
1905
1906    /// Establishes multiple connections between two peers,
1907    /// after which one peer disconnects the other
1908    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1909    ///
1910    /// The test expects both behaviours to be notified via calls to
1911    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1912    /// / [`FromSwarm::ConnectionClosed`]
1913    #[tokio::test]
1914    async fn test_behaviour_disconnect_all() {
1915        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1916        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1917
1918        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1919        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1920
1921        swarm1.listen_on(addr1.clone()).unwrap();
1922        swarm2.listen_on(addr2.clone()).unwrap();
1923
1924        let swarm1_id = *swarm1.local_peer_id();
1925
1926        let mut reconnected = false;
1927        let num_connections = 10;
1928
1929        for _ in 0..num_connections {
1930            swarm1.dial(addr2.clone()).unwrap();
1931        }
1932        let mut state = State::Connecting;
1933
1934        future::poll_fn(move |cx| loop {
1935            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1936            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1937            match state {
1938                State::Connecting => {
1939                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1940                        if reconnected {
1941                            return Poll::Ready(());
1942                        }
1943                        swarm2
1944                            .behaviour
1945                            .inner()
1946                            .next_action
1947                            .replace(ToSwarm::CloseConnection {
1948                                peer_id: swarm1_id,
1949                                connection: CloseConnection::All,
1950                            });
1951                        state = State::Disconnecting;
1952                        continue;
1953                    }
1954                }
1955                State::Disconnecting => {
1956                    if swarms_disconnected(&swarm1, &swarm2) {
1957                        reconnected = true;
1958                        for _ in 0..num_connections {
1959                            swarm2.dial(addr1.clone()).unwrap();
1960                        }
1961                        state = State::Connecting;
1962                        continue;
1963                    }
1964                }
1965            }
1966
1967            if poll1.is_pending() && poll2.is_pending() {
1968                return Poll::Pending;
1969            }
1970        })
1971        .await
1972    }
1973
1974    /// Establishes multiple connections between two peers,
1975    /// after which one peer closes a single connection
1976    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1977    ///
1978    /// The test expects both behaviours to be notified via calls to
1979    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1980    /// / [`FromSwarm::ConnectionClosed`]
1981    #[tokio::test]
1982    async fn test_behaviour_disconnect_one() {
1983        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1984        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1985
1986        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1987        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1988
1989        swarm1.listen_on(addr1).unwrap();
1990        swarm2.listen_on(addr2.clone()).unwrap();
1991
1992        let swarm1_id = *swarm1.local_peer_id();
1993
1994        let num_connections = 10;
1995
1996        for _ in 0..num_connections {
1997            swarm1.dial(addr2.clone()).unwrap();
1998        }
1999        let mut state = State::Connecting;
2000        let mut disconnected_conn_id = None;
2001
2002        future::poll_fn(move |cx| loop {
2003            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2004            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2005            match state {
2006                State::Connecting => {
2007                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2008                        disconnected_conn_id = {
2009                            let conn_id =
2010                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2011                            swarm2.behaviour.inner().next_action.replace(
2012                                ToSwarm::CloseConnection {
2013                                    peer_id: swarm1_id,
2014                                    connection: CloseConnection::One(conn_id),
2015                                },
2016                            );
2017                            Some(conn_id)
2018                        };
2019                        state = State::Disconnecting;
2020                    }
2021                }
2022                State::Disconnecting => {
2023                    for s in &[&swarm1, &swarm2] {
2024                        assert!(s
2025                            .behaviour
2026                            .on_connection_closed
2027                            .iter()
2028                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2029                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2030                        s.behaviour.assert_connected(num_connections, 1);
2031                    }
2032                    if [&swarm1, &swarm2]
2033                        .iter()
2034                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2035                    {
2036                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2037                        assert_eq!(Some(conn_id), disconnected_conn_id);
2038                        return Poll::Ready(());
2039                    }
2040                }
2041            }
2042
2043            if poll1.is_pending() && poll2.is_pending() {
2044                return Poll::Pending;
2045            }
2046        })
2047        .await
2048    }
2049
2050    #[test]
2051    fn concurrent_dialing() {
2052        #[derive(Clone, Debug)]
2053        struct DialConcurrencyFactor(NonZeroU8);
2054
2055        impl Arbitrary for DialConcurrencyFactor {
2056            fn arbitrary(g: &mut Gen) -> Self {
2057                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2058            }
2059        }
2060
2061        fn prop(concurrency_factor: DialConcurrencyFactor) {
2062            tokio::runtime::Runtime::new().unwrap().block_on(async {
2063                let mut swarm = new_test_swarm(
2064                    Config::with_tokio_executor()
2065                        .with_dial_concurrency_factor(concurrency_factor.0),
2066                );
2067
2068                // Listen on `concurrency_factor + 1` addresses.
2069                //
2070                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2071                let num_listen_addrs = concurrency_factor.0.get() + 2;
2072                let mut listen_addresses = Vec::new();
2073                let mut transports = Vec::new();
2074                for _ in 0..num_listen_addrs {
2075                    let mut transport = transport::MemoryTransport::default().boxed();
2076                    transport
2077                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2078                        .unwrap();
2079
2080                    match transport.select_next_some().await {
2081                        TransportEvent::NewAddress { listen_addr, .. } => {
2082                            listen_addresses.push(listen_addr);
2083                        }
2084                        _ => panic!("Expected `NewListenAddr` event."),
2085                    }
2086
2087                    transports.push(transport);
2088                }
2089
2090                // Have swarm dial each listener and wait for each listener to receive the incoming
2091                // connections.
2092                swarm
2093                    .dial(
2094                        DialOpts::peer_id(PeerId::random())
2095                            .addresses(listen_addresses)
2096                            .build(),
2097                    )
2098                    .unwrap();
2099                for mut transport in transports.into_iter() {
2100                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2101                    {
2102                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2103                        future::Either::Left(_) => {
2104                            panic!("Unexpected transport event.")
2105                        }
2106                        future::Either::Right((e, _)) => {
2107                            panic!("Expect swarm to not emit any event {e:?}")
2108                        }
2109                    }
2110                }
2111
2112                match swarm.next().await.unwrap() {
2113                    SwarmEvent::OutgoingConnectionError { .. } => {}
2114                    e => panic!("Unexpected swarm event {e:?}"),
2115                }
2116            })
2117        }
2118
2119        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2120    }
2121
2122    #[tokio::test]
2123    async fn invalid_peer_id() {
2124        // Checks whether dialing an address containing the wrong peer id raises an error
2125        // for the expected peer id instead of the obtained peer id.
2126
2127        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2128        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2129
2130        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2131
2132        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2133            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2134            Poll::Pending => Poll::Pending,
2135            _ => panic!("Was expecting the listen address to be reported"),
2136        })
2137        .await;
2138
2139        let other_id = PeerId::random();
2140        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2141
2142        swarm2.dial(other_addr.clone()).unwrap();
2143
2144        let (peer_id, error) = future::poll_fn(|cx| {
2145            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2146                swarm1.poll_next_unpin(cx)
2147            {}
2148
2149            match swarm2.poll_next_unpin(cx) {
2150                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2151                    peer_id, error, ..
2152                })) => Poll::Ready((peer_id, error)),
2153                Poll::Ready(x) => panic!("unexpected {x:?}"),
2154                Poll::Pending => Poll::Pending,
2155            }
2156        })
2157        .await;
2158        assert_eq!(peer_id.unwrap(), other_id);
2159        match error {
2160            DialError::WrongPeerId { obtained, address } => {
2161                assert_eq!(obtained, *swarm1.local_peer_id());
2162                assert_eq!(address, other_addr);
2163            }
2164            x => panic!("wrong error {x:?}"),
2165        }
2166    }
2167
2168    #[tokio::test]
2169    async fn dial_self() {
2170        // Check whether dialing ourselves correctly fails.
2171        //
2172        // Dialing the same address we're listening should result in three events:
2173        //
2174        // - The incoming connection notification (before we know the incoming peer ID).
2175        // - The connection error for the dialing endpoint (once we've determined that it's our own
2176        //   ID).
2177        // - The connection error for the listening endpoint (once we've determined that it's our
2178        //   own ID).
2179        //
2180        // The last two can happen in any order.
2181
2182        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2183        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2184
2185        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2186            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2187            Poll::Pending => Poll::Pending,
2188            _ => panic!("Was expecting the listen address to be reported"),
2189        })
2190        .await;
2191
2192        // This is a hack to actually execute the dial
2193        // to ourselves which would otherwise be filtered.
2194        swarm.listened_addrs.clear();
2195        swarm.dial(local_address.clone()).unwrap();
2196
2197        let mut got_dial_err = false;
2198        let mut got_inc_err = false;
2199        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2200            loop {
2201                match swarm.poll_next_unpin(cx) {
2202                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2203                        peer_id,
2204                        error: DialError::LocalPeerId { .. },
2205                        ..
2206                    })) => {
2207                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2208                        assert!(!got_dial_err);
2209                        got_dial_err = true;
2210                        if got_inc_err {
2211                            return Poll::Ready(Ok(()));
2212                        }
2213                    }
2214                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2215                        local_addr, ..
2216                    })) => {
2217                        assert!(!got_inc_err);
2218                        assert_eq!(local_addr, local_address);
2219                        got_inc_err = true;
2220                        if got_dial_err {
2221                            return Poll::Ready(Ok(()));
2222                        }
2223                    }
2224                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2225                        assert_eq!(local_addr, local_address);
2226                    }
2227                    Poll::Ready(ev) => {
2228                        panic!("Unexpected event: {ev:?}")
2229                    }
2230                    Poll::Pending => break Poll::Pending,
2231                }
2232            }
2233        })
2234        .await
2235        .unwrap();
2236    }
2237
2238    #[tokio::test]
2239    async fn dial_self_by_id() {
2240        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2241        // place.
2242        let swarm = new_test_swarm(Config::with_tokio_executor());
2243        let peer_id = *swarm.local_peer_id();
2244        assert!(!swarm.is_connected(&peer_id));
2245    }
2246
2247    #[tokio::test]
2248    async fn multiple_addresses_err() {
2249        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2250
2251        let target = PeerId::random();
2252
2253        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2254
2255        let addresses = HashSet::from([
2256            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2258            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2259            multiaddr![Udp(rand::random::<u16>())],
2260            multiaddr![Udp(rand::random::<u16>())],
2261            multiaddr![Udp(rand::random::<u16>())],
2262            multiaddr![Udp(rand::random::<u16>())],
2263            multiaddr![Udp(rand::random::<u16>())],
2264        ]);
2265
2266        swarm
2267            .dial(
2268                DialOpts::peer_id(target)
2269                    .addresses(addresses.iter().cloned().collect())
2270                    .build(),
2271            )
2272            .unwrap();
2273
2274        match swarm.next().await.unwrap() {
2275            SwarmEvent::OutgoingConnectionError {
2276                peer_id,
2277                // multiaddr,
2278                error: DialError::Transport(errors),
2279                ..
2280            } => {
2281                assert_eq!(target, peer_id.unwrap());
2282
2283                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2284                let expected_addresses = addresses
2285                    .into_iter()
2286                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2287                    .collect::<Vec<_>>();
2288
2289                assert_eq!(expected_addresses, failed_addresses);
2290            }
2291            e => panic!("Unexpected event: {e:?}"),
2292        }
2293    }
2294
2295    #[tokio::test]
2296    async fn aborting_pending_connection_surfaces_error() {
2297        let _ = tracing_subscriber::fmt()
2298            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2299            .try_init();
2300
2301        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2302        let mut listener = new_test_swarm(Config::with_tokio_executor());
2303
2304        let listener_peer_id = *listener.local_peer_id();
2305        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2306        let listener_address = match listener.next().await.unwrap() {
2307            SwarmEvent::NewListenAddr { address, .. } => address,
2308            e => panic!("Unexpected network event: {e:?}"),
2309        };
2310
2311        dialer
2312            .dial(
2313                DialOpts::peer_id(listener_peer_id)
2314                    .addresses(vec![listener_address])
2315                    .build(),
2316            )
2317            .unwrap();
2318
2319        dialer
2320            .disconnect_peer_id(listener_peer_id)
2321            .expect_err("Expect peer to not yet be connected.");
2322
2323        match dialer.next().await.unwrap() {
2324            SwarmEvent::OutgoingConnectionError {
2325                error: DialError::Aborted,
2326                ..
2327            } => {}
2328            e => panic!("Unexpected swarm event {e:?}."),
2329        }
2330    }
2331
2332    #[test]
2333    fn dial_error_prints_sources() {
2334        // This constitutes a fairly typical error for chained transports.
2335        let error = DialError::Transport(vec![(
2336            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2337            TransportError::Other(io::Error::new(
2338                io::ErrorKind::Other,
2339                MemoryTransportError::Unreachable,
2340            )),
2341        )]);
2342
2343        let string = format!("{error}");
2344
2345        // Unfortunately, we have some "empty" errors
2346        // that lead to multiple colons without text but that is the best we can do.
2347        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2348    }
2349}