libp2p_relay/
behaviour.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`NetworkBehaviour`] to act as a circuit relay v2 **relay**.
22
23pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26    collections::{hash_map, HashMap, HashSet, VecDeque},
27    num::NonZeroU32,
28    ops::Add,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37    behaviour::{ConnectionClosed, FromSwarm},
38    dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39    THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44    behaviour::handler::Handler,
45    multiaddr_ext::MultiaddrExt,
46    proto,
47    protocol::{inbound_hop, outbound_stop},
48};
49
50/// Configuration for the relay [`Behaviour`].
51///
52/// # Panics
53///
54/// [`Config::max_circuit_duration`] may not exceed [`u32::MAX`].
55pub struct Config {
56    pub max_reservations: usize,
57    pub max_reservations_per_peer: usize,
58    pub reservation_duration: Duration,
59    pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61    pub max_circuits: usize,
62    pub max_circuits_per_peer: usize,
63    pub max_circuit_duration: Duration,
64    pub max_circuit_bytes: u64,
65    pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69    pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70        self.reservation_rate_limiters
71            .push(rate_limiter::new_per_peer(
72                rate_limiter::GenericRateLimiterConfig { limit, interval },
73            ));
74        self
75    }
76
77    pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78        self.circuit_src_rate_limiters
79            .push(rate_limiter::new_per_peer(
80                rate_limiter::GenericRateLimiterConfig { limit, interval },
81            ));
82        self
83    }
84
85    pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86        self.reservation_rate_limiters
87            .push(rate_limiter::new_per_ip(
88                rate_limiter::GenericRateLimiterConfig { limit, interval },
89            ));
90        self
91    }
92
93    pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94        self.circuit_src_rate_limiters
95            .push(rate_limiter::new_per_ip(
96                rate_limiter::GenericRateLimiterConfig { limit, interval },
97            ));
98        self
99    }
100}
101
102impl std::fmt::Debug for Config {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("Config")
105            .field("max_reservations", &self.max_reservations)
106            .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107            .field("reservation_duration", &self.reservation_duration)
108            .field(
109                "reservation_rate_limiters",
110                &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111            )
112            .field("max_circuits", &self.max_circuits)
113            .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114            .field("max_circuit_duration", &self.max_circuit_duration)
115            .field("max_circuit_bytes", &self.max_circuit_bytes)
116            .field(
117                "circuit_src_rate_limiters",
118                &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119            )
120            .finish()
121    }
122}
123
124impl Default for Config {
125    fn default() -> Self {
126        let reservation_rate_limiters = vec![
127            // For each peer ID one reservation every 2 minutes with up
128            // to 30 reservations per hour.
129            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130                limit: NonZeroU32::new(30).expect("30 > 0"),
131                interval: Duration::from_secs(60 * 2),
132            }),
133            // For each IP address one reservation every minute with up
134            // to 60 reservations per hour.
135            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136                limit: NonZeroU32::new(60).expect("60 > 0"),
137                interval: Duration::from_secs(60),
138            }),
139        ];
140
141        let circuit_src_rate_limiters = vec![
142            // For each source peer ID one circuit every 2 minute with up to 30 circuits per hour.
143            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144                limit: NonZeroU32::new(30).expect("30 > 0"),
145                interval: Duration::from_secs(60 * 2),
146            }),
147            // For each source IP address one circuit every minute with up to 60 circuits per hour.
148            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149                limit: NonZeroU32::new(60).expect("60 > 0"),
150                interval: Duration::from_secs(60),
151            }),
152        ];
153
154        Config {
155            max_reservations: 128,
156            max_reservations_per_peer: 4,
157            reservation_duration: Duration::from_secs(60 * 60),
158            reservation_rate_limiters,
159
160            max_circuits: 16,
161            max_circuits_per_peer: 4,
162            max_circuit_duration: Duration::from_secs(2 * 60),
163            max_circuit_bytes: 1 << 17, // 128 kibibyte
164            circuit_src_rate_limiters,
165        }
166    }
167}
168
169/// The events produced by the relay `Behaviour`.
170#[derive(Debug)]
171pub enum Event {
172    /// An inbound reservation request has been accepted.
173    ReservationReqAccepted {
174        src_peer_id: PeerId,
175        /// Indicates whether the request replaces an existing reservation.
176        renewed: bool,
177    },
178    /// Accepting an inbound reservation request failed.
179    #[deprecated(
180        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181    )]
182    ReservationReqAcceptFailed {
183        src_peer_id: PeerId,
184        error: inbound_hop::Error,
185    },
186    /// An inbound reservation request has been denied.
187    ReservationReqDenied { src_peer_id: PeerId },
188    /// Denying an inbound reservation request has failed.
189    #[deprecated(
190        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
191    )]
192    ReservationReqDenyFailed {
193        src_peer_id: PeerId,
194        error: inbound_hop::Error,
195    },
196    /// A reservation has been closed.
197    ReservationClosed { src_peer_id: PeerId },
198    /// An inbound reservation has timed out.
199    ReservationTimedOut { src_peer_id: PeerId },
200    /// An inbound circuit request has been denied.
201    CircuitReqDenied {
202        src_peer_id: PeerId,
203        dst_peer_id: PeerId,
204    },
205    /// Denying an inbound circuit request failed.
206    #[deprecated(
207        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
208    )]
209    CircuitReqDenyFailed {
210        src_peer_id: PeerId,
211        dst_peer_id: PeerId,
212        error: inbound_hop::Error,
213    },
214    /// An inbound circuit request has been accepted.
215    CircuitReqAccepted {
216        src_peer_id: PeerId,
217        dst_peer_id: PeerId,
218    },
219    /// An outbound connect for an inbound circuit request failed.
220    #[deprecated(
221        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
222    )]
223    CircuitReqOutboundConnectFailed {
224        src_peer_id: PeerId,
225        dst_peer_id: PeerId,
226        error: outbound_stop::Error,
227    },
228    /// Accepting an inbound circuit request failed.
229    #[deprecated(
230        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
231    )]
232    CircuitReqAcceptFailed {
233        src_peer_id: PeerId,
234        dst_peer_id: PeerId,
235        error: inbound_hop::Error,
236    },
237    /// An inbound circuit has closed.
238    CircuitClosed {
239        src_peer_id: PeerId,
240        dst_peer_id: PeerId,
241        error: Option<std::io::Error>,
242    },
243}
244
245/// [`NetworkBehaviour`] implementation of the relay server
246/// functionality of the circuit relay v2 protocol.
247pub struct Behaviour {
248    config: Config,
249
250    local_peer_id: PeerId,
251
252    reservations: HashMap<PeerId, HashSet<ConnectionId>>,
253    circuits: CircuitsTracker,
254
255    /// Queue of actions to return when polled.
256    queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
257
258    external_addresses: ExternalAddresses,
259}
260
261impl Behaviour {
262    pub fn new(local_peer_id: PeerId, config: Config) -> Self {
263        Self {
264            config,
265            local_peer_id,
266            reservations: Default::default(),
267            circuits: Default::default(),
268            queued_actions: Default::default(),
269            external_addresses: Default::default(),
270        }
271    }
272
273    fn on_connection_closed(
274        &mut self,
275        ConnectionClosed {
276            peer_id,
277            connection_id,
278            ..
279        }: ConnectionClosed,
280    ) {
281        if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
282            if peer.get_mut().remove(&connection_id) {
283                self.queued_actions
284                    .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
285                        src_peer_id: peer_id,
286                    }));
287            }
288            if peer.get().is_empty() {
289                peer.remove();
290            }
291        }
292
293        for circuit in self
294            .circuits
295            .remove_by_connection(peer_id, connection_id)
296            .iter()
297            // Only emit [`CircuitClosed`] for accepted requests.
298            .filter(|c| matches!(c.status, CircuitStatus::Accepted))
299        {
300            self.queued_actions
301                .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
302                    src_peer_id: circuit.src_peer_id,
303                    dst_peer_id: circuit.dst_peer_id,
304                    error: Some(std::io::ErrorKind::ConnectionAborted.into()),
305                }));
306        }
307    }
308}
309
310impl NetworkBehaviour for Behaviour {
311    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
312    type ToSwarm = Event;
313
314    fn handle_established_inbound_connection(
315        &mut self,
316        _: ConnectionId,
317        _: PeerId,
318        local_addr: &Multiaddr,
319        remote_addr: &Multiaddr,
320    ) -> Result<THandler<Self>, ConnectionDenied> {
321        if local_addr.is_relayed() {
322            // Deny all substreams on relayed connection.
323            return Ok(Either::Right(dummy::ConnectionHandler));
324        }
325
326        Ok(Either::Left(Handler::new(
327            handler::Config {
328                reservation_duration: self.config.reservation_duration,
329                max_circuit_duration: self.config.max_circuit_duration,
330                max_circuit_bytes: self.config.max_circuit_bytes,
331            },
332            ConnectedPoint::Listener {
333                local_addr: local_addr.clone(),
334                send_back_addr: remote_addr.clone(),
335            },
336        )))
337    }
338
339    fn handle_established_outbound_connection(
340        &mut self,
341        _: ConnectionId,
342        _: PeerId,
343        addr: &Multiaddr,
344        role_override: Endpoint,
345        port_use: PortUse,
346    ) -> Result<THandler<Self>, ConnectionDenied> {
347        if addr.is_relayed() {
348            // Deny all substreams on relayed connection.
349            return Ok(Either::Right(dummy::ConnectionHandler));
350        }
351
352        Ok(Either::Left(Handler::new(
353            handler::Config {
354                reservation_duration: self.config.reservation_duration,
355                max_circuit_duration: self.config.max_circuit_duration,
356                max_circuit_bytes: self.config.max_circuit_bytes,
357            },
358            ConnectedPoint::Dialer {
359                address: addr.clone(),
360                role_override,
361                port_use,
362            },
363        )))
364    }
365
366    fn on_swarm_event(&mut self, event: FromSwarm) {
367        self.external_addresses.on_swarm_event(&event);
368
369        if let FromSwarm::ConnectionClosed(connection_closed) = event {
370            self.on_connection_closed(connection_closed)
371        }
372    }
373
374    fn on_connection_handler_event(
375        &mut self,
376        event_source: PeerId,
377        connection: ConnectionId,
378        event: THandlerOutEvent<Self>,
379    ) {
380        let event = match event {
381            Either::Left(e) => e,
382            Either::Right(v) => libp2p_core::util::unreachable(v),
383        };
384
385        match event {
386            handler::Event::ReservationReqReceived {
387                inbound_reservation_req,
388                endpoint,
389                renewed,
390            } => {
391                let now = Instant::now();
392
393                assert!(
394                    !endpoint.is_relayed(),
395                    "`dummy::ConnectionHandler` handles relayed connections. It \
396                     denies all inbound substreams."
397                );
398
399                let action = if
400                // Deny if it is a new reservation and exceeds
401                // `max_reservations_per_peer`.
402                (!renewed
403                    && self
404                        .reservations
405                        .get(&event_source)
406                        .map(|cs| cs.len())
407                        .unwrap_or(0)
408                        > self.config.max_reservations_per_peer)
409                    // Deny if it exceeds `max_reservations`.
410                    || self
411                        .reservations
412                        .values()
413                        .map(|cs| cs.len())
414                        .sum::<usize>()
415                        >= self.config.max_reservations
416                    // Deny if it exceeds the allowed rate of reservations.
417                    || !self
418                        .config
419                        .reservation_rate_limiters
420                        .iter_mut()
421                        .all(|limiter| {
422                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
423                        }) {
424                    ToSwarm::NotifyHandler {
425                        handler: NotifyHandler::One(connection),
426                        peer_id: event_source,
427                        event: Either::Left(handler::In::DenyReservationReq {
428                            inbound_reservation_req,
429                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
430                        }),
431                    }
432                } else {
433                    // Accept reservation.
434                    self.reservations
435                        .entry(event_source)
436                        .or_default()
437                        .insert(connection);
438
439                    ToSwarm::NotifyHandler {
440                        handler: NotifyHandler::One(connection),
441                        peer_id: event_source,
442                        event: Either::Left(handler::In::AcceptReservationReq {
443                            inbound_reservation_req,
444                            addrs: self
445                                .external_addresses
446                                .iter()
447                                .cloned()
448                                // Add local peer ID in case it isn't present yet.
449                                .filter_map(|a| match a.iter().last()? {
450                                    Protocol::P2p(_) => Some(a),
451                                    _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
452                                })
453                                .collect(),
454                        }),
455                    }
456                };
457
458                self.queued_actions.push_back(action);
459            }
460            handler::Event::ReservationReqAccepted { renewed } => {
461                // Ensure local eventual consistent reservation state matches handler (source of
462                // truth).
463                self.reservations
464                    .entry(event_source)
465                    .or_default()
466                    .insert(connection);
467
468                self.queued_actions.push_back(ToSwarm::GenerateEvent(
469                    Event::ReservationReqAccepted {
470                        src_peer_id: event_source,
471                        renewed,
472                    },
473                ));
474            }
475            handler::Event::ReservationReqAcceptFailed { error } => {
476                #[allow(deprecated)]
477                self.queued_actions.push_back(ToSwarm::GenerateEvent(
478                    Event::ReservationReqAcceptFailed {
479                        src_peer_id: event_source,
480                        error,
481                    },
482                ));
483            }
484            handler::Event::ReservationReqDenied {} => {
485                self.queued_actions.push_back(ToSwarm::GenerateEvent(
486                    Event::ReservationReqDenied {
487                        src_peer_id: event_source,
488                    },
489                ));
490            }
491            handler::Event::ReservationReqDenyFailed { error } => {
492                #[allow(deprecated)]
493                self.queued_actions.push_back(ToSwarm::GenerateEvent(
494                    Event::ReservationReqDenyFailed {
495                        src_peer_id: event_source,
496                        error,
497                    },
498                ));
499            }
500            handler::Event::ReservationTimedOut {} => {
501                match self.reservations.entry(event_source) {
502                    hash_map::Entry::Occupied(mut peer) => {
503                        peer.get_mut().remove(&connection);
504                        if peer.get().is_empty() {
505                            peer.remove();
506                        }
507                    }
508                    hash_map::Entry::Vacant(_) => {
509                        unreachable!(
510                            "Expect to track timed out reservation with peer {:?} on connection {:?}",
511                            event_source,
512                            connection,
513                        );
514                    }
515                }
516
517                self.queued_actions
518                    .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
519                        src_peer_id: event_source,
520                    }));
521            }
522            handler::Event::CircuitReqReceived {
523                inbound_circuit_req,
524                endpoint,
525            } => {
526                let now = Instant::now();
527
528                assert!(
529                    !endpoint.is_relayed(),
530                    "`dummy::ConnectionHandler` handles relayed connections. It \
531                     denies all inbound substreams."
532                );
533
534                let action = if self.circuits.num_circuits_of_peer(event_source)
535                    > self.config.max_circuits_per_peer
536                    || self.circuits.len() >= self.config.max_circuits
537                    || !self
538                        .config
539                        .circuit_src_rate_limiters
540                        .iter_mut()
541                        .all(|limiter| {
542                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
543                        }) {
544                    // Deny circuit exceeding limits.
545                    ToSwarm::NotifyHandler {
546                        handler: NotifyHandler::One(connection),
547                        peer_id: event_source,
548                        event: Either::Left(handler::In::DenyCircuitReq {
549                            circuit_id: None,
550                            inbound_circuit_req,
551                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
552                        }),
553                    }
554                } else if let Some(dst_conn) = self
555                    .reservations
556                    .get(&inbound_circuit_req.dst())
557                    .and_then(|cs| cs.iter().next())
558                {
559                    // Accept circuit request if reservation present.
560                    let circuit_id = self.circuits.insert(Circuit {
561                        status: CircuitStatus::Accepting,
562                        src_peer_id: event_source,
563                        src_connection_id: connection,
564                        dst_peer_id: inbound_circuit_req.dst(),
565                        dst_connection_id: *dst_conn,
566                    });
567
568                    ToSwarm::NotifyHandler {
569                        handler: NotifyHandler::One(*dst_conn),
570                        peer_id: event_source,
571                        event: Either::Left(handler::In::NegotiateOutboundConnect {
572                            circuit_id,
573                            inbound_circuit_req,
574                            src_peer_id: event_source,
575                            src_connection_id: connection,
576                        }),
577                    }
578                } else {
579                    // Deny circuit request if no reservation present.
580                    ToSwarm::NotifyHandler {
581                        handler: NotifyHandler::One(connection),
582                        peer_id: event_source,
583                        event: Either::Left(handler::In::DenyCircuitReq {
584                            circuit_id: None,
585                            inbound_circuit_req,
586                            status: proto::Status::NO_RESERVATION,
587                        }),
588                    }
589                };
590                self.queued_actions.push_back(action);
591            }
592            handler::Event::CircuitReqDenied {
593                circuit_id,
594                dst_peer_id,
595            } => {
596                if let Some(circuit_id) = circuit_id {
597                    self.circuits.remove(circuit_id);
598                }
599
600                self.queued_actions
601                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
602                        src_peer_id: event_source,
603                        dst_peer_id,
604                    }));
605            }
606            handler::Event::CircuitReqDenyFailed {
607                circuit_id,
608                dst_peer_id,
609                error,
610            } => {
611                if let Some(circuit_id) = circuit_id {
612                    self.circuits.remove(circuit_id);
613                }
614
615                #[allow(deprecated)]
616                self.queued_actions.push_back(ToSwarm::GenerateEvent(
617                    Event::CircuitReqDenyFailed {
618                        src_peer_id: event_source,
619                        dst_peer_id,
620                        error,
621                    },
622                ));
623            }
624            handler::Event::OutboundConnectNegotiated {
625                circuit_id,
626                src_peer_id,
627                src_connection_id,
628                inbound_circuit_req,
629                dst_stream,
630                dst_pending_data,
631            } => {
632                self.queued_actions.push_back(ToSwarm::NotifyHandler {
633                    handler: NotifyHandler::One(src_connection_id),
634                    peer_id: src_peer_id,
635                    event: Either::Left(handler::In::AcceptAndDriveCircuit {
636                        circuit_id,
637                        dst_peer_id: event_source,
638                        inbound_circuit_req,
639                        dst_stream,
640                        dst_pending_data,
641                    }),
642                });
643            }
644            handler::Event::OutboundConnectNegotiationFailed {
645                circuit_id,
646                src_peer_id,
647                src_connection_id,
648                inbound_circuit_req,
649                status,
650                error,
651            } => {
652                self.queued_actions.push_back(ToSwarm::NotifyHandler {
653                    handler: NotifyHandler::One(src_connection_id),
654                    peer_id: src_peer_id,
655                    event: Either::Left(handler::In::DenyCircuitReq {
656                        circuit_id: Some(circuit_id),
657                        inbound_circuit_req,
658                        status,
659                    }),
660                });
661                #[allow(deprecated)]
662                self.queued_actions.push_back(ToSwarm::GenerateEvent(
663                    Event::CircuitReqOutboundConnectFailed {
664                        src_peer_id,
665                        dst_peer_id: event_source,
666                        error,
667                    },
668                ));
669            }
670            handler::Event::CircuitReqAccepted {
671                dst_peer_id,
672                circuit_id,
673            } => {
674                self.circuits.accepted(circuit_id);
675                self.queued_actions
676                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
677                        src_peer_id: event_source,
678                        dst_peer_id,
679                    }));
680            }
681            handler::Event::CircuitReqAcceptFailed {
682                dst_peer_id,
683                circuit_id,
684                error,
685            } => {
686                self.circuits.remove(circuit_id);
687                #[allow(deprecated)]
688                self.queued_actions.push_back(ToSwarm::GenerateEvent(
689                    Event::CircuitReqAcceptFailed {
690                        src_peer_id: event_source,
691                        dst_peer_id,
692                        error,
693                    },
694                ));
695            }
696            handler::Event::CircuitClosed {
697                dst_peer_id,
698                circuit_id,
699                error,
700            } => {
701                self.circuits.remove(circuit_id);
702
703                self.queued_actions
704                    .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
705                        src_peer_id: event_source,
706                        dst_peer_id,
707                        error,
708                    }));
709            }
710        }
711    }
712
713    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
714    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
715        if let Some(to_swarm) = self.queued_actions.pop_front() {
716            return Poll::Ready(to_swarm);
717        }
718
719        Poll::Pending
720    }
721}
722
723#[derive(Default)]
724struct CircuitsTracker {
725    next_id: CircuitId,
726    circuits: HashMap<CircuitId, Circuit>,
727}
728
729impl CircuitsTracker {
730    fn len(&self) -> usize {
731        self.circuits.len()
732    }
733
734    fn insert(&mut self, circuit: Circuit) -> CircuitId {
735        let id = self.next_id;
736        self.next_id = self.next_id + 1;
737
738        self.circuits.insert(id, circuit);
739
740        id
741    }
742
743    fn accepted(&mut self, circuit_id: CircuitId) {
744        if let Some(c) = self.circuits.get_mut(&circuit_id) {
745            c.status = CircuitStatus::Accepted;
746        };
747    }
748
749    fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
750        self.circuits.remove(&circuit_id)
751    }
752
753    fn remove_by_connection(
754        &mut self,
755        peer_id: PeerId,
756        connection_id: ConnectionId,
757    ) -> Vec<Circuit> {
758        let mut removed = vec![];
759
760        self.circuits.retain(|_circuit_id, circuit| {
761            let is_src =
762                circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
763            let is_dst =
764                circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
765
766            if is_src || is_dst {
767                removed.push(circuit.clone());
768                // Remove circuit from HashMap.
769                false
770            } else {
771                // Retain circuit in HashMap.
772                true
773            }
774        });
775
776        removed
777    }
778
779    fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
780        self.circuits
781            .iter()
782            .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
783            .count()
784    }
785}
786
787#[derive(Clone)]
788struct Circuit {
789    src_peer_id: PeerId,
790    src_connection_id: ConnectionId,
791    dst_peer_id: PeerId,
792    dst_connection_id: ConnectionId,
793    status: CircuitStatus,
794}
795
796#[derive(Clone)]
797enum CircuitStatus {
798    Accepting,
799    Accepted,
800}
801
802#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
803pub struct CircuitId(u64);
804
805impl Add<u64> for CircuitId {
806    type Output = CircuitId;
807
808    fn add(self, rhs: u64) -> Self {
809        CircuitId(self.0 + rhs)
810    }
811}