libp2p_request_response/
lib.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `Behaviour` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`Codec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`Behaviour::with_codec`]. Further configuration options are
32//! available via the [`Config`].
33//!
34//! Requests are sent using [`Behaviour::send_request`] and the
35//! responses received as [`Message::Response`] via
36//! [`Event::Message`].
37//!
38//! Responses are sent using [`Behaviour::send_response`] upon
39//! receiving a [`Message::Request`] via
40//! [`Event::Message`].
41//!
42//! ## Predefined codecs
43//!
44//! In case your message types implement [`serde::Serialize`] and [`serde::Deserialize`],
45//! you can use two predefined behaviours:
46//!
47//! - [`cbor::Behaviour`] for CBOR-encoded messages
48//! - [`json::Behaviour`] for JSON-encoded messages
49//!
50//! ## Protocol Families
51//!
52//! A single [`Behaviour`] instance can be used with an entire
53//! protocol family that share the same request and response types.
54//! For that purpose, [`Codec::Protocol`] is typically
55//! instantiated with a sum type.
56//!
57//! ## Limited Protocol Support
58//!
59//! It is possible to only support inbound or outbound requests for
60//! a particular protocol. This is achieved by instantiating `Behaviour`
61//! with protocols using [`ProtocolSupport::Inbound`] or
62//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
63//! family can be configured in this way. Such protocols will not be
64//! advertised during inbound respectively outbound protocol negotiation
65//! on the substreams.
66
67#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
68
69#[cfg(feature = "cbor")]
70pub mod cbor;
71mod codec;
72mod handler;
73#[cfg(feature = "json")]
74pub mod json;
75
76use std::{
77    collections::{HashMap, HashSet, VecDeque},
78    fmt, io,
79    sync::{atomic::AtomicU64, Arc},
80    task::{Context, Poll},
81    time::Duration,
82};
83
84pub use codec::Codec;
85use futures::channel::oneshot;
86use handler::Handler;
87pub use handler::ProtocolSupport;
88use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
89use libp2p_identity::PeerId;
90use libp2p_swarm::{
91    behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
92    dial_opts::DialOpts,
93    ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler,
94    PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
95};
96use smallvec::SmallVec;
97
98use crate::handler::OutboundMessage;
99
100/// An inbound request or response.
101#[derive(Debug)]
102pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
103    /// A request message.
104    Request {
105        /// The ID of this request.
106        request_id: InboundRequestId,
107        /// The request message.
108        request: TRequest,
109        /// The channel waiting for the response.
110        ///
111        /// If this channel is dropped instead of being used to send a response
112        /// via [`Behaviour::send_response`], a [`Event::InboundFailure`]
113        /// with [`InboundFailure::ResponseOmission`] is emitted.
114        channel: ResponseChannel<TChannelResponse>,
115    },
116    /// A response message.
117    Response {
118        /// The ID of the request that produced this response.
119        ///
120        /// See [`Behaviour::send_request`].
121        request_id: OutboundRequestId,
122        /// The response message.
123        response: TResponse,
124    },
125}
126
127/// The events emitted by a request-response [`Behaviour`].
128#[derive(Debug)]
129pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
130    /// An incoming message (request or response).
131    Message {
132        /// The peer who sent the message.
133        peer: PeerId,
134        /// The connection used.
135        connection_id: ConnectionId,
136        /// The incoming message.
137        message: Message<TRequest, TResponse, TChannelResponse>,
138    },
139    /// An outbound request failed.
140    OutboundFailure {
141        /// The peer to whom the request was sent.
142        peer: PeerId,
143        /// The connection used.
144        connection_id: ConnectionId,
145        /// The (local) ID of the failed request.
146        request_id: OutboundRequestId,
147        /// The error that occurred.
148        error: OutboundFailure,
149    },
150    /// An inbound request failed.
151    InboundFailure {
152        /// The peer from whom the request was received.
153        peer: PeerId,
154        /// The connection used.
155        connection_id: ConnectionId,
156        /// The ID of the failed inbound request.
157        request_id: InboundRequestId,
158        /// The error that occurred.
159        error: InboundFailure,
160    },
161    /// A response to an inbound request has been sent.
162    ///
163    /// When this event is received, the response has been flushed on
164    /// the underlying transport connection.
165    ResponseSent {
166        /// The peer to whom the response was sent.
167        peer: PeerId,
168        /// The connection used.
169        connection_id: ConnectionId,
170        /// The ID of the inbound request whose response was sent.
171        request_id: InboundRequestId,
172    },
173}
174
175/// Possible failures occurring in the context of sending
176/// an outbound request and receiving the response.
177#[derive(Debug)]
178pub enum OutboundFailure {
179    /// The request could not be sent because a dialing attempt failed.
180    DialFailure,
181    /// The request timed out before a response was received.
182    ///
183    /// It is not known whether the request may have been
184    /// received (and processed) by the remote peer.
185    Timeout,
186    /// The connection closed before a response was received.
187    ///
188    /// It is not known whether the request may have been
189    /// received (and processed) by the remote peer.
190    ConnectionClosed,
191    /// The remote supports none of the requested protocols.
192    UnsupportedProtocols,
193    /// An IO failure happened on an outbound stream.
194    Io(io::Error),
195}
196
197impl fmt::Display for OutboundFailure {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        match self {
200            OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
201            OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
202            OutboundFailure::ConnectionClosed => {
203                write!(f, "Connection was closed before a response was received")
204            }
205            OutboundFailure::UnsupportedProtocols => {
206                write!(f, "The remote supports none of the requested protocols")
207            }
208            OutboundFailure::Io(e) => write!(f, "IO error on outbound stream: {e}"),
209        }
210    }
211}
212
213impl std::error::Error for OutboundFailure {}
214
215/// Possible failures occurring in the context of receiving an
216/// inbound request and sending a response.
217#[derive(Debug)]
218pub enum InboundFailure {
219    /// The inbound request timed out, either while reading the
220    /// incoming request or before a response is sent, e.g. if
221    /// [`Behaviour::send_response`] is not called in a
222    /// timely manner.
223    Timeout,
224    /// The connection closed before a response could be send.
225    ConnectionClosed,
226    /// The local peer supports none of the protocols requested
227    /// by the remote.
228    UnsupportedProtocols,
229    /// The local peer failed to respond to an inbound request
230    /// due to the [`ResponseChannel`] being dropped instead of
231    /// being passed to [`Behaviour::send_response`].
232    ResponseOmission,
233    /// An IO failure happened on an inbound stream.
234    Io(io::Error),
235}
236
237impl fmt::Display for InboundFailure {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            InboundFailure::Timeout => {
241                write!(f, "Timeout while receiving request or sending response")
242            }
243            InboundFailure::ConnectionClosed => {
244                write!(f, "Connection was closed before a response could be sent")
245            }
246            InboundFailure::UnsupportedProtocols => write!(
247                f,
248                "The local peer supports none of the protocols requested by the remote"
249            ),
250            InboundFailure::ResponseOmission => write!(
251                f,
252                "The response channel was dropped without sending a response to the remote"
253            ),
254            InboundFailure::Io(e) => write!(f, "IO error on inbound stream: {e}"),
255        }
256    }
257}
258
259impl std::error::Error for InboundFailure {}
260
261/// A channel for sending a response to an inbound request.
262///
263/// See [`Behaviour::send_response`].
264#[derive(Debug)]
265pub struct ResponseChannel<TResponse> {
266    sender: oneshot::Sender<TResponse>,
267}
268
269impl<TResponse> ResponseChannel<TResponse> {
270    /// Checks whether the response channel is still open, i.e.
271    /// the `Behaviour` is still waiting for a
272    /// a response to be sent via [`Behaviour::send_response`]
273    /// and this response channel.
274    ///
275    /// If the response channel is no longer open then the inbound
276    /// request timed out waiting for the response.
277    pub fn is_open(&self) -> bool {
278        !self.sender.is_canceled()
279    }
280}
281
282/// The ID of an inbound request.
283///
284/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between
285/// inbound requests of the same originating [`Behaviour`].
286#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
287pub struct InboundRequestId(u64);
288
289impl fmt::Display for InboundRequestId {
290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291        write!(f, "{}", self.0)
292    }
293}
294
295/// The ID of an outbound request.
296///
297/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between
298/// outbound requests of the same originating [`Behaviour`].
299#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
300pub struct OutboundRequestId(u64);
301
302impl fmt::Display for OutboundRequestId {
303    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304        write!(f, "{}", self.0)
305    }
306}
307
308/// The configuration for a `Behaviour` protocol.
309#[derive(Debug, Clone)]
310pub struct Config {
311    request_timeout: Duration,
312    max_concurrent_streams: usize,
313}
314
315impl Default for Config {
316    fn default() -> Self {
317        Self {
318            request_timeout: Duration::from_secs(10),
319            max_concurrent_streams: 100,
320        }
321    }
322}
323
324impl Config {
325    /// Sets the timeout for inbound and outbound requests.
326    #[deprecated(note = "Use `Config::with_request_timeout` for one-liner constructions.")]
327    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
328        self.request_timeout = v;
329        self
330    }
331
332    /// Sets the timeout for inbound and outbound requests.
333    pub fn with_request_timeout(mut self, v: Duration) -> Self {
334        self.request_timeout = v;
335        self
336    }
337
338    /// Sets the upper bound for the number of concurrent inbound + outbound streams.
339    pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self {
340        self.max_concurrent_streams = num_streams;
341        self
342    }
343}
344
345/// A request/response protocol for some message codec.
346pub struct Behaviour<TCodec>
347where
348    TCodec: Codec + Clone + Send + 'static,
349{
350    /// The supported inbound protocols.
351    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
352    /// The supported outbound protocols.
353    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
354    /// The next (local) request ID.
355    next_outbound_request_id: OutboundRequestId,
356    /// The next (inbound) request ID.
357    next_inbound_request_id: Arc<AtomicU64>,
358    /// The protocol configuration.
359    config: Config,
360    /// The protocol codec for reading and writing requests and responses.
361    codec: TCodec,
362    /// Pending events to return from `poll`.
363    pending_events:
364        VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, OutboundMessage<TCodec>>>,
365    /// The currently connected peers, their pending outbound and inbound responses and their
366    /// known, reachable addresses, if any.
367    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
368    /// Externally managed addresses via `add_address` and `remove_address`.
369    addresses: PeerAddresses,
370    /// Requests that have not yet been sent and are waiting for a connection
371    /// to be established.
372    pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
373}
374
375impl<TCodec> Behaviour<TCodec>
376where
377    TCodec: Codec + Default + Clone + Send + 'static,
378{
379    /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to
380    /// construct the codec.
381    pub fn new<I>(protocols: I, cfg: Config) -> Self
382    where
383        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
384    {
385        Self::with_codec(TCodec::default(), protocols, cfg)
386    }
387}
388
389impl<TCodec> Behaviour<TCodec>
390where
391    TCodec: Codec + Clone + Send + 'static,
392{
393    /// Creates a new `Behaviour` for the given
394    /// protocols, codec and configuration.
395    pub fn with_codec<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
396    where
397        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
398    {
399        let mut inbound_protocols = SmallVec::new();
400        let mut outbound_protocols = SmallVec::new();
401        for (p, s) in protocols {
402            if s.inbound() {
403                inbound_protocols.push(p.clone());
404            }
405            if s.outbound() {
406                outbound_protocols.push(p.clone());
407            }
408        }
409        Behaviour {
410            inbound_protocols,
411            outbound_protocols,
412            next_outbound_request_id: OutboundRequestId(1),
413            next_inbound_request_id: Arc::new(AtomicU64::new(1)),
414            config: cfg,
415            codec,
416            pending_events: VecDeque::new(),
417            connected: HashMap::new(),
418            pending_outbound_requests: HashMap::new(),
419            addresses: PeerAddresses::default(),
420        }
421    }
422
423    /// Initiates sending a request.
424    ///
425    /// If the targeted peer is currently not connected, a dialing
426    /// attempt is initiated and the request is sent as soon as a
427    /// connection is established.
428    ///
429    /// > **Note**: In order for such a dialing attempt to succeed,
430    /// > the `RequestResponse` protocol must either be embedded
431    /// > in another `NetworkBehaviour` that provides peer and
432    /// > address discovery, or known addresses of peers must be
433    /// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
434    /// > Addresses are automatically removed when dial attempts
435    /// > to them fail.
436    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
437        let request_id = self.next_outbound_request_id();
438        let request = OutboundMessage {
439            request_id,
440            request,
441            protocols: self.outbound_protocols.clone(),
442        };
443
444        if let Some(request) = self.try_send_request(peer, request) {
445            self.pending_events.push_back(ToSwarm::Dial {
446                opts: DialOpts::peer_id(*peer).build(),
447            });
448            self.pending_outbound_requests
449                .entry(*peer)
450                .or_default()
451                .push(request);
452        }
453
454        request_id
455    }
456
457    /// Initiates sending a response to an inbound request.
458    ///
459    /// If the [`ResponseChannel`] is already closed due to a timeout or the
460    /// connection being closed, the response is returned as an `Err` for
461    /// further handling. Once the response has been successfully sent on the
462    /// corresponding connection, [`Event::ResponseSent`] is
463    /// emitted. In all other cases [`Event::InboundFailure`]
464    /// will be or has been emitted.
465    ///
466    /// The provided `ResponseChannel` is obtained from an inbound
467    /// [`Message::Request`].
468    pub fn send_response(
469        &mut self,
470        ch: ResponseChannel<TCodec::Response>,
471        rs: TCodec::Response,
472    ) -> Result<(), TCodec::Response> {
473        ch.sender.send(rs)
474    }
475
476    /// Adds a known address for a peer that can be used for
477    /// dialing attempts by the `Swarm`, i.e. is returned
478    /// by [`NetworkBehaviour::handle_pending_outbound_connection`].
479    ///
480    /// Addresses added in this way are only removed by `remove_address`.
481    ///
482    /// Returns true if the address was added, false otherwise (i.e. if the
483    /// address is already in the list).
484    #[deprecated(note = "Use `Swarm::add_peer_address` instead.")]
485    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool {
486        self.addresses.add(*peer, address)
487    }
488
489    /// Removes an address of a peer previously added via [`Behaviour::add_address`].
490    #[deprecated(note = "Will be removed with the next breaking release and won't be replaced.")]
491    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
492        self.addresses.remove(peer, address);
493    }
494
495    /// Checks whether a peer is currently connected.
496    pub fn is_connected(&self, peer: &PeerId) -> bool {
497        if let Some(connections) = self.connected.get(peer) {
498            !connections.is_empty()
499        } else {
500            false
501        }
502    }
503
504    /// Checks whether an outbound request to the peer with the provided
505    /// [`PeerId`] initiated by [`Behaviour::send_request`] is still
506    /// pending, i.e. waiting for a response.
507    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool {
508        // Check if request is already sent on established connection.
509        let est_conn = self
510            .connected
511            .get(peer)
512            .map(|cs| {
513                cs.iter()
514                    .any(|c| c.pending_outbound_responses.contains(request_id))
515            })
516            .unwrap_or(false);
517        // Check if request is still pending to be sent.
518        let pen_conn = self
519            .pending_outbound_requests
520            .get(peer)
521            .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
522            .unwrap_or(false);
523
524        est_conn || pen_conn
525    }
526
527    /// Checks whether an inbound request from the peer with the provided
528    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
529    /// node through [`Behaviour::send_response`].
530    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool {
531        self.connected
532            .get(peer)
533            .map(|cs| {
534                cs.iter()
535                    .any(|c| c.pending_inbound_responses.contains(request_id))
536            })
537            .unwrap_or(false)
538    }
539
540    /// Returns the next outbound request ID.
541    fn next_outbound_request_id(&mut self) -> OutboundRequestId {
542        let request_id = self.next_outbound_request_id;
543        self.next_outbound_request_id.0 += 1;
544        request_id
545    }
546
547    /// Tries to send a request by queueing an appropriate event to be
548    /// emitted to the `Swarm`. If the peer is not currently connected,
549    /// the given request is return unchanged.
550    fn try_send_request(
551        &mut self,
552        peer: &PeerId,
553        request: OutboundMessage<TCodec>,
554    ) -> Option<OutboundMessage<TCodec>> {
555        if let Some(connections) = self.connected.get_mut(peer) {
556            if connections.is_empty() {
557                return Some(request);
558            }
559            let ix = (request.request_id.0 as usize) % connections.len();
560            let conn = &mut connections[ix];
561            conn.pending_outbound_responses.insert(request.request_id);
562            self.pending_events.push_back(ToSwarm::NotifyHandler {
563                peer_id: *peer,
564                handler: NotifyHandler::One(conn.id),
565                event: request,
566            });
567            None
568        } else {
569            Some(request)
570        }
571    }
572
573    /// Remove pending outbound response for the given peer and connection.
574    ///
575    /// Returns `true` if the provided connection to the given peer is still
576    /// alive and the [`OutboundRequestId`] was previously present and is now removed.
577    /// Returns `false` otherwise.
578    fn remove_pending_outbound_response(
579        &mut self,
580        peer: &PeerId,
581        connection_id: ConnectionId,
582        request: OutboundRequestId,
583    ) -> bool {
584        self.get_connection_mut(peer, connection_id)
585            .map(|c| c.pending_outbound_responses.remove(&request))
586            .unwrap_or(false)
587    }
588
589    /// Remove pending inbound response for the given peer and connection.
590    ///
591    /// Returns `true` if the provided connection to the given peer is still
592    /// alive and the [`InboundRequestId`] was previously present and is now removed.
593    /// Returns `false` otherwise.
594    fn remove_pending_inbound_response(
595        &mut self,
596        peer: &PeerId,
597        connection_id: ConnectionId,
598        request: InboundRequestId,
599    ) -> bool {
600        self.get_connection_mut(peer, connection_id)
601            .map(|c| c.pending_inbound_responses.remove(&request))
602            .unwrap_or(false)
603    }
604
605    /// Returns a mutable reference to the connection in `self.connected`
606    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
607    fn get_connection_mut(
608        &mut self,
609        peer: &PeerId,
610        connection_id: ConnectionId,
611    ) -> Option<&mut Connection> {
612        self.connected
613            .get_mut(peer)
614            .and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id))
615    }
616
617    fn on_address_change(
618        &mut self,
619        AddressChange {
620            peer_id,
621            connection_id,
622            new,
623            ..
624        }: AddressChange,
625    ) {
626        let new_address = match new {
627            ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
628            ConnectedPoint::Listener { .. } => None,
629        };
630        let connections = self
631            .connected
632            .get_mut(&peer_id)
633            .expect("Address change can only happen on an established connection.");
634
635        let connection = connections
636            .iter_mut()
637            .find(|c| c.id == connection_id)
638            .expect("Address change can only happen on an established connection.");
639        connection.remote_address = new_address;
640    }
641
642    fn on_connection_closed(
643        &mut self,
644        ConnectionClosed {
645            peer_id,
646            connection_id,
647            remaining_established,
648            ..
649        }: ConnectionClosed,
650    ) {
651        let connections = self
652            .connected
653            .get_mut(&peer_id)
654            .expect("Expected some established connection to peer before closing.");
655
656        let connection = connections
657            .iter()
658            .position(|c| c.id == connection_id)
659            .map(|p: usize| connections.remove(p))
660            .expect("Expected connection to be established before closing.");
661
662        debug_assert_eq!(connections.is_empty(), remaining_established == 0);
663        if connections.is_empty() {
664            self.connected.remove(&peer_id);
665        }
666
667        for request_id in connection.pending_inbound_responses {
668            self.pending_events
669                .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
670                    peer: peer_id,
671                    connection_id,
672                    request_id,
673                    error: InboundFailure::ConnectionClosed,
674                }));
675        }
676
677        for request_id in connection.pending_outbound_responses {
678            self.pending_events
679                .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
680                    peer: peer_id,
681                    connection_id,
682                    request_id,
683                    error: OutboundFailure::ConnectionClosed,
684                }));
685        }
686    }
687
688    fn on_dial_failure(
689        &mut self,
690        DialFailure {
691            peer_id,
692            connection_id,
693            ..
694        }: DialFailure,
695    ) {
696        if let Some(peer) = peer_id {
697            // If there are pending outgoing requests when a dial failure occurs,
698            // it is implied that we are not connected to the peer, since pending
699            // outgoing requests are drained when a connection is established and
700            // only created when a peer is not connected when a request is made.
701            // Thus these requests must be considered failed, even if there is
702            // another, concurrent dialing attempt ongoing.
703            if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
704                for request in pending {
705                    self.pending_events
706                        .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
707                            peer,
708                            connection_id,
709                            request_id: request.request_id,
710                            error: OutboundFailure::DialFailure,
711                        }));
712                }
713            }
714        }
715    }
716
717    /// Preloads a new [`Handler`] with requests that are
718    /// waiting to be sent to the newly connected peer.
719    fn preload_new_handler(
720        &mut self,
721        handler: &mut Handler<TCodec>,
722        peer: PeerId,
723        connection_id: ConnectionId,
724        remote_address: Option<Multiaddr>,
725    ) {
726        let mut connection = Connection::new(connection_id, remote_address);
727
728        if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
729            for request in pending_requests {
730                connection
731                    .pending_outbound_responses
732                    .insert(request.request_id);
733                handler.on_behaviour_event(request);
734            }
735        }
736
737        self.connected.entry(peer).or_default().push(connection);
738    }
739}
740
741impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
742where
743    TCodec: Codec + Send + Clone + 'static,
744{
745    type ConnectionHandler = Handler<TCodec>;
746    type ToSwarm = Event<TCodec::Request, TCodec::Response>;
747
748    fn handle_established_inbound_connection(
749        &mut self,
750        connection_id: ConnectionId,
751        peer: PeerId,
752        _: &Multiaddr,
753        _: &Multiaddr,
754    ) -> Result<THandler<Self>, ConnectionDenied> {
755        let mut handler = Handler::new(
756            self.inbound_protocols.clone(),
757            self.codec.clone(),
758            self.config.request_timeout,
759            self.next_inbound_request_id.clone(),
760            self.config.max_concurrent_streams,
761        );
762
763        self.preload_new_handler(&mut handler, peer, connection_id, None);
764
765        Ok(handler)
766    }
767
768    fn handle_pending_outbound_connection(
769        &mut self,
770        _connection_id: ConnectionId,
771        maybe_peer: Option<PeerId>,
772        _addresses: &[Multiaddr],
773        _effective_role: Endpoint,
774    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
775        let peer = match maybe_peer {
776            None => return Ok(vec![]),
777            Some(peer) => peer,
778        };
779
780        let mut addresses = Vec::new();
781        if let Some(connections) = self.connected.get(&peer) {
782            addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
783        }
784
785        let cached_addrs = self.addresses.get(&peer);
786        addresses.extend(cached_addrs);
787
788        Ok(addresses)
789    }
790
791    fn handle_established_outbound_connection(
792        &mut self,
793        connection_id: ConnectionId,
794        peer: PeerId,
795        remote_address: &Multiaddr,
796        _: Endpoint,
797        _: PortUse,
798    ) -> Result<THandler<Self>, ConnectionDenied> {
799        let mut handler = Handler::new(
800            self.inbound_protocols.clone(),
801            self.codec.clone(),
802            self.config.request_timeout,
803            self.next_inbound_request_id.clone(),
804            self.config.max_concurrent_streams,
805        );
806
807        self.preload_new_handler(
808            &mut handler,
809            peer,
810            connection_id,
811            Some(remote_address.clone()),
812        );
813
814        Ok(handler)
815    }
816
817    fn on_swarm_event(&mut self, event: FromSwarm) {
818        self.addresses.on_swarm_event(&event);
819        match event {
820            FromSwarm::ConnectionEstablished(_) => {}
821            FromSwarm::ConnectionClosed(connection_closed) => {
822                self.on_connection_closed(connection_closed)
823            }
824            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
825            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
826            _ => {}
827        }
828    }
829
830    fn on_connection_handler_event(
831        &mut self,
832        peer: PeerId,
833        connection_id: ConnectionId,
834        event: THandlerOutEvent<Self>,
835    ) {
836        match event {
837            handler::Event::Response {
838                request_id,
839                response,
840            } => {
841                let removed =
842                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
843                debug_assert!(
844                    removed,
845                    "Expect request_id to be pending before receiving response.",
846                );
847
848                let message = Message::Response {
849                    request_id,
850                    response,
851                };
852                self.pending_events
853                    .push_back(ToSwarm::GenerateEvent(Event::Message {
854                        peer,
855                        connection_id,
856                        message,
857                    }));
858            }
859            handler::Event::Request {
860                request_id,
861                request,
862                sender,
863            } => match self.get_connection_mut(&peer, connection_id) {
864                Some(connection) => {
865                    let inserted = connection.pending_inbound_responses.insert(request_id);
866                    debug_assert!(inserted, "Expect id of new request to be unknown.");
867
868                    let channel = ResponseChannel { sender };
869                    let message = Message::Request {
870                        request_id,
871                        request,
872                        channel,
873                    };
874                    self.pending_events
875                        .push_back(ToSwarm::GenerateEvent(Event::Message {
876                            peer,
877                            connection_id,
878                            message,
879                        }));
880                }
881                None => {
882                    tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted.");
883                }
884            },
885            handler::Event::ResponseSent(request_id) => {
886                let removed =
887                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
888                debug_assert!(
889                    removed,
890                    "Expect request_id to be pending before response is sent."
891                );
892
893                self.pending_events
894                    .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
895                        peer,
896                        connection_id,
897                        request_id,
898                    }));
899            }
900            handler::Event::ResponseOmission(request_id) => {
901                let removed =
902                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
903                debug_assert!(
904                    removed,
905                    "Expect request_id to be pending before response is omitted.",
906                );
907
908                self.pending_events
909                    .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
910                        peer,
911                        connection_id,
912                        request_id,
913                        error: InboundFailure::ResponseOmission,
914                    }));
915            }
916            handler::Event::OutboundTimeout(request_id) => {
917                let removed =
918                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
919                debug_assert!(
920                    removed,
921                    "Expect request_id to be pending before request times out."
922                );
923
924                self.pending_events
925                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
926                        peer,
927                        connection_id,
928                        request_id,
929                        error: OutboundFailure::Timeout,
930                    }));
931            }
932            handler::Event::OutboundUnsupportedProtocols(request_id) => {
933                let removed =
934                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
935                debug_assert!(
936                    removed,
937                    "Expect request_id to be pending before failing to connect.",
938                );
939
940                self.pending_events
941                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
942                        peer,
943                        connection_id,
944                        request_id,
945                        error: OutboundFailure::UnsupportedProtocols,
946                    }));
947            }
948            handler::Event::OutboundStreamFailed { request_id, error } => {
949                let removed =
950                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
951                debug_assert!(removed, "Expect request_id to be pending upon failure");
952
953                self.pending_events
954                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
955                        peer,
956                        connection_id,
957                        request_id,
958                        error: OutboundFailure::Io(error),
959                    }))
960            }
961            handler::Event::InboundTimeout(request_id) => {
962                let removed =
963                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
964
965                if removed {
966                    self.pending_events
967                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
968                            peer,
969                            connection_id,
970                            request_id,
971                            error: InboundFailure::Timeout,
972                        }));
973                } else {
974                    // This happens when timeout is emitted before `read_request` finishes.
975                    tracing::debug!(
976                        "Inbound request timeout for an unknown request_id ({request_id})"
977                    );
978                }
979            }
980            handler::Event::InboundStreamFailed { request_id, error } => {
981                let removed =
982                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
983
984                if removed {
985                    self.pending_events
986                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
987                            peer,
988                            connection_id,
989                            request_id,
990                            error: InboundFailure::Io(error),
991                        }));
992                } else {
993                    // This happens when `read_request` fails.
994                    tracing::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}");
995                }
996            }
997        }
998    }
999
1000    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
1001    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
1002        if let Some(ev) = self.pending_events.pop_front() {
1003            return Poll::Ready(ev);
1004        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
1005            self.pending_events.shrink_to_fit();
1006        }
1007
1008        Poll::Pending
1009    }
1010}
1011
1012/// Internal threshold for when to shrink the capacity
1013/// of empty queues. If the capacity of an empty queue
1014/// exceeds this threshold, the associated memory is
1015/// released.
1016const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
1017
1018/// Internal information tracked for an established connection.
1019struct Connection {
1020    id: ConnectionId,
1021    remote_address: Option<Multiaddr>,
1022    /// Pending outbound responses where corresponding inbound requests have
1023    /// been received on this connection and emitted via `poll` but have not yet
1024    /// been answered.
1025    pending_outbound_responses: HashSet<OutboundRequestId>,
1026    /// Pending inbound responses for previously sent requests on this
1027    /// connection.
1028    pending_inbound_responses: HashSet<InboundRequestId>,
1029}
1030
1031impl Connection {
1032    fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
1033        Self {
1034            id,
1035            remote_address,
1036            pending_outbound_responses: Default::default(),
1037            pending_inbound_responses: Default::default(),
1038        }
1039    }
1040}