1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71#[doc(hidden)]
73pub mod derive_prelude {
74 pub use either::Either;
75 pub use futures::prelude as futures;
76 pub use libp2p_core::{
77 transport::{ListenerId, PortUse},
78 ConnectedPoint, Endpoint, Multiaddr,
79 };
80 pub use libp2p_identity::PeerId;
81
82 pub use crate::{
83 behaviour::{
84 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85 ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86 ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87 NewListener,
88 },
89 connection::ConnectionId,
90 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92 };
93}
94
95use std::{
96 collections::{HashMap, HashSet, VecDeque},
97 error, fmt, io,
98 num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99 pin::Pin,
100 task::{Context, Poll},
101 time::Duration,
102};
103
104pub use behaviour::{
105 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112 pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113 IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
114};
115use dial_opts::{DialOpts, PeerCondition};
116pub use executor::Executor;
117use futures::{prelude::*, stream::FusedStream};
118pub use handler::{
119 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122use libp2p_core::{
123 connection::ConnectedPoint,
124 muxing::StreamMuxerBox,
125 transport::{self, ListenerId, TransportError, TransportEvent},
126 Multiaddr, Transport,
127};
128use libp2p_identity::PeerId;
129#[cfg(feature = "macros")]
130pub use libp2p_swarm_derive::NetworkBehaviour;
131pub use listen_opts::ListenOpts;
132use smallvec::SmallVec;
133pub use stream::Stream;
134pub use stream_protocol::{InvalidProtocol, StreamProtocol};
135use tracing::Instrument;
136#[doc(hidden)]
137pub use translation::_address_translation;
138
139use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
140
141type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
143
144pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
147
148pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
151
152pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
154
155#[derive(Debug)]
157#[non_exhaustive]
158pub enum SwarmEvent<TBehaviourOutEvent> {
159 Behaviour(TBehaviourOutEvent),
161 ConnectionEstablished {
163 peer_id: PeerId,
165 connection_id: ConnectionId,
167 endpoint: ConnectedPoint,
169 num_established: NonZeroU32,
172 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
176 established_in: std::time::Duration,
178 },
179 ConnectionClosed {
182 peer_id: PeerId,
184 connection_id: ConnectionId,
186 endpoint: ConnectedPoint,
188 num_established: u32,
190 cause: Option<ConnectionError>,
193 },
194 IncomingConnection {
200 connection_id: ConnectionId,
202 local_addr: Multiaddr,
206 send_back_addr: Multiaddr,
208 },
209 IncomingConnectionError {
214 connection_id: ConnectionId,
216 local_addr: Multiaddr,
220 send_back_addr: Multiaddr,
222 error: ListenError,
224 peer_id: Option<PeerId>,
226 },
227 OutgoingConnectionError {
229 connection_id: ConnectionId,
231 peer_id: Option<PeerId>,
233 error: DialError,
235 },
236 NewListenAddr {
238 listener_id: ListenerId,
240 address: Multiaddr,
242 },
243 ExpiredListenAddr {
245 listener_id: ListenerId,
247 address: Multiaddr,
249 },
250 ListenerClosed {
252 listener_id: ListenerId,
254 addresses: Vec<Multiaddr>,
258 reason: Result<(), io::Error>,
261 },
262 ListenerError {
264 listener_id: ListenerId,
266 error: io::Error,
268 },
269 Dialing {
277 peer_id: Option<PeerId>,
279
280 connection_id: ConnectionId,
282 },
283 NewExternalAddrCandidate { address: Multiaddr },
285 ExternalAddrConfirmed { address: Multiaddr },
287 ExternalAddrExpired { address: Multiaddr },
289 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
291}
292
293impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
294 #[allow(clippy::result_large_err)]
297 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
298 match self {
299 SwarmEvent::Behaviour(inner) => Ok(inner),
300 other => Err(other),
301 }
302 }
303}
304
305pub struct Swarm<TBehaviour>
310where
311 TBehaviour: NetworkBehaviour,
312{
313 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
315
316 pool: Pool<THandler<TBehaviour>>,
318
319 local_peer_id: PeerId,
321
322 behaviour: TBehaviour,
325
326 supported_protocols: SmallVec<[Vec<u8>; 16]>,
328
329 confirmed_external_addr: HashSet<Multiaddr>,
330
331 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
333
334 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
338
339 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
340}
341
342impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
343
344impl<TBehaviour> Swarm<TBehaviour>
345where
346 TBehaviour: NetworkBehaviour,
347{
348 pub fn new(
351 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
352 behaviour: TBehaviour,
353 local_peer_id: PeerId,
354 config: Config,
355 ) -> Self {
356 tracing::info!(%local_peer_id);
357
358 Swarm {
359 local_peer_id,
360 transport,
361 pool: Pool::new(local_peer_id, config.pool_config),
362 behaviour,
363 supported_protocols: Default::default(),
364 confirmed_external_addr: Default::default(),
365 listened_addrs: HashMap::new(),
366 pending_handler_event: None,
367 pending_swarm_events: VecDeque::default(),
368 }
369 }
370
371 pub fn network_info(&self) -> NetworkInfo {
373 let num_peers = self.pool.num_peers();
374 let connection_counters = self.pool.counters().clone();
375 NetworkInfo {
376 num_peers,
377 connection_counters,
378 }
379 }
380
381 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
387 let opts = ListenOpts::new(addr);
388 let id = opts.listener_id();
389 self.add_listener(opts)?;
390 Ok(id)
391 }
392
393 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
398 self.transport.remove_listener(listener_id)
399 }
400
401 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
429 let dial_opts = opts.into();
430
431 let peer_id = dial_opts.get_peer_id();
432 let condition = dial_opts.peer_condition();
433 let connection_id = dial_opts.connection_id();
434
435 let should_dial = match (condition, peer_id) {
436 (_, None) => true,
437 (PeerCondition::Always, _) => true,
438 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
439 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
440 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
441 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
442 }
443 };
444
445 if !should_dial {
446 let e = DialError::DialPeerConditionFalse(condition);
447
448 self.behaviour
449 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
450 peer_id,
451 error: &e,
452 connection_id,
453 }));
454
455 return Err(e);
456 }
457
458 let addresses = {
459 let mut addresses_from_opts = dial_opts.get_addresses();
460
461 match self.behaviour.handle_pending_outbound_connection(
462 connection_id,
463 peer_id,
464 addresses_from_opts.as_slice(),
465 dial_opts.role_override(),
466 ) {
467 Ok(addresses) => {
468 if dial_opts.extend_addresses_through_behaviour() {
469 addresses_from_opts.extend(addresses)
470 } else {
471 let num_addresses = addresses.len();
472
473 if num_addresses > 0 {
474 tracing::debug!(
475 connection=%connection_id,
476 discarded_addresses_count=%num_addresses,
477 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
478 )
479 }
480 }
481 }
482 Err(cause) => {
483 let error = DialError::Denied { cause };
484
485 self.behaviour
486 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
487 peer_id,
488 error: &error,
489 connection_id,
490 }));
491
492 return Err(error);
493 }
494 }
495
496 let mut unique_addresses = HashSet::new();
497 addresses_from_opts.retain(|addr| {
498 !self.listened_addrs.values().flatten().any(|a| a == addr)
499 && unique_addresses.insert(addr.clone())
500 });
501
502 if addresses_from_opts.is_empty() {
503 let error = DialError::NoAddresses;
504 self.behaviour
505 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
506 peer_id,
507 error: &error,
508 connection_id,
509 }));
510 return Err(error);
511 };
512
513 addresses_from_opts
514 };
515
516 let dials = addresses
517 .into_iter()
518 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
519 Ok(address) => {
520 let dial = self.transport.dial(
521 address.clone(),
522 transport::DialOpts {
523 role: dial_opts.role_override(),
524 port_use: dial_opts.port_use(),
525 },
526 );
527 let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
528 span.follows_from(tracing::Span::current());
529 match dial {
530 Ok(fut) => fut
531 .map(|r| (address, r.map_err(TransportError::Other)))
532 .instrument(span)
533 .boxed(),
534 Err(err) => futures::future::ready((address, Err(err))).boxed(),
535 }
536 }
537 Err(address) => futures::future::ready((
538 address.clone(),
539 Err(TransportError::MultiaddrNotSupported(address)),
540 ))
541 .boxed(),
542 })
543 .collect();
544
545 self.pool.add_outgoing(
546 dials,
547 peer_id,
548 dial_opts.role_override(),
549 dial_opts.port_use(),
550 dial_opts.dial_concurrency_override(),
551 connection_id,
552 );
553
554 Ok(())
555 }
556
557 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
559 self.listened_addrs.values().flatten()
560 }
561
562 pub fn local_peer_id(&self) -> &PeerId {
564 &self.local_peer_id
565 }
566
567 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
569 self.confirmed_external_addr.iter()
570 }
571
572 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
573 let addr = opts.address();
574 let listener_id = opts.listener_id();
575
576 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
577 self.behaviour
578 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
579 listener_id,
580 err: &e,
581 }));
582
583 return Err(e);
584 }
585
586 self.behaviour
587 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
588 listener_id,
589 }));
590
591 Ok(())
592 }
593
594 pub fn add_external_address(&mut self, a: Multiaddr) {
600 self.behaviour
601 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
602 addr: &a,
603 }));
604 self.confirmed_external_addr.insert(a);
605 }
606
607 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
612 self.behaviour
613 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
614 self.confirmed_external_addr.remove(addr);
615 }
616
617 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
622 self.behaviour
623 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
624 peer_id,
625 addr: &addr,
626 }))
627 }
628
629 #[allow(clippy::result_unit_err)]
637 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
638 let was_connected = self.pool.is_connected(peer_id);
639 self.pool.disconnect(peer_id);
640
641 if was_connected {
642 Ok(())
643 } else {
644 Err(())
645 }
646 }
647
648 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
659 if let Some(established) = self.pool.get_established(connection_id) {
660 established.start_close();
661 return true;
662 }
663
664 false
665 }
666
667 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
669 self.pool.is_connected(*peer_id)
670 }
671
672 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
674 self.pool.iter_connected()
675 }
676
677 pub fn behaviour(&self) -> &TBehaviour {
679 &self.behaviour
680 }
681
682 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
684 &mut self.behaviour
685 }
686
687 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
688 match event {
689 PoolEvent::ConnectionEstablished {
690 peer_id,
691 id,
692 endpoint,
693 connection,
694 concurrent_dial_errors,
695 established_in,
696 } => {
697 let handler = match endpoint.clone() {
698 ConnectedPoint::Dialer {
699 address,
700 role_override,
701 port_use,
702 } => {
703 match self.behaviour.handle_established_outbound_connection(
704 id,
705 peer_id,
706 &address,
707 role_override,
708 port_use,
709 ) {
710 Ok(handler) => handler,
711 Err(cause) => {
712 let dial_error = DialError::Denied { cause };
713 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
714 DialFailure {
715 connection_id: id,
716 error: &dial_error,
717 peer_id: Some(peer_id),
718 },
719 ));
720
721 self.pending_swarm_events.push_back(
722 SwarmEvent::OutgoingConnectionError {
723 peer_id: Some(peer_id),
724 connection_id: id,
725 error: dial_error,
726 },
727 );
728 return;
729 }
730 }
731 }
732 ConnectedPoint::Listener {
733 local_addr,
734 send_back_addr,
735 } => {
736 match self.behaviour.handle_established_inbound_connection(
737 id,
738 peer_id,
739 &local_addr,
740 &send_back_addr,
741 ) {
742 Ok(handler) => handler,
743 Err(cause) => {
744 let listen_error = ListenError::Denied { cause };
745 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
746 ListenFailure {
747 local_addr: &local_addr,
748 send_back_addr: &send_back_addr,
749 error: &listen_error,
750 connection_id: id,
751 peer_id: Some(peer_id),
752 },
753 ));
754
755 self.pending_swarm_events.push_back(
756 SwarmEvent::IncomingConnectionError {
757 connection_id: id,
758 send_back_addr,
759 local_addr,
760 error: listen_error,
761 peer_id: Some(peer_id),
762 },
763 );
764 return;
765 }
766 }
767 }
768 };
769
770 let supported_protocols = handler
771 .listen_protocol()
772 .upgrade()
773 .protocol_info()
774 .map(|p| p.as_ref().as_bytes().to_vec())
775 .collect();
776 let other_established_connection_ids = self
777 .pool
778 .iter_established_connections_of_peer(&peer_id)
779 .collect::<Vec<_>>();
780 let num_established = NonZeroU32::new(
781 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
782 )
783 .expect("n + 1 is always non-zero; qed");
784
785 self.pool
786 .spawn_connection(id, peer_id, &endpoint, connection, handler);
787
788 tracing::debug!(
789 peer=%peer_id,
790 ?endpoint,
791 total_peers=%num_established,
792 "Connection established"
793 );
794 let failed_addresses = concurrent_dial_errors
795 .as_ref()
796 .map(|es| {
797 es.iter()
798 .map(|(a, _)| a)
799 .cloned()
800 .collect::<Vec<Multiaddr>>()
801 })
802 .unwrap_or_default();
803 self.behaviour
804 .on_swarm_event(FromSwarm::ConnectionEstablished(
805 behaviour::ConnectionEstablished {
806 peer_id,
807 connection_id: id,
808 endpoint: &endpoint,
809 failed_addresses: &failed_addresses,
810 other_established: other_established_connection_ids.len(),
811 },
812 ));
813 self.supported_protocols = supported_protocols;
814 self.pending_swarm_events
815 .push_back(SwarmEvent::ConnectionEstablished {
816 peer_id,
817 connection_id: id,
818 num_established,
819 endpoint,
820 concurrent_dial_errors,
821 established_in,
822 });
823 }
824 PoolEvent::PendingOutboundConnectionError {
825 id: connection_id,
826 error,
827 peer,
828 } => {
829 let error = error.into();
830
831 self.behaviour
832 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
833 peer_id: peer,
834 error: &error,
835 connection_id,
836 }));
837
838 if let Some(peer) = peer {
839 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
840 } else {
841 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
842 }
843
844 self.pending_swarm_events
845 .push_back(SwarmEvent::OutgoingConnectionError {
846 peer_id: peer,
847 connection_id,
848 error,
849 });
850 }
851 PoolEvent::PendingInboundConnectionError {
852 id,
853 send_back_addr,
854 local_addr,
855 error,
856 } => {
857 let error = error.into();
858
859 tracing::debug!("Incoming connection failed: {:?}", error);
860 self.behaviour
861 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
862 local_addr: &local_addr,
863 send_back_addr: &send_back_addr,
864 error: &error,
865 connection_id: id,
866 peer_id: None,
867 }));
868 self.pending_swarm_events
869 .push_back(SwarmEvent::IncomingConnectionError {
870 connection_id: id,
871 local_addr,
872 send_back_addr,
873 error,
874 peer_id: None,
875 });
876 }
877 PoolEvent::ConnectionClosed {
878 id,
879 connected,
880 error,
881 remaining_established_connection_ids,
882 ..
883 } => {
884 if let Some(error) = error.as_ref() {
885 tracing::debug!(
886 total_peers=%remaining_established_connection_ids.len(),
887 "Connection closed with error {:?}: {:?}",
888 error,
889 connected,
890 );
891 } else {
892 tracing::debug!(
893 total_peers=%remaining_established_connection_ids.len(),
894 "Connection closed: {:?}",
895 connected
896 );
897 }
898 let peer_id = connected.peer_id;
899 let endpoint = connected.endpoint;
900 let num_established =
901 u32::try_from(remaining_established_connection_ids.len()).unwrap();
902
903 self.behaviour
904 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
905 peer_id,
906 connection_id: id,
907 endpoint: &endpoint,
908 cause: error.as_ref(),
909 remaining_established: num_established as usize,
910 }));
911 self.pending_swarm_events
912 .push_back(SwarmEvent::ConnectionClosed {
913 peer_id,
914 connection_id: id,
915 endpoint,
916 cause: error,
917 num_established,
918 });
919 }
920 PoolEvent::ConnectionEvent { peer_id, id, event } => {
921 self.behaviour
922 .on_connection_handler_event(peer_id, id, event);
923 }
924 PoolEvent::AddressChange {
925 peer_id,
926 id,
927 new_endpoint,
928 old_endpoint,
929 } => {
930 self.behaviour
931 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
932 peer_id,
933 connection_id: id,
934 old: &old_endpoint,
935 new: &new_endpoint,
936 }));
937 }
938 }
939 }
940
941 fn handle_transport_event(
942 &mut self,
943 event: TransportEvent<
944 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
945 io::Error,
946 >,
947 ) {
948 match event {
949 TransportEvent::Incoming {
950 listener_id: _,
951 upgrade,
952 local_addr,
953 send_back_addr,
954 } => {
955 let connection_id = ConnectionId::next();
956
957 match self.behaviour.handle_pending_inbound_connection(
958 connection_id,
959 &local_addr,
960 &send_back_addr,
961 ) {
962 Ok(()) => {}
963 Err(cause) => {
964 let listen_error = ListenError::Denied { cause };
965
966 self.behaviour
967 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
968 local_addr: &local_addr,
969 send_back_addr: &send_back_addr,
970 error: &listen_error,
971 connection_id,
972 peer_id: None,
973 }));
974
975 self.pending_swarm_events
976 .push_back(SwarmEvent::IncomingConnectionError {
977 connection_id,
978 local_addr,
979 send_back_addr,
980 error: listen_error,
981 peer_id: None,
982 });
983 return;
984 }
985 }
986
987 self.pool.add_incoming(
988 upgrade,
989 IncomingInfo {
990 local_addr: &local_addr,
991 send_back_addr: &send_back_addr,
992 },
993 connection_id,
994 );
995
996 self.pending_swarm_events
997 .push_back(SwarmEvent::IncomingConnection {
998 connection_id,
999 local_addr,
1000 send_back_addr,
1001 })
1002 }
1003 TransportEvent::NewAddress {
1004 listener_id,
1005 listen_addr,
1006 } => {
1007 tracing::debug!(
1008 listener=?listener_id,
1009 address=%listen_addr,
1010 "New listener address"
1011 );
1012 let addrs = self.listened_addrs.entry(listener_id).or_default();
1013 if !addrs.contains(&listen_addr) {
1014 addrs.push(listen_addr.clone())
1015 }
1016 self.behaviour
1017 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1018 listener_id,
1019 addr: &listen_addr,
1020 }));
1021 self.pending_swarm_events
1022 .push_back(SwarmEvent::NewListenAddr {
1023 listener_id,
1024 address: listen_addr,
1025 })
1026 }
1027 TransportEvent::AddressExpired {
1028 listener_id,
1029 listen_addr,
1030 } => {
1031 tracing::debug!(
1032 listener=?listener_id,
1033 address=%listen_addr,
1034 "Expired listener address"
1035 );
1036 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1037 addrs.retain(|a| a != &listen_addr);
1038 }
1039 self.behaviour
1040 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1041 listener_id,
1042 addr: &listen_addr,
1043 }));
1044 self.pending_swarm_events
1045 .push_back(SwarmEvent::ExpiredListenAddr {
1046 listener_id,
1047 address: listen_addr,
1048 })
1049 }
1050 TransportEvent::ListenerClosed {
1051 listener_id,
1052 reason,
1053 } => {
1054 tracing::debug!(
1055 listener=?listener_id,
1056 ?reason,
1057 "Listener closed"
1058 );
1059 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1060 for addr in addrs.iter() {
1061 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1062 ExpiredListenAddr { listener_id, addr },
1063 ));
1064 }
1065 self.behaviour
1066 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1067 listener_id,
1068 reason: reason.as_ref().copied(),
1069 }));
1070 self.pending_swarm_events
1071 .push_back(SwarmEvent::ListenerClosed {
1072 listener_id,
1073 addresses: addrs.to_vec(),
1074 reason,
1075 })
1076 }
1077 TransportEvent::ListenerError { listener_id, error } => {
1078 self.behaviour
1079 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1080 listener_id,
1081 err: &error,
1082 }));
1083 self.pending_swarm_events
1084 .push_back(SwarmEvent::ListenerError { listener_id, error })
1085 }
1086 }
1087 }
1088
1089 fn handle_behaviour_event(
1090 &mut self,
1091 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1092 ) {
1093 match event {
1094 ToSwarm::GenerateEvent(event) => {
1095 self.pending_swarm_events
1096 .push_back(SwarmEvent::Behaviour(event));
1097 }
1098 ToSwarm::Dial { opts } => {
1099 let peer_id = opts.get_peer_id();
1100 let connection_id = opts.connection_id();
1101 if let Ok(()) = self.dial(opts) {
1102 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1103 peer_id,
1104 connection_id,
1105 });
1106 }
1107 }
1108 ToSwarm::ListenOn { opts } => {
1109 let _ = self.add_listener(opts);
1111 }
1112 ToSwarm::RemoveListener { id } => {
1113 self.remove_listener(id);
1114 }
1115 ToSwarm::NotifyHandler {
1116 peer_id,
1117 handler,
1118 event,
1119 } => {
1120 assert!(self.pending_handler_event.is_none());
1121 let handler = match handler {
1122 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1123 NotifyHandler::Any => {
1124 let ids = self
1125 .pool
1126 .iter_established_connections_of_peer(&peer_id)
1127 .collect();
1128 PendingNotifyHandler::Any(ids)
1129 }
1130 };
1131
1132 self.pending_handler_event = Some((peer_id, handler, event));
1133 }
1134 ToSwarm::NewExternalAddrCandidate(addr) => {
1135 if !self.confirmed_external_addr.contains(&addr) {
1136 self.behaviour
1137 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1138 NewExternalAddrCandidate { addr: &addr },
1139 ));
1140 self.pending_swarm_events
1141 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1142 }
1143 }
1144 ToSwarm::ExternalAddrConfirmed(addr) => {
1145 self.add_external_address(addr.clone());
1146 self.pending_swarm_events
1147 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1148 }
1149 ToSwarm::ExternalAddrExpired(addr) => {
1150 self.remove_external_address(&addr);
1151 self.pending_swarm_events
1152 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1153 }
1154 ToSwarm::CloseConnection {
1155 peer_id,
1156 connection,
1157 } => match connection {
1158 CloseConnection::One(connection_id) => {
1159 if let Some(conn) = self.pool.get_established(connection_id) {
1160 conn.start_close();
1161 }
1162 }
1163 CloseConnection::All => {
1164 self.pool.disconnect(peer_id);
1165 }
1166 },
1167 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1168 self.behaviour
1169 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1170 peer_id,
1171 addr: &address,
1172 }));
1173 self.pending_swarm_events
1174 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1175 }
1176 }
1177 }
1178
1179 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1183 fn poll_next_event(
1184 mut self: Pin<&mut Self>,
1185 cx: &mut Context<'_>,
1186 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1187 let this = &mut *self;
1190
1191 loop {
1202 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1203 return Poll::Ready(swarm_event);
1204 }
1205
1206 match this.pending_handler_event.take() {
1207 Some((peer_id, handler, event)) => match handler {
1210 PendingNotifyHandler::One(conn_id) => {
1211 match this.pool.get_established(conn_id) {
1212 Some(conn) => match notify_one(conn, event, cx) {
1213 None => continue,
1214 Some(event) => {
1215 this.pending_handler_event = Some((peer_id, handler, event));
1216 }
1217 },
1218 None => continue,
1219 }
1220 }
1221 PendingNotifyHandler::Any(ids) => {
1222 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1223 None => continue,
1224 Some((event, ids)) => {
1225 let handler = PendingNotifyHandler::Any(ids);
1226 this.pending_handler_event = Some((peer_id, handler, event));
1227 }
1228 }
1229 }
1230 },
1231 None => match this.behaviour.poll(cx) {
1233 Poll::Pending => {}
1234 Poll::Ready(behaviour_event) => {
1235 this.handle_behaviour_event(behaviour_event);
1236
1237 continue;
1238 }
1239 },
1240 }
1241
1242 match this.pool.poll(cx) {
1244 Poll::Pending => {}
1245 Poll::Ready(pool_event) => {
1246 this.handle_pool_event(pool_event);
1247 continue;
1248 }
1249 }
1250
1251 match Pin::new(&mut this.transport).poll(cx) {
1253 Poll::Pending => {}
1254 Poll::Ready(transport_event) => {
1255 this.handle_transport_event(transport_event);
1256 continue;
1257 }
1258 }
1259
1260 return Poll::Pending;
1261 }
1262 }
1263}
1264
1265enum PendingNotifyHandler {
1272 One(ConnectionId),
1273 Any(SmallVec<[ConnectionId; 10]>),
1274}
1275
1276fn notify_one<THandlerInEvent>(
1285 conn: &mut EstablishedConnection<THandlerInEvent>,
1286 event: THandlerInEvent,
1287 cx: &mut Context<'_>,
1288) -> Option<THandlerInEvent> {
1289 match conn.poll_ready_notify_handler(cx) {
1290 Poll::Pending => Some(event),
1291 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1293 let _ = conn.notify_handler(event);
1295 None
1296 }
1297 }
1298}
1299
1300fn notify_any<THandler, TBehaviour>(
1311 ids: SmallVec<[ConnectionId; 10]>,
1312 pool: &mut Pool<THandler>,
1313 event: THandlerInEvent<TBehaviour>,
1314 cx: &mut Context<'_>,
1315) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1316where
1317 TBehaviour: NetworkBehaviour,
1318 THandler: ConnectionHandler<
1319 FromBehaviour = THandlerInEvent<TBehaviour>,
1320 ToBehaviour = THandlerOutEvent<TBehaviour>,
1321 >,
1322{
1323 let mut pending = SmallVec::new();
1324 let mut event = Some(event); for id in ids.into_iter() {
1326 if let Some(conn) = pool.get_established(id) {
1327 match conn.poll_ready_notify_handler(cx) {
1328 Poll::Pending => pending.push(id),
1329 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1331 let e = event.take().expect("by (1),(2)");
1332 if let Err(e) = conn.notify_handler(e) {
1333 event = Some(e) } else {
1335 break;
1336 }
1337 }
1338 }
1339 }
1340 }
1341
1342 event.and_then(|e| {
1343 if !pending.is_empty() {
1344 Some((e, pending))
1345 } else {
1346 None
1347 }
1348 })
1349}
1350
1351impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1359where
1360 TBehaviour: NetworkBehaviour,
1361{
1362 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1363
1364 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1365 self.as_mut().poll_next_event(cx).map(Some)
1366 }
1367}
1368
1369impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1371where
1372 TBehaviour: NetworkBehaviour,
1373{
1374 fn is_terminated(&self) -> bool {
1375 false
1376 }
1377}
1378
1379pub struct Config {
1380 pool_config: PoolConfig,
1381}
1382
1383impl Config {
1384 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1387 Self {
1388 pool_config: PoolConfig::new(Some(Box::new(executor))),
1389 }
1390 }
1391
1392 #[doc(hidden)]
1393 pub fn without_executor() -> Self {
1395 Self {
1396 pool_config: PoolConfig::new(None),
1397 }
1398 }
1399
1400 #[cfg(feature = "wasm-bindgen")]
1410 pub fn with_wasm_executor() -> Self {
1411 Self::with_executor(crate::executor::WasmBindgenExecutor)
1412 }
1413
1414 #[cfg(all(
1416 feature = "tokio",
1417 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1418 ))]
1419 pub fn with_tokio_executor() -> Self {
1420 Self::with_executor(crate::executor::TokioExecutor)
1421 }
1422
1423 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1433 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1434 self
1435 }
1436
1437 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1449 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1450 self
1451 }
1452
1453 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1455 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1456 self
1457 }
1458
1459 pub fn with_substream_upgrade_protocol_override(
1470 mut self,
1471 v: libp2p_core::upgrade::Version,
1472 ) -> Self {
1473 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1474 self
1475 }
1476
1477 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1487 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1488 self
1489 }
1490
1491 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1509 self.pool_config.idle_connection_timeout = timeout;
1510 self
1511 }
1512}
1513
1514#[derive(Debug)]
1516pub enum DialError {
1517 LocalPeerId { address: Multiaddr },
1519 NoAddresses,
1522 DialPeerConditionFalse(dial_opts::PeerCondition),
1525 Aborted,
1527 WrongPeerId {
1529 obtained: PeerId,
1530 address: Multiaddr,
1531 },
1532 Denied { cause: ConnectionDenied },
1536 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1538}
1539
1540impl From<PendingOutboundConnectionError> for DialError {
1541 fn from(error: PendingOutboundConnectionError) -> Self {
1542 match error {
1543 PendingOutboundConnectionError::Aborted => DialError::Aborted,
1544 PendingOutboundConnectionError::WrongPeerId { obtained, address } => {
1545 DialError::WrongPeerId { obtained, address }
1546 }
1547 PendingOutboundConnectionError::LocalPeerId { address } => {
1548 DialError::LocalPeerId { address }
1549 }
1550 PendingOutboundConnectionError::Transport(e) => DialError::Transport(e),
1551 }
1552 }
1553}
1554
1555impl fmt::Display for DialError {
1556 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1557 match self {
1558 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1559 DialError::LocalPeerId { address } => write!(
1560 f,
1561 "Dial error: tried to dial local peer id at {address:?}."
1562 ),
1563 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1564 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1565 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1566 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1567 DialError::Aborted => write!(
1568 f,
1569 "Dial error: Pending connection attempt has been aborted."
1570 ),
1571 DialError::WrongPeerId { obtained, address } => write!(
1572 f,
1573 "Dial error: Unexpected peer ID {obtained} at {address:?}."
1574 ),
1575 DialError::Transport(errors) => {
1576 write!(f, "Failed to negotiate transport protocol(s): [")?;
1577
1578 for (addr, error) in errors {
1579 write!(f, "({addr}")?;
1580 print_error_chain(f, error)?;
1581 write!(f, ")")?;
1582 }
1583 write!(f, "]")?;
1584
1585 Ok(())
1586 }
1587 DialError::Denied { .. } => {
1588 write!(f, "Dial error")
1589 }
1590 }
1591 }
1592}
1593
1594fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1595 write!(f, ": {e}")?;
1596
1597 if let Some(source) = e.source() {
1598 print_error_chain(f, source)?;
1599 }
1600
1601 Ok(())
1602}
1603
1604impl error::Error for DialError {
1605 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1606 match self {
1607 DialError::LocalPeerId { .. } => None,
1608 DialError::NoAddresses => None,
1609 DialError::DialPeerConditionFalse(_) => None,
1610 DialError::Aborted => None,
1611 DialError::WrongPeerId { .. } => None,
1612 DialError::Transport(_) => None,
1613 DialError::Denied { cause } => Some(cause),
1614 }
1615 }
1616}
1617
1618#[derive(Debug)]
1620pub enum ListenError {
1621 Aborted,
1623 WrongPeerId {
1625 obtained: PeerId,
1626 endpoint: ConnectedPoint,
1627 },
1628 LocalPeerId {
1630 address: Multiaddr,
1631 },
1632 Denied {
1633 cause: ConnectionDenied,
1634 },
1635 Transport(TransportError<io::Error>),
1637}
1638
1639impl From<PendingInboundConnectionError> for ListenError {
1640 fn from(error: PendingInboundConnectionError) -> Self {
1641 match error {
1642 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1643 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1644 PendingInboundConnectionError::LocalPeerId { address } => {
1645 ListenError::LocalPeerId { address }
1646 }
1647 }
1648 }
1649}
1650
1651impl fmt::Display for ListenError {
1652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1653 match self {
1654 ListenError::Aborted => write!(
1655 f,
1656 "Listen error: Pending connection attempt has been aborted."
1657 ),
1658 ListenError::WrongPeerId { obtained, endpoint } => write!(
1659 f,
1660 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1661 ),
1662 ListenError::Transport(_) => {
1663 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1664 }
1665 ListenError::Denied { cause } => {
1666 write!(f, "Listen error: Denied: {cause}")
1667 }
1668 ListenError::LocalPeerId { address } => {
1669 write!(f, "Listen error: Local peer ID at {address:?}.")
1670 }
1671 }
1672 }
1673}
1674
1675impl error::Error for ListenError {
1676 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1677 match self {
1678 ListenError::WrongPeerId { .. } => None,
1679 ListenError::Transport(err) => Some(err),
1680 ListenError::Aborted => None,
1681 ListenError::Denied { cause } => Some(cause),
1682 ListenError::LocalPeerId { .. } => None,
1683 }
1684 }
1685}
1686
1687#[derive(Debug)]
1692pub struct ConnectionDenied {
1693 inner: Box<dyn error::Error + Send + Sync + 'static>,
1694}
1695
1696impl ConnectionDenied {
1697 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1698 Self {
1699 inner: cause.into(),
1700 }
1701 }
1702
1703 pub fn downcast<E>(self) -> Result<E, Self>
1705 where
1706 E: error::Error + Send + Sync + 'static,
1707 {
1708 let inner = self
1709 .inner
1710 .downcast::<E>()
1711 .map_err(|inner| ConnectionDenied { inner })?;
1712
1713 Ok(*inner)
1714 }
1715
1716 pub fn downcast_ref<E>(&self) -> Option<&E>
1718 where
1719 E: error::Error + Send + Sync + 'static,
1720 {
1721 self.inner.downcast_ref::<E>()
1722 }
1723}
1724
1725impl fmt::Display for ConnectionDenied {
1726 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1727 write!(f, "connection denied")
1728 }
1729}
1730
1731impl error::Error for ConnectionDenied {
1732 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1733 Some(self.inner.as_ref())
1734 }
1735}
1736
1737#[derive(Clone, Debug)]
1739pub struct NetworkInfo {
1740 num_peers: usize,
1742 connection_counters: ConnectionCounters,
1744}
1745
1746impl NetworkInfo {
1747 pub fn num_peers(&self) -> usize {
1750 self.num_peers
1751 }
1752
1753 pub fn connection_counters(&self) -> &ConnectionCounters {
1755 &self.connection_counters
1756 }
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761 use libp2p_core::{
1762 multiaddr,
1763 multiaddr::multiaddr,
1764 transport,
1765 transport::{memory::MemoryTransportError, TransportEvent},
1766 upgrade,
1767 };
1768 use libp2p_identity as identity;
1769 use libp2p_plaintext as plaintext;
1770 use libp2p_yamux as yamux;
1771 use quickcheck::*;
1772
1773 use super::*;
1774 use crate::test::{CallTraceBehaviour, MockBehaviour};
1775
1776 enum State {
1779 Connecting,
1780 Disconnecting,
1781 }
1782
1783 fn new_test_swarm(
1784 config: Config,
1785 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1786 let id_keys = identity::Keypair::generate_ed25519();
1787 let local_public_key = id_keys.public();
1788 let transport = transport::MemoryTransport::default()
1789 .upgrade(upgrade::Version::V1)
1790 .authenticate(plaintext::Config::new(&id_keys))
1791 .multiplex(yamux::Config::default())
1792 .boxed();
1793 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1794
1795 Swarm::new(transport, behaviour, local_public_key.into(), config)
1796 }
1797
1798 fn swarms_connected<TBehaviour>(
1799 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1800 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1801 num_connections: usize,
1802 ) -> bool
1803 where
1804 TBehaviour: NetworkBehaviour,
1805 THandlerOutEvent<TBehaviour>: Clone,
1806 {
1807 swarm1
1808 .behaviour()
1809 .num_connections_to_peer(*swarm2.local_peer_id())
1810 == num_connections
1811 && swarm2
1812 .behaviour()
1813 .num_connections_to_peer(*swarm1.local_peer_id())
1814 == num_connections
1815 && swarm1.is_connected(swarm2.local_peer_id())
1816 && swarm2.is_connected(swarm1.local_peer_id())
1817 }
1818
1819 fn swarms_disconnected<TBehaviour>(
1820 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1821 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1822 ) -> bool
1823 where
1824 TBehaviour: NetworkBehaviour,
1825 THandlerOutEvent<TBehaviour>: Clone,
1826 {
1827 swarm1
1828 .behaviour()
1829 .num_connections_to_peer(*swarm2.local_peer_id())
1830 == 0
1831 && swarm2
1832 .behaviour()
1833 .num_connections_to_peer(*swarm1.local_peer_id())
1834 == 0
1835 && !swarm1.is_connected(swarm2.local_peer_id())
1836 && !swarm2.is_connected(swarm1.local_peer_id())
1837 }
1838
1839 #[tokio::test]
1846 async fn test_swarm_disconnect() {
1847 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1848 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1849
1850 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1851 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1852
1853 swarm1.listen_on(addr1.clone()).unwrap();
1854 swarm2.listen_on(addr2.clone()).unwrap();
1855
1856 let swarm1_id = *swarm1.local_peer_id();
1857
1858 let mut reconnected = false;
1859 let num_connections = 10;
1860
1861 for _ in 0..num_connections {
1862 swarm1.dial(addr2.clone()).unwrap();
1863 }
1864 let mut state = State::Connecting;
1865
1866 future::poll_fn(move |cx| loop {
1867 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1868 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1869 match state {
1870 State::Connecting => {
1871 if swarms_connected(&swarm1, &swarm2, num_connections) {
1872 if reconnected {
1873 return Poll::Ready(());
1874 }
1875 swarm2
1876 .disconnect_peer_id(swarm1_id)
1877 .expect("Error disconnecting");
1878 state = State::Disconnecting;
1879 }
1880 }
1881 State::Disconnecting => {
1882 if swarms_disconnected(&swarm1, &swarm2) {
1883 if reconnected {
1884 return Poll::Ready(());
1885 }
1886 reconnected = true;
1887 for _ in 0..num_connections {
1888 swarm2.dial(addr1.clone()).unwrap();
1889 }
1890 state = State::Connecting;
1891 }
1892 }
1893 }
1894
1895 if poll1.is_pending() && poll2.is_pending() {
1896 return Poll::Pending;
1897 }
1898 })
1899 .await
1900 }
1901
1902 #[tokio::test]
1910 async fn test_behaviour_disconnect_all() {
1911 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1912 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1913
1914 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1915 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1916
1917 swarm1.listen_on(addr1.clone()).unwrap();
1918 swarm2.listen_on(addr2.clone()).unwrap();
1919
1920 let swarm1_id = *swarm1.local_peer_id();
1921
1922 let mut reconnected = false;
1923 let num_connections = 10;
1924
1925 for _ in 0..num_connections {
1926 swarm1.dial(addr2.clone()).unwrap();
1927 }
1928 let mut state = State::Connecting;
1929
1930 future::poll_fn(move |cx| loop {
1931 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1932 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1933 match state {
1934 State::Connecting => {
1935 if swarms_connected(&swarm1, &swarm2, num_connections) {
1936 if reconnected {
1937 return Poll::Ready(());
1938 }
1939 swarm2
1940 .behaviour
1941 .inner()
1942 .next_action
1943 .replace(ToSwarm::CloseConnection {
1944 peer_id: swarm1_id,
1945 connection: CloseConnection::All,
1946 });
1947 state = State::Disconnecting;
1948 continue;
1949 }
1950 }
1951 State::Disconnecting => {
1952 if swarms_disconnected(&swarm1, &swarm2) {
1953 reconnected = true;
1954 for _ in 0..num_connections {
1955 swarm2.dial(addr1.clone()).unwrap();
1956 }
1957 state = State::Connecting;
1958 continue;
1959 }
1960 }
1961 }
1962
1963 if poll1.is_pending() && poll2.is_pending() {
1964 return Poll::Pending;
1965 }
1966 })
1967 .await
1968 }
1969
1970 #[tokio::test]
1978 async fn test_behaviour_disconnect_one() {
1979 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1980 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1981
1982 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1983 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1984
1985 swarm1.listen_on(addr1).unwrap();
1986 swarm2.listen_on(addr2.clone()).unwrap();
1987
1988 let swarm1_id = *swarm1.local_peer_id();
1989
1990 let num_connections = 10;
1991
1992 for _ in 0..num_connections {
1993 swarm1.dial(addr2.clone()).unwrap();
1994 }
1995 let mut state = State::Connecting;
1996 let mut disconnected_conn_id = None;
1997
1998 future::poll_fn(move |cx| loop {
1999 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2000 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2001 match state {
2002 State::Connecting => {
2003 if swarms_connected(&swarm1, &swarm2, num_connections) {
2004 disconnected_conn_id = {
2005 let conn_id =
2006 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2007 swarm2.behaviour.inner().next_action.replace(
2008 ToSwarm::CloseConnection {
2009 peer_id: swarm1_id,
2010 connection: CloseConnection::One(conn_id),
2011 },
2012 );
2013 Some(conn_id)
2014 };
2015 state = State::Disconnecting;
2016 }
2017 }
2018 State::Disconnecting => {
2019 for s in &[&swarm1, &swarm2] {
2020 assert!(s
2021 .behaviour
2022 .on_connection_closed
2023 .iter()
2024 .all(|(.., remaining_conns)| *remaining_conns > 0));
2025 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2026 s.behaviour.assert_connected(num_connections, 1);
2027 }
2028 if [&swarm1, &swarm2]
2029 .iter()
2030 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2031 {
2032 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2033 assert_eq!(Some(conn_id), disconnected_conn_id);
2034 return Poll::Ready(());
2035 }
2036 }
2037 }
2038
2039 if poll1.is_pending() && poll2.is_pending() {
2040 return Poll::Pending;
2041 }
2042 })
2043 .await
2044 }
2045
2046 #[test]
2047 fn concurrent_dialing() {
2048 #[derive(Clone, Debug)]
2049 struct DialConcurrencyFactor(NonZeroU8);
2050
2051 impl Arbitrary for DialConcurrencyFactor {
2052 fn arbitrary(g: &mut Gen) -> Self {
2053 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2054 }
2055 }
2056
2057 fn prop(concurrency_factor: DialConcurrencyFactor) {
2058 tokio::runtime::Runtime::new().unwrap().block_on(async {
2059 let mut swarm = new_test_swarm(
2060 Config::with_tokio_executor()
2061 .with_dial_concurrency_factor(concurrency_factor.0),
2062 );
2063
2064 let num_listen_addrs = concurrency_factor.0.get() + 2;
2068 let mut listen_addresses = Vec::new();
2069 let mut transports = Vec::new();
2070 for _ in 0..num_listen_addrs {
2071 let mut transport = transport::MemoryTransport::default().boxed();
2072 transport
2073 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2074 .unwrap();
2075
2076 match transport.select_next_some().await {
2077 TransportEvent::NewAddress { listen_addr, .. } => {
2078 listen_addresses.push(listen_addr);
2079 }
2080 _ => panic!("Expected `NewListenAddr` event."),
2081 }
2082
2083 transports.push(transport);
2084 }
2085
2086 swarm
2089 .dial(
2090 DialOpts::peer_id(PeerId::random())
2091 .addresses(listen_addresses)
2092 .build(),
2093 )
2094 .unwrap();
2095 for mut transport in transports.into_iter() {
2096 match futures::future::select(transport.select_next_some(), swarm.next()).await
2097 {
2098 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2099 future::Either::Left(_) => {
2100 panic!("Unexpected transport event.")
2101 }
2102 future::Either::Right((e, _)) => {
2103 panic!("Expect swarm to not emit any event {e:?}")
2104 }
2105 }
2106 }
2107
2108 match swarm.next().await.unwrap() {
2109 SwarmEvent::OutgoingConnectionError { .. } => {}
2110 e => panic!("Unexpected swarm event {e:?}"),
2111 }
2112 })
2113 }
2114
2115 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2116 }
2117
2118 #[tokio::test]
2119 async fn invalid_peer_id() {
2120 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2124 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2125
2126 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2127
2128 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2129 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2130 Poll::Pending => Poll::Pending,
2131 _ => panic!("Was expecting the listen address to be reported"),
2132 })
2133 .await;
2134
2135 let other_id = PeerId::random();
2136 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2137
2138 swarm2.dial(other_addr.clone()).unwrap();
2139
2140 let (peer_id, error) = future::poll_fn(|cx| {
2141 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2142 swarm1.poll_next_unpin(cx)
2143 {}
2144
2145 match swarm2.poll_next_unpin(cx) {
2146 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2147 peer_id, error, ..
2148 })) => Poll::Ready((peer_id, error)),
2149 Poll::Ready(x) => panic!("unexpected {x:?}"),
2150 Poll::Pending => Poll::Pending,
2151 }
2152 })
2153 .await;
2154 assert_eq!(peer_id.unwrap(), other_id);
2155 match error {
2156 DialError::WrongPeerId { obtained, address } => {
2157 assert_eq!(obtained, *swarm1.local_peer_id());
2158 assert_eq!(address, other_addr);
2159 }
2160 x => panic!("wrong error {x:?}"),
2161 }
2162 }
2163
2164 #[tokio::test]
2165 async fn dial_self() {
2166 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2179 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2180
2181 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2182 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2183 Poll::Pending => Poll::Pending,
2184 _ => panic!("Was expecting the listen address to be reported"),
2185 })
2186 .await;
2187
2188 swarm.listened_addrs.clear();
2191 swarm.dial(local_address.clone()).unwrap();
2192
2193 let mut got_dial_err = false;
2194 let mut got_inc_err = false;
2195 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2196 loop {
2197 match swarm.poll_next_unpin(cx) {
2198 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2199 peer_id,
2200 error: DialError::LocalPeerId { .. },
2201 ..
2202 })) => {
2203 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2204 assert!(!got_dial_err);
2205 got_dial_err = true;
2206 if got_inc_err {
2207 return Poll::Ready(Ok(()));
2208 }
2209 }
2210 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2211 local_addr, ..
2212 })) => {
2213 assert!(!got_inc_err);
2214 assert_eq!(local_addr, local_address);
2215 got_inc_err = true;
2216 if got_dial_err {
2217 return Poll::Ready(Ok(()));
2218 }
2219 }
2220 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2221 assert_eq!(local_addr, local_address);
2222 }
2223 Poll::Ready(ev) => {
2224 panic!("Unexpected event: {ev:?}")
2225 }
2226 Poll::Pending => break Poll::Pending,
2227 }
2228 }
2229 })
2230 .await
2231 .unwrap();
2232 }
2233
2234 #[tokio::test]
2235 async fn dial_self_by_id() {
2236 let swarm = new_test_swarm(Config::with_tokio_executor());
2239 let peer_id = *swarm.local_peer_id();
2240 assert!(!swarm.is_connected(&peer_id));
2241 }
2242
2243 #[tokio::test]
2244 async fn multiple_addresses_err() {
2245 let target = PeerId::random();
2248
2249 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2250
2251 let addresses = HashSet::from([
2252 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2253 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2254 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2255 multiaddr![Udp(rand::random::<u16>())],
2256 multiaddr![Udp(rand::random::<u16>())],
2257 multiaddr![Udp(rand::random::<u16>())],
2258 multiaddr![Udp(rand::random::<u16>())],
2259 multiaddr![Udp(rand::random::<u16>())],
2260 ]);
2261
2262 swarm
2263 .dial(
2264 DialOpts::peer_id(target)
2265 .addresses(addresses.iter().cloned().collect())
2266 .build(),
2267 )
2268 .unwrap();
2269
2270 match swarm.next().await.unwrap() {
2271 SwarmEvent::OutgoingConnectionError {
2272 peer_id,
2273 error: DialError::Transport(errors),
2275 ..
2276 } => {
2277 assert_eq!(target, peer_id.unwrap());
2278
2279 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2280 let expected_addresses = addresses
2281 .into_iter()
2282 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2283 .collect::<Vec<_>>();
2284
2285 assert_eq!(expected_addresses, failed_addresses);
2286 }
2287 e => panic!("Unexpected event: {e:?}"),
2288 }
2289 }
2290
2291 #[tokio::test]
2292 async fn aborting_pending_connection_surfaces_error() {
2293 let _ = tracing_subscriber::fmt()
2294 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2295 .try_init();
2296
2297 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2298 let mut listener = new_test_swarm(Config::with_tokio_executor());
2299
2300 let listener_peer_id = *listener.local_peer_id();
2301 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2302 let listener_address = match listener.next().await.unwrap() {
2303 SwarmEvent::NewListenAddr { address, .. } => address,
2304 e => panic!("Unexpected network event: {e:?}"),
2305 };
2306
2307 dialer
2308 .dial(
2309 DialOpts::peer_id(listener_peer_id)
2310 .addresses(vec![listener_address])
2311 .build(),
2312 )
2313 .unwrap();
2314
2315 dialer
2316 .disconnect_peer_id(listener_peer_id)
2317 .expect_err("Expect peer to not yet be connected.");
2318
2319 match dialer.next().await.unwrap() {
2320 SwarmEvent::OutgoingConnectionError {
2321 error: DialError::Aborted,
2322 ..
2323 } => {}
2324 e => panic!("Unexpected swarm event {e:?}."),
2325 }
2326 }
2327
2328 #[test]
2329 fn dial_error_prints_sources() {
2330 let error = DialError::Transport(vec![(
2332 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2333 TransportError::Other(io::Error::other(MemoryTransportError::Unreachable)),
2334 )]);
2335
2336 let string = format!("{error}");
2337
2338 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2341 }
2342}