libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that will be used in order to
35//!     reach nodes on the network based on their address. See the `transport` module for more
36//!     information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state machine that defines
38//!     how the swarm should behave once it is connected to a node.
39//!
40//! # Network Behaviour
41//!
42//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
43//! the swarm how it should behave. This includes which protocols are supported
44//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
45//! controls what happens on the network. Multiple types that implement
46//! `NetworkBehaviour` can be composed into a single behaviour.
47//!
48//! # Protocols Handler
49//!
50//! The [`ConnectionHandler`] trait defines how each active connection to a
51//! remote should behave: how to handle incoming substreams, which protocols
52//! are supported, when to open a new outbound substream, etc.
53
54#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
72#[doc(hidden)]
73pub mod derive_prelude {
74    pub use either::Either;
75    pub use futures::prelude as futures;
76    pub use libp2p_core::{
77        transport::{ListenerId, PortUse},
78        ConnectedPoint, Endpoint, Multiaddr,
79    };
80    pub use libp2p_identity::PeerId;
81
82    pub use crate::{
83        behaviour::{
84            AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85            ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86            ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87            NewListener,
88        },
89        connection::ConnectionId,
90        ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92    };
93}
94
95use std::{
96    collections::{HashMap, HashSet, VecDeque},
97    error, fmt, io,
98    num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99    pin::Pin,
100    task::{Context, Poll},
101    time::Duration,
102};
103
104pub use behaviour::{
105    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112    pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113    IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
114};
115use dial_opts::{DialOpts, PeerCondition};
116pub use executor::Executor;
117use futures::{prelude::*, stream::FusedStream};
118pub use handler::{
119    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122use libp2p_core::{
123    connection::ConnectedPoint,
124    muxing::StreamMuxerBox,
125    transport::{self, ListenerId, TransportError, TransportEvent},
126    Multiaddr, Transport,
127};
128use libp2p_identity::PeerId;
129#[cfg(feature = "macros")]
130pub use libp2p_swarm_derive::NetworkBehaviour;
131pub use listen_opts::ListenOpts;
132use smallvec::SmallVec;
133pub use stream::Stream;
134pub use stream_protocol::{InvalidProtocol, StreamProtocol};
135use tracing::Instrument;
136#[doc(hidden)]
137pub use translation::_address_translation;
138
139use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
140
141/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
142type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
143
144/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
145/// supports.
146pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
147
148/// Custom event that can be received by the [`ConnectionHandler`] of the
149/// [`NetworkBehaviour`].
150pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
151
152/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
153pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
154
155/// Event generated by the `Swarm`.
156#[derive(Debug)]
157#[non_exhaustive]
158pub enum SwarmEvent<TBehaviourOutEvent> {
159    /// Event generated by the `NetworkBehaviour`.
160    Behaviour(TBehaviourOutEvent),
161    /// A connection to the given peer has been opened.
162    ConnectionEstablished {
163        /// Identity of the peer that we have connected to.
164        peer_id: PeerId,
165        /// Identifier of the connection.
166        connection_id: ConnectionId,
167        /// Endpoint of the connection that has been opened.
168        endpoint: ConnectedPoint,
169        /// Number of established connections to this peer, including the one that has just been
170        /// opened.
171        num_established: NonZeroU32,
172        /// [`Some`] when the new connection is an outgoing connection.
173        /// Addresses are dialed concurrently. Contains the addresses and errors
174        /// of dial attempts that failed before the one successful dial.
175        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
176        /// How long it took to establish this connection
177        established_in: std::time::Duration,
178    },
179    /// A connection with the given peer has been closed,
180    /// possibly as a result of an error.
181    ConnectionClosed {
182        /// Identity of the peer that we have connected to.
183        peer_id: PeerId,
184        /// Identifier of the connection.
185        connection_id: ConnectionId,
186        /// Endpoint of the connection that has been closed.
187        endpoint: ConnectedPoint,
188        /// Number of other remaining connections to this same peer.
189        num_established: u32,
190        /// Reason for the disconnection, if it was not a successful
191        /// active close.
192        cause: Option<ConnectionError>,
193    },
194    /// A new connection arrived on a listener and is in the process of protocol negotiation.
195    ///
196    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
197    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
198    /// generated for this connection.
199    IncomingConnection {
200        /// Identifier of the connection.
201        connection_id: ConnectionId,
202        /// Local connection address.
203        /// This address has been earlier reported with a
204        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
205        local_addr: Multiaddr,
206        /// Address used to send back data to the remote.
207        send_back_addr: Multiaddr,
208    },
209    /// An error happened on an inbound connection during its initial handshake.
210    ///
211    /// This can include, for example, an error during the handshake of the encryption layer, or
212    /// the connection unexpectedly closed.
213    IncomingConnectionError {
214        /// Identifier of the connection.
215        connection_id: ConnectionId,
216        /// Local connection address.
217        /// This address has been earlier reported with a
218        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
219        local_addr: Multiaddr,
220        /// Address used to send back data to the remote.
221        send_back_addr: Multiaddr,
222        /// The error that happened.
223        error: ListenError,
224        /// If known, [`PeerId`] of the peer that tried to connect to us.
225        peer_id: Option<PeerId>,
226    },
227    /// An error happened on an outbound connection.
228    OutgoingConnectionError {
229        /// Identifier of the connection.
230        connection_id: ConnectionId,
231        /// If known, [`PeerId`] of the peer we tried to reach.
232        peer_id: Option<PeerId>,
233        /// Error that has been encountered.
234        error: DialError,
235    },
236    /// One of our listeners has reported a new local listening address.
237    NewListenAddr {
238        /// The listener that is listening on the new address.
239        listener_id: ListenerId,
240        /// The new address that is being listened on.
241        address: Multiaddr,
242    },
243    /// One of our listeners has reported the expiration of a listening address.
244    ExpiredListenAddr {
245        /// The listener that is no longer listening on the address.
246        listener_id: ListenerId,
247        /// The expired address.
248        address: Multiaddr,
249    },
250    /// One of the listeners gracefully closed.
251    ListenerClosed {
252        /// The listener that closed.
253        listener_id: ListenerId,
254        /// The addresses that the listener was listening on. These addresses are now considered
255        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
256        /// has been generated for each of them.
257        addresses: Vec<Multiaddr>,
258        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
259        /// if the stream produced an error.
260        reason: Result<(), io::Error>,
261    },
262    /// One of the listeners reported a non-fatal error.
263    ListenerError {
264        /// The listener that errored.
265        listener_id: ListenerId,
266        /// The listener error.
267        error: io::Error,
268    },
269    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
270    /// implementation.
271    ///
272    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
273    /// reported if the dialing attempt succeeds, otherwise a
274    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
275    /// is reported.
276    Dialing {
277        /// Identity of the peer that we are connecting to.
278        peer_id: Option<PeerId>,
279
280        /// Identifier of the connection.
281        connection_id: ConnectionId,
282    },
283    /// We have discovered a new candidate for an external address for us.
284    NewExternalAddrCandidate { address: Multiaddr },
285    /// An external address of the local node was confirmed.
286    ExternalAddrConfirmed { address: Multiaddr },
287    /// An external address of the local node expired, i.e. is no-longer confirmed.
288    ExternalAddrExpired { address: Multiaddr },
289    /// We have discovered a new address of a peer.
290    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
291}
292
293impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
294    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour`
295    /// variant, otherwise fail.
296    #[allow(clippy::result_large_err)]
297    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
298        match self {
299            SwarmEvent::Behaviour(inner) => Ok(inner),
300            other => Err(other),
301        }
302    }
303}
304
305/// Contains the state of the network, plus the way it should behave.
306///
307/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
308/// progress.
309pub struct Swarm<TBehaviour>
310where
311    TBehaviour: NetworkBehaviour,
312{
313    /// [`Transport`] for dialing remote peers and listening for incoming connection.
314    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
315
316    /// The nodes currently active.
317    pool: Pool<THandler<TBehaviour>>,
318
319    /// The local peer ID.
320    local_peer_id: PeerId,
321
322    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
323    /// handlers.
324    behaviour: TBehaviour,
325
326    /// List of protocols that the behaviour says it supports.
327    supported_protocols: SmallVec<[Vec<u8>; 16]>,
328
329    confirmed_external_addr: HashSet<Multiaddr>,
330
331    /// Multiaddresses that our listeners are listening on,
332    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
333
334    /// Pending event to be delivered to connection handlers
335    /// (or dropped if the peer disconnected) before the `behaviour`
336    /// can be polled again.
337    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
338
339    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
340}
341
342impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
343
344impl<TBehaviour> Swarm<TBehaviour>
345where
346    TBehaviour: NetworkBehaviour,
347{
348    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
349    /// [`Config`].
350    pub fn new(
351        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
352        behaviour: TBehaviour,
353        local_peer_id: PeerId,
354        config: Config,
355    ) -> Self {
356        tracing::info!(%local_peer_id);
357
358        Swarm {
359            local_peer_id,
360            transport,
361            pool: Pool::new(local_peer_id, config.pool_config),
362            behaviour,
363            supported_protocols: Default::default(),
364            confirmed_external_addr: Default::default(),
365            listened_addrs: HashMap::new(),
366            pending_handler_event: None,
367            pending_swarm_events: VecDeque::default(),
368        }
369    }
370
371    /// Returns information about the connections underlying the [`Swarm`].
372    pub fn network_info(&self) -> NetworkInfo {
373        let num_peers = self.pool.num_peers();
374        let connection_counters = self.pool.counters().clone();
375        NetworkInfo {
376            num_peers,
377            connection_counters,
378        }
379    }
380
381    /// Starts listening on the given address.
382    /// Returns an error if the address is not supported.
383    ///
384    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
385    /// Depending on the underlying transport, one listener may have multiple listening addresses.
386    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
387        let opts = ListenOpts::new(addr);
388        let id = opts.listener_id();
389        self.add_listener(opts)?;
390        Ok(id)
391    }
392
393    /// Remove some listener.
394    ///
395    /// Returns `true` if there was a listener with this ID, `false`
396    /// otherwise.
397    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
398        self.transport.remove_listener(listener_id)
399    }
400
401    /// Dial a known or unknown peer.
402    ///
403    /// See also [`DialOpts`].
404    ///
405    /// ```
406    /// # use libp2p_swarm::Swarm;
407    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
408    /// # use libp2p_core::{Multiaddr, Transport};
409    /// # use libp2p_core::transport::dummy::DummyTransport;
410    /// # use libp2p_swarm::dummy;
411    /// # use libp2p_identity::PeerId;
412    /// #
413    /// # #[tokio::main]
414    /// # async fn main() {
415    /// let mut swarm = build_swarm();
416    ///
417    /// // Dial a known peer.
418    /// swarm.dial(PeerId::random());
419    ///
420    /// // Dial an unknown peer.
421    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
422    /// # }
423    ///
424    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
425    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
426    /// # }
427    /// ```
428    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
429        let dial_opts = opts.into();
430
431        let peer_id = dial_opts.get_peer_id();
432        let condition = dial_opts.peer_condition();
433        let connection_id = dial_opts.connection_id();
434
435        let should_dial = match (condition, peer_id) {
436            (_, None) => true,
437            (PeerCondition::Always, _) => true,
438            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
439            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
440            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
441                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
442            }
443        };
444
445        if !should_dial {
446            let e = DialError::DialPeerConditionFalse(condition);
447
448            self.behaviour
449                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
450                    peer_id,
451                    error: &e,
452                    connection_id,
453                }));
454
455            return Err(e);
456        }
457
458        let addresses = {
459            let mut addresses_from_opts = dial_opts.get_addresses();
460
461            match self.behaviour.handle_pending_outbound_connection(
462                connection_id,
463                peer_id,
464                addresses_from_opts.as_slice(),
465                dial_opts.role_override(),
466            ) {
467                Ok(addresses) => {
468                    if dial_opts.extend_addresses_through_behaviour() {
469                        addresses_from_opts.extend(addresses)
470                    } else {
471                        let num_addresses = addresses.len();
472
473                        if num_addresses > 0 {
474                            tracing::debug!(
475                                connection=%connection_id,
476                                discarded_addresses_count=%num_addresses,
477                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
478                            )
479                        }
480                    }
481                }
482                Err(cause) => {
483                    let error = DialError::Denied { cause };
484
485                    self.behaviour
486                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
487                            peer_id,
488                            error: &error,
489                            connection_id,
490                        }));
491
492                    return Err(error);
493                }
494            }
495
496            let mut unique_addresses = HashSet::new();
497            addresses_from_opts.retain(|addr| {
498                !self.listened_addrs.values().flatten().any(|a| a == addr)
499                    && unique_addresses.insert(addr.clone())
500            });
501
502            if addresses_from_opts.is_empty() {
503                let error = DialError::NoAddresses;
504                self.behaviour
505                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
506                        peer_id,
507                        error: &error,
508                        connection_id,
509                    }));
510                return Err(error);
511            };
512
513            addresses_from_opts
514        };
515
516        let dials = addresses
517            .into_iter()
518            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
519                Ok(address) => {
520                    let dial = self.transport.dial(
521                        address.clone(),
522                        transport::DialOpts {
523                            role: dial_opts.role_override(),
524                            port_use: dial_opts.port_use(),
525                        },
526                    );
527                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
528                    span.follows_from(tracing::Span::current());
529                    match dial {
530                        Ok(fut) => fut
531                            .map(|r| (address, r.map_err(TransportError::Other)))
532                            .instrument(span)
533                            .boxed(),
534                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
535                    }
536                }
537                Err(address) => futures::future::ready((
538                    address.clone(),
539                    Err(TransportError::MultiaddrNotSupported(address)),
540                ))
541                .boxed(),
542            })
543            .collect();
544
545        self.pool.add_outgoing(
546            dials,
547            peer_id,
548            dial_opts.role_override(),
549            dial_opts.port_use(),
550            dial_opts.dial_concurrency_override(),
551            connection_id,
552        );
553
554        Ok(())
555    }
556
557    /// Returns an iterator that produces the list of addresses we're listening on.
558    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
559        self.listened_addrs.values().flatten()
560    }
561
562    /// Returns the peer ID of the swarm passed as parameter.
563    pub fn local_peer_id(&self) -> &PeerId {
564        &self.local_peer_id
565    }
566
567    /// List all **confirmed** external address for the local node.
568    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
569        self.confirmed_external_addr.iter()
570    }
571
572    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
573        let addr = opts.address();
574        let listener_id = opts.listener_id();
575
576        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
577            self.behaviour
578                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
579                    listener_id,
580                    err: &e,
581                }));
582
583            return Err(e);
584        }
585
586        self.behaviour
587            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
588                listener_id,
589            }));
590
591        Ok(())
592    }
593
594    /// Add a **confirmed** external address for the local node.
595    ///
596    /// This function should only be called with addresses that are guaranteed to be reachable.
597    /// The address is broadcast to all [`NetworkBehaviour`]s via
598    /// [`FromSwarm::ExternalAddrConfirmed`].
599    pub fn add_external_address(&mut self, a: Multiaddr) {
600        self.behaviour
601            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
602                addr: &a,
603            }));
604        self.confirmed_external_addr.insert(a);
605    }
606
607    /// Remove an external address for the local node.
608    ///
609    /// The address is broadcast to all [`NetworkBehaviour`]s via
610    /// [`FromSwarm::ExternalAddrExpired`].
611    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
612        self.behaviour
613            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
614        self.confirmed_external_addr.remove(addr);
615    }
616
617    /// Add a new external address of a remote peer.
618    ///
619    /// The address is broadcast to all [`NetworkBehaviour`]s via
620    /// [`FromSwarm::NewExternalAddrOfPeer`].
621    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
622        self.behaviour
623            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
624                peer_id,
625                addr: &addr,
626            }))
627    }
628
629    /// Disconnects a peer by its peer ID, closing all connections to said peer.
630    ///
631    /// Returns `Ok(())` if there was one or more established connections to the peer.
632    ///
633    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll
634    /// [`ConnectionHandler::poll_close`] to completion. Use this function if you want to close
635    /// a connection _despite_ it still being in use by one or more handlers.
636    #[allow(clippy::result_unit_err)]
637    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
638        let was_connected = self.pool.is_connected(peer_id);
639        self.pool.disconnect(peer_id);
640
641        if was_connected {
642            Ok(())
643        } else {
644            Err(())
645        }
646    }
647
648    /// Attempt to gracefully close a connection.
649    ///
650    /// Closing a connection is asynchronous but this function will return immediately.
651    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted
652    /// once the connection is actually closed.
653    ///
654    /// # Returns
655    ///
656    /// - `true` if the connection was established and is now being closed.
657    /// - `false` if the connection was not found or is no longer established.
658    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
659        if let Some(established) = self.pool.get_established(connection_id) {
660            established.start_close();
661            return true;
662        }
663
664        false
665    }
666
667    /// Checks whether there is an established connection to a peer.
668    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
669        self.pool.is_connected(*peer_id)
670    }
671
672    /// Returns the currently connected peers.
673    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
674        self.pool.iter_connected()
675    }
676
677    /// Returns a reference to the provided [`NetworkBehaviour`].
678    pub fn behaviour(&self) -> &TBehaviour {
679        &self.behaviour
680    }
681
682    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
683    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
684        &mut self.behaviour
685    }
686
687    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
688        match event {
689            PoolEvent::ConnectionEstablished {
690                peer_id,
691                id,
692                endpoint,
693                connection,
694                concurrent_dial_errors,
695                established_in,
696            } => {
697                let handler = match endpoint.clone() {
698                    ConnectedPoint::Dialer {
699                        address,
700                        role_override,
701                        port_use,
702                    } => {
703                        match self.behaviour.handle_established_outbound_connection(
704                            id,
705                            peer_id,
706                            &address,
707                            role_override,
708                            port_use,
709                        ) {
710                            Ok(handler) => handler,
711                            Err(cause) => {
712                                let dial_error = DialError::Denied { cause };
713                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
714                                    DialFailure {
715                                        connection_id: id,
716                                        error: &dial_error,
717                                        peer_id: Some(peer_id),
718                                    },
719                                ));
720
721                                self.pending_swarm_events.push_back(
722                                    SwarmEvent::OutgoingConnectionError {
723                                        peer_id: Some(peer_id),
724                                        connection_id: id,
725                                        error: dial_error,
726                                    },
727                                );
728                                return;
729                            }
730                        }
731                    }
732                    ConnectedPoint::Listener {
733                        local_addr,
734                        send_back_addr,
735                    } => {
736                        match self.behaviour.handle_established_inbound_connection(
737                            id,
738                            peer_id,
739                            &local_addr,
740                            &send_back_addr,
741                        ) {
742                            Ok(handler) => handler,
743                            Err(cause) => {
744                                let listen_error = ListenError::Denied { cause };
745                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
746                                    ListenFailure {
747                                        local_addr: &local_addr,
748                                        send_back_addr: &send_back_addr,
749                                        error: &listen_error,
750                                        connection_id: id,
751                                        peer_id: Some(peer_id),
752                                    },
753                                ));
754
755                                self.pending_swarm_events.push_back(
756                                    SwarmEvent::IncomingConnectionError {
757                                        connection_id: id,
758                                        send_back_addr,
759                                        local_addr,
760                                        error: listen_error,
761                                        peer_id: Some(peer_id),
762                                    },
763                                );
764                                return;
765                            }
766                        }
767                    }
768                };
769
770                let supported_protocols = handler
771                    .listen_protocol()
772                    .upgrade()
773                    .protocol_info()
774                    .map(|p| p.as_ref().as_bytes().to_vec())
775                    .collect();
776                let other_established_connection_ids = self
777                    .pool
778                    .iter_established_connections_of_peer(&peer_id)
779                    .collect::<Vec<_>>();
780                let num_established = NonZeroU32::new(
781                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
782                )
783                .expect("n + 1 is always non-zero; qed");
784
785                self.pool
786                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
787
788                tracing::debug!(
789                    peer=%peer_id,
790                    ?endpoint,
791                    total_peers=%num_established,
792                    "Connection established"
793                );
794                let failed_addresses = concurrent_dial_errors
795                    .as_ref()
796                    .map(|es| {
797                        es.iter()
798                            .map(|(a, _)| a)
799                            .cloned()
800                            .collect::<Vec<Multiaddr>>()
801                    })
802                    .unwrap_or_default();
803                self.behaviour
804                    .on_swarm_event(FromSwarm::ConnectionEstablished(
805                        behaviour::ConnectionEstablished {
806                            peer_id,
807                            connection_id: id,
808                            endpoint: &endpoint,
809                            failed_addresses: &failed_addresses,
810                            other_established: other_established_connection_ids.len(),
811                        },
812                    ));
813                self.supported_protocols = supported_protocols;
814                self.pending_swarm_events
815                    .push_back(SwarmEvent::ConnectionEstablished {
816                        peer_id,
817                        connection_id: id,
818                        num_established,
819                        endpoint,
820                        concurrent_dial_errors,
821                        established_in,
822                    });
823            }
824            PoolEvent::PendingOutboundConnectionError {
825                id: connection_id,
826                error,
827                peer,
828            } => {
829                let error = error.into();
830
831                self.behaviour
832                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
833                        peer_id: peer,
834                        error: &error,
835                        connection_id,
836                    }));
837
838                if let Some(peer) = peer {
839                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
840                } else {
841                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
842                }
843
844                self.pending_swarm_events
845                    .push_back(SwarmEvent::OutgoingConnectionError {
846                        peer_id: peer,
847                        connection_id,
848                        error,
849                    });
850            }
851            PoolEvent::PendingInboundConnectionError {
852                id,
853                send_back_addr,
854                local_addr,
855                error,
856            } => {
857                let error = error.into();
858
859                tracing::debug!("Incoming connection failed: {:?}", error);
860                self.behaviour
861                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
862                        local_addr: &local_addr,
863                        send_back_addr: &send_back_addr,
864                        error: &error,
865                        connection_id: id,
866                        peer_id: None,
867                    }));
868                self.pending_swarm_events
869                    .push_back(SwarmEvent::IncomingConnectionError {
870                        connection_id: id,
871                        local_addr,
872                        send_back_addr,
873                        error,
874                        peer_id: None,
875                    });
876            }
877            PoolEvent::ConnectionClosed {
878                id,
879                connected,
880                error,
881                remaining_established_connection_ids,
882                ..
883            } => {
884                if let Some(error) = error.as_ref() {
885                    tracing::debug!(
886                        total_peers=%remaining_established_connection_ids.len(),
887                        "Connection closed with error {:?}: {:?}",
888                        error,
889                        connected,
890                    );
891                } else {
892                    tracing::debug!(
893                        total_peers=%remaining_established_connection_ids.len(),
894                        "Connection closed: {:?}",
895                        connected
896                    );
897                }
898                let peer_id = connected.peer_id;
899                let endpoint = connected.endpoint;
900                let num_established =
901                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
902
903                self.behaviour
904                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
905                        peer_id,
906                        connection_id: id,
907                        endpoint: &endpoint,
908                        cause: error.as_ref(),
909                        remaining_established: num_established as usize,
910                    }));
911                self.pending_swarm_events
912                    .push_back(SwarmEvent::ConnectionClosed {
913                        peer_id,
914                        connection_id: id,
915                        endpoint,
916                        cause: error,
917                        num_established,
918                    });
919            }
920            PoolEvent::ConnectionEvent { peer_id, id, event } => {
921                self.behaviour
922                    .on_connection_handler_event(peer_id, id, event);
923            }
924            PoolEvent::AddressChange {
925                peer_id,
926                id,
927                new_endpoint,
928                old_endpoint,
929            } => {
930                self.behaviour
931                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
932                        peer_id,
933                        connection_id: id,
934                        old: &old_endpoint,
935                        new: &new_endpoint,
936                    }));
937            }
938        }
939    }
940
941    fn handle_transport_event(
942        &mut self,
943        event: TransportEvent<
944            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
945            io::Error,
946        >,
947    ) {
948        match event {
949            TransportEvent::Incoming {
950                listener_id: _,
951                upgrade,
952                local_addr,
953                send_back_addr,
954            } => {
955                let connection_id = ConnectionId::next();
956
957                match self.behaviour.handle_pending_inbound_connection(
958                    connection_id,
959                    &local_addr,
960                    &send_back_addr,
961                ) {
962                    Ok(()) => {}
963                    Err(cause) => {
964                        let listen_error = ListenError::Denied { cause };
965
966                        self.behaviour
967                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
968                                local_addr: &local_addr,
969                                send_back_addr: &send_back_addr,
970                                error: &listen_error,
971                                connection_id,
972                                peer_id: None,
973                            }));
974
975                        self.pending_swarm_events
976                            .push_back(SwarmEvent::IncomingConnectionError {
977                                connection_id,
978                                local_addr,
979                                send_back_addr,
980                                error: listen_error,
981                                peer_id: None,
982                            });
983                        return;
984                    }
985                }
986
987                self.pool.add_incoming(
988                    upgrade,
989                    IncomingInfo {
990                        local_addr: &local_addr,
991                        send_back_addr: &send_back_addr,
992                    },
993                    connection_id,
994                );
995
996                self.pending_swarm_events
997                    .push_back(SwarmEvent::IncomingConnection {
998                        connection_id,
999                        local_addr,
1000                        send_back_addr,
1001                    })
1002            }
1003            TransportEvent::NewAddress {
1004                listener_id,
1005                listen_addr,
1006            } => {
1007                tracing::debug!(
1008                    listener=?listener_id,
1009                    address=%listen_addr,
1010                    "New listener address"
1011                );
1012                let addrs = self.listened_addrs.entry(listener_id).or_default();
1013                if !addrs.contains(&listen_addr) {
1014                    addrs.push(listen_addr.clone())
1015                }
1016                self.behaviour
1017                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1018                        listener_id,
1019                        addr: &listen_addr,
1020                    }));
1021                self.pending_swarm_events
1022                    .push_back(SwarmEvent::NewListenAddr {
1023                        listener_id,
1024                        address: listen_addr,
1025                    })
1026            }
1027            TransportEvent::AddressExpired {
1028                listener_id,
1029                listen_addr,
1030            } => {
1031                tracing::debug!(
1032                    listener=?listener_id,
1033                    address=%listen_addr,
1034                    "Expired listener address"
1035                );
1036                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1037                    addrs.retain(|a| a != &listen_addr);
1038                }
1039                self.behaviour
1040                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1041                        listener_id,
1042                        addr: &listen_addr,
1043                    }));
1044                self.pending_swarm_events
1045                    .push_back(SwarmEvent::ExpiredListenAddr {
1046                        listener_id,
1047                        address: listen_addr,
1048                    })
1049            }
1050            TransportEvent::ListenerClosed {
1051                listener_id,
1052                reason,
1053            } => {
1054                tracing::debug!(
1055                    listener=?listener_id,
1056                    ?reason,
1057                    "Listener closed"
1058                );
1059                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1060                for addr in addrs.iter() {
1061                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1062                        ExpiredListenAddr { listener_id, addr },
1063                    ));
1064                }
1065                self.behaviour
1066                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1067                        listener_id,
1068                        reason: reason.as_ref().copied(),
1069                    }));
1070                self.pending_swarm_events
1071                    .push_back(SwarmEvent::ListenerClosed {
1072                        listener_id,
1073                        addresses: addrs.to_vec(),
1074                        reason,
1075                    })
1076            }
1077            TransportEvent::ListenerError { listener_id, error } => {
1078                self.behaviour
1079                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1080                        listener_id,
1081                        err: &error,
1082                    }));
1083                self.pending_swarm_events
1084                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1085            }
1086        }
1087    }
1088
1089    fn handle_behaviour_event(
1090        &mut self,
1091        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1092    ) {
1093        match event {
1094            ToSwarm::GenerateEvent(event) => {
1095                self.pending_swarm_events
1096                    .push_back(SwarmEvent::Behaviour(event));
1097            }
1098            ToSwarm::Dial { opts } => {
1099                let peer_id = opts.get_peer_id();
1100                let connection_id = opts.connection_id();
1101                if let Ok(()) = self.dial(opts) {
1102                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1103                        peer_id,
1104                        connection_id,
1105                    });
1106                }
1107            }
1108            ToSwarm::ListenOn { opts } => {
1109                // Error is dispatched internally, safe to ignore.
1110                let _ = self.add_listener(opts);
1111            }
1112            ToSwarm::RemoveListener { id } => {
1113                self.remove_listener(id);
1114            }
1115            ToSwarm::NotifyHandler {
1116                peer_id,
1117                handler,
1118                event,
1119            } => {
1120                assert!(self.pending_handler_event.is_none());
1121                let handler = match handler {
1122                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1123                    NotifyHandler::Any => {
1124                        let ids = self
1125                            .pool
1126                            .iter_established_connections_of_peer(&peer_id)
1127                            .collect();
1128                        PendingNotifyHandler::Any(ids)
1129                    }
1130                };
1131
1132                self.pending_handler_event = Some((peer_id, handler, event));
1133            }
1134            ToSwarm::NewExternalAddrCandidate(addr) => {
1135                if !self.confirmed_external_addr.contains(&addr) {
1136                    self.behaviour
1137                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1138                            NewExternalAddrCandidate { addr: &addr },
1139                        ));
1140                    self.pending_swarm_events
1141                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1142                }
1143            }
1144            ToSwarm::ExternalAddrConfirmed(addr) => {
1145                self.add_external_address(addr.clone());
1146                self.pending_swarm_events
1147                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1148            }
1149            ToSwarm::ExternalAddrExpired(addr) => {
1150                self.remove_external_address(&addr);
1151                self.pending_swarm_events
1152                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1153            }
1154            ToSwarm::CloseConnection {
1155                peer_id,
1156                connection,
1157            } => match connection {
1158                CloseConnection::One(connection_id) => {
1159                    if let Some(conn) = self.pool.get_established(connection_id) {
1160                        conn.start_close();
1161                    }
1162                }
1163                CloseConnection::All => {
1164                    self.pool.disconnect(peer_id);
1165                }
1166            },
1167            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1168                self.behaviour
1169                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1170                        peer_id,
1171                        addr: &address,
1172                    }));
1173                self.pending_swarm_events
1174                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1175            }
1176        }
1177    }
1178
1179    /// Internal function used by everything event-related.
1180    ///
1181    /// Polls the `Swarm` for the next event.
1182    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1183    fn poll_next_event(
1184        mut self: Pin<&mut Self>,
1185        cx: &mut Context<'_>,
1186    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1187        // We use a `this` variable because the compiler can't mutably borrow multiple times
1188        // across a `Deref`.
1189        let this = &mut *self;
1190
1191        // This loop polls the components below in a prioritized order.
1192        //
1193        // 1. [`NetworkBehaviour`]
1194        // 2. Connection [`Pool`]
1195        // 3. [`ListenersStream`]
1196        //
1197        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1198        //
1199        // (2) is polled before (3) to prioritize existing connections
1200        // over upgrading new incoming connections.
1201        loop {
1202            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1203                return Poll::Ready(swarm_event);
1204            }
1205
1206            match this.pending_handler_event.take() {
1207                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the
1208                // previous iteration to the connection handler(s).
1209                Some((peer_id, handler, event)) => match handler {
1210                    PendingNotifyHandler::One(conn_id) => {
1211                        match this.pool.get_established(conn_id) {
1212                            Some(conn) => match notify_one(conn, event, cx) {
1213                                None => continue,
1214                                Some(event) => {
1215                                    this.pending_handler_event = Some((peer_id, handler, event));
1216                                }
1217                            },
1218                            None => continue,
1219                        }
1220                    }
1221                    PendingNotifyHandler::Any(ids) => {
1222                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1223                            None => continue,
1224                            Some((event, ids)) => {
1225                                let handler = PendingNotifyHandler::Any(ids);
1226                                this.pending_handler_event = Some((peer_id, handler, event));
1227                            }
1228                        }
1229                    }
1230                },
1231                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1232                None => match this.behaviour.poll(cx) {
1233                    Poll::Pending => {}
1234                    Poll::Ready(behaviour_event) => {
1235                        this.handle_behaviour_event(behaviour_event);
1236
1237                        continue;
1238                    }
1239                },
1240            }
1241
1242            // Poll the known peers.
1243            match this.pool.poll(cx) {
1244                Poll::Pending => {}
1245                Poll::Ready(pool_event) => {
1246                    this.handle_pool_event(pool_event);
1247                    continue;
1248                }
1249            }
1250
1251            // Poll the listener(s) for new connections.
1252            match Pin::new(&mut this.transport).poll(cx) {
1253                Poll::Pending => {}
1254                Poll::Ready(transport_event) => {
1255                    this.handle_transport_event(transport_event);
1256                    continue;
1257                }
1258            }
1259
1260            return Poll::Pending;
1261        }
1262    }
1263}
1264
1265/// Connection to notify of a pending event.
1266///
1267/// The connection IDs out of which to notify one of an event are captured at
1268/// the time the behaviour emits the event, in order not to forward the event to
1269/// a new connection which the behaviour may not have been aware of at the time
1270/// it issued the request for sending it.
1271enum PendingNotifyHandler {
1272    One(ConnectionId),
1273    Any(SmallVec<[ConnectionId; 10]>),
1274}
1275
1276/// Notify a single connection of an event.
1277///
1278/// Returns `Some` with the given event if the connection is not currently
1279/// ready to receive another event, in which case the current task is
1280/// scheduled to be woken up.
1281///
1282/// Returns `None` if the connection is closing or the event has been
1283/// successfully sent, in either case the event is consumed.
1284fn notify_one<THandlerInEvent>(
1285    conn: &mut EstablishedConnection<THandlerInEvent>,
1286    event: THandlerInEvent,
1287    cx: &mut Context<'_>,
1288) -> Option<THandlerInEvent> {
1289    match conn.poll_ready_notify_handler(cx) {
1290        Poll::Pending => Some(event),
1291        Poll::Ready(Err(())) => None, // connection is closing
1292        Poll::Ready(Ok(())) => {
1293            // Can now only fail if connection is closing.
1294            let _ = conn.notify_handler(event);
1295            None
1296        }
1297    }
1298}
1299
1300/// Notify any one of a given list of connections of a peer of an event.
1301///
1302/// Returns `Some` with the given event and a new list of connections if
1303/// none of the given connections was able to receive the event but at
1304/// least one of them is not closing, in which case the current task
1305/// is scheduled to be woken up. The returned connections are those which
1306/// may still become ready to receive another event.
1307///
1308/// Returns `None` if either all connections are closing or the event
1309/// was successfully sent to a handler, in either case the event is consumed.
1310fn notify_any<THandler, TBehaviour>(
1311    ids: SmallVec<[ConnectionId; 10]>,
1312    pool: &mut Pool<THandler>,
1313    event: THandlerInEvent<TBehaviour>,
1314    cx: &mut Context<'_>,
1315) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1316where
1317    TBehaviour: NetworkBehaviour,
1318    THandler: ConnectionHandler<
1319        FromBehaviour = THandlerInEvent<TBehaviour>,
1320        ToBehaviour = THandlerOutEvent<TBehaviour>,
1321    >,
1322{
1323    let mut pending = SmallVec::new();
1324    let mut event = Some(event); // (1)
1325    for id in ids.into_iter() {
1326        if let Some(conn) = pool.get_established(id) {
1327            match conn.poll_ready_notify_handler(cx) {
1328                Poll::Pending => pending.push(id),
1329                Poll::Ready(Err(())) => {} // connection is closing
1330                Poll::Ready(Ok(())) => {
1331                    let e = event.take().expect("by (1),(2)");
1332                    if let Err(e) = conn.notify_handler(e) {
1333                        event = Some(e) // (2)
1334                    } else {
1335                        break;
1336                    }
1337                }
1338            }
1339        }
1340    }
1341
1342    event.and_then(|e| {
1343        if !pending.is_empty() {
1344            Some((e, pending))
1345        } else {
1346            None
1347        }
1348    })
1349}
1350
1351/// Stream of events returned by [`Swarm`].
1352///
1353/// Includes events from the [`NetworkBehaviour`] as well as events about
1354/// connection and listener status. See [`SwarmEvent`] for details.
1355///
1356/// Note: This stream is infinite and it is guaranteed that
1357/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1358impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1359where
1360    TBehaviour: NetworkBehaviour,
1361{
1362    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1363
1364    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1365        self.as_mut().poll_next_event(cx).map(Some)
1366    }
1367}
1368
1369/// The stream of swarm events never terminates, so we can implement fused for it.
1370impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1371where
1372    TBehaviour: NetworkBehaviour,
1373{
1374    fn is_terminated(&self) -> bool {
1375        false
1376    }
1377}
1378
1379pub struct Config {
1380    pool_config: PoolConfig,
1381}
1382
1383impl Config {
1384    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1385    /// [`Swarm::new`].
1386    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1387        Self {
1388            pool_config: PoolConfig::new(Some(Box::new(executor))),
1389        }
1390    }
1391
1392    #[doc(hidden)]
1393    /// Used on connection benchmarks.
1394    pub fn without_executor() -> Self {
1395        Self {
1396            pool_config: PoolConfig::new(None),
1397        }
1398    }
1399
1400    /// Sets executor to the `wasm` executor.
1401    /// Background tasks will be executed by the browser on the next micro-tick.
1402    ///
1403    /// Spawning a task is similar too:
1404    /// ```typescript
1405    /// function spawn(task: () => Promise<void>) {
1406    ///     task()
1407    /// }
1408    /// ```
1409    #[cfg(feature = "wasm-bindgen")]
1410    pub fn with_wasm_executor() -> Self {
1411        Self::with_executor(crate::executor::WasmBindgenExecutor)
1412    }
1413
1414    /// Builds a new [`Config`] from the given `tokio` executor.
1415    #[cfg(all(
1416        feature = "tokio",
1417        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1418    ))]
1419    pub fn with_tokio_executor() -> Self {
1420        Self::with_executor(crate::executor::TokioExecutor)
1421    }
1422
1423    /// Configures the number of events from the [`NetworkBehaviour`] in
1424    /// destination to the [`ConnectionHandler`] that can be buffered before
1425    /// the [`Swarm`] has to wait. An individual buffer with this number of
1426    /// events exists for each individual connection.
1427    ///
1428    /// The ideal value depends on the executor used, the CPU speed, and the
1429    /// volume of events. If this value is too low, then the [`Swarm`] will
1430    /// be sleeping more often than necessary. Increasing this value increases
1431    /// the overall memory usage.
1432    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1433        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1434        self
1435    }
1436
1437    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1438    /// [`NetworkBehaviour`].
1439    ///
1440    /// Each connection has its own buffer.
1441    ///
1442    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1443    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1444    /// than necessary. Increasing this value increases the overall memory
1445    /// usage, and more importantly the latency between the moment when an
1446    /// event is emitted and the moment when it is received by the
1447    /// [`NetworkBehaviour`].
1448    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1449        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1450        self
1451    }
1452
1453    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1454    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1455        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1456        self
1457    }
1458
1459    /// Configures an override for the substream upgrade protocol to use.
1460    ///
1461    /// The subtream upgrade protocol is the multistream-select protocol
1462    /// used for protocol negotiation on substreams. Since a listener
1463    /// supports all existing versions, the choice of upgrade protocol
1464    /// only effects the "dialer", i.e. the peer opening a substream.
1465    ///
1466    /// > **Note**: If configured, specific upgrade protocols for
1467    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1468    /// > are ignored.
1469    pub fn with_substream_upgrade_protocol_override(
1470        mut self,
1471        v: libp2p_core::upgrade::Version,
1472    ) -> Self {
1473        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1474        self
1475    }
1476
1477    /// The maximum number of inbound streams concurrently negotiating on a
1478    /// connection. New inbound streams exceeding the limit are dropped and thus
1479    /// reset.
1480    ///
1481    /// Note: This only enforces a limit on the number of concurrently
1482    /// negotiating inbound streams. The total number of inbound streams on a
1483    /// connection is the sum of negotiating and negotiated streams. A limit on
1484    /// the total number of streams can be enforced at the
1485    /// [`StreamMuxerBox`] level.
1486    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1487        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1488        self
1489    }
1490
1491    /// How long to keep a connection alive once it is idling.
1492    ///
1493    /// Defaults to 10s.
1494    ///
1495    /// Typically, you shouldn't _need_ to modify this default as connections will be kept alive
1496    /// whilst they are "in use" (see below). Depending on the application's usecase, it may be
1497    /// desirable to keep connections alive despite them not being in use.
1498    ///
1499    /// A connection is considered idle if:
1500    /// - There are no active inbound streams.
1501    /// - There are no active outbounds streams.
1502    /// - There are no pending outbound streams (i.e. all streams requested via
1503    ///   [`ConnectionHandlerEvent::OutboundSubstreamRequest`] have completed).
1504    /// - Every [`ConnectionHandler`] returns `false` from
1505    ///   [`ConnectionHandler::connection_keep_alive`].
1506    ///
1507    /// Once all these conditions are true, the idle connection timeout starts ticking.
1508    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1509        self.pool_config.idle_connection_timeout = timeout;
1510        self
1511    }
1512}
1513
1514/// Possible errors when trying to establish or upgrade an outbound connection.
1515#[derive(Debug)]
1516pub enum DialError {
1517    /// The peer identity obtained on the connection matches the local peer.
1518    LocalPeerId { address: Multiaddr },
1519    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`]
1520    /// and [`DialOpts`].
1521    NoAddresses,
1522    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1523    /// the dial was aborted.
1524    DialPeerConditionFalse(dial_opts::PeerCondition),
1525    /// Pending connection attempt has been aborted.
1526    Aborted,
1527    /// The peer identity obtained on the connection did not match the one that was expected.
1528    WrongPeerId {
1529        obtained: PeerId,
1530        address: Multiaddr,
1531    },
1532    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1533    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1534    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1535    Denied { cause: ConnectionDenied },
1536    /// An error occurred while negotiating the transport protocol(s) on a connection.
1537    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1538}
1539
1540impl From<PendingOutboundConnectionError> for DialError {
1541    fn from(error: PendingOutboundConnectionError) -> Self {
1542        match error {
1543            PendingOutboundConnectionError::Aborted => DialError::Aborted,
1544            PendingOutboundConnectionError::WrongPeerId { obtained, address } => {
1545                DialError::WrongPeerId { obtained, address }
1546            }
1547            PendingOutboundConnectionError::LocalPeerId { address } => {
1548                DialError::LocalPeerId { address }
1549            }
1550            PendingOutboundConnectionError::Transport(e) => DialError::Transport(e),
1551        }
1552    }
1553}
1554
1555impl fmt::Display for DialError {
1556    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1557        match self {
1558            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1559            DialError::LocalPeerId { address } => write!(
1560                f,
1561                "Dial error: tried to dial local peer id at {address:?}."
1562            ),
1563            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1564            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1565            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1566            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1567            DialError::Aborted => write!(
1568                f,
1569                "Dial error: Pending connection attempt has been aborted."
1570            ),
1571            DialError::WrongPeerId { obtained, address } => write!(
1572                f,
1573                "Dial error: Unexpected peer ID {obtained} at {address:?}."
1574            ),
1575            DialError::Transport(errors) => {
1576                write!(f, "Failed to negotiate transport protocol(s): [")?;
1577
1578                for (addr, error) in errors {
1579                    write!(f, "({addr}")?;
1580                    print_error_chain(f, error)?;
1581                    write!(f, ")")?;
1582                }
1583                write!(f, "]")?;
1584
1585                Ok(())
1586            }
1587            DialError::Denied { .. } => {
1588                write!(f, "Dial error")
1589            }
1590        }
1591    }
1592}
1593
1594fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1595    write!(f, ": {e}")?;
1596
1597    if let Some(source) = e.source() {
1598        print_error_chain(f, source)?;
1599    }
1600
1601    Ok(())
1602}
1603
1604impl error::Error for DialError {
1605    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1606        match self {
1607            DialError::LocalPeerId { .. } => None,
1608            DialError::NoAddresses => None,
1609            DialError::DialPeerConditionFalse(_) => None,
1610            DialError::Aborted => None,
1611            DialError::WrongPeerId { .. } => None,
1612            DialError::Transport(_) => None,
1613            DialError::Denied { cause } => Some(cause),
1614        }
1615    }
1616}
1617
1618/// Possible errors when upgrading an inbound connection.
1619#[derive(Debug)]
1620pub enum ListenError {
1621    /// Pending connection attempt has been aborted.
1622    Aborted,
1623    /// The peer identity obtained on the connection did not match the one that was expected.
1624    WrongPeerId {
1625        obtained: PeerId,
1626        endpoint: ConnectedPoint,
1627    },
1628    /// The connection was dropped because it resolved to our own [`PeerId`].
1629    LocalPeerId {
1630        address: Multiaddr,
1631    },
1632    Denied {
1633        cause: ConnectionDenied,
1634    },
1635    /// An error occurred while negotiating the transport protocol(s) on a connection.
1636    Transport(TransportError<io::Error>),
1637}
1638
1639impl From<PendingInboundConnectionError> for ListenError {
1640    fn from(error: PendingInboundConnectionError) -> Self {
1641        match error {
1642            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1643            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1644            PendingInboundConnectionError::LocalPeerId { address } => {
1645                ListenError::LocalPeerId { address }
1646            }
1647        }
1648    }
1649}
1650
1651impl fmt::Display for ListenError {
1652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1653        match self {
1654            ListenError::Aborted => write!(
1655                f,
1656                "Listen error: Pending connection attempt has been aborted."
1657            ),
1658            ListenError::WrongPeerId { obtained, endpoint } => write!(
1659                f,
1660                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1661            ),
1662            ListenError::Transport(_) => {
1663                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1664            }
1665            ListenError::Denied { cause } => {
1666                write!(f, "Listen error: Denied: {cause}")
1667            }
1668            ListenError::LocalPeerId { address } => {
1669                write!(f, "Listen error: Local peer ID at {address:?}.")
1670            }
1671        }
1672    }
1673}
1674
1675impl error::Error for ListenError {
1676    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1677        match self {
1678            ListenError::WrongPeerId { .. } => None,
1679            ListenError::Transport(err) => Some(err),
1680            ListenError::Aborted => None,
1681            ListenError::Denied { cause } => Some(cause),
1682            ListenError::LocalPeerId { .. } => None,
1683        }
1684    }
1685}
1686
1687/// A connection was denied.
1688///
1689/// To figure out which [`NetworkBehaviour`] denied the connection, use
1690/// [`ConnectionDenied::downcast`].
1691#[derive(Debug)]
1692pub struct ConnectionDenied {
1693    inner: Box<dyn error::Error + Send + Sync + 'static>,
1694}
1695
1696impl ConnectionDenied {
1697    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1698        Self {
1699            inner: cause.into(),
1700        }
1701    }
1702
1703    /// Attempt to downcast to a particular reason for why the connection was denied.
1704    pub fn downcast<E>(self) -> Result<E, Self>
1705    where
1706        E: error::Error + Send + Sync + 'static,
1707    {
1708        let inner = self
1709            .inner
1710            .downcast::<E>()
1711            .map_err(|inner| ConnectionDenied { inner })?;
1712
1713        Ok(*inner)
1714    }
1715
1716    /// Attempt to downcast to a particular reason for why the connection was denied.
1717    pub fn downcast_ref<E>(&self) -> Option<&E>
1718    where
1719        E: error::Error + Send + Sync + 'static,
1720    {
1721        self.inner.downcast_ref::<E>()
1722    }
1723}
1724
1725impl fmt::Display for ConnectionDenied {
1726    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1727        write!(f, "connection denied")
1728    }
1729}
1730
1731impl error::Error for ConnectionDenied {
1732    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1733        Some(self.inner.as_ref())
1734    }
1735}
1736
1737/// Information about the connections obtained by [`Swarm::network_info()`].
1738#[derive(Clone, Debug)]
1739pub struct NetworkInfo {
1740    /// The total number of connected peers.
1741    num_peers: usize,
1742    /// Counters of ongoing network connections.
1743    connection_counters: ConnectionCounters,
1744}
1745
1746impl NetworkInfo {
1747    /// The number of connected peers, i.e. peers with whom at least
1748    /// one established connection exists.
1749    pub fn num_peers(&self) -> usize {
1750        self.num_peers
1751    }
1752
1753    /// Gets counters for ongoing network connections.
1754    pub fn connection_counters(&self) -> &ConnectionCounters {
1755        &self.connection_counters
1756    }
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761    use libp2p_core::{
1762        multiaddr,
1763        multiaddr::multiaddr,
1764        transport,
1765        transport::{memory::MemoryTransportError, TransportEvent},
1766        upgrade,
1767    };
1768    use libp2p_identity as identity;
1769    use libp2p_plaintext as plaintext;
1770    use libp2p_yamux as yamux;
1771    use quickcheck::*;
1772
1773    use super::*;
1774    use crate::test::{CallTraceBehaviour, MockBehaviour};
1775
1776    // Test execution state.
1777    // Connection => Disconnecting => Connecting.
1778    enum State {
1779        Connecting,
1780        Disconnecting,
1781    }
1782
1783    fn new_test_swarm(
1784        config: Config,
1785    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1786        let id_keys = identity::Keypair::generate_ed25519();
1787        let local_public_key = id_keys.public();
1788        let transport = transport::MemoryTransport::default()
1789            .upgrade(upgrade::Version::V1)
1790            .authenticate(plaintext::Config::new(&id_keys))
1791            .multiplex(yamux::Config::default())
1792            .boxed();
1793        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1794
1795        Swarm::new(transport, behaviour, local_public_key.into(), config)
1796    }
1797
1798    fn swarms_connected<TBehaviour>(
1799        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1800        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1801        num_connections: usize,
1802    ) -> bool
1803    where
1804        TBehaviour: NetworkBehaviour,
1805        THandlerOutEvent<TBehaviour>: Clone,
1806    {
1807        swarm1
1808            .behaviour()
1809            .num_connections_to_peer(*swarm2.local_peer_id())
1810            == num_connections
1811            && swarm2
1812                .behaviour()
1813                .num_connections_to_peer(*swarm1.local_peer_id())
1814                == num_connections
1815            && swarm1.is_connected(swarm2.local_peer_id())
1816            && swarm2.is_connected(swarm1.local_peer_id())
1817    }
1818
1819    fn swarms_disconnected<TBehaviour>(
1820        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1821        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1822    ) -> bool
1823    where
1824        TBehaviour: NetworkBehaviour,
1825        THandlerOutEvent<TBehaviour>: Clone,
1826    {
1827        swarm1
1828            .behaviour()
1829            .num_connections_to_peer(*swarm2.local_peer_id())
1830            == 0
1831            && swarm2
1832                .behaviour()
1833                .num_connections_to_peer(*swarm1.local_peer_id())
1834                == 0
1835            && !swarm1.is_connected(swarm2.local_peer_id())
1836            && !swarm2.is_connected(swarm1.local_peer_id())
1837    }
1838
1839    /// Establishes multiple connections between two peers,
1840    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1841    ///
1842    /// The test expects both behaviours to be notified via calls to
1843    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1844    /// / [`FromSwarm::ConnectionClosed`]
1845    #[tokio::test]
1846    async fn test_swarm_disconnect() {
1847        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1848        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1849
1850        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1851        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1852
1853        swarm1.listen_on(addr1.clone()).unwrap();
1854        swarm2.listen_on(addr2.clone()).unwrap();
1855
1856        let swarm1_id = *swarm1.local_peer_id();
1857
1858        let mut reconnected = false;
1859        let num_connections = 10;
1860
1861        for _ in 0..num_connections {
1862            swarm1.dial(addr2.clone()).unwrap();
1863        }
1864        let mut state = State::Connecting;
1865
1866        future::poll_fn(move |cx| loop {
1867            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1868            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1869            match state {
1870                State::Connecting => {
1871                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1872                        if reconnected {
1873                            return Poll::Ready(());
1874                        }
1875                        swarm2
1876                            .disconnect_peer_id(swarm1_id)
1877                            .expect("Error disconnecting");
1878                        state = State::Disconnecting;
1879                    }
1880                }
1881                State::Disconnecting => {
1882                    if swarms_disconnected(&swarm1, &swarm2) {
1883                        if reconnected {
1884                            return Poll::Ready(());
1885                        }
1886                        reconnected = true;
1887                        for _ in 0..num_connections {
1888                            swarm2.dial(addr1.clone()).unwrap();
1889                        }
1890                        state = State::Connecting;
1891                    }
1892                }
1893            }
1894
1895            if poll1.is_pending() && poll2.is_pending() {
1896                return Poll::Pending;
1897            }
1898        })
1899        .await
1900    }
1901
1902    /// Establishes multiple connections between two peers,
1903    /// after which one peer disconnects the other
1904    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1905    ///
1906    /// The test expects both behaviours to be notified via calls to
1907    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1908    /// / [`FromSwarm::ConnectionClosed`]
1909    #[tokio::test]
1910    async fn test_behaviour_disconnect_all() {
1911        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1912        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1913
1914        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1915        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1916
1917        swarm1.listen_on(addr1.clone()).unwrap();
1918        swarm2.listen_on(addr2.clone()).unwrap();
1919
1920        let swarm1_id = *swarm1.local_peer_id();
1921
1922        let mut reconnected = false;
1923        let num_connections = 10;
1924
1925        for _ in 0..num_connections {
1926            swarm1.dial(addr2.clone()).unwrap();
1927        }
1928        let mut state = State::Connecting;
1929
1930        future::poll_fn(move |cx| loop {
1931            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1932            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1933            match state {
1934                State::Connecting => {
1935                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1936                        if reconnected {
1937                            return Poll::Ready(());
1938                        }
1939                        swarm2
1940                            .behaviour
1941                            .inner()
1942                            .next_action
1943                            .replace(ToSwarm::CloseConnection {
1944                                peer_id: swarm1_id,
1945                                connection: CloseConnection::All,
1946                            });
1947                        state = State::Disconnecting;
1948                        continue;
1949                    }
1950                }
1951                State::Disconnecting => {
1952                    if swarms_disconnected(&swarm1, &swarm2) {
1953                        reconnected = true;
1954                        for _ in 0..num_connections {
1955                            swarm2.dial(addr1.clone()).unwrap();
1956                        }
1957                        state = State::Connecting;
1958                        continue;
1959                    }
1960                }
1961            }
1962
1963            if poll1.is_pending() && poll2.is_pending() {
1964                return Poll::Pending;
1965            }
1966        })
1967        .await
1968    }
1969
1970    /// Establishes multiple connections between two peers,
1971    /// after which one peer closes a single connection
1972    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1973    ///
1974    /// The test expects both behaviours to be notified via calls to
1975    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1976    /// / [`FromSwarm::ConnectionClosed`]
1977    #[tokio::test]
1978    async fn test_behaviour_disconnect_one() {
1979        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1980        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1981
1982        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1983        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1984
1985        swarm1.listen_on(addr1).unwrap();
1986        swarm2.listen_on(addr2.clone()).unwrap();
1987
1988        let swarm1_id = *swarm1.local_peer_id();
1989
1990        let num_connections = 10;
1991
1992        for _ in 0..num_connections {
1993            swarm1.dial(addr2.clone()).unwrap();
1994        }
1995        let mut state = State::Connecting;
1996        let mut disconnected_conn_id = None;
1997
1998        future::poll_fn(move |cx| loop {
1999            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2000            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2001            match state {
2002                State::Connecting => {
2003                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2004                        disconnected_conn_id = {
2005                            let conn_id =
2006                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2007                            swarm2.behaviour.inner().next_action.replace(
2008                                ToSwarm::CloseConnection {
2009                                    peer_id: swarm1_id,
2010                                    connection: CloseConnection::One(conn_id),
2011                                },
2012                            );
2013                            Some(conn_id)
2014                        };
2015                        state = State::Disconnecting;
2016                    }
2017                }
2018                State::Disconnecting => {
2019                    for s in &[&swarm1, &swarm2] {
2020                        assert!(s
2021                            .behaviour
2022                            .on_connection_closed
2023                            .iter()
2024                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2025                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2026                        s.behaviour.assert_connected(num_connections, 1);
2027                    }
2028                    if [&swarm1, &swarm2]
2029                        .iter()
2030                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2031                    {
2032                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2033                        assert_eq!(Some(conn_id), disconnected_conn_id);
2034                        return Poll::Ready(());
2035                    }
2036                }
2037            }
2038
2039            if poll1.is_pending() && poll2.is_pending() {
2040                return Poll::Pending;
2041            }
2042        })
2043        .await
2044    }
2045
2046    #[test]
2047    fn concurrent_dialing() {
2048        #[derive(Clone, Debug)]
2049        struct DialConcurrencyFactor(NonZeroU8);
2050
2051        impl Arbitrary for DialConcurrencyFactor {
2052            fn arbitrary(g: &mut Gen) -> Self {
2053                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2054            }
2055        }
2056
2057        fn prop(concurrency_factor: DialConcurrencyFactor) {
2058            tokio::runtime::Runtime::new().unwrap().block_on(async {
2059                let mut swarm = new_test_swarm(
2060                    Config::with_tokio_executor()
2061                        .with_dial_concurrency_factor(concurrency_factor.0),
2062                );
2063
2064                // Listen on `concurrency_factor + 1` addresses.
2065                //
2066                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2067                let num_listen_addrs = concurrency_factor.0.get() + 2;
2068                let mut listen_addresses = Vec::new();
2069                let mut transports = Vec::new();
2070                for _ in 0..num_listen_addrs {
2071                    let mut transport = transport::MemoryTransport::default().boxed();
2072                    transport
2073                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2074                        .unwrap();
2075
2076                    match transport.select_next_some().await {
2077                        TransportEvent::NewAddress { listen_addr, .. } => {
2078                            listen_addresses.push(listen_addr);
2079                        }
2080                        _ => panic!("Expected `NewListenAddr` event."),
2081                    }
2082
2083                    transports.push(transport);
2084                }
2085
2086                // Have swarm dial each listener and wait for each listener to receive the incoming
2087                // connections.
2088                swarm
2089                    .dial(
2090                        DialOpts::peer_id(PeerId::random())
2091                            .addresses(listen_addresses)
2092                            .build(),
2093                    )
2094                    .unwrap();
2095                for mut transport in transports.into_iter() {
2096                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2097                    {
2098                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2099                        future::Either::Left(_) => {
2100                            panic!("Unexpected transport event.")
2101                        }
2102                        future::Either::Right((e, _)) => {
2103                            panic!("Expect swarm to not emit any event {e:?}")
2104                        }
2105                    }
2106                }
2107
2108                match swarm.next().await.unwrap() {
2109                    SwarmEvent::OutgoingConnectionError { .. } => {}
2110                    e => panic!("Unexpected swarm event {e:?}"),
2111                }
2112            })
2113        }
2114
2115        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2116    }
2117
2118    #[tokio::test]
2119    async fn invalid_peer_id() {
2120        // Checks whether dialing an address containing the wrong peer id raises an error
2121        // for the expected peer id instead of the obtained peer id.
2122
2123        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2124        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2125
2126        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2127
2128        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2129            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2130            Poll::Pending => Poll::Pending,
2131            _ => panic!("Was expecting the listen address to be reported"),
2132        })
2133        .await;
2134
2135        let other_id = PeerId::random();
2136        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2137
2138        swarm2.dial(other_addr.clone()).unwrap();
2139
2140        let (peer_id, error) = future::poll_fn(|cx| {
2141            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2142                swarm1.poll_next_unpin(cx)
2143            {}
2144
2145            match swarm2.poll_next_unpin(cx) {
2146                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2147                    peer_id, error, ..
2148                })) => Poll::Ready((peer_id, error)),
2149                Poll::Ready(x) => panic!("unexpected {x:?}"),
2150                Poll::Pending => Poll::Pending,
2151            }
2152        })
2153        .await;
2154        assert_eq!(peer_id.unwrap(), other_id);
2155        match error {
2156            DialError::WrongPeerId { obtained, address } => {
2157                assert_eq!(obtained, *swarm1.local_peer_id());
2158                assert_eq!(address, other_addr);
2159            }
2160            x => panic!("wrong error {x:?}"),
2161        }
2162    }
2163
2164    #[tokio::test]
2165    async fn dial_self() {
2166        // Check whether dialing ourselves correctly fails.
2167        //
2168        // Dialing the same address we're listening should result in three events:
2169        //
2170        // - The incoming connection notification (before we know the incoming peer ID).
2171        // - The connection error for the dialing endpoint (once we've determined that it's our own
2172        //   ID).
2173        // - The connection error for the listening endpoint (once we've determined that it's our
2174        //   own ID).
2175        //
2176        // The last two can happen in any order.
2177
2178        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2179        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2180
2181        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2182            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2183            Poll::Pending => Poll::Pending,
2184            _ => panic!("Was expecting the listen address to be reported"),
2185        })
2186        .await;
2187
2188        // This is a hack to actually execute the dial
2189        // to ourselves which would otherwise be filtered.
2190        swarm.listened_addrs.clear();
2191        swarm.dial(local_address.clone()).unwrap();
2192
2193        let mut got_dial_err = false;
2194        let mut got_inc_err = false;
2195        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2196            loop {
2197                match swarm.poll_next_unpin(cx) {
2198                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2199                        peer_id,
2200                        error: DialError::LocalPeerId { .. },
2201                        ..
2202                    })) => {
2203                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2204                        assert!(!got_dial_err);
2205                        got_dial_err = true;
2206                        if got_inc_err {
2207                            return Poll::Ready(Ok(()));
2208                        }
2209                    }
2210                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2211                        local_addr, ..
2212                    })) => {
2213                        assert!(!got_inc_err);
2214                        assert_eq!(local_addr, local_address);
2215                        got_inc_err = true;
2216                        if got_dial_err {
2217                            return Poll::Ready(Ok(()));
2218                        }
2219                    }
2220                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2221                        assert_eq!(local_addr, local_address);
2222                    }
2223                    Poll::Ready(ev) => {
2224                        panic!("Unexpected event: {ev:?}")
2225                    }
2226                    Poll::Pending => break Poll::Pending,
2227                }
2228            }
2229        })
2230        .await
2231        .unwrap();
2232    }
2233
2234    #[tokio::test]
2235    async fn dial_self_by_id() {
2236        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2237        // place.
2238        let swarm = new_test_swarm(Config::with_tokio_executor());
2239        let peer_id = *swarm.local_peer_id();
2240        assert!(!swarm.is_connected(&peer_id));
2241    }
2242
2243    #[tokio::test]
2244    async fn multiple_addresses_err() {
2245        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2246
2247        let target = PeerId::random();
2248
2249        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2250
2251        let addresses = HashSet::from([
2252            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2253            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2254            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2255            multiaddr![Udp(rand::random::<u16>())],
2256            multiaddr![Udp(rand::random::<u16>())],
2257            multiaddr![Udp(rand::random::<u16>())],
2258            multiaddr![Udp(rand::random::<u16>())],
2259            multiaddr![Udp(rand::random::<u16>())],
2260        ]);
2261
2262        swarm
2263            .dial(
2264                DialOpts::peer_id(target)
2265                    .addresses(addresses.iter().cloned().collect())
2266                    .build(),
2267            )
2268            .unwrap();
2269
2270        match swarm.next().await.unwrap() {
2271            SwarmEvent::OutgoingConnectionError {
2272                peer_id,
2273                // multiaddr,
2274                error: DialError::Transport(errors),
2275                ..
2276            } => {
2277                assert_eq!(target, peer_id.unwrap());
2278
2279                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2280                let expected_addresses = addresses
2281                    .into_iter()
2282                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2283                    .collect::<Vec<_>>();
2284
2285                assert_eq!(expected_addresses, failed_addresses);
2286            }
2287            e => panic!("Unexpected event: {e:?}"),
2288        }
2289    }
2290
2291    #[tokio::test]
2292    async fn aborting_pending_connection_surfaces_error() {
2293        let _ = tracing_subscriber::fmt()
2294            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2295            .try_init();
2296
2297        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2298        let mut listener = new_test_swarm(Config::with_tokio_executor());
2299
2300        let listener_peer_id = *listener.local_peer_id();
2301        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2302        let listener_address = match listener.next().await.unwrap() {
2303            SwarmEvent::NewListenAddr { address, .. } => address,
2304            e => panic!("Unexpected network event: {e:?}"),
2305        };
2306
2307        dialer
2308            .dial(
2309                DialOpts::peer_id(listener_peer_id)
2310                    .addresses(vec![listener_address])
2311                    .build(),
2312            )
2313            .unwrap();
2314
2315        dialer
2316            .disconnect_peer_id(listener_peer_id)
2317            .expect_err("Expect peer to not yet be connected.");
2318
2319        match dialer.next().await.unwrap() {
2320            SwarmEvent::OutgoingConnectionError {
2321                error: DialError::Aborted,
2322                ..
2323            } => {}
2324            e => panic!("Unexpected swarm event {e:?}."),
2325        }
2326    }
2327
2328    #[test]
2329    fn dial_error_prints_sources() {
2330        // This constitutes a fairly typical error for chained transports.
2331        let error = DialError::Transport(vec![(
2332            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2333            TransportError::Other(io::Error::other(MemoryTransportError::Unreachable)),
2334        )]);
2335
2336        let string = format!("{error}");
2337
2338        // Unfortunately, we have some "empty" errors
2339        // that lead to multiple colons without text but that is the best we can do.
2340        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2341    }
2342}