1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
68
69#[cfg(feature = "cbor")]
70pub mod cbor;
71mod codec;
72mod handler;
73#[cfg(feature = "json")]
74pub mod json;
75
76use std::{
77 collections::{HashMap, HashSet, VecDeque},
78 fmt, io,
79 sync::{atomic::AtomicU64, Arc},
80 task::{Context, Poll},
81 time::Duration,
82};
83
84pub use codec::Codec;
85use futures::channel::oneshot;
86use handler::Handler;
87pub use handler::ProtocolSupport;
88use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
89use libp2p_identity::PeerId;
90use libp2p_swarm::{
91 behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
92 dial_opts::DialOpts,
93 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
94 PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
95};
96use smallvec::SmallVec;
97
98use crate::handler::OutboundMessage;
99
100#[derive(Debug)]
102pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
103 Request {
105 request_id: InboundRequestId,
107 request: TRequest,
109 channel: ResponseChannel<TChannelResponse>,
115 },
116 Response {
118 request_id: OutboundRequestId,
122 response: TResponse,
124 },
125}
126
127#[derive(Debug)]
129pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
130 Message {
132 peer: PeerId,
134 connection_id: ConnectionId,
136 message: Message<TRequest, TResponse, TChannelResponse>,
138 },
139 OutboundFailure {
141 peer: PeerId,
143 connection_id: ConnectionId,
145 request_id: OutboundRequestId,
147 error: OutboundFailure,
149 },
150 InboundFailure {
152 peer: PeerId,
154 connection_id: ConnectionId,
156 request_id: InboundRequestId,
158 error: InboundFailure,
160 },
161 ResponseSent {
166 peer: PeerId,
168 connection_id: ConnectionId,
170 request_id: InboundRequestId,
172 },
173}
174
175#[derive(Debug)]
178pub enum OutboundFailure {
179 DialFailure,
181 Timeout,
186 ConnectionClosed,
191 UnsupportedProtocols,
193 Io(io::Error),
195}
196
197impl fmt::Display for OutboundFailure {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 match self {
200 OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
201 OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
202 OutboundFailure::ConnectionClosed => {
203 write!(f, "Connection was closed before a response was received")
204 }
205 OutboundFailure::UnsupportedProtocols => {
206 write!(f, "The remote supports none of the requested protocols")
207 }
208 OutboundFailure::Io(e) => write!(f, "IO error on outbound stream: {e}"),
209 }
210 }
211}
212
213impl std::error::Error for OutboundFailure {}
214
215#[derive(Debug)]
218pub enum InboundFailure {
219 Timeout,
224 ConnectionClosed,
226 UnsupportedProtocols,
229 ResponseOmission,
233 Io(io::Error),
235}
236
237impl fmt::Display for InboundFailure {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match self {
240 InboundFailure::Timeout => {
241 write!(f, "Timeout while receiving request or sending response")
242 }
243 InboundFailure::ConnectionClosed => {
244 write!(f, "Connection was closed before a response could be sent")
245 }
246 InboundFailure::UnsupportedProtocols => write!(
247 f,
248 "The local peer supports none of the protocols requested by the remote"
249 ),
250 InboundFailure::ResponseOmission => write!(
251 f,
252 "The response channel was dropped without sending a response to the remote"
253 ),
254 InboundFailure::Io(e) => write!(f, "IO error on inbound stream: {e}"),
255 }
256 }
257}
258
259impl std::error::Error for InboundFailure {}
260
261#[derive(Debug)]
265pub struct ResponseChannel<TResponse> {
266 sender: oneshot::Sender<TResponse>,
267}
268
269impl<TResponse> ResponseChannel<TResponse> {
270 pub fn is_open(&self) -> bool {
278 !self.sender.is_canceled()
279 }
280}
281
282#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
287pub struct InboundRequestId(u64);
288
289impl fmt::Display for InboundRequestId {
290 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291 write!(f, "{}", self.0)
292 }
293}
294
295#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
300pub struct OutboundRequestId(u64);
301
302impl fmt::Display for OutboundRequestId {
303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304 write!(f, "{}", self.0)
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct Config {
311 request_timeout: Duration,
312 max_concurrent_streams: usize,
313}
314
315impl Default for Config {
316 fn default() -> Self {
317 Self {
318 request_timeout: Duration::from_secs(10),
319 max_concurrent_streams: 100,
320 }
321 }
322}
323
324impl Config {
325 #[deprecated(note = "Use `Config::with_request_timeout` for one-liner constructions.")]
327 pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
328 self.request_timeout = v;
329 self
330 }
331
332 pub fn with_request_timeout(mut self, v: Duration) -> Self {
334 self.request_timeout = v;
335 self
336 }
337
338 pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self {
340 self.max_concurrent_streams = num_streams;
341 self
342 }
343}
344
345pub struct Behaviour<TCodec>
347where
348 TCodec: Codec + Clone + Send + 'static,
349{
350 inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
352 outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
354 next_outbound_request_id: OutboundRequestId,
356 next_inbound_request_id: Arc<AtomicU64>,
358 config: Config,
360 codec: TCodec,
362 pending_events:
364 VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, OutboundMessage<TCodec>>>,
365 connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
368 addresses: PeerAddresses,
370 pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
373}
374
375impl<TCodec> Behaviour<TCodec>
376where
377 TCodec: Codec + Default + Clone + Send + 'static,
378{
379 pub fn new<I>(protocols: I, cfg: Config) -> Self
382 where
383 I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
384 {
385 Self::with_codec(TCodec::default(), protocols, cfg)
386 }
387}
388
389impl<TCodec> Behaviour<TCodec>
390where
391 TCodec: Codec + Clone + Send + 'static,
392{
393 pub fn with_codec<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
396 where
397 I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
398 {
399 let mut inbound_protocols = SmallVec::new();
400 let mut outbound_protocols = SmallVec::new();
401 for (p, s) in protocols {
402 if s.inbound() {
403 inbound_protocols.push(p.clone());
404 }
405 if s.outbound() {
406 outbound_protocols.push(p.clone());
407 }
408 }
409 Behaviour {
410 inbound_protocols,
411 outbound_protocols,
412 next_outbound_request_id: OutboundRequestId(1),
413 next_inbound_request_id: Arc::new(AtomicU64::new(1)),
414 config: cfg,
415 codec,
416 pending_events: VecDeque::new(),
417 connected: HashMap::new(),
418 pending_outbound_requests: HashMap::new(),
419 addresses: PeerAddresses::default(),
420 }
421 }
422
423 pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
439 self.send_request_with_addresses(peer, request, Vec::new())
440 }
441
442 pub fn send_request_with_addresses(
445 &mut self,
446 peer: &PeerId,
447 request: TCodec::Request,
448 addresses: Vec<Multiaddr>,
449 ) -> OutboundRequestId {
450 let request_id = self.next_outbound_request_id();
451 let request = OutboundMessage {
452 request_id,
453 request,
454 protocols: self.outbound_protocols.clone(),
455 };
456
457 if let Some(request) = self.try_send_request(peer, request) {
458 self.pending_events.push_back(ToSwarm::Dial {
459 opts: DialOpts::peer_id(*peer)
460 .addresses(addresses)
461 .extend_addresses_through_behaviour()
462 .build(),
463 });
464 self.pending_outbound_requests
465 .entry(*peer)
466 .or_default()
467 .push(request);
468 }
469
470 request_id
471 }
472
473 pub fn send_response(
485 &mut self,
486 ch: ResponseChannel<TCodec::Response>,
487 rs: TCodec::Response,
488 ) -> Result<(), TCodec::Response> {
489 ch.sender.send(rs)
490 }
491
492 #[deprecated(note = "Use `Swarm::add_peer_address` instead.")]
501 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool {
502 self.addresses.add(*peer, address)
503 }
504
505 #[deprecated(note = "Will be removed with the next breaking release and won't be replaced.")]
507 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
508 self.addresses.remove(peer, address);
509 }
510
511 pub fn is_connected(&self, peer: &PeerId) -> bool {
513 if let Some(connections) = self.connected.get(peer) {
514 !connections.is_empty()
515 } else {
516 false
517 }
518 }
519
520 pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool {
524 let est_conn = self
526 .connected
527 .get(peer)
528 .map(|cs| {
529 cs.iter()
530 .any(|c| c.pending_outbound_responses.contains(request_id))
531 })
532 .unwrap_or(false);
533 let pen_conn = self
535 .pending_outbound_requests
536 .get(peer)
537 .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
538 .unwrap_or(false);
539
540 est_conn || pen_conn
541 }
542
543 pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool {
547 self.connected
548 .get(peer)
549 .map(|cs| {
550 cs.iter()
551 .any(|c| c.pending_inbound_responses.contains(request_id))
552 })
553 .unwrap_or(false)
554 }
555
556 fn next_outbound_request_id(&mut self) -> OutboundRequestId {
558 let request_id = self.next_outbound_request_id;
559 self.next_outbound_request_id.0 += 1;
560 request_id
561 }
562
563 fn try_send_request(
567 &mut self,
568 peer: &PeerId,
569 request: OutboundMessage<TCodec>,
570 ) -> Option<OutboundMessage<TCodec>> {
571 if let Some(connections) = self.connected.get_mut(peer) {
572 if connections.is_empty() {
573 return Some(request);
574 }
575 let ix = (request.request_id.0 as usize) % connections.len();
576 let conn = &mut connections[ix];
577 conn.pending_outbound_responses.insert(request.request_id);
578 self.pending_events.push_back(ToSwarm::NotifyHandler {
579 peer_id: *peer,
580 handler: NotifyHandler::One(conn.id),
581 event: request,
582 });
583 None
584 } else {
585 Some(request)
586 }
587 }
588
589 fn remove_pending_outbound_response(
595 &mut self,
596 peer: &PeerId,
597 connection_id: ConnectionId,
598 request: OutboundRequestId,
599 ) -> bool {
600 self.get_connection_mut(peer, connection_id)
601 .map(|c| c.pending_outbound_responses.remove(&request))
602 .unwrap_or(false)
603 }
604
605 fn remove_pending_inbound_response(
611 &mut self,
612 peer: &PeerId,
613 connection_id: ConnectionId,
614 request: InboundRequestId,
615 ) -> bool {
616 self.get_connection_mut(peer, connection_id)
617 .map(|c| c.pending_inbound_responses.remove(&request))
618 .unwrap_or(false)
619 }
620
621 fn get_connection_mut(
624 &mut self,
625 peer: &PeerId,
626 connection_id: ConnectionId,
627 ) -> Option<&mut Connection> {
628 self.connected
629 .get_mut(peer)
630 .and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id))
631 }
632
633 fn on_address_change(
634 &mut self,
635 AddressChange {
636 peer_id,
637 connection_id,
638 new,
639 ..
640 }: AddressChange,
641 ) {
642 let new_address = match new {
643 ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
644 ConnectedPoint::Listener { .. } => None,
645 };
646 let connections = self
647 .connected
648 .get_mut(&peer_id)
649 .expect("Address change can only happen on an established connection.");
650
651 let connection = connections
652 .iter_mut()
653 .find(|c| c.id == connection_id)
654 .expect("Address change can only happen on an established connection.");
655 connection.remote_address = new_address;
656 }
657
658 fn on_connection_closed(
659 &mut self,
660 ConnectionClosed {
661 peer_id,
662 connection_id,
663 remaining_established,
664 ..
665 }: ConnectionClosed,
666 ) {
667 let connections = self
668 .connected
669 .get_mut(&peer_id)
670 .expect("Expected some established connection to peer before closing.");
671
672 let connection = connections
673 .iter()
674 .position(|c| c.id == connection_id)
675 .map(|p: usize| connections.remove(p))
676 .expect("Expected connection to be established before closing.");
677
678 debug_assert_eq!(connections.is_empty(), remaining_established == 0);
679 if connections.is_empty() {
680 self.connected.remove(&peer_id);
681 }
682
683 for request_id in connection.pending_inbound_responses {
684 self.pending_events
685 .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
686 peer: peer_id,
687 connection_id,
688 request_id,
689 error: InboundFailure::ConnectionClosed,
690 }));
691 }
692
693 for request_id in connection.pending_outbound_responses {
694 self.pending_events
695 .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
696 peer: peer_id,
697 connection_id,
698 request_id,
699 error: OutboundFailure::ConnectionClosed,
700 }));
701 }
702 }
703
704 fn on_dial_failure(
705 &mut self,
706 DialFailure {
707 peer_id,
708 connection_id,
709 error,
710 }: DialFailure,
711 ) {
712 if let DialError::DialPeerConditionFalse(_) = error {
713 return;
715 }
716 if let Some(peer) = peer_id {
717 if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
724 for request in pending {
725 self.pending_events
726 .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
727 peer,
728 connection_id,
729 request_id: request.request_id,
730 error: OutboundFailure::DialFailure,
731 }));
732 }
733 }
734 }
735 }
736
737 fn preload_new_handler(
740 &mut self,
741 handler: &mut Handler<TCodec>,
742 peer: PeerId,
743 connection_id: ConnectionId,
744 remote_address: Option<Multiaddr>,
745 ) {
746 let mut connection = Connection::new(connection_id, remote_address);
747
748 if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
749 for request in pending_requests {
750 connection
751 .pending_outbound_responses
752 .insert(request.request_id);
753 handler.on_behaviour_event(request);
754 }
755 }
756
757 self.connected.entry(peer).or_default().push(connection);
758 }
759}
760
761impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
762where
763 TCodec: Codec + Send + Clone + 'static,
764{
765 type ConnectionHandler = Handler<TCodec>;
766 type ToSwarm = Event<TCodec::Request, TCodec::Response>;
767
768 fn handle_established_inbound_connection(
769 &mut self,
770 connection_id: ConnectionId,
771 peer: PeerId,
772 _: &Multiaddr,
773 _: &Multiaddr,
774 ) -> Result<THandler<Self>, ConnectionDenied> {
775 let mut handler = Handler::new(
776 self.inbound_protocols.clone(),
777 self.codec.clone(),
778 self.config.request_timeout,
779 self.next_inbound_request_id.clone(),
780 self.config.max_concurrent_streams,
781 );
782
783 self.preload_new_handler(&mut handler, peer, connection_id, None);
784
785 Ok(handler)
786 }
787
788 fn handle_pending_outbound_connection(
789 &mut self,
790 _connection_id: ConnectionId,
791 maybe_peer: Option<PeerId>,
792 _addresses: &[Multiaddr],
793 _effective_role: Endpoint,
794 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
795 let Some(peer) = maybe_peer else {
796 return Ok(vec![]);
797 };
798
799 let mut addresses = Vec::new();
800 if let Some(connections) = self.connected.get(&peer) {
801 addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
802 }
803
804 let cached_addrs = self.addresses.get(&peer);
805 addresses.extend(cached_addrs);
806
807 Ok(addresses)
808 }
809
810 fn handle_established_outbound_connection(
811 &mut self,
812 connection_id: ConnectionId,
813 peer: PeerId,
814 remote_address: &Multiaddr,
815 _: Endpoint,
816 _: PortUse,
817 ) -> Result<THandler<Self>, ConnectionDenied> {
818 let mut handler = Handler::new(
819 self.inbound_protocols.clone(),
820 self.codec.clone(),
821 self.config.request_timeout,
822 self.next_inbound_request_id.clone(),
823 self.config.max_concurrent_streams,
824 );
825
826 self.preload_new_handler(
827 &mut handler,
828 peer,
829 connection_id,
830 Some(remote_address.clone()),
831 );
832
833 Ok(handler)
834 }
835
836 fn on_swarm_event(&mut self, event: FromSwarm) {
837 self.addresses.on_swarm_event(&event);
838 match event {
839 FromSwarm::ConnectionEstablished(_) => {}
840 FromSwarm::ConnectionClosed(connection_closed) => {
841 self.on_connection_closed(connection_closed)
842 }
843 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
844 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
845 _ => {}
846 }
847 }
848
849 fn on_connection_handler_event(
850 &mut self,
851 peer: PeerId,
852 connection_id: ConnectionId,
853 event: THandlerOutEvent<Self>,
854 ) {
855 match event {
856 handler::Event::Response {
857 request_id,
858 response,
859 } => {
860 let removed =
861 self.remove_pending_outbound_response(&peer, connection_id, request_id);
862 debug_assert!(
863 removed,
864 "Expect request_id to be pending before receiving response.",
865 );
866
867 let message = Message::Response {
868 request_id,
869 response,
870 };
871 self.pending_events
872 .push_back(ToSwarm::GenerateEvent(Event::Message {
873 peer,
874 connection_id,
875 message,
876 }));
877 }
878 handler::Event::Request {
879 request_id,
880 request,
881 sender,
882 } => match self.get_connection_mut(&peer, connection_id) {
883 Some(connection) => {
884 let inserted = connection.pending_inbound_responses.insert(request_id);
885 debug_assert!(inserted, "Expect id of new request to be unknown.");
886
887 let channel = ResponseChannel { sender };
888 let message = Message::Request {
889 request_id,
890 request,
891 channel,
892 };
893 self.pending_events
894 .push_back(ToSwarm::GenerateEvent(Event::Message {
895 peer,
896 connection_id,
897 message,
898 }));
899 }
900 None => {
901 tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted.");
902 }
903 },
904 handler::Event::ResponseSent(request_id) => {
905 let removed =
906 self.remove_pending_inbound_response(&peer, connection_id, request_id);
907 debug_assert!(
908 removed,
909 "Expect request_id to be pending before response is sent."
910 );
911
912 self.pending_events
913 .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
914 peer,
915 connection_id,
916 request_id,
917 }));
918 }
919 handler::Event::ResponseOmission(request_id) => {
920 let removed =
921 self.remove_pending_inbound_response(&peer, connection_id, request_id);
922 debug_assert!(
923 removed,
924 "Expect request_id to be pending before response is omitted.",
925 );
926
927 self.pending_events
928 .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
929 peer,
930 connection_id,
931 request_id,
932 error: InboundFailure::ResponseOmission,
933 }));
934 }
935 handler::Event::OutboundTimeout(request_id) => {
936 let removed =
937 self.remove_pending_outbound_response(&peer, connection_id, request_id);
938 debug_assert!(
939 removed,
940 "Expect request_id to be pending before request times out."
941 );
942
943 self.pending_events
944 .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
945 peer,
946 connection_id,
947 request_id,
948 error: OutboundFailure::Timeout,
949 }));
950 }
951 handler::Event::OutboundUnsupportedProtocols(request_id) => {
952 let removed =
953 self.remove_pending_outbound_response(&peer, connection_id, request_id);
954 debug_assert!(
955 removed,
956 "Expect request_id to be pending before failing to connect.",
957 );
958
959 self.pending_events
960 .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
961 peer,
962 connection_id,
963 request_id,
964 error: OutboundFailure::UnsupportedProtocols,
965 }));
966 }
967 handler::Event::OutboundStreamFailed { request_id, error } => {
968 let removed =
969 self.remove_pending_outbound_response(&peer, connection_id, request_id);
970 debug_assert!(removed, "Expect request_id to be pending upon failure");
971
972 self.pending_events
973 .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
974 peer,
975 connection_id,
976 request_id,
977 error: OutboundFailure::Io(error),
978 }))
979 }
980 handler::Event::InboundTimeout(request_id) => {
981 let removed =
982 self.remove_pending_inbound_response(&peer, connection_id, request_id);
983
984 if removed {
985 self.pending_events
986 .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
987 peer,
988 connection_id,
989 request_id,
990 error: InboundFailure::Timeout,
991 }));
992 } else {
993 tracing::debug!(
995 "Inbound request timeout for an unknown request_id ({request_id})"
996 );
997 }
998 }
999 handler::Event::InboundStreamFailed { request_id, error } => {
1000 let removed =
1001 self.remove_pending_inbound_response(&peer, connection_id, request_id);
1002
1003 if removed {
1004 self.pending_events
1005 .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
1006 peer,
1007 connection_id,
1008 request_id,
1009 error: InboundFailure::Io(error),
1010 }));
1011 } else {
1012 tracing::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}");
1014 }
1015 }
1016 }
1017 }
1018
1019 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
1020 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
1021 if let Some(ev) = self.pending_events.pop_front() {
1022 return Poll::Ready(ev);
1023 } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
1024 self.pending_events.shrink_to_fit();
1025 }
1026
1027 Poll::Pending
1028 }
1029}
1030
1031const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
1036
1037struct Connection {
1039 id: ConnectionId,
1040 remote_address: Option<Multiaddr>,
1041 pending_outbound_responses: HashSet<OutboundRequestId>,
1045 pending_inbound_responses: HashSet<InboundRequestId>,
1048}
1049
1050impl Connection {
1051 fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
1052 Self {
1053 id,
1054 remote_address,
1055 pending_outbound_responses: Default::default(),
1056 pending_inbound_responses: Default::default(),
1057 }
1058 }
1059}