1use std::{
22 collections::HashMap,
23 convert::Infallible,
24 fmt,
25 num::{NonZeroU8, NonZeroUsize},
26 pin::Pin,
27 task::{Context, Poll, Waker},
28};
29
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::{
33 channel::{mpsc, oneshot},
34 future::{poll_fn, BoxFuture, Either},
35 prelude::*,
36 ready,
37 stream::{FuturesUnordered, SelectAll},
38};
39use libp2p_core::{
40 connection::Endpoint,
41 muxing::{StreamMuxerBox, StreamMuxerExt},
42 transport::PortUse,
43};
44use tracing::Instrument;
45use web_time::{Duration, Instant};
46
47use crate::{
48 connection::{
49 Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
50 PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
51 },
52 transport::TransportError,
53 ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
54};
55
56mod concurrent_dial;
57mod task;
58
59enum ExecSwitch {
60 Executor(Box<dyn Executor + Send>),
61 LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
62}
63
64impl ExecSwitch {
65 fn advance_local(&mut self, cx: &mut Context) {
66 match self {
67 ExecSwitch::Executor(_) => {}
68 ExecSwitch::LocalSpawn(local) => {
69 while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
70 }
71 }
72 }
73
74 #[track_caller]
75 fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
76 let task = task.boxed();
77
78 match self {
79 Self::Executor(executor) => executor.exec(task),
80 Self::LocalSpawn(local) => local.push(task),
81 }
82 }
83}
84
85pub(crate) struct Pool<THandler>
87where
88 THandler: ConnectionHandler,
89{
90 local_id: PeerId,
91
92 counters: ConnectionCounters,
94
95 established: FnvHashMap<
97 PeerId,
98 FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
99 >,
100
101 pending: HashMap<ConnectionId, PendingConnection>,
103
104 task_command_buffer_size: usize,
106
107 dial_concurrency_factor: NonZeroU8,
109
110 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
112
113 max_negotiating_inbound_streams: usize,
117
118 per_connection_event_buffer_size: usize,
121
122 executor: ExecSwitch,
125
126 pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
129
130 pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
132
133 no_established_connections_waker: Option<Waker>,
135
136 established_connection_events:
138 SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
139
140 new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
142
143 idle_connection_timeout: Duration,
145}
146
147#[derive(Debug)]
148pub(crate) struct EstablishedConnection<TInEvent> {
149 endpoint: ConnectedPoint,
150 sender: mpsc::Sender<task::Command<TInEvent>>,
152}
153
154impl<TInEvent> EstablishedConnection<TInEvent> {
155 pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
166 let cmd = task::Command::NotifyHandler(event);
167 self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
168 task::Command::NotifyHandler(event) => event,
169 _ => unreachable!("Expect failed send to return initial event."),
170 })
171 }
172
173 pub(crate) fn poll_ready_notify_handler(
180 &mut self,
181 cx: &mut Context<'_>,
182 ) -> Poll<Result<(), ()>> {
183 self.sender.poll_ready(cx).map_err(|_| ())
184 }
185
186 pub(crate) fn start_close(&mut self) {
190 match self.sender.clone().try_send(task::Command::Close) {
193 Ok(()) => {}
194 Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
195 };
196 }
197}
198
199struct PendingConnection {
200 peer_id: Option<PeerId>,
202 endpoint: PendingPoint,
203 abort_notifier: Option<oneshot::Sender<Infallible>>,
205 accepted_at: Instant,
207}
208
209impl PendingConnection {
210 fn is_for_same_remote_as(&self, other: PeerId) -> bool {
211 self.peer_id == Some(other)
212 }
213
214 fn abort(&mut self) {
216 if let Some(notifier) = self.abort_notifier.take() {
217 drop(notifier);
218 }
219 }
220}
221
222impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
224 f.debug_struct("Pool")
225 .field("counters", &self.counters)
226 .finish()
227 }
228}
229
230#[derive(Debug)]
232pub(crate) enum PoolEvent<ToBehaviour> {
233 ConnectionEstablished {
235 id: ConnectionId,
236 peer_id: PeerId,
237 endpoint: ConnectedPoint,
238 connection: NewConnection,
239 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
243 established_in: std::time::Duration,
245 },
246
247 ConnectionClosed {
257 id: ConnectionId,
258 connected: Connected,
260 error: Option<ConnectionError>,
263 remaining_established_connection_ids: Vec<ConnectionId>,
265 },
266
267 PendingOutboundConnectionError {
269 id: ConnectionId,
271 error: PendingOutboundConnectionError,
273 peer: Option<PeerId>,
275 },
276
277 PendingInboundConnectionError {
279 id: ConnectionId,
281 send_back_addr: Multiaddr,
283 local_addr: Multiaddr,
285 error: PendingInboundConnectionError,
287 },
288
289 ConnectionEvent {
291 id: ConnectionId,
292 peer_id: PeerId,
293 event: ToBehaviour,
295 },
296
297 AddressChange {
299 id: ConnectionId,
300 peer_id: PeerId,
301 new_endpoint: ConnectedPoint,
303 old_endpoint: ConnectedPoint,
305 },
306}
307
308impl<THandler> Pool<THandler>
309where
310 THandler: ConnectionHandler,
311{
312 pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
314 let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
315 let executor = match config.executor {
316 Some(exec) => ExecSwitch::Executor(exec),
317 None => ExecSwitch::LocalSpawn(Default::default()),
318 };
319 Pool {
320 local_id,
321 counters: ConnectionCounters::new(),
322 established: Default::default(),
323 pending: Default::default(),
324 task_command_buffer_size: config.task_command_buffer_size,
325 dial_concurrency_factor: config.dial_concurrency_factor,
326 substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
327 max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
328 per_connection_event_buffer_size: config.per_connection_event_buffer_size,
329 idle_connection_timeout: config.idle_connection_timeout,
330 executor,
331 pending_connection_events_tx,
332 pending_connection_events_rx,
333 no_established_connections_waker: None,
334 established_connection_events: Default::default(),
335 new_connection_dropped_listeners: Default::default(),
336 }
337 }
338
339 pub(crate) fn counters(&self) -> &ConnectionCounters {
341 &self.counters
342 }
343
344 pub(crate) fn get_established(
346 &mut self,
347 id: ConnectionId,
348 ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
349 self.established
350 .values_mut()
351 .find_map(|connections| connections.get_mut(&id))
352 }
353
354 pub(crate) fn is_connected(&self, id: PeerId) -> bool {
358 self.established.contains_key(&id)
359 }
360
361 pub(crate) fn num_peers(&self) -> usize {
364 self.established.len()
365 }
366
367 pub(crate) fn disconnect(&mut self, peer: PeerId) {
373 if let Some(conns) = self.established.get_mut(&peer) {
374 for (_, conn) in conns.iter_mut() {
375 conn.start_close();
376 }
377 }
378
379 for connection in self
380 .pending
381 .iter_mut()
382 .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
383 {
384 connection.abort()
385 }
386 }
387
388 pub(crate) fn iter_established_connections_of_peer(
390 &mut self,
391 peer: &PeerId,
392 ) -> impl Iterator<Item = ConnectionId> + '_ {
393 match self.established.get(peer) {
394 Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
395 None => either::Either::Right(std::iter::empty()),
396 }
397 }
398
399 pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
401 self.pending.iter().any(|(_, info)| {
402 matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
403 })
404 }
405
406 pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
409 self.established.keys()
410 }
411
412 pub(crate) fn add_outgoing(
415 &mut self,
416 dials: Vec<
417 BoxFuture<
418 'static,
419 (
420 Multiaddr,
421 Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
422 ),
423 >,
424 >,
425 peer: Option<PeerId>,
426 role_override: Endpoint,
427 port_use: PortUse,
428 dial_concurrency_factor_override: Option<NonZeroU8>,
429 connection_id: ConnectionId,
430 ) {
431 let concurrency_factor =
432 dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
433 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
434 span.follows_from(tracing::Span::current());
435
436 let (abort_notifier, abort_receiver) = oneshot::channel();
437
438 self.executor.spawn(
439 task::new_for_pending_outgoing_connection(
440 connection_id,
441 ConcurrentDial::new(dials, concurrency_factor),
442 abort_receiver,
443 self.pending_connection_events_tx.clone(),
444 )
445 .instrument(span),
446 );
447
448 let endpoint = PendingPoint::Dialer {
449 role_override,
450 port_use,
451 };
452
453 self.counters.inc_pending(&endpoint);
454 self.pending.insert(
455 connection_id,
456 PendingConnection {
457 peer_id: peer,
458 endpoint,
459 abort_notifier: Some(abort_notifier),
460 accepted_at: Instant::now(),
461 },
462 );
463 }
464
465 pub(crate) fn add_incoming<TFut>(
468 &mut self,
469 future: TFut,
470 info: IncomingInfo<'_>,
471 connection_id: ConnectionId,
472 ) where
473 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
474 {
475 let endpoint = info.create_connected_point();
476
477 let (abort_notifier, abort_receiver) = oneshot::channel();
478
479 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
480 span.follows_from(tracing::Span::current());
481
482 self.executor.spawn(
483 task::new_for_pending_incoming_connection(
484 connection_id,
485 future,
486 abort_receiver,
487 self.pending_connection_events_tx.clone(),
488 )
489 .instrument(span),
490 );
491
492 self.counters.inc_pending_incoming();
493 self.pending.insert(
494 connection_id,
495 PendingConnection {
496 peer_id: None,
497 endpoint: endpoint.into(),
498 abort_notifier: Some(abort_notifier),
499 accepted_at: Instant::now(),
500 },
501 );
502 }
503
504 pub(crate) fn spawn_connection(
505 &mut self,
506 id: ConnectionId,
507 obtained_peer_id: PeerId,
508 endpoint: &ConnectedPoint,
509 connection: NewConnection,
510 handler: THandler,
511 ) {
512 let connection = connection.extract();
513 let conns = self.established.entry(obtained_peer_id).or_default();
514 self.counters.inc_established(endpoint);
515
516 let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
517 let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
518
519 conns.insert(
520 id,
521 EstablishedConnection {
522 endpoint: endpoint.clone(),
523 sender: command_sender,
524 },
525 );
526 self.established_connection_events.push(event_receiver);
527 if let Some(waker) = self.no_established_connections_waker.take() {
528 waker.wake();
529 }
530
531 let connection = Connection::new(
532 connection,
533 handler,
534 self.substream_upgrade_protocol_override,
535 self.max_negotiating_inbound_streams,
536 self.idle_connection_timeout,
537 );
538
539 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
540 span.follows_from(tracing::Span::current());
541
542 self.executor.spawn(
543 task::new_for_established_connection(
544 id,
545 obtained_peer_id,
546 connection,
547 command_receiver,
548 event_sender,
549 )
550 .instrument(span),
551 )
552 }
553
554 #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
556 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
557 where
558 THandler: ConnectionHandler + 'static,
559 <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
560 {
561 match self.established_connection_events.poll_next_unpin(cx) {
566 Poll::Pending => {}
567 Poll::Ready(None) => {
568 self.no_established_connections_waker = Some(cx.waker().clone());
569 }
570
571 Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
572 return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
573 }
574 Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
575 id,
576 peer_id,
577 new_address,
578 })) => {
579 let connection = self
580 .established
581 .get_mut(&peer_id)
582 .expect("Receive `AddressChange` event for established peer.")
583 .get_mut(&id)
584 .expect("Receive `AddressChange` event from established connection");
585 let mut new_endpoint = connection.endpoint.clone();
586 new_endpoint.set_remote_address(new_address);
587 let old_endpoint =
588 std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
589
590 return Poll::Ready(PoolEvent::AddressChange {
591 peer_id,
592 id,
593 new_endpoint,
594 old_endpoint,
595 });
596 }
597 Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
598 let connections = self
599 .established
600 .get_mut(&peer_id)
601 .expect("`Closed` event for established connection");
602 let EstablishedConnection { endpoint, .. } =
603 connections.remove(&id).expect("Connection to be present");
604 self.counters.dec_established(&endpoint);
605 let remaining_established_connection_ids: Vec<ConnectionId> =
606 connections.keys().cloned().collect();
607 if remaining_established_connection_ids.is_empty() {
608 self.established.remove(&peer_id);
609 }
610 return Poll::Ready(PoolEvent::ConnectionClosed {
611 id,
612 connected: Connected { endpoint, peer_id },
613 error,
614 remaining_established_connection_ids,
615 });
616 }
617 }
618
619 loop {
621 if let Poll::Ready(Some(result)) =
622 self.new_connection_dropped_listeners.poll_next_unpin(cx)
623 {
624 if let Ok(dropped_connection) = result {
625 self.executor.spawn(async move {
626 let _ = dropped_connection.close().await;
627 });
628 }
629 continue;
630 }
631
632 let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
633 Poll::Ready(Some(event)) => event,
634 Poll::Pending => break,
635 Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
636 };
637
638 match event {
639 task::PendingConnectionEvent::ConnectionEstablished {
640 id,
641 output: (obtained_peer_id, mut muxer),
642 outgoing,
643 } => {
644 let PendingConnection {
645 peer_id: expected_peer_id,
646 endpoint,
647 abort_notifier: _,
648 accepted_at,
649 } = self
650 .pending
651 .remove(&id)
652 .expect("Entry in `self.pending` for previously pending connection.");
653
654 self.counters.dec_pending(&endpoint);
655
656 let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
657 (
658 PendingPoint::Dialer {
659 role_override,
660 port_use,
661 },
662 Some((address, errors)),
663 ) => (
664 ConnectedPoint::Dialer {
665 address,
666 role_override,
667 port_use,
668 },
669 Some(errors),
670 ),
671 (
672 PendingPoint::Listener {
673 local_addr,
674 send_back_addr,
675 },
676 None,
677 ) => (
678 ConnectedPoint::Listener {
679 local_addr,
680 send_back_addr,
681 },
682 None,
683 ),
684 (PendingPoint::Dialer { .. }, None) => unreachable!(
685 "Established incoming connection via pending outgoing connection."
686 ),
687 (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
688 "Established outgoing connection via pending incoming connection."
689 ),
690 };
691
692 let check_peer_id = || {
693 if let Some(peer) = expected_peer_id {
694 if peer != obtained_peer_id {
695 return match &endpoint {
696 ConnectedPoint::Dialer { address, .. } => {
697 Err(PoolEvent::PendingOutboundConnectionError {
698 id,
699 error: PendingOutboundConnectionError::WrongPeerId {
700 obtained: obtained_peer_id,
701 address: address.clone(),
702 },
703 peer: Some(peer),
704 })
705 }
706 ConnectedPoint::Listener {.. } => unreachable!("There shouldn't be an expected PeerId on inbound connections."),
707 };
708 }
709 }
710
711 if self.local_id == obtained_peer_id {
712 return match &endpoint {
713 ConnectedPoint::Dialer { address, .. } => {
714 Err(PoolEvent::PendingOutboundConnectionError {
715 id,
716 error: PendingOutboundConnectionError::LocalPeerId {
717 address: address.clone(),
718 },
719 peer: Some(obtained_peer_id),
720 })
721 }
722 ConnectedPoint::Listener {
723 send_back_addr,
724 local_addr,
725 } => Err(PoolEvent::PendingInboundConnectionError {
726 id,
727 send_back_addr: send_back_addr.clone(),
728 local_addr: local_addr.clone(),
729 error: PendingInboundConnectionError::LocalPeerId {
730 address: send_back_addr.clone(),
731 },
732 }),
733 };
734 }
735
736 Ok(())
737 };
738
739 if let Err(error) = check_peer_id() {
740 self.executor.spawn(poll_fn(move |cx| {
741 if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
742 tracing::debug!(
743 peer=%obtained_peer_id,
744 connection=%id,
745 "Failed to close connection to peer: {:?}",
746 e
747 );
748 }
749 Poll::Ready(())
750 }));
751
752 return Poll::Ready(error);
753 }
754
755 let established_in = accepted_at.elapsed();
756
757 let (connection, drop_listener) = NewConnection::new(muxer);
758 self.new_connection_dropped_listeners.push(drop_listener);
759
760 return Poll::Ready(PoolEvent::ConnectionEstablished {
761 peer_id: obtained_peer_id,
762 endpoint,
763 id,
764 connection,
765 concurrent_dial_errors,
766 established_in,
767 });
768 }
769 task::PendingConnectionEvent::PendingFailed { id, error } => {
770 if let Some(PendingConnection {
771 peer_id,
772 endpoint,
773 abort_notifier: _,
774 accepted_at: _, }) = self.pending.remove(&id)
776 {
777 self.counters.dec_pending(&endpoint);
778
779 match (endpoint, error) {
780 (PendingPoint::Dialer { .. }, Either::Left(error)) => {
781 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
782 id,
783 error,
784 peer: peer_id,
785 });
786 }
787 (
788 PendingPoint::Listener {
789 send_back_addr,
790 local_addr,
791 },
792 Either::Right(error),
793 ) => {
794 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
795 id,
796 error,
797 send_back_addr,
798 local_addr,
799 });
800 }
801 (PendingPoint::Dialer { .. }, Either::Right(_)) => {
802 unreachable!("Inbound error for outbound connection.")
803 }
804 (PendingPoint::Listener { .. }, Either::Left(_)) => {
805 unreachable!("Outbound error for inbound connection.")
806 }
807 }
808 }
809 }
810 }
811 }
812
813 self.executor.advance_local(cx);
814
815 Poll::Pending
816 }
817}
818
819#[derive(Debug)]
826pub(crate) struct NewConnection {
827 connection: Option<StreamMuxerBox>,
828 drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
829}
830
831impl NewConnection {
832 fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
833 let (sender, receiver) = oneshot::channel();
834
835 (
836 Self {
837 connection: Some(conn),
838 drop_sender: Some(sender),
839 },
840 receiver,
841 )
842 }
843
844 fn extract(mut self) -> StreamMuxerBox {
845 self.connection.take().unwrap()
846 }
847}
848
849impl Drop for NewConnection {
850 fn drop(&mut self) {
851 if let Some(connection) = self.connection.take() {
852 let _ = self
853 .drop_sender
854 .take()
855 .expect("`drop_sender` to always be `Some`")
856 .send(connection);
857 }
858 }
859}
860
861#[derive(Debug, Clone)]
863pub struct ConnectionCounters {
864 pending_incoming: u32,
866 pending_outgoing: u32,
868 established_incoming: u32,
870 established_outgoing: u32,
872}
873
874impl ConnectionCounters {
875 fn new() -> Self {
876 Self {
877 pending_incoming: 0,
878 pending_outgoing: 0,
879 established_incoming: 0,
880 established_outgoing: 0,
881 }
882 }
883
884 pub fn num_connections(&self) -> u32 {
886 self.num_pending() + self.num_established()
887 }
888
889 pub fn num_pending(&self) -> u32 {
891 self.pending_incoming + self.pending_outgoing
892 }
893
894 pub fn num_pending_incoming(&self) -> u32 {
896 self.pending_incoming
897 }
898
899 pub fn num_pending_outgoing(&self) -> u32 {
901 self.pending_outgoing
902 }
903
904 pub fn num_established_incoming(&self) -> u32 {
906 self.established_incoming
907 }
908
909 pub fn num_established_outgoing(&self) -> u32 {
911 self.established_outgoing
912 }
913
914 pub fn num_established(&self) -> u32 {
916 self.established_outgoing + self.established_incoming
917 }
918
919 fn inc_pending(&mut self, endpoint: &PendingPoint) {
920 match endpoint {
921 PendingPoint::Dialer { .. } => {
922 self.pending_outgoing += 1;
923 }
924 PendingPoint::Listener { .. } => {
925 self.pending_incoming += 1;
926 }
927 }
928 }
929
930 fn inc_pending_incoming(&mut self) {
931 self.pending_incoming += 1;
932 }
933
934 fn dec_pending(&mut self, endpoint: &PendingPoint) {
935 match endpoint {
936 PendingPoint::Dialer { .. } => {
937 self.pending_outgoing -= 1;
938 }
939 PendingPoint::Listener { .. } => {
940 self.pending_incoming -= 1;
941 }
942 }
943 }
944
945 fn inc_established(&mut self, endpoint: &ConnectedPoint) {
946 match endpoint {
947 ConnectedPoint::Dialer { .. } => {
948 self.established_outgoing += 1;
949 }
950 ConnectedPoint::Listener { .. } => {
951 self.established_incoming += 1;
952 }
953 }
954 }
955
956 fn dec_established(&mut self, endpoint: &ConnectedPoint) {
957 match endpoint {
958 ConnectedPoint::Dialer { .. } => {
959 self.established_outgoing -= 1;
960 }
961 ConnectedPoint::Listener { .. } => {
962 self.established_incoming -= 1;
963 }
964 }
965 }
966}
967
968pub(crate) struct PoolConfig {
973 pub(crate) executor: Option<Box<dyn Executor + Send>>,
975 pub(crate) task_command_buffer_size: usize,
977 pub(crate) per_connection_event_buffer_size: usize,
980 pub(crate) dial_concurrency_factor: NonZeroU8,
982 pub(crate) idle_connection_timeout: Duration,
984 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
986
987 max_negotiating_inbound_streams: usize,
991}
992
993impl PoolConfig {
994 pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
995 Self {
996 executor,
997 task_command_buffer_size: 32,
998 per_connection_event_buffer_size: 7,
999 dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
1000 idle_connection_timeout: Duration::from_secs(10),
1001 substream_upgrade_protocol_override: None,
1002 max_negotiating_inbound_streams: 128,
1003 }
1004 }
1005
1006 pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1014 self.task_command_buffer_size = n.get() - 1;
1015 self
1016 }
1017
1018 pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1025 self.per_connection_event_buffer_size = n;
1026 self
1027 }
1028
1029 pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1031 self.dial_concurrency_factor = factor;
1032 self
1033 }
1034
1035 pub(crate) fn with_substream_upgrade_protocol_override(
1037 mut self,
1038 v: libp2p_core::upgrade::Version,
1039 ) -> Self {
1040 self.substream_upgrade_protocol_override = Some(v);
1041 self
1042 }
1043
1044 pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1048 self.max_negotiating_inbound_streams = v;
1049 self
1050 }
1051}