libp2p_relay/
behaviour.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`NetworkBehaviour`] to act as a circuit relay v2 **relay**.
22
23pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26    collections::{hash_map, HashMap, HashSet, VecDeque},
27    num::NonZeroU32,
28    ops::Add,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37    behaviour::{ConnectionClosed, FromSwarm},
38    dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39    THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44    behaviour::handler::Handler,
45    multiaddr_ext::MultiaddrExt,
46    proto,
47    protocol::{inbound_hop, outbound_stop},
48};
49
50/// Configuration for the relay [`Behaviour`].
51///
52/// # Panics
53///
54/// [`Config::max_circuit_duration`] may not exceed [`u32::MAX`].
55pub struct Config {
56    pub max_reservations: usize,
57    pub max_reservations_per_peer: usize,
58    pub reservation_duration: Duration,
59    pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61    pub max_circuits: usize,
62    pub max_circuits_per_peer: usize,
63    pub max_circuit_duration: Duration,
64    pub max_circuit_bytes: u64,
65    pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69    pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70        self.reservation_rate_limiters
71            .push(rate_limiter::new_per_peer(
72                rate_limiter::GenericRateLimiterConfig { limit, interval },
73            ));
74        self
75    }
76
77    pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78        self.circuit_src_rate_limiters
79            .push(rate_limiter::new_per_peer(
80                rate_limiter::GenericRateLimiterConfig { limit, interval },
81            ));
82        self
83    }
84
85    pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86        self.reservation_rate_limiters
87            .push(rate_limiter::new_per_ip(
88                rate_limiter::GenericRateLimiterConfig { limit, interval },
89            ));
90        self
91    }
92
93    pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94        self.circuit_src_rate_limiters
95            .push(rate_limiter::new_per_ip(
96                rate_limiter::GenericRateLimiterConfig { limit, interval },
97            ));
98        self
99    }
100}
101
102impl std::fmt::Debug for Config {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("Config")
105            .field("max_reservations", &self.max_reservations)
106            .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107            .field("reservation_duration", &self.reservation_duration)
108            .field(
109                "reservation_rate_limiters",
110                &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111            )
112            .field("max_circuits", &self.max_circuits)
113            .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114            .field("max_circuit_duration", &self.max_circuit_duration)
115            .field("max_circuit_bytes", &self.max_circuit_bytes)
116            .field(
117                "circuit_src_rate_limiters",
118                &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119            )
120            .finish()
121    }
122}
123
124impl Default for Config {
125    fn default() -> Self {
126        let reservation_rate_limiters = vec![
127            // For each peer ID one reservation every 2 minutes with up
128            // to 30 reservations per hour.
129            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130                limit: NonZeroU32::new(30).expect("30 > 0"),
131                interval: Duration::from_secs(60 * 2),
132            }),
133            // For each IP address one reservation every minute with up
134            // to 60 reservations per hour.
135            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136                limit: NonZeroU32::new(60).expect("60 > 0"),
137                interval: Duration::from_secs(60),
138            }),
139        ];
140
141        let circuit_src_rate_limiters = vec![
142            // For each source peer ID one circuit every 2 minute with up to 30 circuits per hour.
143            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144                limit: NonZeroU32::new(30).expect("30 > 0"),
145                interval: Duration::from_secs(60 * 2),
146            }),
147            // For each source IP address one circuit every minute with up to 60 circuits per hour.
148            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149                limit: NonZeroU32::new(60).expect("60 > 0"),
150                interval: Duration::from_secs(60),
151            }),
152        ];
153
154        Config {
155            max_reservations: 128,
156            max_reservations_per_peer: 4,
157            reservation_duration: Duration::from_secs(60 * 60),
158            reservation_rate_limiters,
159
160            max_circuits: 16,
161            max_circuits_per_peer: 4,
162            max_circuit_duration: Duration::from_secs(2 * 60),
163            max_circuit_bytes: 1 << 17, // 128 kibibyte
164            circuit_src_rate_limiters,
165        }
166    }
167}
168
169/// The events produced by the relay `Behaviour`.
170#[derive(Debug)]
171pub enum Event {
172    /// An inbound reservation request has been accepted.
173    ReservationReqAccepted {
174        src_peer_id: PeerId,
175        /// Indicates whether the request replaces an existing reservation.
176        renewed: bool,
177    },
178    /// Accepting an inbound reservation request failed.
179    #[deprecated(
180        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181    )]
182    ReservationReqAcceptFailed {
183        src_peer_id: PeerId,
184        error: inbound_hop::Error,
185    },
186    /// An inbound reservation request has been denied.
187    ReservationReqDenied {
188        src_peer_id: PeerId,
189        status: StatusCode,
190    },
191    /// Denying an inbound reservation request has failed.
192    #[deprecated(
193        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
194    )]
195    ReservationReqDenyFailed {
196        src_peer_id: PeerId,
197        error: inbound_hop::Error,
198    },
199    /// A reservation has been closed.
200    ReservationClosed { src_peer_id: PeerId },
201    /// An inbound reservation has timed out.
202    ReservationTimedOut { src_peer_id: PeerId },
203    /// An inbound circuit request has been denied.
204    CircuitReqDenied {
205        src_peer_id: PeerId,
206        dst_peer_id: PeerId,
207        status: StatusCode,
208    },
209    /// Denying an inbound circuit request failed.
210    #[deprecated(
211        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
212    )]
213    CircuitReqDenyFailed {
214        src_peer_id: PeerId,
215        dst_peer_id: PeerId,
216        error: inbound_hop::Error,
217    },
218    /// An inbound circuit request has been accepted.
219    CircuitReqAccepted {
220        src_peer_id: PeerId,
221        dst_peer_id: PeerId,
222    },
223    /// An outbound connect for an inbound circuit request failed.
224    #[deprecated(
225        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
226    )]
227    CircuitReqOutboundConnectFailed {
228        src_peer_id: PeerId,
229        dst_peer_id: PeerId,
230        error: outbound_stop::Error,
231    },
232    /// Accepting an inbound circuit request failed.
233    #[deprecated(
234        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
235    )]
236    CircuitReqAcceptFailed {
237        src_peer_id: PeerId,
238        dst_peer_id: PeerId,
239        error: inbound_hop::Error,
240    },
241    /// An inbound circuit has closed.
242    CircuitClosed {
243        src_peer_id: PeerId,
244        dst_peer_id: PeerId,
245        error: Option<std::io::Error>,
246    },
247}
248
249/// [`NetworkBehaviour`] implementation of the relay server
250/// functionality of the circuit relay v2 protocol.
251pub struct Behaviour {
252    config: Config,
253
254    local_peer_id: PeerId,
255
256    reservations: HashMap<PeerId, HashSet<ConnectionId>>,
257    circuits: CircuitsTracker,
258
259    /// Queue of actions to return when polled.
260    queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
261
262    external_addresses: ExternalAddresses,
263}
264
265impl Behaviour {
266    pub fn new(local_peer_id: PeerId, config: Config) -> Self {
267        Self {
268            config,
269            local_peer_id,
270            reservations: Default::default(),
271            circuits: Default::default(),
272            queued_actions: Default::default(),
273            external_addresses: Default::default(),
274        }
275    }
276
277    fn on_connection_closed(
278        &mut self,
279        ConnectionClosed {
280            peer_id,
281            connection_id,
282            ..
283        }: ConnectionClosed,
284    ) {
285        if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
286            if peer.get_mut().remove(&connection_id) {
287                self.queued_actions
288                    .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
289                        src_peer_id: peer_id,
290                    }));
291            }
292            if peer.get().is_empty() {
293                peer.remove();
294            }
295        }
296
297        for circuit in self
298            .circuits
299            .remove_by_connection(peer_id, connection_id)
300            .iter()
301            // Only emit [`CircuitClosed`] for accepted requests.
302            .filter(|c| matches!(c.status, CircuitStatus::Accepted))
303        {
304            self.queued_actions
305                .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
306                    src_peer_id: circuit.src_peer_id,
307                    dst_peer_id: circuit.dst_peer_id,
308                    error: Some(std::io::ErrorKind::ConnectionAborted.into()),
309                }));
310        }
311    }
312}
313
314impl NetworkBehaviour for Behaviour {
315    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
316    type ToSwarm = Event;
317
318    fn handle_established_inbound_connection(
319        &mut self,
320        _: ConnectionId,
321        _: PeerId,
322        local_addr: &Multiaddr,
323        remote_addr: &Multiaddr,
324    ) -> Result<THandler<Self>, ConnectionDenied> {
325        if local_addr.is_relayed() {
326            // Deny all substreams on relayed connection.
327            return Ok(Either::Right(dummy::ConnectionHandler));
328        }
329
330        Ok(Either::Left(Handler::new(
331            handler::Config {
332                reservation_duration: self.config.reservation_duration,
333                max_circuit_duration: self.config.max_circuit_duration,
334                max_circuit_bytes: self.config.max_circuit_bytes,
335            },
336            ConnectedPoint::Listener {
337                local_addr: local_addr.clone(),
338                send_back_addr: remote_addr.clone(),
339            },
340        )))
341    }
342
343    fn handle_established_outbound_connection(
344        &mut self,
345        _: ConnectionId,
346        _: PeerId,
347        addr: &Multiaddr,
348        role_override: Endpoint,
349        port_use: PortUse,
350    ) -> Result<THandler<Self>, ConnectionDenied> {
351        if addr.is_relayed() {
352            // Deny all substreams on relayed connection.
353            return Ok(Either::Right(dummy::ConnectionHandler));
354        }
355
356        Ok(Either::Left(Handler::new(
357            handler::Config {
358                reservation_duration: self.config.reservation_duration,
359                max_circuit_duration: self.config.max_circuit_duration,
360                max_circuit_bytes: self.config.max_circuit_bytes,
361            },
362            ConnectedPoint::Dialer {
363                address: addr.clone(),
364                role_override,
365                port_use,
366            },
367        )))
368    }
369
370    fn on_swarm_event(&mut self, event: FromSwarm) {
371        self.external_addresses.on_swarm_event(&event);
372
373        if let FromSwarm::ConnectionClosed(connection_closed) = event {
374            self.on_connection_closed(connection_closed)
375        }
376    }
377
378    fn on_connection_handler_event(
379        &mut self,
380        event_source: PeerId,
381        connection: ConnectionId,
382        event: THandlerOutEvent<Self>,
383    ) {
384        let event = match event {
385            Either::Left(e) => e,
386            Either::Right(v) => libp2p_core::util::unreachable(v),
387        };
388
389        match event {
390            handler::Event::ReservationReqReceived {
391                inbound_reservation_req,
392                endpoint,
393                renewed,
394            } => {
395                let now = Instant::now();
396
397                assert!(
398                    !endpoint.is_relayed(),
399                    "`dummy::ConnectionHandler` handles relayed connections. It \
400                     denies all inbound substreams."
401                );
402
403                let action = if
404                // Deny if it is a new reservation and exceeds
405                // `max_reservations_per_peer`.
406                (!renewed
407                    && self
408                        .reservations
409                        .get(&event_source)
410                        .map(|cs| cs.len())
411                        .unwrap_or(0)
412                        > self.config.max_reservations_per_peer)
413                    // Deny if it exceeds `max_reservations`.
414                    || self
415                        .reservations
416                        .values()
417                        .map(|cs| cs.len())
418                        .sum::<usize>()
419                        >= self.config.max_reservations
420                    // Deny if it exceeds the allowed rate of reservations.
421                    || !self
422                        .config
423                        .reservation_rate_limiters
424                        .iter_mut()
425                        .all(|limiter| {
426                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
427                        }) {
428                    ToSwarm::NotifyHandler {
429                        handler: NotifyHandler::One(connection),
430                        peer_id: event_source,
431                        event: Either::Left(handler::In::DenyReservationReq {
432                            inbound_reservation_req,
433                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
434                        }),
435                    }
436                } else {
437                    // Accept reservation.
438                    self.reservations
439                        .entry(event_source)
440                        .or_default()
441                        .insert(connection);
442
443                    ToSwarm::NotifyHandler {
444                        handler: NotifyHandler::One(connection),
445                        peer_id: event_source,
446                        event: Either::Left(handler::In::AcceptReservationReq {
447                            inbound_reservation_req,
448                            addrs: self
449                                .external_addresses
450                                .iter()
451                                .cloned()
452                                // Add local peer ID in case it isn't present yet.
453                                .filter_map(|a| match a.iter().last()? {
454                                    Protocol::P2p(_) => Some(a),
455                                    _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
456                                })
457                                .collect(),
458                        }),
459                    }
460                };
461
462                self.queued_actions.push_back(action);
463            }
464            handler::Event::ReservationReqAccepted { renewed } => {
465                // Ensure local eventual consistent reservation state matches handler (source of
466                // truth).
467                self.reservations
468                    .entry(event_source)
469                    .or_default()
470                    .insert(connection);
471
472                self.queued_actions.push_back(ToSwarm::GenerateEvent(
473                    Event::ReservationReqAccepted {
474                        src_peer_id: event_source,
475                        renewed,
476                    },
477                ));
478            }
479            handler::Event::ReservationReqAcceptFailed { error } => {
480                #[allow(deprecated)]
481                self.queued_actions.push_back(ToSwarm::GenerateEvent(
482                    Event::ReservationReqAcceptFailed {
483                        src_peer_id: event_source,
484                        error,
485                    },
486                ));
487            }
488            handler::Event::ReservationReqDenied { status } => {
489                self.queued_actions.push_back(ToSwarm::GenerateEvent(
490                    Event::ReservationReqDenied {
491                        src_peer_id: event_source,
492                        status: status.into(),
493                    },
494                ));
495            }
496            handler::Event::ReservationReqDenyFailed { error } => {
497                #[allow(deprecated)]
498                self.queued_actions.push_back(ToSwarm::GenerateEvent(
499                    Event::ReservationReqDenyFailed {
500                        src_peer_id: event_source,
501                        error,
502                    },
503                ));
504            }
505            handler::Event::ReservationTimedOut {} => {
506                match self.reservations.entry(event_source) {
507                    hash_map::Entry::Occupied(mut peer) => {
508                        peer.get_mut().remove(&connection);
509                        if peer.get().is_empty() {
510                            peer.remove();
511                        }
512                    }
513                    hash_map::Entry::Vacant(_) => {
514                        unreachable!(
515                            "Expect to track timed out reservation with peer {:?} on connection {:?}",
516                            event_source,
517                            connection,
518                        );
519                    }
520                }
521
522                self.queued_actions
523                    .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
524                        src_peer_id: event_source,
525                    }));
526            }
527            handler::Event::CircuitReqReceived {
528                inbound_circuit_req,
529                endpoint,
530            } => {
531                let now = Instant::now();
532
533                assert!(
534                    !endpoint.is_relayed(),
535                    "`dummy::ConnectionHandler` handles relayed connections. It \
536                     denies all inbound substreams."
537                );
538
539                let action = if self.circuits.num_circuits_of_peer(event_source)
540                    > self.config.max_circuits_per_peer
541                    || self.circuits.len() >= self.config.max_circuits
542                    || !self
543                        .config
544                        .circuit_src_rate_limiters
545                        .iter_mut()
546                        .all(|limiter| {
547                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
548                        }) {
549                    // Deny circuit exceeding limits.
550                    ToSwarm::NotifyHandler {
551                        handler: NotifyHandler::One(connection),
552                        peer_id: event_source,
553                        event: Either::Left(handler::In::DenyCircuitReq {
554                            circuit_id: None,
555                            inbound_circuit_req,
556                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
557                        }),
558                    }
559                } else if let Some(dst_conn) = self
560                    .reservations
561                    .get(&inbound_circuit_req.dst())
562                    .and_then(|cs| cs.iter().next())
563                {
564                    // Accept circuit request if reservation present.
565                    let circuit_id = self.circuits.insert(Circuit {
566                        status: CircuitStatus::Accepting,
567                        src_peer_id: event_source,
568                        src_connection_id: connection,
569                        dst_peer_id: inbound_circuit_req.dst(),
570                        dst_connection_id: *dst_conn,
571                    });
572
573                    ToSwarm::NotifyHandler {
574                        handler: NotifyHandler::One(*dst_conn),
575                        peer_id: event_source,
576                        event: Either::Left(handler::In::NegotiateOutboundConnect {
577                            circuit_id,
578                            inbound_circuit_req,
579                            src_peer_id: event_source,
580                            src_connection_id: connection,
581                        }),
582                    }
583                } else {
584                    // Deny circuit request if no reservation present.
585                    ToSwarm::NotifyHandler {
586                        handler: NotifyHandler::One(connection),
587                        peer_id: event_source,
588                        event: Either::Left(handler::In::DenyCircuitReq {
589                            circuit_id: None,
590                            inbound_circuit_req,
591                            status: proto::Status::NO_RESERVATION,
592                        }),
593                    }
594                };
595                self.queued_actions.push_back(action);
596            }
597            handler::Event::CircuitReqDenied {
598                circuit_id,
599                dst_peer_id,
600                status,
601            } => {
602                if let Some(circuit_id) = circuit_id {
603                    self.circuits.remove(circuit_id);
604                }
605
606                self.queued_actions
607                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
608                        src_peer_id: event_source,
609                        dst_peer_id,
610                        status: status.into(),
611                    }));
612            }
613            handler::Event::CircuitReqDenyFailed {
614                circuit_id,
615                dst_peer_id,
616                error,
617            } => {
618                if let Some(circuit_id) = circuit_id {
619                    self.circuits.remove(circuit_id);
620                }
621
622                #[allow(deprecated)]
623                self.queued_actions.push_back(ToSwarm::GenerateEvent(
624                    Event::CircuitReqDenyFailed {
625                        src_peer_id: event_source,
626                        dst_peer_id,
627                        error,
628                    },
629                ));
630            }
631            handler::Event::OutboundConnectNegotiated {
632                circuit_id,
633                src_peer_id,
634                src_connection_id,
635                inbound_circuit_req,
636                dst_stream,
637                dst_pending_data,
638            } => {
639                self.queued_actions.push_back(ToSwarm::NotifyHandler {
640                    handler: NotifyHandler::One(src_connection_id),
641                    peer_id: src_peer_id,
642                    event: Either::Left(handler::In::AcceptAndDriveCircuit {
643                        circuit_id,
644                        dst_peer_id: event_source,
645                        inbound_circuit_req,
646                        dst_stream,
647                        dst_pending_data,
648                    }),
649                });
650            }
651            handler::Event::OutboundConnectNegotiationFailed {
652                circuit_id,
653                src_peer_id,
654                src_connection_id,
655                inbound_circuit_req,
656                status,
657                error,
658            } => {
659                self.queued_actions.push_back(ToSwarm::NotifyHandler {
660                    handler: NotifyHandler::One(src_connection_id),
661                    peer_id: src_peer_id,
662                    event: Either::Left(handler::In::DenyCircuitReq {
663                        circuit_id: Some(circuit_id),
664                        inbound_circuit_req,
665                        status,
666                    }),
667                });
668                #[allow(deprecated)]
669                self.queued_actions.push_back(ToSwarm::GenerateEvent(
670                    Event::CircuitReqOutboundConnectFailed {
671                        src_peer_id,
672                        dst_peer_id: event_source,
673                        error,
674                    },
675                ));
676            }
677            handler::Event::CircuitReqAccepted {
678                dst_peer_id,
679                circuit_id,
680            } => {
681                self.circuits.accepted(circuit_id);
682                self.queued_actions
683                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
684                        src_peer_id: event_source,
685                        dst_peer_id,
686                    }));
687            }
688            handler::Event::CircuitReqAcceptFailed {
689                dst_peer_id,
690                circuit_id,
691                error,
692            } => {
693                self.circuits.remove(circuit_id);
694                #[allow(deprecated)]
695                self.queued_actions.push_back(ToSwarm::GenerateEvent(
696                    Event::CircuitReqAcceptFailed {
697                        src_peer_id: event_source,
698                        dst_peer_id,
699                        error,
700                    },
701                ));
702            }
703            handler::Event::CircuitClosed {
704                dst_peer_id,
705                circuit_id,
706                error,
707            } => {
708                self.circuits.remove(circuit_id);
709
710                self.queued_actions
711                    .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
712                        src_peer_id: event_source,
713                        dst_peer_id,
714                        error,
715                    }));
716            }
717        }
718    }
719
720    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
721    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
722        if let Some(to_swarm) = self.queued_actions.pop_front() {
723            return Poll::Ready(to_swarm);
724        }
725
726        Poll::Pending
727    }
728}
729
730#[derive(Default)]
731struct CircuitsTracker {
732    next_id: CircuitId,
733    circuits: HashMap<CircuitId, Circuit>,
734}
735
736impl CircuitsTracker {
737    fn len(&self) -> usize {
738        self.circuits.len()
739    }
740
741    fn insert(&mut self, circuit: Circuit) -> CircuitId {
742        let id = self.next_id;
743        self.next_id = self.next_id + 1;
744
745        self.circuits.insert(id, circuit);
746
747        id
748    }
749
750    fn accepted(&mut self, circuit_id: CircuitId) {
751        if let Some(c) = self.circuits.get_mut(&circuit_id) {
752            c.status = CircuitStatus::Accepted;
753        };
754    }
755
756    fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
757        self.circuits.remove(&circuit_id)
758    }
759
760    fn remove_by_connection(
761        &mut self,
762        peer_id: PeerId,
763        connection_id: ConnectionId,
764    ) -> Vec<Circuit> {
765        let mut removed = vec![];
766
767        self.circuits.retain(|_circuit_id, circuit| {
768            let is_src =
769                circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
770            let is_dst =
771                circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
772
773            if is_src || is_dst {
774                removed.push(circuit.clone());
775                // Remove circuit from HashMap.
776                false
777            } else {
778                // Retain circuit in HashMap.
779                true
780            }
781        });
782
783        removed
784    }
785
786    fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
787        self.circuits
788            .iter()
789            .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
790            .count()
791    }
792}
793
794#[derive(Clone)]
795struct Circuit {
796    src_peer_id: PeerId,
797    src_connection_id: ConnectionId,
798    dst_peer_id: PeerId,
799    dst_connection_id: ConnectionId,
800    status: CircuitStatus,
801}
802
803#[derive(Clone)]
804enum CircuitStatus {
805    Accepting,
806    Accepted,
807}
808
809#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
810pub struct CircuitId(u64);
811
812impl Add<u64> for CircuitId {
813    type Output = CircuitId;
814
815    fn add(self, rhs: u64) -> Self {
816        CircuitId(self.0 + rhs)
817    }
818}
819
820/// Status code for a relay reservation request that was denied.
821#[derive(Debug)]
822pub enum StatusCode {
823    OK,
824    ReservationRefused,
825    ResourceLimitExceeded,
826    PermissionDenied,
827    ConnectionFailed,
828    NoReservation,
829    MalformedMessage,
830    UnexpectedMessage,
831}
832
833impl From<proto::Status> for StatusCode {
834    fn from(other: proto::Status) -> Self {
835        match other {
836            proto::Status::OK => Self::OK,
837            proto::Status::RESERVATION_REFUSED => Self::ReservationRefused,
838            proto::Status::RESOURCE_LIMIT_EXCEEDED => Self::ResourceLimitExceeded,
839            proto::Status::PERMISSION_DENIED => Self::PermissionDenied,
840            proto::Status::CONNECTION_FAILED => Self::ConnectionFailed,
841            proto::Status::NO_RESERVATION => Self::NoReservation,
842            proto::Status::MALFORMED_MESSAGE => Self::MalformedMessage,
843            proto::Status::UNEXPECTED_MESSAGE => Self::UnexpectedMessage,
844        }
845    }
846}