1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71#[doc(hidden)]
73pub mod derive_prelude {
74 pub use either::Either;
75 pub use futures::prelude as futures;
76 pub use libp2p_core::{
77 transport::{ListenerId, PortUse},
78 ConnectedPoint, Endpoint, Multiaddr,
79 };
80 pub use libp2p_identity::PeerId;
81
82 pub use crate::{
83 behaviour::{
84 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85 ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86 ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87 NewListener,
88 },
89 connection::ConnectionId,
90 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92 };
93}
94
95use std::{
96 collections::{HashMap, HashSet, VecDeque},
97 error, fmt, io,
98 num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99 pin::Pin,
100 task::{Context, Poll},
101 time::Duration,
102};
103
104pub use behaviour::{
105 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112 pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113 IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
114};
115use dial_opts::{DialOpts, PeerCondition};
116pub use executor::Executor;
117use futures::{prelude::*, stream::FusedStream};
118pub use handler::{
119 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122use libp2p_core::{
123 connection::ConnectedPoint,
124 muxing::StreamMuxerBox,
125 transport::{self, ListenerId, TransportError, TransportEvent},
126 Multiaddr, Transport,
127};
128use libp2p_identity::PeerId;
129#[cfg(feature = "macros")]
130pub use libp2p_swarm_derive::NetworkBehaviour;
131pub use listen_opts::ListenOpts;
132use smallvec::SmallVec;
133pub use stream::Stream;
134pub use stream_protocol::{InvalidProtocol, StreamProtocol};
135use tracing::Instrument;
136#[doc(hidden)]
137pub use translation::_address_translation;
138
139use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
140
141type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
143
144pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
147
148pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
151
152pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
154
155#[derive(Debug)]
157#[non_exhaustive]
158pub enum SwarmEvent<TBehaviourOutEvent> {
159 Behaviour(TBehaviourOutEvent),
161 ConnectionEstablished {
163 peer_id: PeerId,
165 connection_id: ConnectionId,
167 endpoint: ConnectedPoint,
169 num_established: NonZeroU32,
172 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
176 established_in: std::time::Duration,
178 },
179 ConnectionClosed {
182 peer_id: PeerId,
184 connection_id: ConnectionId,
186 endpoint: ConnectedPoint,
188 num_established: u32,
190 cause: Option<ConnectionError>,
193 },
194 IncomingConnection {
200 connection_id: ConnectionId,
202 local_addr: Multiaddr,
206 send_back_addr: Multiaddr,
208 },
209 IncomingConnectionError {
214 connection_id: ConnectionId,
216 local_addr: Multiaddr,
220 send_back_addr: Multiaddr,
222 error: ListenError,
224 },
225 OutgoingConnectionError {
227 connection_id: ConnectionId,
229 peer_id: Option<PeerId>,
231 error: DialError,
233 },
234 NewListenAddr {
236 listener_id: ListenerId,
238 address: Multiaddr,
240 },
241 ExpiredListenAddr {
243 listener_id: ListenerId,
245 address: Multiaddr,
247 },
248 ListenerClosed {
250 listener_id: ListenerId,
252 addresses: Vec<Multiaddr>,
256 reason: Result<(), io::Error>,
259 },
260 ListenerError {
262 listener_id: ListenerId,
264 error: io::Error,
266 },
267 Dialing {
275 peer_id: Option<PeerId>,
277
278 connection_id: ConnectionId,
280 },
281 NewExternalAddrCandidate { address: Multiaddr },
283 ExternalAddrConfirmed { address: Multiaddr },
285 ExternalAddrExpired { address: Multiaddr },
287 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
289}
290
291impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
292 #[allow(clippy::result_large_err)]
295 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
296 match self {
297 SwarmEvent::Behaviour(inner) => Ok(inner),
298 other => Err(other),
299 }
300 }
301}
302
303pub struct Swarm<TBehaviour>
308where
309 TBehaviour: NetworkBehaviour,
310{
311 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
313
314 pool: Pool<THandler<TBehaviour>>,
316
317 local_peer_id: PeerId,
319
320 behaviour: TBehaviour,
323
324 supported_protocols: SmallVec<[Vec<u8>; 16]>,
326
327 confirmed_external_addr: HashSet<Multiaddr>,
328
329 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
331
332 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
336
337 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
338}
339
340impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
341
342impl<TBehaviour> Swarm<TBehaviour>
343where
344 TBehaviour: NetworkBehaviour,
345{
346 pub fn new(
349 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
350 behaviour: TBehaviour,
351 local_peer_id: PeerId,
352 config: Config,
353 ) -> Self {
354 tracing::info!(%local_peer_id);
355
356 Swarm {
357 local_peer_id,
358 transport,
359 pool: Pool::new(local_peer_id, config.pool_config),
360 behaviour,
361 supported_protocols: Default::default(),
362 confirmed_external_addr: Default::default(),
363 listened_addrs: HashMap::new(),
364 pending_handler_event: None,
365 pending_swarm_events: VecDeque::default(),
366 }
367 }
368
369 pub fn network_info(&self) -> NetworkInfo {
371 let num_peers = self.pool.num_peers();
372 let connection_counters = self.pool.counters().clone();
373 NetworkInfo {
374 num_peers,
375 connection_counters,
376 }
377 }
378
379 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
385 let opts = ListenOpts::new(addr);
386 let id = opts.listener_id();
387 self.add_listener(opts)?;
388 Ok(id)
389 }
390
391 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
396 self.transport.remove_listener(listener_id)
397 }
398
399 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
427 let dial_opts = opts.into();
428
429 let peer_id = dial_opts.get_peer_id();
430 let condition = dial_opts.peer_condition();
431 let connection_id = dial_opts.connection_id();
432
433 let should_dial = match (condition, peer_id) {
434 (_, None) => true,
435 (PeerCondition::Always, _) => true,
436 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
437 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
438 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
439 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
440 }
441 };
442
443 if !should_dial {
444 let e = DialError::DialPeerConditionFalse(condition);
445
446 self.behaviour
447 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
448 peer_id,
449 error: &e,
450 connection_id,
451 }));
452
453 return Err(e);
454 }
455
456 let addresses = {
457 let mut addresses_from_opts = dial_opts.get_addresses();
458
459 match self.behaviour.handle_pending_outbound_connection(
460 connection_id,
461 peer_id,
462 addresses_from_opts.as_slice(),
463 dial_opts.role_override(),
464 ) {
465 Ok(addresses) => {
466 if dial_opts.extend_addresses_through_behaviour() {
467 addresses_from_opts.extend(addresses)
468 } else {
469 let num_addresses = addresses.len();
470
471 if num_addresses > 0 {
472 tracing::debug!(
473 connection=%connection_id,
474 discarded_addresses_count=%num_addresses,
475 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
476 )
477 }
478 }
479 }
480 Err(cause) => {
481 let error = DialError::Denied { cause };
482
483 self.behaviour
484 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
485 peer_id,
486 error: &error,
487 connection_id,
488 }));
489
490 return Err(error);
491 }
492 }
493
494 let mut unique_addresses = HashSet::new();
495 addresses_from_opts.retain(|addr| {
496 !self.listened_addrs.values().flatten().any(|a| a == addr)
497 && unique_addresses.insert(addr.clone())
498 });
499
500 if addresses_from_opts.is_empty() {
501 let error = DialError::NoAddresses;
502 self.behaviour
503 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
504 peer_id,
505 error: &error,
506 connection_id,
507 }));
508 return Err(error);
509 };
510
511 addresses_from_opts
512 };
513
514 let dials = addresses
515 .into_iter()
516 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
517 Ok(address) => {
518 let dial = self.transport.dial(
519 address.clone(),
520 transport::DialOpts {
521 role: dial_opts.role_override(),
522 port_use: dial_opts.port_use(),
523 },
524 );
525 let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
526 span.follows_from(tracing::Span::current());
527 match dial {
528 Ok(fut) => fut
529 .map(|r| (address, r.map_err(TransportError::Other)))
530 .instrument(span)
531 .boxed(),
532 Err(err) => futures::future::ready((address, Err(err))).boxed(),
533 }
534 }
535 Err(address) => futures::future::ready((
536 address.clone(),
537 Err(TransportError::MultiaddrNotSupported(address)),
538 ))
539 .boxed(),
540 })
541 .collect();
542
543 self.pool.add_outgoing(
544 dials,
545 peer_id,
546 dial_opts.role_override(),
547 dial_opts.port_use(),
548 dial_opts.dial_concurrency_override(),
549 connection_id,
550 );
551
552 Ok(())
553 }
554
555 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
557 self.listened_addrs.values().flatten()
558 }
559
560 pub fn local_peer_id(&self) -> &PeerId {
562 &self.local_peer_id
563 }
564
565 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
567 self.confirmed_external_addr.iter()
568 }
569
570 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
571 let addr = opts.address();
572 let listener_id = opts.listener_id();
573
574 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
575 self.behaviour
576 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
577 listener_id,
578 err: &e,
579 }));
580
581 return Err(e);
582 }
583
584 self.behaviour
585 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
586 listener_id,
587 }));
588
589 Ok(())
590 }
591
592 pub fn add_external_address(&mut self, a: Multiaddr) {
598 self.behaviour
599 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
600 addr: &a,
601 }));
602 self.confirmed_external_addr.insert(a);
603 }
604
605 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
610 self.behaviour
611 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
612 self.confirmed_external_addr.remove(addr);
613 }
614
615 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
620 self.behaviour
621 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
622 peer_id,
623 addr: &addr,
624 }))
625 }
626
627 #[allow(clippy::result_unit_err)]
635 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
636 let was_connected = self.pool.is_connected(peer_id);
637 self.pool.disconnect(peer_id);
638
639 if was_connected {
640 Ok(())
641 } else {
642 Err(())
643 }
644 }
645
646 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
657 if let Some(established) = self.pool.get_established(connection_id) {
658 established.start_close();
659 return true;
660 }
661
662 false
663 }
664
665 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
667 self.pool.is_connected(*peer_id)
668 }
669
670 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
672 self.pool.iter_connected()
673 }
674
675 pub fn behaviour(&self) -> &TBehaviour {
677 &self.behaviour
678 }
679
680 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
682 &mut self.behaviour
683 }
684
685 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
686 match event {
687 PoolEvent::ConnectionEstablished {
688 peer_id,
689 id,
690 endpoint,
691 connection,
692 concurrent_dial_errors,
693 established_in,
694 } => {
695 let handler = match endpoint.clone() {
696 ConnectedPoint::Dialer {
697 address,
698 role_override,
699 port_use,
700 } => {
701 match self.behaviour.handle_established_outbound_connection(
702 id,
703 peer_id,
704 &address,
705 role_override,
706 port_use,
707 ) {
708 Ok(handler) => handler,
709 Err(cause) => {
710 let dial_error = DialError::Denied { cause };
711 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
712 DialFailure {
713 connection_id: id,
714 error: &dial_error,
715 peer_id: Some(peer_id),
716 },
717 ));
718
719 self.pending_swarm_events.push_back(
720 SwarmEvent::OutgoingConnectionError {
721 peer_id: Some(peer_id),
722 connection_id: id,
723 error: dial_error,
724 },
725 );
726 return;
727 }
728 }
729 }
730 ConnectedPoint::Listener {
731 local_addr,
732 send_back_addr,
733 } => {
734 match self.behaviour.handle_established_inbound_connection(
735 id,
736 peer_id,
737 &local_addr,
738 &send_back_addr,
739 ) {
740 Ok(handler) => handler,
741 Err(cause) => {
742 let listen_error = ListenError::Denied { cause };
743 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
744 ListenFailure {
745 local_addr: &local_addr,
746 send_back_addr: &send_back_addr,
747 error: &listen_error,
748 connection_id: id,
749 peer_id: Some(peer_id),
750 },
751 ));
752
753 self.pending_swarm_events.push_back(
754 SwarmEvent::IncomingConnectionError {
755 connection_id: id,
756 send_back_addr,
757 local_addr,
758 error: listen_error,
759 },
760 );
761 return;
762 }
763 }
764 }
765 };
766
767 let supported_protocols = handler
768 .listen_protocol()
769 .upgrade()
770 .protocol_info()
771 .map(|p| p.as_ref().as_bytes().to_vec())
772 .collect();
773 let other_established_connection_ids = self
774 .pool
775 .iter_established_connections_of_peer(&peer_id)
776 .collect::<Vec<_>>();
777 let num_established = NonZeroU32::new(
778 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
779 )
780 .expect("n + 1 is always non-zero; qed");
781
782 self.pool
783 .spawn_connection(id, peer_id, &endpoint, connection, handler);
784
785 tracing::debug!(
786 peer=%peer_id,
787 ?endpoint,
788 total_peers=%num_established,
789 "Connection established"
790 );
791 let failed_addresses = concurrent_dial_errors
792 .as_ref()
793 .map(|es| {
794 es.iter()
795 .map(|(a, _)| a)
796 .cloned()
797 .collect::<Vec<Multiaddr>>()
798 })
799 .unwrap_or_default();
800 self.behaviour
801 .on_swarm_event(FromSwarm::ConnectionEstablished(
802 behaviour::ConnectionEstablished {
803 peer_id,
804 connection_id: id,
805 endpoint: &endpoint,
806 failed_addresses: &failed_addresses,
807 other_established: other_established_connection_ids.len(),
808 },
809 ));
810 self.supported_protocols = supported_protocols;
811 self.pending_swarm_events
812 .push_back(SwarmEvent::ConnectionEstablished {
813 peer_id,
814 connection_id: id,
815 num_established,
816 endpoint,
817 concurrent_dial_errors,
818 established_in,
819 });
820 }
821 PoolEvent::PendingOutboundConnectionError {
822 id: connection_id,
823 error,
824 peer,
825 } => {
826 let error = error.into();
827
828 self.behaviour
829 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
830 peer_id: peer,
831 error: &error,
832 connection_id,
833 }));
834
835 if let Some(peer) = peer {
836 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
837 } else {
838 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
839 }
840
841 self.pending_swarm_events
842 .push_back(SwarmEvent::OutgoingConnectionError {
843 peer_id: peer,
844 connection_id,
845 error,
846 });
847 }
848 PoolEvent::PendingInboundConnectionError {
849 id,
850 send_back_addr,
851 local_addr,
852 error,
853 } => {
854 let error = error.into();
855
856 tracing::debug!("Incoming connection failed: {:?}", error);
857 self.behaviour
858 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
859 local_addr: &local_addr,
860 send_back_addr: &send_back_addr,
861 error: &error,
862 connection_id: id,
863 peer_id: None,
864 }));
865 self.pending_swarm_events
866 .push_back(SwarmEvent::IncomingConnectionError {
867 connection_id: id,
868 local_addr,
869 send_back_addr,
870 error,
871 });
872 }
873 PoolEvent::ConnectionClosed {
874 id,
875 connected,
876 error,
877 remaining_established_connection_ids,
878 ..
879 } => {
880 if let Some(error) = error.as_ref() {
881 tracing::debug!(
882 total_peers=%remaining_established_connection_ids.len(),
883 "Connection closed with error {:?}: {:?}",
884 error,
885 connected,
886 );
887 } else {
888 tracing::debug!(
889 total_peers=%remaining_established_connection_ids.len(),
890 "Connection closed: {:?}",
891 connected
892 );
893 }
894 let peer_id = connected.peer_id;
895 let endpoint = connected.endpoint;
896 let num_established =
897 u32::try_from(remaining_established_connection_ids.len()).unwrap();
898
899 self.behaviour
900 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
901 peer_id,
902 connection_id: id,
903 endpoint: &endpoint,
904 cause: error.as_ref(),
905 remaining_established: num_established as usize,
906 }));
907 self.pending_swarm_events
908 .push_back(SwarmEvent::ConnectionClosed {
909 peer_id,
910 connection_id: id,
911 endpoint,
912 cause: error,
913 num_established,
914 });
915 }
916 PoolEvent::ConnectionEvent { peer_id, id, event } => {
917 self.behaviour
918 .on_connection_handler_event(peer_id, id, event);
919 }
920 PoolEvent::AddressChange {
921 peer_id,
922 id,
923 new_endpoint,
924 old_endpoint,
925 } => {
926 self.behaviour
927 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
928 peer_id,
929 connection_id: id,
930 old: &old_endpoint,
931 new: &new_endpoint,
932 }));
933 }
934 }
935 }
936
937 fn handle_transport_event(
938 &mut self,
939 event: TransportEvent<
940 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
941 io::Error,
942 >,
943 ) {
944 match event {
945 TransportEvent::Incoming {
946 listener_id: _,
947 upgrade,
948 local_addr,
949 send_back_addr,
950 } => {
951 let connection_id = ConnectionId::next();
952
953 match self.behaviour.handle_pending_inbound_connection(
954 connection_id,
955 &local_addr,
956 &send_back_addr,
957 ) {
958 Ok(()) => {}
959 Err(cause) => {
960 let listen_error = ListenError::Denied { cause };
961
962 self.behaviour
963 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
964 local_addr: &local_addr,
965 send_back_addr: &send_back_addr,
966 error: &listen_error,
967 connection_id,
968 peer_id: None,
969 }));
970
971 self.pending_swarm_events
972 .push_back(SwarmEvent::IncomingConnectionError {
973 connection_id,
974 local_addr,
975 send_back_addr,
976 error: listen_error,
977 });
978 return;
979 }
980 }
981
982 self.pool.add_incoming(
983 upgrade,
984 IncomingInfo {
985 local_addr: &local_addr,
986 send_back_addr: &send_back_addr,
987 },
988 connection_id,
989 );
990
991 self.pending_swarm_events
992 .push_back(SwarmEvent::IncomingConnection {
993 connection_id,
994 local_addr,
995 send_back_addr,
996 })
997 }
998 TransportEvent::NewAddress {
999 listener_id,
1000 listen_addr,
1001 } => {
1002 tracing::debug!(
1003 listener=?listener_id,
1004 address=%listen_addr,
1005 "New listener address"
1006 );
1007 let addrs = self.listened_addrs.entry(listener_id).or_default();
1008 if !addrs.contains(&listen_addr) {
1009 addrs.push(listen_addr.clone())
1010 }
1011 self.behaviour
1012 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1013 listener_id,
1014 addr: &listen_addr,
1015 }));
1016 self.pending_swarm_events
1017 .push_back(SwarmEvent::NewListenAddr {
1018 listener_id,
1019 address: listen_addr,
1020 })
1021 }
1022 TransportEvent::AddressExpired {
1023 listener_id,
1024 listen_addr,
1025 } => {
1026 tracing::debug!(
1027 listener=?listener_id,
1028 address=%listen_addr,
1029 "Expired listener address"
1030 );
1031 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1032 addrs.retain(|a| a != &listen_addr);
1033 }
1034 self.behaviour
1035 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1036 listener_id,
1037 addr: &listen_addr,
1038 }));
1039 self.pending_swarm_events
1040 .push_back(SwarmEvent::ExpiredListenAddr {
1041 listener_id,
1042 address: listen_addr,
1043 })
1044 }
1045 TransportEvent::ListenerClosed {
1046 listener_id,
1047 reason,
1048 } => {
1049 tracing::debug!(
1050 listener=?listener_id,
1051 ?reason,
1052 "Listener closed"
1053 );
1054 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1055 for addr in addrs.iter() {
1056 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1057 ExpiredListenAddr { listener_id, addr },
1058 ));
1059 }
1060 self.behaviour
1061 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1062 listener_id,
1063 reason: reason.as_ref().copied(),
1064 }));
1065 self.pending_swarm_events
1066 .push_back(SwarmEvent::ListenerClosed {
1067 listener_id,
1068 addresses: addrs.to_vec(),
1069 reason,
1070 })
1071 }
1072 TransportEvent::ListenerError { listener_id, error } => {
1073 self.behaviour
1074 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1075 listener_id,
1076 err: &error,
1077 }));
1078 self.pending_swarm_events
1079 .push_back(SwarmEvent::ListenerError { listener_id, error })
1080 }
1081 }
1082 }
1083
1084 fn handle_behaviour_event(
1085 &mut self,
1086 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1087 ) {
1088 match event {
1089 ToSwarm::GenerateEvent(event) => {
1090 self.pending_swarm_events
1091 .push_back(SwarmEvent::Behaviour(event));
1092 }
1093 ToSwarm::Dial { opts } => {
1094 let peer_id = opts.get_peer_id();
1095 let connection_id = opts.connection_id();
1096 if let Ok(()) = self.dial(opts) {
1097 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1098 peer_id,
1099 connection_id,
1100 });
1101 }
1102 }
1103 ToSwarm::ListenOn { opts } => {
1104 let _ = self.add_listener(opts);
1106 }
1107 ToSwarm::RemoveListener { id } => {
1108 self.remove_listener(id);
1109 }
1110 ToSwarm::NotifyHandler {
1111 peer_id,
1112 handler,
1113 event,
1114 } => {
1115 assert!(self.pending_handler_event.is_none());
1116 let handler = match handler {
1117 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1118 NotifyHandler::Any => {
1119 let ids = self
1120 .pool
1121 .iter_established_connections_of_peer(&peer_id)
1122 .collect();
1123 PendingNotifyHandler::Any(ids)
1124 }
1125 };
1126
1127 self.pending_handler_event = Some((peer_id, handler, event));
1128 }
1129 ToSwarm::NewExternalAddrCandidate(addr) => {
1130 if !self.confirmed_external_addr.contains(&addr) {
1131 self.behaviour
1132 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1133 NewExternalAddrCandidate { addr: &addr },
1134 ));
1135 self.pending_swarm_events
1136 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1137 }
1138 }
1139 ToSwarm::ExternalAddrConfirmed(addr) => {
1140 self.add_external_address(addr.clone());
1141 self.pending_swarm_events
1142 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1143 }
1144 ToSwarm::ExternalAddrExpired(addr) => {
1145 self.remove_external_address(&addr);
1146 self.pending_swarm_events
1147 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1148 }
1149 ToSwarm::CloseConnection {
1150 peer_id,
1151 connection,
1152 } => match connection {
1153 CloseConnection::One(connection_id) => {
1154 if let Some(conn) = self.pool.get_established(connection_id) {
1155 conn.start_close();
1156 }
1157 }
1158 CloseConnection::All => {
1159 self.pool.disconnect(peer_id);
1160 }
1161 },
1162 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1163 self.behaviour
1164 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1165 peer_id,
1166 addr: &address,
1167 }));
1168 self.pending_swarm_events
1169 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1170 }
1171 }
1172 }
1173
1174 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1178 fn poll_next_event(
1179 mut self: Pin<&mut Self>,
1180 cx: &mut Context<'_>,
1181 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1182 let this = &mut *self;
1185
1186 loop {
1197 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1198 return Poll::Ready(swarm_event);
1199 }
1200
1201 match this.pending_handler_event.take() {
1202 Some((peer_id, handler, event)) => match handler {
1205 PendingNotifyHandler::One(conn_id) => {
1206 match this.pool.get_established(conn_id) {
1207 Some(conn) => match notify_one(conn, event, cx) {
1208 None => continue,
1209 Some(event) => {
1210 this.pending_handler_event = Some((peer_id, handler, event));
1211 }
1212 },
1213 None => continue,
1214 }
1215 }
1216 PendingNotifyHandler::Any(ids) => {
1217 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1218 None => continue,
1219 Some((event, ids)) => {
1220 let handler = PendingNotifyHandler::Any(ids);
1221 this.pending_handler_event = Some((peer_id, handler, event));
1222 }
1223 }
1224 }
1225 },
1226 None => match this.behaviour.poll(cx) {
1228 Poll::Pending => {}
1229 Poll::Ready(behaviour_event) => {
1230 this.handle_behaviour_event(behaviour_event);
1231
1232 continue;
1233 }
1234 },
1235 }
1236
1237 match this.pool.poll(cx) {
1239 Poll::Pending => {}
1240 Poll::Ready(pool_event) => {
1241 this.handle_pool_event(pool_event);
1242 continue;
1243 }
1244 }
1245
1246 match Pin::new(&mut this.transport).poll(cx) {
1248 Poll::Pending => {}
1249 Poll::Ready(transport_event) => {
1250 this.handle_transport_event(transport_event);
1251 continue;
1252 }
1253 }
1254
1255 return Poll::Pending;
1256 }
1257 }
1258}
1259
1260enum PendingNotifyHandler {
1267 One(ConnectionId),
1268 Any(SmallVec<[ConnectionId; 10]>),
1269}
1270
1271fn notify_one<THandlerInEvent>(
1280 conn: &mut EstablishedConnection<THandlerInEvent>,
1281 event: THandlerInEvent,
1282 cx: &mut Context<'_>,
1283) -> Option<THandlerInEvent> {
1284 match conn.poll_ready_notify_handler(cx) {
1285 Poll::Pending => Some(event),
1286 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1288 let _ = conn.notify_handler(event);
1290 None
1291 }
1292 }
1293}
1294
1295fn notify_any<THandler, TBehaviour>(
1306 ids: SmallVec<[ConnectionId; 10]>,
1307 pool: &mut Pool<THandler>,
1308 event: THandlerInEvent<TBehaviour>,
1309 cx: &mut Context<'_>,
1310) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1311where
1312 TBehaviour: NetworkBehaviour,
1313 THandler: ConnectionHandler<
1314 FromBehaviour = THandlerInEvent<TBehaviour>,
1315 ToBehaviour = THandlerOutEvent<TBehaviour>,
1316 >,
1317{
1318 let mut pending = SmallVec::new();
1319 let mut event = Some(event); for id in ids.into_iter() {
1321 if let Some(conn) = pool.get_established(id) {
1322 match conn.poll_ready_notify_handler(cx) {
1323 Poll::Pending => pending.push(id),
1324 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1326 let e = event.take().expect("by (1),(2)");
1327 if let Err(e) = conn.notify_handler(e) {
1328 event = Some(e) } else {
1330 break;
1331 }
1332 }
1333 }
1334 }
1335 }
1336
1337 event.and_then(|e| {
1338 if !pending.is_empty() {
1339 Some((e, pending))
1340 } else {
1341 None
1342 }
1343 })
1344}
1345
1346impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1354where
1355 TBehaviour: NetworkBehaviour,
1356{
1357 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1358
1359 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1360 self.as_mut().poll_next_event(cx).map(Some)
1361 }
1362}
1363
1364impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1366where
1367 TBehaviour: NetworkBehaviour,
1368{
1369 fn is_terminated(&self) -> bool {
1370 false
1371 }
1372}
1373
1374pub struct Config {
1375 pool_config: PoolConfig,
1376}
1377
1378impl Config {
1379 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1382 Self {
1383 pool_config: PoolConfig::new(Some(Box::new(executor))),
1384 }
1385 }
1386
1387 #[doc(hidden)]
1388 pub fn without_executor() -> Self {
1390 Self {
1391 pool_config: PoolConfig::new(None),
1392 }
1393 }
1394
1395 #[cfg(feature = "wasm-bindgen")]
1405 pub fn with_wasm_executor() -> Self {
1406 Self::with_executor(crate::executor::WasmBindgenExecutor)
1407 }
1408
1409 #[cfg(all(
1411 feature = "tokio",
1412 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1413 ))]
1414 pub fn with_tokio_executor() -> Self {
1415 Self::with_executor(crate::executor::TokioExecutor)
1416 }
1417
1418 #[cfg(all(
1420 feature = "async-std",
1421 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1422 ))]
1423 pub fn with_async_std_executor() -> Self {
1424 Self::with_executor(crate::executor::AsyncStdExecutor)
1425 }
1426
1427 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1437 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1438 self
1439 }
1440
1441 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1453 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1454 self
1455 }
1456
1457 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1459 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1460 self
1461 }
1462
1463 pub fn with_substream_upgrade_protocol_override(
1474 mut self,
1475 v: libp2p_core::upgrade::Version,
1476 ) -> Self {
1477 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1478 self
1479 }
1480
1481 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1491 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1492 self
1493 }
1494
1495 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1513 self.pool_config.idle_connection_timeout = timeout;
1514 self
1515 }
1516}
1517
1518#[derive(Debug)]
1520pub enum DialError {
1521 LocalPeerId { address: Multiaddr },
1523 NoAddresses,
1526 DialPeerConditionFalse(dial_opts::PeerCondition),
1529 Aborted,
1531 WrongPeerId {
1533 obtained: PeerId,
1534 address: Multiaddr,
1535 },
1536 Denied { cause: ConnectionDenied },
1540 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1542}
1543
1544impl From<PendingOutboundConnectionError> for DialError {
1545 fn from(error: PendingOutboundConnectionError) -> Self {
1546 match error {
1547 PendingOutboundConnectionError::Aborted => DialError::Aborted,
1548 PendingOutboundConnectionError::WrongPeerId { obtained, address } => {
1549 DialError::WrongPeerId { obtained, address }
1550 }
1551 PendingOutboundConnectionError::LocalPeerId { address } => {
1552 DialError::LocalPeerId { address }
1553 }
1554 PendingOutboundConnectionError::Transport(e) => DialError::Transport(e),
1555 }
1556 }
1557}
1558
1559impl fmt::Display for DialError {
1560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561 match self {
1562 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1563 DialError::LocalPeerId { address } => write!(
1564 f,
1565 "Dial error: tried to dial local peer id at {address:?}."
1566 ),
1567 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1568 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1569 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1570 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1571 DialError::Aborted => write!(
1572 f,
1573 "Dial error: Pending connection attempt has been aborted."
1574 ),
1575 DialError::WrongPeerId { obtained, address } => write!(
1576 f,
1577 "Dial error: Unexpected peer ID {obtained} at {address:?}."
1578 ),
1579 DialError::Transport(errors) => {
1580 write!(f, "Failed to negotiate transport protocol(s): [")?;
1581
1582 for (addr, error) in errors {
1583 write!(f, "({addr}")?;
1584 print_error_chain(f, error)?;
1585 write!(f, ")")?;
1586 }
1587 write!(f, "]")?;
1588
1589 Ok(())
1590 }
1591 DialError::Denied { .. } => {
1592 write!(f, "Dial error")
1593 }
1594 }
1595 }
1596}
1597
1598fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1599 write!(f, ": {e}")?;
1600
1601 if let Some(source) = e.source() {
1602 print_error_chain(f, source)?;
1603 }
1604
1605 Ok(())
1606}
1607
1608impl error::Error for DialError {
1609 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1610 match self {
1611 DialError::LocalPeerId { .. } => None,
1612 DialError::NoAddresses => None,
1613 DialError::DialPeerConditionFalse(_) => None,
1614 DialError::Aborted => None,
1615 DialError::WrongPeerId { .. } => None,
1616 DialError::Transport(_) => None,
1617 DialError::Denied { cause } => Some(cause),
1618 }
1619 }
1620}
1621
1622#[derive(Debug)]
1624pub enum ListenError {
1625 Aborted,
1627 WrongPeerId {
1629 obtained: PeerId,
1630 endpoint: ConnectedPoint,
1631 },
1632 LocalPeerId {
1634 address: Multiaddr,
1635 },
1636 Denied {
1637 cause: ConnectionDenied,
1638 },
1639 Transport(TransportError<io::Error>),
1641}
1642
1643impl From<PendingInboundConnectionError> for ListenError {
1644 fn from(error: PendingInboundConnectionError) -> Self {
1645 match error {
1646 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1647 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1648 PendingInboundConnectionError::LocalPeerId { address } => {
1649 ListenError::LocalPeerId { address }
1650 }
1651 }
1652 }
1653}
1654
1655impl fmt::Display for ListenError {
1656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657 match self {
1658 ListenError::Aborted => write!(
1659 f,
1660 "Listen error: Pending connection attempt has been aborted."
1661 ),
1662 ListenError::WrongPeerId { obtained, endpoint } => write!(
1663 f,
1664 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1665 ),
1666 ListenError::Transport(_) => {
1667 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1668 }
1669 ListenError::Denied { cause } => {
1670 write!(f, "Listen error: Denied: {cause}")
1671 }
1672 ListenError::LocalPeerId { address } => {
1673 write!(f, "Listen error: Local peer ID at {address:?}.")
1674 }
1675 }
1676 }
1677}
1678
1679impl error::Error for ListenError {
1680 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1681 match self {
1682 ListenError::WrongPeerId { .. } => None,
1683 ListenError::Transport(err) => Some(err),
1684 ListenError::Aborted => None,
1685 ListenError::Denied { cause } => Some(cause),
1686 ListenError::LocalPeerId { .. } => None,
1687 }
1688 }
1689}
1690
1691#[derive(Debug)]
1696pub struct ConnectionDenied {
1697 inner: Box<dyn error::Error + Send + Sync + 'static>,
1698}
1699
1700impl ConnectionDenied {
1701 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1702 Self {
1703 inner: cause.into(),
1704 }
1705 }
1706
1707 pub fn downcast<E>(self) -> Result<E, Self>
1709 where
1710 E: error::Error + Send + Sync + 'static,
1711 {
1712 let inner = self
1713 .inner
1714 .downcast::<E>()
1715 .map_err(|inner| ConnectionDenied { inner })?;
1716
1717 Ok(*inner)
1718 }
1719
1720 pub fn downcast_ref<E>(&self) -> Option<&E>
1722 where
1723 E: error::Error + Send + Sync + 'static,
1724 {
1725 self.inner.downcast_ref::<E>()
1726 }
1727}
1728
1729impl fmt::Display for ConnectionDenied {
1730 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1731 write!(f, "connection denied")
1732 }
1733}
1734
1735impl error::Error for ConnectionDenied {
1736 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1737 Some(self.inner.as_ref())
1738 }
1739}
1740
1741#[derive(Clone, Debug)]
1743pub struct NetworkInfo {
1744 num_peers: usize,
1746 connection_counters: ConnectionCounters,
1748}
1749
1750impl NetworkInfo {
1751 pub fn num_peers(&self) -> usize {
1754 self.num_peers
1755 }
1756
1757 pub fn connection_counters(&self) -> &ConnectionCounters {
1759 &self.connection_counters
1760 }
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765 use libp2p_core::{
1766 multiaddr,
1767 multiaddr::multiaddr,
1768 transport,
1769 transport::{memory::MemoryTransportError, TransportEvent},
1770 upgrade,
1771 };
1772 use libp2p_identity as identity;
1773 use libp2p_plaintext as plaintext;
1774 use libp2p_yamux as yamux;
1775 use quickcheck::*;
1776
1777 use super::*;
1778 use crate::test::{CallTraceBehaviour, MockBehaviour};
1779
1780 enum State {
1783 Connecting,
1784 Disconnecting,
1785 }
1786
1787 fn new_test_swarm(
1788 config: Config,
1789 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1790 let id_keys = identity::Keypair::generate_ed25519();
1791 let local_public_key = id_keys.public();
1792 let transport = transport::MemoryTransport::default()
1793 .upgrade(upgrade::Version::V1)
1794 .authenticate(plaintext::Config::new(&id_keys))
1795 .multiplex(yamux::Config::default())
1796 .boxed();
1797 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1798
1799 Swarm::new(transport, behaviour, local_public_key.into(), config)
1800 }
1801
1802 fn swarms_connected<TBehaviour>(
1803 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1804 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1805 num_connections: usize,
1806 ) -> bool
1807 where
1808 TBehaviour: NetworkBehaviour,
1809 THandlerOutEvent<TBehaviour>: Clone,
1810 {
1811 swarm1
1812 .behaviour()
1813 .num_connections_to_peer(*swarm2.local_peer_id())
1814 == num_connections
1815 && swarm2
1816 .behaviour()
1817 .num_connections_to_peer(*swarm1.local_peer_id())
1818 == num_connections
1819 && swarm1.is_connected(swarm2.local_peer_id())
1820 && swarm2.is_connected(swarm1.local_peer_id())
1821 }
1822
1823 fn swarms_disconnected<TBehaviour>(
1824 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1825 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1826 ) -> bool
1827 where
1828 TBehaviour: NetworkBehaviour,
1829 THandlerOutEvent<TBehaviour>: Clone,
1830 {
1831 swarm1
1832 .behaviour()
1833 .num_connections_to_peer(*swarm2.local_peer_id())
1834 == 0
1835 && swarm2
1836 .behaviour()
1837 .num_connections_to_peer(*swarm1.local_peer_id())
1838 == 0
1839 && !swarm1.is_connected(swarm2.local_peer_id())
1840 && !swarm2.is_connected(swarm1.local_peer_id())
1841 }
1842
1843 #[tokio::test]
1850 async fn test_swarm_disconnect() {
1851 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1852 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1853
1854 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1855 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1856
1857 swarm1.listen_on(addr1.clone()).unwrap();
1858 swarm2.listen_on(addr2.clone()).unwrap();
1859
1860 let swarm1_id = *swarm1.local_peer_id();
1861
1862 let mut reconnected = false;
1863 let num_connections = 10;
1864
1865 for _ in 0..num_connections {
1866 swarm1.dial(addr2.clone()).unwrap();
1867 }
1868 let mut state = State::Connecting;
1869
1870 future::poll_fn(move |cx| loop {
1871 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1872 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1873 match state {
1874 State::Connecting => {
1875 if swarms_connected(&swarm1, &swarm2, num_connections) {
1876 if reconnected {
1877 return Poll::Ready(());
1878 }
1879 swarm2
1880 .disconnect_peer_id(swarm1_id)
1881 .expect("Error disconnecting");
1882 state = State::Disconnecting;
1883 }
1884 }
1885 State::Disconnecting => {
1886 if swarms_disconnected(&swarm1, &swarm2) {
1887 if reconnected {
1888 return Poll::Ready(());
1889 }
1890 reconnected = true;
1891 for _ in 0..num_connections {
1892 swarm2.dial(addr1.clone()).unwrap();
1893 }
1894 state = State::Connecting;
1895 }
1896 }
1897 }
1898
1899 if poll1.is_pending() && poll2.is_pending() {
1900 return Poll::Pending;
1901 }
1902 })
1903 .await
1904 }
1905
1906 #[tokio::test]
1914 async fn test_behaviour_disconnect_all() {
1915 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1916 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1917
1918 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1919 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1920
1921 swarm1.listen_on(addr1.clone()).unwrap();
1922 swarm2.listen_on(addr2.clone()).unwrap();
1923
1924 let swarm1_id = *swarm1.local_peer_id();
1925
1926 let mut reconnected = false;
1927 let num_connections = 10;
1928
1929 for _ in 0..num_connections {
1930 swarm1.dial(addr2.clone()).unwrap();
1931 }
1932 let mut state = State::Connecting;
1933
1934 future::poll_fn(move |cx| loop {
1935 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1936 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1937 match state {
1938 State::Connecting => {
1939 if swarms_connected(&swarm1, &swarm2, num_connections) {
1940 if reconnected {
1941 return Poll::Ready(());
1942 }
1943 swarm2
1944 .behaviour
1945 .inner()
1946 .next_action
1947 .replace(ToSwarm::CloseConnection {
1948 peer_id: swarm1_id,
1949 connection: CloseConnection::All,
1950 });
1951 state = State::Disconnecting;
1952 continue;
1953 }
1954 }
1955 State::Disconnecting => {
1956 if swarms_disconnected(&swarm1, &swarm2) {
1957 reconnected = true;
1958 for _ in 0..num_connections {
1959 swarm2.dial(addr1.clone()).unwrap();
1960 }
1961 state = State::Connecting;
1962 continue;
1963 }
1964 }
1965 }
1966
1967 if poll1.is_pending() && poll2.is_pending() {
1968 return Poll::Pending;
1969 }
1970 })
1971 .await
1972 }
1973
1974 #[tokio::test]
1982 async fn test_behaviour_disconnect_one() {
1983 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1984 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1985
1986 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1987 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1988
1989 swarm1.listen_on(addr1).unwrap();
1990 swarm2.listen_on(addr2.clone()).unwrap();
1991
1992 let swarm1_id = *swarm1.local_peer_id();
1993
1994 let num_connections = 10;
1995
1996 for _ in 0..num_connections {
1997 swarm1.dial(addr2.clone()).unwrap();
1998 }
1999 let mut state = State::Connecting;
2000 let mut disconnected_conn_id = None;
2001
2002 future::poll_fn(move |cx| loop {
2003 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2004 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2005 match state {
2006 State::Connecting => {
2007 if swarms_connected(&swarm1, &swarm2, num_connections) {
2008 disconnected_conn_id = {
2009 let conn_id =
2010 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2011 swarm2.behaviour.inner().next_action.replace(
2012 ToSwarm::CloseConnection {
2013 peer_id: swarm1_id,
2014 connection: CloseConnection::One(conn_id),
2015 },
2016 );
2017 Some(conn_id)
2018 };
2019 state = State::Disconnecting;
2020 }
2021 }
2022 State::Disconnecting => {
2023 for s in &[&swarm1, &swarm2] {
2024 assert!(s
2025 .behaviour
2026 .on_connection_closed
2027 .iter()
2028 .all(|(.., remaining_conns)| *remaining_conns > 0));
2029 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2030 s.behaviour.assert_connected(num_connections, 1);
2031 }
2032 if [&swarm1, &swarm2]
2033 .iter()
2034 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2035 {
2036 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2037 assert_eq!(Some(conn_id), disconnected_conn_id);
2038 return Poll::Ready(());
2039 }
2040 }
2041 }
2042
2043 if poll1.is_pending() && poll2.is_pending() {
2044 return Poll::Pending;
2045 }
2046 })
2047 .await
2048 }
2049
2050 #[test]
2051 fn concurrent_dialing() {
2052 #[derive(Clone, Debug)]
2053 struct DialConcurrencyFactor(NonZeroU8);
2054
2055 impl Arbitrary for DialConcurrencyFactor {
2056 fn arbitrary(g: &mut Gen) -> Self {
2057 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2058 }
2059 }
2060
2061 fn prop(concurrency_factor: DialConcurrencyFactor) {
2062 tokio::runtime::Runtime::new().unwrap().block_on(async {
2063 let mut swarm = new_test_swarm(
2064 Config::with_tokio_executor()
2065 .with_dial_concurrency_factor(concurrency_factor.0),
2066 );
2067
2068 let num_listen_addrs = concurrency_factor.0.get() + 2;
2072 let mut listen_addresses = Vec::new();
2073 let mut transports = Vec::new();
2074 for _ in 0..num_listen_addrs {
2075 let mut transport = transport::MemoryTransport::default().boxed();
2076 transport
2077 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2078 .unwrap();
2079
2080 match transport.select_next_some().await {
2081 TransportEvent::NewAddress { listen_addr, .. } => {
2082 listen_addresses.push(listen_addr);
2083 }
2084 _ => panic!("Expected `NewListenAddr` event."),
2085 }
2086
2087 transports.push(transport);
2088 }
2089
2090 swarm
2093 .dial(
2094 DialOpts::peer_id(PeerId::random())
2095 .addresses(listen_addresses)
2096 .build(),
2097 )
2098 .unwrap();
2099 for mut transport in transports.into_iter() {
2100 match futures::future::select(transport.select_next_some(), swarm.next()).await
2101 {
2102 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2103 future::Either::Left(_) => {
2104 panic!("Unexpected transport event.")
2105 }
2106 future::Either::Right((e, _)) => {
2107 panic!("Expect swarm to not emit any event {e:?}")
2108 }
2109 }
2110 }
2111
2112 match swarm.next().await.unwrap() {
2113 SwarmEvent::OutgoingConnectionError { .. } => {}
2114 e => panic!("Unexpected swarm event {e:?}"),
2115 }
2116 })
2117 }
2118
2119 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2120 }
2121
2122 #[tokio::test]
2123 async fn invalid_peer_id() {
2124 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2128 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2129
2130 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2131
2132 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2133 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2134 Poll::Pending => Poll::Pending,
2135 _ => panic!("Was expecting the listen address to be reported"),
2136 })
2137 .await;
2138
2139 let other_id = PeerId::random();
2140 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2141
2142 swarm2.dial(other_addr.clone()).unwrap();
2143
2144 let (peer_id, error) = future::poll_fn(|cx| {
2145 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2146 swarm1.poll_next_unpin(cx)
2147 {}
2148
2149 match swarm2.poll_next_unpin(cx) {
2150 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2151 peer_id, error, ..
2152 })) => Poll::Ready((peer_id, error)),
2153 Poll::Ready(x) => panic!("unexpected {x:?}"),
2154 Poll::Pending => Poll::Pending,
2155 }
2156 })
2157 .await;
2158 assert_eq!(peer_id.unwrap(), other_id);
2159 match error {
2160 DialError::WrongPeerId { obtained, address } => {
2161 assert_eq!(obtained, *swarm1.local_peer_id());
2162 assert_eq!(address, other_addr);
2163 }
2164 x => panic!("wrong error {x:?}"),
2165 }
2166 }
2167
2168 #[tokio::test]
2169 async fn dial_self() {
2170 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2183 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2184
2185 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2186 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2187 Poll::Pending => Poll::Pending,
2188 _ => panic!("Was expecting the listen address to be reported"),
2189 })
2190 .await;
2191
2192 swarm.listened_addrs.clear();
2195 swarm.dial(local_address.clone()).unwrap();
2196
2197 let mut got_dial_err = false;
2198 let mut got_inc_err = false;
2199 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2200 loop {
2201 match swarm.poll_next_unpin(cx) {
2202 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2203 peer_id,
2204 error: DialError::LocalPeerId { .. },
2205 ..
2206 })) => {
2207 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2208 assert!(!got_dial_err);
2209 got_dial_err = true;
2210 if got_inc_err {
2211 return Poll::Ready(Ok(()));
2212 }
2213 }
2214 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2215 local_addr, ..
2216 })) => {
2217 assert!(!got_inc_err);
2218 assert_eq!(local_addr, local_address);
2219 got_inc_err = true;
2220 if got_dial_err {
2221 return Poll::Ready(Ok(()));
2222 }
2223 }
2224 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2225 assert_eq!(local_addr, local_address);
2226 }
2227 Poll::Ready(ev) => {
2228 panic!("Unexpected event: {ev:?}")
2229 }
2230 Poll::Pending => break Poll::Pending,
2231 }
2232 }
2233 })
2234 .await
2235 .unwrap();
2236 }
2237
2238 #[tokio::test]
2239 async fn dial_self_by_id() {
2240 let swarm = new_test_swarm(Config::with_tokio_executor());
2243 let peer_id = *swarm.local_peer_id();
2244 assert!(!swarm.is_connected(&peer_id));
2245 }
2246
2247 #[tokio::test]
2248 async fn multiple_addresses_err() {
2249 let target = PeerId::random();
2252
2253 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2254
2255 let addresses = HashSet::from([
2256 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2258 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2259 multiaddr![Udp(rand::random::<u16>())],
2260 multiaddr![Udp(rand::random::<u16>())],
2261 multiaddr![Udp(rand::random::<u16>())],
2262 multiaddr![Udp(rand::random::<u16>())],
2263 multiaddr![Udp(rand::random::<u16>())],
2264 ]);
2265
2266 swarm
2267 .dial(
2268 DialOpts::peer_id(target)
2269 .addresses(addresses.iter().cloned().collect())
2270 .build(),
2271 )
2272 .unwrap();
2273
2274 match swarm.next().await.unwrap() {
2275 SwarmEvent::OutgoingConnectionError {
2276 peer_id,
2277 error: DialError::Transport(errors),
2279 ..
2280 } => {
2281 assert_eq!(target, peer_id.unwrap());
2282
2283 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2284 let expected_addresses = addresses
2285 .into_iter()
2286 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2287 .collect::<Vec<_>>();
2288
2289 assert_eq!(expected_addresses, failed_addresses);
2290 }
2291 e => panic!("Unexpected event: {e:?}"),
2292 }
2293 }
2294
2295 #[tokio::test]
2296 async fn aborting_pending_connection_surfaces_error() {
2297 let _ = tracing_subscriber::fmt()
2298 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2299 .try_init();
2300
2301 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2302 let mut listener = new_test_swarm(Config::with_tokio_executor());
2303
2304 let listener_peer_id = *listener.local_peer_id();
2305 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2306 let listener_address = match listener.next().await.unwrap() {
2307 SwarmEvent::NewListenAddr { address, .. } => address,
2308 e => panic!("Unexpected network event: {e:?}"),
2309 };
2310
2311 dialer
2312 .dial(
2313 DialOpts::peer_id(listener_peer_id)
2314 .addresses(vec![listener_address])
2315 .build(),
2316 )
2317 .unwrap();
2318
2319 dialer
2320 .disconnect_peer_id(listener_peer_id)
2321 .expect_err("Expect peer to not yet be connected.");
2322
2323 match dialer.next().await.unwrap() {
2324 SwarmEvent::OutgoingConnectionError {
2325 error: DialError::Aborted,
2326 ..
2327 } => {}
2328 e => panic!("Unexpected swarm event {e:?}."),
2329 }
2330 }
2331
2332 #[test]
2333 fn dial_error_prints_sources() {
2334 let error = DialError::Transport(vec![(
2336 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2337 TransportError::Other(io::Error::new(
2338 io::ErrorKind::Other,
2339 MemoryTransportError::Unreachable,
2340 )),
2341 )]);
2342
2343 let string = format!("{error}");
2344
2345 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2348 }
2349}