libp2p_swarm/connection/
pool.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21use std::{
22    collections::HashMap,
23    convert::Infallible,
24    fmt,
25    num::{NonZeroU8, NonZeroUsize},
26    pin::Pin,
27    task::{Context, Poll, Waker},
28};
29
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::{
33    channel::{mpsc, oneshot},
34    future::{poll_fn, BoxFuture, Either},
35    prelude::*,
36    ready,
37    stream::{FuturesUnordered, SelectAll},
38};
39use libp2p_core::{
40    connection::Endpoint,
41    muxing::{StreamMuxerBox, StreamMuxerExt},
42    transport::PortUse,
43};
44use tracing::Instrument;
45use web_time::{Duration, Instant};
46
47use crate::{
48    connection::{
49        Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
50        PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
51    },
52    transport::TransportError,
53    ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
54};
55
56mod concurrent_dial;
57mod task;
58
59enum ExecSwitch {
60    Executor(Box<dyn Executor + Send>),
61    LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
62}
63
64impl ExecSwitch {
65    fn advance_local(&mut self, cx: &mut Context) {
66        match self {
67            ExecSwitch::Executor(_) => {}
68            ExecSwitch::LocalSpawn(local) => {
69                while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
70            }
71        }
72    }
73
74    #[track_caller]
75    fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
76        let task = task.boxed();
77
78        match self {
79            Self::Executor(executor) => executor.exec(task),
80            Self::LocalSpawn(local) => local.push(task),
81        }
82    }
83}
84
85/// A connection `Pool` manages a set of connections for each peer.
86pub(crate) struct Pool<THandler>
87where
88    THandler: ConnectionHandler,
89{
90    local_id: PeerId,
91
92    /// The connection counter(s).
93    counters: ConnectionCounters,
94
95    /// The managed connections of each peer that are currently considered established.
96    established: FnvHashMap<
97        PeerId,
98        FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
99    >,
100
101    /// The pending connections that are currently being negotiated.
102    pending: HashMap<ConnectionId, PendingConnection>,
103
104    /// Size of the task command buffer (per task).
105    task_command_buffer_size: usize,
106
107    /// Number of addresses concurrently dialed for a single outbound connection attempt.
108    dial_concurrency_factor: NonZeroU8,
109
110    /// The configured override for substream protocol upgrades, if any.
111    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
112
113    /// The maximum number of inbound streams concurrently negotiating on a connection.
114    ///
115    /// See [`Connection::max_negotiating_inbound_streams`].
116    max_negotiating_inbound_streams: usize,
117
118    /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is
119    /// back-pressured.
120    per_connection_event_buffer_size: usize,
121
122    /// The executor to use for running connection tasks. Can either be a global executor
123    /// or a local queue.
124    executor: ExecSwitch,
125
126    /// Sender distributed to pending tasks for reporting events back
127    /// to the pool.
128    pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
129
130    /// Receiver for events reported from pending tasks.
131    pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
132
133    /// Waker in case we haven't established any connections yet.
134    no_established_connections_waker: Option<Waker>,
135
136    /// Receivers for events reported from established connections.
137    established_connection_events:
138        SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
139
140    /// Receivers for [`NewConnection`] objects that are dropped.
141    new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
142
143    /// How long a connection should be kept alive once it starts idling.
144    idle_connection_timeout: Duration,
145}
146
147#[derive(Debug)]
148pub(crate) struct EstablishedConnection<TInEvent> {
149    endpoint: ConnectedPoint,
150    /// Channel endpoint to send commands to the task.
151    sender: mpsc::Sender<task::Command<TInEvent>>,
152}
153
154impl<TInEvent> EstablishedConnection<TInEvent> {
155    /// (Asynchronously) sends an event to the connection handler.
156    ///
157    /// If the handler is not ready to receive the event, either because
158    /// it is busy or the connection is about to close, the given event
159    /// is returned with an `Err`.
160    ///
161    /// If execution of this method is preceded by successful execution of
162    /// `poll_ready_notify_handler` without another intervening execution
163    /// of `notify_handler`, it only fails if the connection is now about
164    /// to close.
165    pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
166        let cmd = task::Command::NotifyHandler(event);
167        self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
168            task::Command::NotifyHandler(event) => event,
169            _ => unreachable!("Expect failed send to return initial event."),
170        })
171    }
172
173    /// Checks if `notify_handler` is ready to accept an event.
174    ///
175    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
176    ///
177    /// Returns `Err(())` if the background task associated with the connection
178    /// is terminating and the connection is about to close.
179    pub(crate) fn poll_ready_notify_handler(
180        &mut self,
181        cx: &mut Context<'_>,
182    ) -> Poll<Result<(), ()>> {
183        self.sender.poll_ready(cx).map_err(|_| ())
184    }
185
186    /// Initiates a graceful close of the connection.
187    ///
188    /// Has no effect if the connection is already closing.
189    pub(crate) fn start_close(&mut self) {
190        // Clone the sender so that we are guaranteed to have
191        // capacity for the close command (every sender gets a slot).
192        match self.sender.clone().try_send(task::Command::Close) {
193            Ok(()) => {}
194            Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
195        };
196    }
197}
198
199struct PendingConnection {
200    /// [`PeerId`] of the remote peer.
201    peer_id: Option<PeerId>,
202    endpoint: PendingPoint,
203    /// When dropped, notifies the task which then knows to terminate.
204    abort_notifier: Option<oneshot::Sender<Infallible>>,
205    /// The moment we became aware of this possible connection, useful for timing metrics.
206    accepted_at: Instant,
207}
208
209impl PendingConnection {
210    fn is_for_same_remote_as(&self, other: PeerId) -> bool {
211        self.peer_id == Some(other)
212    }
213
214    /// Aborts the connection attempt, closing the connection.
215    fn abort(&mut self) {
216        if let Some(notifier) = self.abort_notifier.take() {
217            drop(notifier);
218        }
219    }
220}
221
222impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
224        f.debug_struct("Pool")
225            .field("counters", &self.counters)
226            .finish()
227    }
228}
229
230/// Event that can happen on the `Pool`.
231#[derive(Debug)]
232pub(crate) enum PoolEvent<ToBehaviour> {
233    /// A new connection has been established.
234    ConnectionEstablished {
235        id: ConnectionId,
236        peer_id: PeerId,
237        endpoint: ConnectedPoint,
238        connection: NewConnection,
239        /// [`Some`] when the new connection is an outgoing connection.
240        /// Addresses are dialed in parallel. Contains the addresses and errors
241        /// of dial attempts that failed before the one successful dial.
242        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
243        /// How long it took to establish this connection.
244        established_in: std::time::Duration,
245    },
246
247    /// An established connection was closed.
248    ///
249    /// A connection may close if
250    ///
251    ///   * it encounters an error, which includes the connection being closed by the remote. In
252    ///     this case `error` is `Some`.
253    ///   * it was actively closed by [`EstablishedConnection::start_close`], i.e. a successful,
254    ///     orderly close.
255    ///   * it was actively closed by [`Pool::disconnect`], i.e. dropped without an orderly close.
256    ConnectionClosed {
257        id: ConnectionId,
258        /// Information about the connection that errored.
259        connected: Connected,
260        /// The error that occurred, if any. If `None`, the connection
261        /// was closed by the local peer.
262        error: Option<ConnectionError>,
263        /// The remaining established connections to the same peer.
264        remaining_established_connection_ids: Vec<ConnectionId>,
265    },
266
267    /// An outbound connection attempt failed.
268    PendingOutboundConnectionError {
269        /// The ID of the failed connection.
270        id: ConnectionId,
271        /// The error that occurred.
272        error: PendingOutboundConnectionError,
273        /// The (expected) peer of the failed connection.
274        peer: Option<PeerId>,
275    },
276
277    /// An inbound connection attempt failed.
278    PendingInboundConnectionError {
279        /// The ID of the failed connection.
280        id: ConnectionId,
281        /// Address used to send back data to the remote.
282        send_back_addr: Multiaddr,
283        /// Local connection address.
284        local_addr: Multiaddr,
285        /// The error that occurred.
286        error: PendingInboundConnectionError,
287    },
288
289    /// A node has produced an event.
290    ConnectionEvent {
291        id: ConnectionId,
292        peer_id: PeerId,
293        /// The produced event.
294        event: ToBehaviour,
295    },
296
297    /// The connection to a node has changed its address.
298    AddressChange {
299        id: ConnectionId,
300        peer_id: PeerId,
301        /// The new endpoint.
302        new_endpoint: ConnectedPoint,
303        /// The old endpoint.
304        old_endpoint: ConnectedPoint,
305    },
306}
307
308impl<THandler> Pool<THandler>
309where
310    THandler: ConnectionHandler,
311{
312    /// Creates a new empty `Pool`.
313    pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
314        let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
315        let executor = match config.executor {
316            Some(exec) => ExecSwitch::Executor(exec),
317            None => ExecSwitch::LocalSpawn(Default::default()),
318        };
319        Pool {
320            local_id,
321            counters: ConnectionCounters::new(),
322            established: Default::default(),
323            pending: Default::default(),
324            task_command_buffer_size: config.task_command_buffer_size,
325            dial_concurrency_factor: config.dial_concurrency_factor,
326            substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
327            max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
328            per_connection_event_buffer_size: config.per_connection_event_buffer_size,
329            idle_connection_timeout: config.idle_connection_timeout,
330            executor,
331            pending_connection_events_tx,
332            pending_connection_events_rx,
333            no_established_connections_waker: None,
334            established_connection_events: Default::default(),
335            new_connection_dropped_listeners: Default::default(),
336        }
337    }
338
339    /// Gets the dedicated connection counters.
340    pub(crate) fn counters(&self) -> &ConnectionCounters {
341        &self.counters
342    }
343
344    /// Gets an established connection from the pool by ID.
345    pub(crate) fn get_established(
346        &mut self,
347        id: ConnectionId,
348    ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
349        self.established
350            .values_mut()
351            .find_map(|connections| connections.get_mut(&id))
352    }
353
354    /// Returns true if we are connected to the given peer.
355    ///
356    /// This will return true only after a `NodeReached` event has been produced by `poll()`.
357    pub(crate) fn is_connected(&self, id: PeerId) -> bool {
358        self.established.contains_key(&id)
359    }
360
361    /// Returns the number of connected peers, i.e. those with at least one
362    /// established connection in the pool.
363    pub(crate) fn num_peers(&self) -> usize {
364        self.established.len()
365    }
366
367    /// (Forcefully) close all connections to the given peer.
368    ///
369    /// All connections to the peer, whether pending or established are
370    /// closed asap and no more events from these connections are emitted
371    /// by the pool effective immediately.
372    pub(crate) fn disconnect(&mut self, peer: PeerId) {
373        if let Some(conns) = self.established.get_mut(&peer) {
374            for (_, conn) in conns.iter_mut() {
375                conn.start_close();
376            }
377        }
378
379        for connection in self
380            .pending
381            .iter_mut()
382            .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
383        {
384            connection.abort()
385        }
386    }
387
388    /// Returns an iterator over all established connections of `peer`.
389    pub(crate) fn iter_established_connections_of_peer(
390        &mut self,
391        peer: &PeerId,
392    ) -> impl Iterator<Item = ConnectionId> + '_ {
393        match self.established.get(peer) {
394            Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
395            None => either::Either::Right(std::iter::empty()),
396        }
397    }
398
399    /// Checks whether we are currently dialing the given peer.
400    pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
401        self.pending.iter().any(|(_, info)| {
402            matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
403        })
404    }
405
406    /// Returns an iterator over all connected peers, i.e. those that have
407    /// at least one established connection in the pool.
408    pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
409        self.established.keys()
410    }
411
412    /// Adds a pending outgoing connection to the pool in the form of a `Future`
413    /// that establishes and negotiates the connection.
414    pub(crate) fn add_outgoing(
415        &mut self,
416        dials: Vec<
417            BoxFuture<
418                'static,
419                (
420                    Multiaddr,
421                    Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
422                ),
423            >,
424        >,
425        peer: Option<PeerId>,
426        role_override: Endpoint,
427        port_use: PortUse,
428        dial_concurrency_factor_override: Option<NonZeroU8>,
429        connection_id: ConnectionId,
430    ) {
431        let concurrency_factor =
432            dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
433        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
434        span.follows_from(tracing::Span::current());
435
436        let (abort_notifier, abort_receiver) = oneshot::channel();
437
438        self.executor.spawn(
439            task::new_for_pending_outgoing_connection(
440                connection_id,
441                ConcurrentDial::new(dials, concurrency_factor),
442                abort_receiver,
443                self.pending_connection_events_tx.clone(),
444            )
445            .instrument(span),
446        );
447
448        let endpoint = PendingPoint::Dialer {
449            role_override,
450            port_use,
451        };
452
453        self.counters.inc_pending(&endpoint);
454        self.pending.insert(
455            connection_id,
456            PendingConnection {
457                peer_id: peer,
458                endpoint,
459                abort_notifier: Some(abort_notifier),
460                accepted_at: Instant::now(),
461            },
462        );
463    }
464
465    /// Adds a pending incoming connection to the pool in the form of a
466    /// `Future` that establishes and negotiates the connection.
467    pub(crate) fn add_incoming<TFut>(
468        &mut self,
469        future: TFut,
470        info: IncomingInfo<'_>,
471        connection_id: ConnectionId,
472    ) where
473        TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
474    {
475        let endpoint = info.create_connected_point();
476
477        let (abort_notifier, abort_receiver) = oneshot::channel();
478
479        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
480        span.follows_from(tracing::Span::current());
481
482        self.executor.spawn(
483            task::new_for_pending_incoming_connection(
484                connection_id,
485                future,
486                abort_receiver,
487                self.pending_connection_events_tx.clone(),
488            )
489            .instrument(span),
490        );
491
492        self.counters.inc_pending_incoming();
493        self.pending.insert(
494            connection_id,
495            PendingConnection {
496                peer_id: None,
497                endpoint: endpoint.into(),
498                abort_notifier: Some(abort_notifier),
499                accepted_at: Instant::now(),
500            },
501        );
502    }
503
504    pub(crate) fn spawn_connection(
505        &mut self,
506        id: ConnectionId,
507        obtained_peer_id: PeerId,
508        endpoint: &ConnectedPoint,
509        connection: NewConnection,
510        handler: THandler,
511    ) {
512        let connection = connection.extract();
513        let conns = self.established.entry(obtained_peer_id).or_default();
514        self.counters.inc_established(endpoint);
515
516        let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
517        let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
518
519        conns.insert(
520            id,
521            EstablishedConnection {
522                endpoint: endpoint.clone(),
523                sender: command_sender,
524            },
525        );
526        self.established_connection_events.push(event_receiver);
527        if let Some(waker) = self.no_established_connections_waker.take() {
528            waker.wake();
529        }
530
531        let connection = Connection::new(
532            connection,
533            handler,
534            self.substream_upgrade_protocol_override,
535            self.max_negotiating_inbound_streams,
536            self.idle_connection_timeout,
537        );
538
539        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
540        span.follows_from(tracing::Span::current());
541
542        self.executor.spawn(
543            task::new_for_established_connection(
544                id,
545                obtained_peer_id,
546                connection,
547                command_receiver,
548                event_sender,
549            )
550            .instrument(span),
551        )
552    }
553
554    /// Polls the connection pool for events.
555    #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
556    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
557    where
558        THandler: ConnectionHandler + 'static,
559        <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
560    {
561        // Poll for events of established connections.
562        //
563        // Note that established connections are polled before pending connections, thus
564        // prioritizing established connections over pending connections.
565        match self.established_connection_events.poll_next_unpin(cx) {
566            Poll::Pending => {}
567            Poll::Ready(None) => {
568                self.no_established_connections_waker = Some(cx.waker().clone());
569            }
570
571            Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
572                return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
573            }
574            Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
575                id,
576                peer_id,
577                new_address,
578            })) => {
579                let connection = self
580                    .established
581                    .get_mut(&peer_id)
582                    .expect("Receive `AddressChange` event for established peer.")
583                    .get_mut(&id)
584                    .expect("Receive `AddressChange` event from established connection");
585                let mut new_endpoint = connection.endpoint.clone();
586                new_endpoint.set_remote_address(new_address);
587                let old_endpoint =
588                    std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
589
590                return Poll::Ready(PoolEvent::AddressChange {
591                    peer_id,
592                    id,
593                    new_endpoint,
594                    old_endpoint,
595                });
596            }
597            Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
598                let connections = self
599                    .established
600                    .get_mut(&peer_id)
601                    .expect("`Closed` event for established connection");
602                let EstablishedConnection { endpoint, .. } =
603                    connections.remove(&id).expect("Connection to be present");
604                self.counters.dec_established(&endpoint);
605                let remaining_established_connection_ids: Vec<ConnectionId> =
606                    connections.keys().cloned().collect();
607                if remaining_established_connection_ids.is_empty() {
608                    self.established.remove(&peer_id);
609                }
610                return Poll::Ready(PoolEvent::ConnectionClosed {
611                    id,
612                    connected: Connected { endpoint, peer_id },
613                    error,
614                    remaining_established_connection_ids,
615                });
616            }
617        }
618
619        // Poll for events of pending connections.
620        loop {
621            if let Poll::Ready(Some(result)) =
622                self.new_connection_dropped_listeners.poll_next_unpin(cx)
623            {
624                if let Ok(dropped_connection) = result {
625                    self.executor.spawn(async move {
626                        let _ = dropped_connection.close().await;
627                    });
628                }
629                continue;
630            }
631
632            let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
633                Poll::Ready(Some(event)) => event,
634                Poll::Pending => break,
635                Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
636            };
637
638            match event {
639                task::PendingConnectionEvent::ConnectionEstablished {
640                    id,
641                    output: (obtained_peer_id, mut muxer),
642                    outgoing,
643                } => {
644                    let PendingConnection {
645                        peer_id: expected_peer_id,
646                        endpoint,
647                        abort_notifier: _,
648                        accepted_at,
649                    } = self
650                        .pending
651                        .remove(&id)
652                        .expect("Entry in `self.pending` for previously pending connection.");
653
654                    self.counters.dec_pending(&endpoint);
655
656                    let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
657                        (
658                            PendingPoint::Dialer {
659                                role_override,
660                                port_use,
661                            },
662                            Some((address, errors)),
663                        ) => (
664                            ConnectedPoint::Dialer {
665                                address,
666                                role_override,
667                                port_use,
668                            },
669                            Some(errors),
670                        ),
671                        (
672                            PendingPoint::Listener {
673                                local_addr,
674                                send_back_addr,
675                            },
676                            None,
677                        ) => (
678                            ConnectedPoint::Listener {
679                                local_addr,
680                                send_back_addr,
681                            },
682                            None,
683                        ),
684                        (PendingPoint::Dialer { .. }, None) => unreachable!(
685                            "Established incoming connection via pending outgoing connection."
686                        ),
687                        (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
688                            "Established outgoing connection via pending incoming connection."
689                        ),
690                    };
691
692                    let check_peer_id = || {
693                        if let Some(peer) = expected_peer_id {
694                            if peer != obtained_peer_id {
695                                return match &endpoint {
696                                    ConnectedPoint::Dialer { address, .. } => {
697                                        Err(PoolEvent::PendingOutboundConnectionError {
698                                            id,
699                                            error: PendingOutboundConnectionError::WrongPeerId {
700                                                obtained: obtained_peer_id,
701                                                address: address.clone(),
702                                            },
703                                            peer: Some(peer),
704                                        })
705                                    }
706                                    ConnectedPoint::Listener {.. } => unreachable!("There shouldn't be an expected PeerId on inbound connections."),
707                                };
708                            }
709                        }
710
711                        if self.local_id == obtained_peer_id {
712                            return match &endpoint {
713                                ConnectedPoint::Dialer { address, .. } => {
714                                    Err(PoolEvent::PendingOutboundConnectionError {
715                                        id,
716                                        error: PendingOutboundConnectionError::LocalPeerId {
717                                            address: address.clone(),
718                                        },
719                                        peer: Some(obtained_peer_id),
720                                    })
721                                }
722                                ConnectedPoint::Listener {
723                                    send_back_addr,
724                                    local_addr,
725                                } => Err(PoolEvent::PendingInboundConnectionError {
726                                    id,
727                                    send_back_addr: send_back_addr.clone(),
728                                    local_addr: local_addr.clone(),
729                                    error: PendingInboundConnectionError::LocalPeerId {
730                                        address: send_back_addr.clone(),
731                                    },
732                                }),
733                            };
734                        }
735
736                        Ok(())
737                    };
738
739                    if let Err(error) = check_peer_id() {
740                        self.executor.spawn(poll_fn(move |cx| {
741                            if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
742                                tracing::debug!(
743                                    peer=%obtained_peer_id,
744                                    connection=%id,
745                                    "Failed to close connection to peer: {:?}",
746                                    e
747                                );
748                            }
749                            Poll::Ready(())
750                        }));
751
752                        return Poll::Ready(error);
753                    }
754
755                    let established_in = accepted_at.elapsed();
756
757                    let (connection, drop_listener) = NewConnection::new(muxer);
758                    self.new_connection_dropped_listeners.push(drop_listener);
759
760                    return Poll::Ready(PoolEvent::ConnectionEstablished {
761                        peer_id: obtained_peer_id,
762                        endpoint,
763                        id,
764                        connection,
765                        concurrent_dial_errors,
766                        established_in,
767                    });
768                }
769                task::PendingConnectionEvent::PendingFailed { id, error } => {
770                    if let Some(PendingConnection {
771                        peer_id,
772                        endpoint,
773                        abort_notifier: _,
774                        accepted_at: _, // Ignoring the time it took for the connection to fail.
775                    }) = self.pending.remove(&id)
776                    {
777                        self.counters.dec_pending(&endpoint);
778
779                        match (endpoint, error) {
780                            (PendingPoint::Dialer { .. }, Either::Left(error)) => {
781                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
782                                    id,
783                                    error,
784                                    peer: peer_id,
785                                });
786                            }
787                            (
788                                PendingPoint::Listener {
789                                    send_back_addr,
790                                    local_addr,
791                                },
792                                Either::Right(error),
793                            ) => {
794                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
795                                    id,
796                                    error,
797                                    send_back_addr,
798                                    local_addr,
799                                });
800                            }
801                            (PendingPoint::Dialer { .. }, Either::Right(_)) => {
802                                unreachable!("Inbound error for outbound connection.")
803                            }
804                            (PendingPoint::Listener { .. }, Either::Left(_)) => {
805                                unreachable!("Outbound error for inbound connection.")
806                            }
807                        }
808                    }
809                }
810            }
811        }
812
813        self.executor.advance_local(cx);
814
815        Poll::Pending
816    }
817}
818
819/// Opaque type for a new connection.
820///
821/// This connection has just been established but isn't part of the [`Pool`] yet.
822/// It either needs to be spawned via [`Pool::spawn_connection`] or dropped if undesired.
823///
824/// On drop, this type send the connection back to the [`Pool`] where it will be gracefully closed.
825#[derive(Debug)]
826pub(crate) struct NewConnection {
827    connection: Option<StreamMuxerBox>,
828    drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
829}
830
831impl NewConnection {
832    fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
833        let (sender, receiver) = oneshot::channel();
834
835        (
836            Self {
837                connection: Some(conn),
838                drop_sender: Some(sender),
839            },
840            receiver,
841        )
842    }
843
844    fn extract(mut self) -> StreamMuxerBox {
845        self.connection.take().unwrap()
846    }
847}
848
849impl Drop for NewConnection {
850    fn drop(&mut self) {
851        if let Some(connection) = self.connection.take() {
852            let _ = self
853                .drop_sender
854                .take()
855                .expect("`drop_sender` to always be `Some`")
856                .send(connection);
857        }
858    }
859}
860
861/// Network connection information.
862#[derive(Debug, Clone)]
863pub struct ConnectionCounters {
864    /// The current number of incoming connections.
865    pending_incoming: u32,
866    /// The current number of outgoing connections.
867    pending_outgoing: u32,
868    /// The current number of established inbound connections.
869    established_incoming: u32,
870    /// The current number of established outbound connections.
871    established_outgoing: u32,
872}
873
874impl ConnectionCounters {
875    fn new() -> Self {
876        Self {
877            pending_incoming: 0,
878            pending_outgoing: 0,
879            established_incoming: 0,
880            established_outgoing: 0,
881        }
882    }
883
884    /// The total number of connections, both pending and established.
885    pub fn num_connections(&self) -> u32 {
886        self.num_pending() + self.num_established()
887    }
888
889    /// The total number of pending connections, both incoming and outgoing.
890    pub fn num_pending(&self) -> u32 {
891        self.pending_incoming + self.pending_outgoing
892    }
893
894    /// The number of incoming connections being established.
895    pub fn num_pending_incoming(&self) -> u32 {
896        self.pending_incoming
897    }
898
899    /// The number of outgoing connections being established.
900    pub fn num_pending_outgoing(&self) -> u32 {
901        self.pending_outgoing
902    }
903
904    /// The number of established incoming connections.
905    pub fn num_established_incoming(&self) -> u32 {
906        self.established_incoming
907    }
908
909    /// The number of established outgoing connections.
910    pub fn num_established_outgoing(&self) -> u32 {
911        self.established_outgoing
912    }
913
914    /// The total number of established connections.
915    pub fn num_established(&self) -> u32 {
916        self.established_outgoing + self.established_incoming
917    }
918
919    fn inc_pending(&mut self, endpoint: &PendingPoint) {
920        match endpoint {
921            PendingPoint::Dialer { .. } => {
922                self.pending_outgoing += 1;
923            }
924            PendingPoint::Listener { .. } => {
925                self.pending_incoming += 1;
926            }
927        }
928    }
929
930    fn inc_pending_incoming(&mut self) {
931        self.pending_incoming += 1;
932    }
933
934    fn dec_pending(&mut self, endpoint: &PendingPoint) {
935        match endpoint {
936            PendingPoint::Dialer { .. } => {
937                self.pending_outgoing -= 1;
938            }
939            PendingPoint::Listener { .. } => {
940                self.pending_incoming -= 1;
941            }
942        }
943    }
944
945    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
946        match endpoint {
947            ConnectedPoint::Dialer { .. } => {
948                self.established_outgoing += 1;
949            }
950            ConnectedPoint::Listener { .. } => {
951                self.established_incoming += 1;
952            }
953        }
954    }
955
956    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
957        match endpoint {
958            ConnectedPoint::Dialer { .. } => {
959                self.established_outgoing -= 1;
960            }
961            ConnectedPoint::Listener { .. } => {
962                self.established_incoming -= 1;
963            }
964        }
965    }
966}
967
968/// Configuration options when creating a [`Pool`].
969///
970/// The default configuration specifies no dedicated task executor, a
971/// task event buffer size of 32, and a task command buffer size of 7.
972pub(crate) struct PoolConfig {
973    /// Executor to use to spawn tasks.
974    pub(crate) executor: Option<Box<dyn Executor + Send>>,
975    /// Size of the task command buffer (per task).
976    pub(crate) task_command_buffer_size: usize,
977    /// Size of the pending connection task event buffer and the established connection task event
978    /// buffer.
979    pub(crate) per_connection_event_buffer_size: usize,
980    /// Number of addresses concurrently dialed for a single outbound connection attempt.
981    pub(crate) dial_concurrency_factor: NonZeroU8,
982    /// How long a connection should be kept alive once it is idling.
983    pub(crate) idle_connection_timeout: Duration,
984    /// The configured override for substream protocol upgrades, if any.
985    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
986
987    /// The maximum number of inbound streams concurrently negotiating on a connection.
988    ///
989    /// See [`Connection::max_negotiating_inbound_streams`].
990    max_negotiating_inbound_streams: usize,
991}
992
993impl PoolConfig {
994    pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
995        Self {
996            executor,
997            task_command_buffer_size: 32,
998            per_connection_event_buffer_size: 7,
999            dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
1000            idle_connection_timeout: Duration::from_secs(10),
1001            substream_upgrade_protocol_override: None,
1002            max_negotiating_inbound_streams: 128,
1003        }
1004    }
1005
1006    /// Sets the maximum number of events sent to a connection's background task
1007    /// that may be buffered, if the task cannot keep up with their consumption and
1008    /// delivery to the connection handler.
1009    ///
1010    /// When the buffer for a particular connection is full, `notify_handler` will no
1011    /// longer be able to deliver events to the associated [`Connection`],
1012    /// thus exerting back-pressure on the connection and peer API.
1013    pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1014        self.task_command_buffer_size = n.get() - 1;
1015        self
1016    }
1017
1018    /// Sets the maximum number of buffered connection events (beyond a guaranteed
1019    /// buffer of 1 event per connection).
1020    ///
1021    /// When the buffer is full, the background tasks of all connections will stall.
1022    /// In this way, the consumers of network events exert back-pressure on
1023    /// the network connection I/O.
1024    pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1025        self.per_connection_event_buffer_size = n;
1026        self
1027    }
1028
1029    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1030    pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1031        self.dial_concurrency_factor = factor;
1032        self
1033    }
1034
1035    /// Configures an override for the substream upgrade protocol to use.
1036    pub(crate) fn with_substream_upgrade_protocol_override(
1037        mut self,
1038        v: libp2p_core::upgrade::Version,
1039    ) -> Self {
1040        self.substream_upgrade_protocol_override = Some(v);
1041        self
1042    }
1043
1044    /// The maximum number of inbound streams concurrently negotiating on a connection.
1045    ///
1046    /// See [`Connection::max_negotiating_inbound_streams`].
1047    pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1048        self.max_negotiating_inbound_streams = v;
1049        self
1050    }
1051}