libp2p_request_response/
lib.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `Behaviour` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`Codec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`Behaviour::with_codec`]. Further configuration options are
32//! available via the [`Config`].
33//!
34//! Requests are sent using [`Behaviour::send_request`] and the
35//! responses received as [`Message::Response`] via
36//! [`Event::Message`].
37//!
38//! Responses are sent using [`Behaviour::send_response`] upon
39//! receiving a [`Message::Request`] via
40//! [`Event::Message`].
41//!
42//! ## Predefined codecs
43//!
44//! In case your message types implement [`serde::Serialize`] and [`serde::Deserialize`],
45//! you can use two predefined behaviours:
46//!
47//! - [`cbor::Behaviour`] for CBOR-encoded messages
48//! - [`json::Behaviour`] for JSON-encoded messages
49//!
50//! ## Protocol Families
51//!
52//! A single [`Behaviour`] instance can be used with an entire
53//! protocol family that share the same request and response types.
54//! For that purpose, [`Codec::Protocol`] is typically
55//! instantiated with a sum type.
56//!
57//! ## Limited Protocol Support
58//!
59//! It is possible to only support inbound or outbound requests for
60//! a particular protocol. This is achieved by instantiating `Behaviour`
61//! with protocols using [`ProtocolSupport::Inbound`] or
62//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
63//! family can be configured in this way. Such protocols will not be
64//! advertised during inbound respectively outbound protocol negotiation
65//! on the substreams.
66
67#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
68
69#[cfg(feature = "cbor")]
70pub mod cbor;
71mod codec;
72mod handler;
73#[cfg(feature = "json")]
74pub mod json;
75
76use std::{
77    collections::{HashMap, HashSet, VecDeque},
78    fmt, io,
79    sync::{atomic::AtomicU64, Arc},
80    task::{Context, Poll},
81    time::Duration,
82};
83
84pub use codec::Codec;
85use futures::channel::oneshot;
86use handler::Handler;
87pub use handler::ProtocolSupport;
88use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
89use libp2p_identity::PeerId;
90use libp2p_swarm::{
91    behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
92    dial_opts::DialOpts,
93    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
94    PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
95};
96use smallvec::SmallVec;
97
98use crate::handler::OutboundMessage;
99
100/// An inbound request or response.
101#[derive(Debug)]
102pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
103    /// A request message.
104    Request {
105        /// The ID of this request.
106        request_id: InboundRequestId,
107        /// The request message.
108        request: TRequest,
109        /// The channel waiting for the response.
110        ///
111        /// If this channel is dropped instead of being used to send a response
112        /// via [`Behaviour::send_response`], a [`Event::InboundFailure`]
113        /// with [`InboundFailure::ResponseOmission`] is emitted.
114        channel: ResponseChannel<TChannelResponse>,
115    },
116    /// A response message.
117    Response {
118        /// The ID of the request that produced this response.
119        ///
120        /// See [`Behaviour::send_request`].
121        request_id: OutboundRequestId,
122        /// The response message.
123        response: TResponse,
124    },
125}
126
127/// The events emitted by a request-response [`Behaviour`].
128#[derive(Debug)]
129pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
130    /// An incoming message (request or response).
131    Message {
132        /// The peer who sent the message.
133        peer: PeerId,
134        /// The connection used.
135        connection_id: ConnectionId,
136        /// The incoming message.
137        message: Message<TRequest, TResponse, TChannelResponse>,
138    },
139    /// An outbound request failed.
140    OutboundFailure {
141        /// The peer to whom the request was sent.
142        peer: PeerId,
143        /// The connection used.
144        connection_id: ConnectionId,
145        /// The (local) ID of the failed request.
146        request_id: OutboundRequestId,
147        /// The error that occurred.
148        error: OutboundFailure,
149    },
150    /// An inbound request failed.
151    InboundFailure {
152        /// The peer from whom the request was received.
153        peer: PeerId,
154        /// The connection used.
155        connection_id: ConnectionId,
156        /// The ID of the failed inbound request.
157        request_id: InboundRequestId,
158        /// The error that occurred.
159        error: InboundFailure,
160    },
161    /// A response to an inbound request has been sent.
162    ///
163    /// When this event is received, the response has been flushed on
164    /// the underlying transport connection.
165    ResponseSent {
166        /// The peer to whom the response was sent.
167        peer: PeerId,
168        /// The connection used.
169        connection_id: ConnectionId,
170        /// The ID of the inbound request whose response was sent.
171        request_id: InboundRequestId,
172    },
173}
174
175/// Possible failures occurring in the context of sending
176/// an outbound request and receiving the response.
177#[derive(Debug)]
178pub enum OutboundFailure {
179    /// The request could not be sent because a dialing attempt failed.
180    DialFailure,
181    /// The request timed out before a response was received.
182    ///
183    /// It is not known whether the request may have been
184    /// received (and processed) by the remote peer.
185    Timeout,
186    /// The connection closed before a response was received.
187    ///
188    /// It is not known whether the request may have been
189    /// received (and processed) by the remote peer.
190    ConnectionClosed,
191    /// The remote supports none of the requested protocols.
192    UnsupportedProtocols,
193    /// An IO failure happened on an outbound stream.
194    Io(io::Error),
195}
196
197impl fmt::Display for OutboundFailure {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        match self {
200            OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
201            OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
202            OutboundFailure::ConnectionClosed => {
203                write!(f, "Connection was closed before a response was received")
204            }
205            OutboundFailure::UnsupportedProtocols => {
206                write!(f, "The remote supports none of the requested protocols")
207            }
208            OutboundFailure::Io(e) => write!(f, "IO error on outbound stream: {e}"),
209        }
210    }
211}
212
213impl std::error::Error for OutboundFailure {}
214
215/// Possible failures occurring in the context of receiving an
216/// inbound request and sending a response.
217#[derive(Debug)]
218pub enum InboundFailure {
219    /// The inbound request timed out, either while reading the
220    /// incoming request or before a response is sent, e.g. if
221    /// [`Behaviour::send_response`] is not called in a
222    /// timely manner.
223    Timeout,
224    /// The connection closed before a response could be send.
225    ConnectionClosed,
226    /// The local peer supports none of the protocols requested
227    /// by the remote.
228    UnsupportedProtocols,
229    /// The local peer failed to respond to an inbound request
230    /// due to the [`ResponseChannel`] being dropped instead of
231    /// being passed to [`Behaviour::send_response`].
232    ResponseOmission,
233    /// An IO failure happened on an inbound stream.
234    Io(io::Error),
235}
236
237impl fmt::Display for InboundFailure {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            InboundFailure::Timeout => {
241                write!(f, "Timeout while receiving request or sending response")
242            }
243            InboundFailure::ConnectionClosed => {
244                write!(f, "Connection was closed before a response could be sent")
245            }
246            InboundFailure::UnsupportedProtocols => write!(
247                f,
248                "The local peer supports none of the protocols requested by the remote"
249            ),
250            InboundFailure::ResponseOmission => write!(
251                f,
252                "The response channel was dropped without sending a response to the remote"
253            ),
254            InboundFailure::Io(e) => write!(f, "IO error on inbound stream: {e}"),
255        }
256    }
257}
258
259impl std::error::Error for InboundFailure {}
260
261/// A channel for sending a response to an inbound request.
262///
263/// See [`Behaviour::send_response`].
264#[derive(Debug)]
265pub struct ResponseChannel<TResponse> {
266    sender: oneshot::Sender<TResponse>,
267}
268
269impl<TResponse> ResponseChannel<TResponse> {
270    /// Checks whether the response channel is still open, i.e.
271    /// the `Behaviour` is still waiting for a
272    /// a response to be sent via [`Behaviour::send_response`]
273    /// and this response channel.
274    ///
275    /// If the response channel is no longer open then the inbound
276    /// request timed out waiting for the response.
277    pub fn is_open(&self) -> bool {
278        !self.sender.is_canceled()
279    }
280}
281
282/// The ID of an inbound request.
283///
284/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between
285/// inbound requests of the same originating [`Behaviour`].
286#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
287pub struct InboundRequestId(u64);
288
289impl fmt::Display for InboundRequestId {
290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291        write!(f, "{}", self.0)
292    }
293}
294
295/// The ID of an outbound request.
296///
297/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between
298/// outbound requests of the same originating [`Behaviour`].
299#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
300pub struct OutboundRequestId(u64);
301
302impl fmt::Display for OutboundRequestId {
303    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304        write!(f, "{}", self.0)
305    }
306}
307
308/// The configuration for a `Behaviour` protocol.
309#[derive(Debug, Clone)]
310pub struct Config {
311    request_timeout: Duration,
312    max_concurrent_streams: usize,
313}
314
315impl Default for Config {
316    fn default() -> Self {
317        Self {
318            request_timeout: Duration::from_secs(10),
319            max_concurrent_streams: 100,
320        }
321    }
322}
323
324impl Config {
325    /// Sets the timeout for inbound and outbound requests.
326    #[deprecated(note = "Use `Config::with_request_timeout` for one-liner constructions.")]
327    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
328        self.request_timeout = v;
329        self
330    }
331
332    /// Sets the timeout for inbound and outbound requests.
333    pub fn with_request_timeout(mut self, v: Duration) -> Self {
334        self.request_timeout = v;
335        self
336    }
337
338    /// Sets the upper bound for the number of concurrent inbound + outbound streams.
339    pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self {
340        self.max_concurrent_streams = num_streams;
341        self
342    }
343}
344
345/// A request/response protocol for some message codec.
346pub struct Behaviour<TCodec>
347where
348    TCodec: Codec + Clone + Send + 'static,
349{
350    /// The supported inbound protocols.
351    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
352    /// The supported outbound protocols.
353    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
354    /// The next (local) request ID.
355    next_outbound_request_id: OutboundRequestId,
356    /// The next (inbound) request ID.
357    next_inbound_request_id: Arc<AtomicU64>,
358    /// The protocol configuration.
359    config: Config,
360    /// The protocol codec for reading and writing requests and responses.
361    codec: TCodec,
362    /// Pending events to return from `poll`.
363    pending_events:
364        VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, OutboundMessage<TCodec>>>,
365    /// The currently connected peers, their pending outbound and inbound responses and their
366    /// known, reachable addresses, if any.
367    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
368    /// Externally managed addresses via `add_address` and `remove_address`.
369    addresses: PeerAddresses,
370    /// Requests that have not yet been sent and are waiting for a connection
371    /// to be established.
372    pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
373}
374
375impl<TCodec> Behaviour<TCodec>
376where
377    TCodec: Codec + Default + Clone + Send + 'static,
378{
379    /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to
380    /// construct the codec.
381    pub fn new<I>(protocols: I, cfg: Config) -> Self
382    where
383        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
384    {
385        Self::with_codec(TCodec::default(), protocols, cfg)
386    }
387}
388
389impl<TCodec> Behaviour<TCodec>
390where
391    TCodec: Codec + Clone + Send + 'static,
392{
393    /// Creates a new `Behaviour` for the given
394    /// protocols, codec and configuration.
395    pub fn with_codec<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
396    where
397        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
398    {
399        let mut inbound_protocols = SmallVec::new();
400        let mut outbound_protocols = SmallVec::new();
401        for (p, s) in protocols {
402            if s.inbound() {
403                inbound_protocols.push(p.clone());
404            }
405            if s.outbound() {
406                outbound_protocols.push(p.clone());
407            }
408        }
409        Behaviour {
410            inbound_protocols,
411            outbound_protocols,
412            next_outbound_request_id: OutboundRequestId(1),
413            next_inbound_request_id: Arc::new(AtomicU64::new(1)),
414            config: cfg,
415            codec,
416            pending_events: VecDeque::new(),
417            connected: HashMap::new(),
418            pending_outbound_requests: HashMap::new(),
419            addresses: PeerAddresses::default(),
420        }
421    }
422
423    /// Initiates sending a request.
424    ///
425    /// If the targeted peer is currently not connected, a dialing
426    /// attempt is initiated and the request is sent as soon as a
427    /// connection is established.
428    ///
429    /// > **Note**: In order for such a dialing attempt to succeed,
430    /// > the `RequestResponse` protocol must either be embedded
431    /// > in another `NetworkBehaviour` that provides peer and
432    /// > address discovery, or known addresses of peers must be
433    /// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
434    /// > Addresses are automatically removed when dial attempts
435    /// > to them fail.
436    /// > Alternatively, [`Behaviour::send_request_with_addresses`]
437    /// > can be used.
438    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
439        self.send_request_with_addresses(peer, request, Vec::new())
440    }
441
442    /// Like [`Behaviour::send_request`], but additionally using the provided addresses
443    /// if a connection needs to be established.
444    pub fn send_request_with_addresses(
445        &mut self,
446        peer: &PeerId,
447        request: TCodec::Request,
448        addresses: Vec<Multiaddr>,
449    ) -> OutboundRequestId {
450        let request_id = self.next_outbound_request_id();
451        let request = OutboundMessage {
452            request_id,
453            request,
454            protocols: self.outbound_protocols.clone(),
455        };
456
457        if let Some(request) = self.try_send_request(peer, request) {
458            self.pending_events.push_back(ToSwarm::Dial {
459                opts: DialOpts::peer_id(*peer)
460                    .addresses(addresses)
461                    .extend_addresses_through_behaviour()
462                    .build(),
463            });
464            self.pending_outbound_requests
465                .entry(*peer)
466                .or_default()
467                .push(request);
468        }
469
470        request_id
471    }
472
473    /// Initiates sending a response to an inbound request.
474    ///
475    /// If the [`ResponseChannel`] is already closed due to a timeout or the
476    /// connection being closed, the response is returned as an `Err` for
477    /// further handling. Once the response has been successfully sent on the
478    /// corresponding connection, [`Event::ResponseSent`] is
479    /// emitted. In all other cases [`Event::InboundFailure`]
480    /// will be or has been emitted.
481    ///
482    /// The provided `ResponseChannel` is obtained from an inbound
483    /// [`Message::Request`].
484    pub fn send_response(
485        &mut self,
486        ch: ResponseChannel<TCodec::Response>,
487        rs: TCodec::Response,
488    ) -> Result<(), TCodec::Response> {
489        ch.sender.send(rs)
490    }
491
492    /// Adds a known address for a peer that can be used for
493    /// dialing attempts by the `Swarm`, i.e. is returned
494    /// by [`NetworkBehaviour::handle_pending_outbound_connection`].
495    ///
496    /// Addresses added in this way are only removed by `remove_address`.
497    ///
498    /// Returns true if the address was added, false otherwise (i.e. if the
499    /// address is already in the list).
500    #[deprecated(note = "Use `Swarm::add_peer_address` instead.")]
501    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool {
502        self.addresses.add(*peer, address)
503    }
504
505    /// Removes an address of a peer previously added via [`Behaviour::add_address`].
506    #[deprecated(note = "Will be removed with the next breaking release and won't be replaced.")]
507    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
508        self.addresses.remove(peer, address);
509    }
510
511    /// Checks whether a peer is currently connected.
512    pub fn is_connected(&self, peer: &PeerId) -> bool {
513        if let Some(connections) = self.connected.get(peer) {
514            !connections.is_empty()
515        } else {
516            false
517        }
518    }
519
520    /// Checks whether an outbound request to the peer with the provided
521    /// [`PeerId`] initiated by [`Behaviour::send_request`] is still
522    /// pending, i.e. waiting for a response.
523    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool {
524        // Check if request is already sent on established connection.
525        let est_conn = self
526            .connected
527            .get(peer)
528            .map(|cs| {
529                cs.iter()
530                    .any(|c| c.pending_outbound_responses.contains(request_id))
531            })
532            .unwrap_or(false);
533        // Check if request is still pending to be sent.
534        let pen_conn = self
535            .pending_outbound_requests
536            .get(peer)
537            .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
538            .unwrap_or(false);
539
540        est_conn || pen_conn
541    }
542
543    /// Checks whether an inbound request from the peer with the provided
544    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
545    /// node through [`Behaviour::send_response`].
546    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool {
547        self.connected
548            .get(peer)
549            .map(|cs| {
550                cs.iter()
551                    .any(|c| c.pending_inbound_responses.contains(request_id))
552            })
553            .unwrap_or(false)
554    }
555
556    /// Returns the next outbound request ID.
557    fn next_outbound_request_id(&mut self) -> OutboundRequestId {
558        let request_id = self.next_outbound_request_id;
559        self.next_outbound_request_id.0 += 1;
560        request_id
561    }
562
563    /// Tries to send a request by queueing an appropriate event to be
564    /// emitted to the `Swarm`. If the peer is not currently connected,
565    /// the given request is return unchanged.
566    fn try_send_request(
567        &mut self,
568        peer: &PeerId,
569        request: OutboundMessage<TCodec>,
570    ) -> Option<OutboundMessage<TCodec>> {
571        if let Some(connections) = self.connected.get_mut(peer) {
572            if connections.is_empty() {
573                return Some(request);
574            }
575            let ix = (request.request_id.0 as usize) % connections.len();
576            let conn = &mut connections[ix];
577            conn.pending_outbound_responses.insert(request.request_id);
578            self.pending_events.push_back(ToSwarm::NotifyHandler {
579                peer_id: *peer,
580                handler: NotifyHandler::One(conn.id),
581                event: request,
582            });
583            None
584        } else {
585            Some(request)
586        }
587    }
588
589    /// Remove pending outbound response for the given peer and connection.
590    ///
591    /// Returns `true` if the provided connection to the given peer is still
592    /// alive and the [`OutboundRequestId`] was previously present and is now removed.
593    /// Returns `false` otherwise.
594    fn remove_pending_outbound_response(
595        &mut self,
596        peer: &PeerId,
597        connection_id: ConnectionId,
598        request: OutboundRequestId,
599    ) -> bool {
600        self.get_connection_mut(peer, connection_id)
601            .map(|c| c.pending_outbound_responses.remove(&request))
602            .unwrap_or(false)
603    }
604
605    /// Remove pending inbound response for the given peer and connection.
606    ///
607    /// Returns `true` if the provided connection to the given peer is still
608    /// alive and the [`InboundRequestId`] was previously present and is now removed.
609    /// Returns `false` otherwise.
610    fn remove_pending_inbound_response(
611        &mut self,
612        peer: &PeerId,
613        connection_id: ConnectionId,
614        request: InboundRequestId,
615    ) -> bool {
616        self.get_connection_mut(peer, connection_id)
617            .map(|c| c.pending_inbound_responses.remove(&request))
618            .unwrap_or(false)
619    }
620
621    /// Returns a mutable reference to the connection in `self.connected`
622    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
623    fn get_connection_mut(
624        &mut self,
625        peer: &PeerId,
626        connection_id: ConnectionId,
627    ) -> Option<&mut Connection> {
628        self.connected
629            .get_mut(peer)
630            .and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id))
631    }
632
633    fn on_address_change(
634        &mut self,
635        AddressChange {
636            peer_id,
637            connection_id,
638            new,
639            ..
640        }: AddressChange,
641    ) {
642        let new_address = match new {
643            ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
644            ConnectedPoint::Listener { .. } => None,
645        };
646        let connections = self
647            .connected
648            .get_mut(&peer_id)
649            .expect("Address change can only happen on an established connection.");
650
651        let connection = connections
652            .iter_mut()
653            .find(|c| c.id == connection_id)
654            .expect("Address change can only happen on an established connection.");
655        connection.remote_address = new_address;
656    }
657
658    fn on_connection_closed(
659        &mut self,
660        ConnectionClosed {
661            peer_id,
662            connection_id,
663            remaining_established,
664            ..
665        }: ConnectionClosed,
666    ) {
667        let connections = self
668            .connected
669            .get_mut(&peer_id)
670            .expect("Expected some established connection to peer before closing.");
671
672        let connection = connections
673            .iter()
674            .position(|c| c.id == connection_id)
675            .map(|p: usize| connections.remove(p))
676            .expect("Expected connection to be established before closing.");
677
678        debug_assert_eq!(connections.is_empty(), remaining_established == 0);
679        if connections.is_empty() {
680            self.connected.remove(&peer_id);
681        }
682
683        for request_id in connection.pending_inbound_responses {
684            self.pending_events
685                .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
686                    peer: peer_id,
687                    connection_id,
688                    request_id,
689                    error: InboundFailure::ConnectionClosed,
690                }));
691        }
692
693        for request_id in connection.pending_outbound_responses {
694            self.pending_events
695                .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
696                    peer: peer_id,
697                    connection_id,
698                    request_id,
699                    error: OutboundFailure::ConnectionClosed,
700                }));
701        }
702    }
703
704    fn on_dial_failure(
705        &mut self,
706        DialFailure {
707            peer_id,
708            connection_id,
709            error,
710        }: DialFailure,
711    ) {
712        if let DialError::DialPeerConditionFalse(_) = error {
713            // Dial-condition fails because there is already another ongoing dial.
714            return;
715        }
716        if let Some(peer) = peer_id {
717            // If there are pending outgoing requests when a dial failure occurs,
718            // it is implied that we are not connected to the peer, since pending
719            // outgoing requests are drained when a connection is established and
720            // only created when a peer is not connected when a request is made.
721            // Thus these requests must be considered failed, even if there is
722            // another, concurrent dialing attempt ongoing.
723            if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
724                for request in pending {
725                    self.pending_events
726                        .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
727                            peer,
728                            connection_id,
729                            request_id: request.request_id,
730                            error: OutboundFailure::DialFailure,
731                        }));
732                }
733            }
734        }
735    }
736
737    /// Preloads a new [`Handler`] with requests that are
738    /// waiting to be sent to the newly connected peer.
739    fn preload_new_handler(
740        &mut self,
741        handler: &mut Handler<TCodec>,
742        peer: PeerId,
743        connection_id: ConnectionId,
744        remote_address: Option<Multiaddr>,
745    ) {
746        let mut connection = Connection::new(connection_id, remote_address);
747
748        if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
749            for request in pending_requests {
750                connection
751                    .pending_outbound_responses
752                    .insert(request.request_id);
753                handler.on_behaviour_event(request);
754            }
755        }
756
757        self.connected.entry(peer).or_default().push(connection);
758    }
759}
760
761impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
762where
763    TCodec: Codec + Send + Clone + 'static,
764{
765    type ConnectionHandler = Handler<TCodec>;
766    type ToSwarm = Event<TCodec::Request, TCodec::Response>;
767
768    fn handle_established_inbound_connection(
769        &mut self,
770        connection_id: ConnectionId,
771        peer: PeerId,
772        _: &Multiaddr,
773        _: &Multiaddr,
774    ) -> Result<THandler<Self>, ConnectionDenied> {
775        let mut handler = Handler::new(
776            self.inbound_protocols.clone(),
777            self.codec.clone(),
778            self.config.request_timeout,
779            self.next_inbound_request_id.clone(),
780            self.config.max_concurrent_streams,
781        );
782
783        self.preload_new_handler(&mut handler, peer, connection_id, None);
784
785        Ok(handler)
786    }
787
788    fn handle_pending_outbound_connection(
789        &mut self,
790        _connection_id: ConnectionId,
791        maybe_peer: Option<PeerId>,
792        _addresses: &[Multiaddr],
793        _effective_role: Endpoint,
794    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
795        let Some(peer) = maybe_peer else {
796            return Ok(vec![]);
797        };
798
799        let mut addresses = Vec::new();
800        if let Some(connections) = self.connected.get(&peer) {
801            addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
802        }
803
804        let cached_addrs = self.addresses.get(&peer);
805        addresses.extend(cached_addrs);
806
807        Ok(addresses)
808    }
809
810    fn handle_established_outbound_connection(
811        &mut self,
812        connection_id: ConnectionId,
813        peer: PeerId,
814        remote_address: &Multiaddr,
815        _: Endpoint,
816        _: PortUse,
817    ) -> Result<THandler<Self>, ConnectionDenied> {
818        let mut handler = Handler::new(
819            self.inbound_protocols.clone(),
820            self.codec.clone(),
821            self.config.request_timeout,
822            self.next_inbound_request_id.clone(),
823            self.config.max_concurrent_streams,
824        );
825
826        self.preload_new_handler(
827            &mut handler,
828            peer,
829            connection_id,
830            Some(remote_address.clone()),
831        );
832
833        Ok(handler)
834    }
835
836    fn on_swarm_event(&mut self, event: FromSwarm) {
837        self.addresses.on_swarm_event(&event);
838        match event {
839            FromSwarm::ConnectionEstablished(_) => {}
840            FromSwarm::ConnectionClosed(connection_closed) => {
841                self.on_connection_closed(connection_closed)
842            }
843            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
844            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
845            _ => {}
846        }
847    }
848
849    fn on_connection_handler_event(
850        &mut self,
851        peer: PeerId,
852        connection_id: ConnectionId,
853        event: THandlerOutEvent<Self>,
854    ) {
855        match event {
856            handler::Event::Response {
857                request_id,
858                response,
859            } => {
860                let removed =
861                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
862                debug_assert!(
863                    removed,
864                    "Expect request_id to be pending before receiving response.",
865                );
866
867                let message = Message::Response {
868                    request_id,
869                    response,
870                };
871                self.pending_events
872                    .push_back(ToSwarm::GenerateEvent(Event::Message {
873                        peer,
874                        connection_id,
875                        message,
876                    }));
877            }
878            handler::Event::Request {
879                request_id,
880                request,
881                sender,
882            } => match self.get_connection_mut(&peer, connection_id) {
883                Some(connection) => {
884                    let inserted = connection.pending_inbound_responses.insert(request_id);
885                    debug_assert!(inserted, "Expect id of new request to be unknown.");
886
887                    let channel = ResponseChannel { sender };
888                    let message = Message::Request {
889                        request_id,
890                        request,
891                        channel,
892                    };
893                    self.pending_events
894                        .push_back(ToSwarm::GenerateEvent(Event::Message {
895                            peer,
896                            connection_id,
897                            message,
898                        }));
899                }
900                None => {
901                    tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted.");
902                }
903            },
904            handler::Event::ResponseSent(request_id) => {
905                let removed =
906                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
907                debug_assert!(
908                    removed,
909                    "Expect request_id to be pending before response is sent."
910                );
911
912                self.pending_events
913                    .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
914                        peer,
915                        connection_id,
916                        request_id,
917                    }));
918            }
919            handler::Event::ResponseOmission(request_id) => {
920                let removed =
921                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
922                debug_assert!(
923                    removed,
924                    "Expect request_id to be pending before response is omitted.",
925                );
926
927                self.pending_events
928                    .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
929                        peer,
930                        connection_id,
931                        request_id,
932                        error: InboundFailure::ResponseOmission,
933                    }));
934            }
935            handler::Event::OutboundTimeout(request_id) => {
936                let removed =
937                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
938                debug_assert!(
939                    removed,
940                    "Expect request_id to be pending before request times out."
941                );
942
943                self.pending_events
944                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
945                        peer,
946                        connection_id,
947                        request_id,
948                        error: OutboundFailure::Timeout,
949                    }));
950            }
951            handler::Event::OutboundUnsupportedProtocols(request_id) => {
952                let removed =
953                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
954                debug_assert!(
955                    removed,
956                    "Expect request_id to be pending before failing to connect.",
957                );
958
959                self.pending_events
960                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
961                        peer,
962                        connection_id,
963                        request_id,
964                        error: OutboundFailure::UnsupportedProtocols,
965                    }));
966            }
967            handler::Event::OutboundStreamFailed { request_id, error } => {
968                let removed =
969                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
970                debug_assert!(removed, "Expect request_id to be pending upon failure");
971
972                self.pending_events
973                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
974                        peer,
975                        connection_id,
976                        request_id,
977                        error: OutboundFailure::Io(error),
978                    }))
979            }
980            handler::Event::InboundTimeout(request_id) => {
981                let removed =
982                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
983
984                if removed {
985                    self.pending_events
986                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
987                            peer,
988                            connection_id,
989                            request_id,
990                            error: InboundFailure::Timeout,
991                        }));
992                } else {
993                    // This happens when timeout is emitted before `read_request` finishes.
994                    tracing::debug!(
995                        "Inbound request timeout for an unknown request_id ({request_id})"
996                    );
997                }
998            }
999            handler::Event::InboundStreamFailed { request_id, error } => {
1000                let removed =
1001                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
1002
1003                if removed {
1004                    self.pending_events
1005                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
1006                            peer,
1007                            connection_id,
1008                            request_id,
1009                            error: InboundFailure::Io(error),
1010                        }));
1011                } else {
1012                    // This happens when `read_request` fails.
1013                    tracing::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}");
1014                }
1015            }
1016        }
1017    }
1018
1019    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
1020    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
1021        if let Some(ev) = self.pending_events.pop_front() {
1022            return Poll::Ready(ev);
1023        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
1024            self.pending_events.shrink_to_fit();
1025        }
1026
1027        Poll::Pending
1028    }
1029}
1030
1031/// Internal threshold for when to shrink the capacity
1032/// of empty queues. If the capacity of an empty queue
1033/// exceeds this threshold, the associated memory is
1034/// released.
1035const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
1036
1037/// Internal information tracked for an established connection.
1038struct Connection {
1039    id: ConnectionId,
1040    remote_address: Option<Multiaddr>,
1041    /// Pending outbound responses where corresponding inbound requests have
1042    /// been received on this connection and emitted via `poll` but have not yet
1043    /// been answered.
1044    pending_outbound_responses: HashSet<OutboundRequestId>,
1045    /// Pending inbound responses for previously sent requests on this
1046    /// connection.
1047    pending_inbound_responses: HashSet<InboundRequestId>,
1048}
1049
1050impl Connection {
1051    fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
1052        Self {
1053            id,
1054            remote_address,
1055            pending_outbound_responses: Default::default(),
1056            pending_inbound_responses: Default::default(),
1057        }
1058    }
1059}