1pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26 collections::{hash_map, HashMap, HashSet, VecDeque},
27 num::NonZeroU32,
28 ops::Add,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37 behaviour::{ConnectionClosed, FromSwarm},
38 dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44 behaviour::handler::Handler,
45 multiaddr_ext::MultiaddrExt,
46 proto,
47 protocol::{inbound_hop, outbound_stop},
48};
49
50pub struct Config {
56 pub max_reservations: usize,
57 pub max_reservations_per_peer: usize,
58 pub reservation_duration: Duration,
59 pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61 pub max_circuits: usize,
62 pub max_circuits_per_peer: usize,
63 pub max_circuit_duration: Duration,
64 pub max_circuit_bytes: u64,
65 pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69 pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70 self.reservation_rate_limiters
71 .push(rate_limiter::new_per_peer(
72 rate_limiter::GenericRateLimiterConfig { limit, interval },
73 ));
74 self
75 }
76
77 pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78 self.circuit_src_rate_limiters
79 .push(rate_limiter::new_per_peer(
80 rate_limiter::GenericRateLimiterConfig { limit, interval },
81 ));
82 self
83 }
84
85 pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86 self.reservation_rate_limiters
87 .push(rate_limiter::new_per_ip(
88 rate_limiter::GenericRateLimiterConfig { limit, interval },
89 ));
90 self
91 }
92
93 pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94 self.circuit_src_rate_limiters
95 .push(rate_limiter::new_per_ip(
96 rate_limiter::GenericRateLimiterConfig { limit, interval },
97 ));
98 self
99 }
100}
101
102impl std::fmt::Debug for Config {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("Config")
105 .field("max_reservations", &self.max_reservations)
106 .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107 .field("reservation_duration", &self.reservation_duration)
108 .field(
109 "reservation_rate_limiters",
110 &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111 )
112 .field("max_circuits", &self.max_circuits)
113 .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114 .field("max_circuit_duration", &self.max_circuit_duration)
115 .field("max_circuit_bytes", &self.max_circuit_bytes)
116 .field(
117 "circuit_src_rate_limiters",
118 &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119 )
120 .finish()
121 }
122}
123
124impl Default for Config {
125 fn default() -> Self {
126 let reservation_rate_limiters = vec![
127 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130 limit: NonZeroU32::new(30).expect("30 > 0"),
131 interval: Duration::from_secs(60 * 2),
132 }),
133 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136 limit: NonZeroU32::new(60).expect("60 > 0"),
137 interval: Duration::from_secs(60),
138 }),
139 ];
140
141 let circuit_src_rate_limiters = vec![
142 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144 limit: NonZeroU32::new(30).expect("30 > 0"),
145 interval: Duration::from_secs(60 * 2),
146 }),
147 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149 limit: NonZeroU32::new(60).expect("60 > 0"),
150 interval: Duration::from_secs(60),
151 }),
152 ];
153
154 Config {
155 max_reservations: 128,
156 max_reservations_per_peer: 4,
157 reservation_duration: Duration::from_secs(60 * 60),
158 reservation_rate_limiters,
159
160 max_circuits: 16,
161 max_circuits_per_peer: 4,
162 max_circuit_duration: Duration::from_secs(2 * 60),
163 max_circuit_bytes: 1 << 17, circuit_src_rate_limiters,
165 }
166 }
167}
168
169#[derive(Debug)]
171pub enum Event {
172 ReservationReqAccepted {
174 src_peer_id: PeerId,
175 renewed: bool,
177 },
178 #[deprecated(
180 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181 )]
182 ReservationReqAcceptFailed {
183 src_peer_id: PeerId,
184 error: inbound_hop::Error,
185 },
186 ReservationReqDenied { src_peer_id: PeerId },
188 #[deprecated(
190 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
191 )]
192 ReservationReqDenyFailed {
193 src_peer_id: PeerId,
194 error: inbound_hop::Error,
195 },
196 ReservationClosed { src_peer_id: PeerId },
198 ReservationTimedOut { src_peer_id: PeerId },
200 CircuitReqDenied {
202 src_peer_id: PeerId,
203 dst_peer_id: PeerId,
204 },
205 #[deprecated(
207 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
208 )]
209 CircuitReqDenyFailed {
210 src_peer_id: PeerId,
211 dst_peer_id: PeerId,
212 error: inbound_hop::Error,
213 },
214 CircuitReqAccepted {
216 src_peer_id: PeerId,
217 dst_peer_id: PeerId,
218 },
219 #[deprecated(
221 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
222 )]
223 CircuitReqOutboundConnectFailed {
224 src_peer_id: PeerId,
225 dst_peer_id: PeerId,
226 error: outbound_stop::Error,
227 },
228 #[deprecated(
230 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
231 )]
232 CircuitReqAcceptFailed {
233 src_peer_id: PeerId,
234 dst_peer_id: PeerId,
235 error: inbound_hop::Error,
236 },
237 CircuitClosed {
239 src_peer_id: PeerId,
240 dst_peer_id: PeerId,
241 error: Option<std::io::Error>,
242 },
243}
244
245pub struct Behaviour {
248 config: Config,
249
250 local_peer_id: PeerId,
251
252 reservations: HashMap<PeerId, HashSet<ConnectionId>>,
253 circuits: CircuitsTracker,
254
255 queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
257
258 external_addresses: ExternalAddresses,
259}
260
261impl Behaviour {
262 pub fn new(local_peer_id: PeerId, config: Config) -> Self {
263 Self {
264 config,
265 local_peer_id,
266 reservations: Default::default(),
267 circuits: Default::default(),
268 queued_actions: Default::default(),
269 external_addresses: Default::default(),
270 }
271 }
272
273 fn on_connection_closed(
274 &mut self,
275 ConnectionClosed {
276 peer_id,
277 connection_id,
278 ..
279 }: ConnectionClosed,
280 ) {
281 if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
282 if peer.get_mut().remove(&connection_id) {
283 self.queued_actions
284 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
285 src_peer_id: peer_id,
286 }));
287 }
288 if peer.get().is_empty() {
289 peer.remove();
290 }
291 }
292
293 for circuit in self
294 .circuits
295 .remove_by_connection(peer_id, connection_id)
296 .iter()
297 .filter(|c| matches!(c.status, CircuitStatus::Accepted))
299 {
300 self.queued_actions
301 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
302 src_peer_id: circuit.src_peer_id,
303 dst_peer_id: circuit.dst_peer_id,
304 error: Some(std::io::ErrorKind::ConnectionAborted.into()),
305 }));
306 }
307 }
308}
309
310impl NetworkBehaviour for Behaviour {
311 type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
312 type ToSwarm = Event;
313
314 fn handle_established_inbound_connection(
315 &mut self,
316 _: ConnectionId,
317 _: PeerId,
318 local_addr: &Multiaddr,
319 remote_addr: &Multiaddr,
320 ) -> Result<THandler<Self>, ConnectionDenied> {
321 if local_addr.is_relayed() {
322 return Ok(Either::Right(dummy::ConnectionHandler));
324 }
325
326 Ok(Either::Left(Handler::new(
327 handler::Config {
328 reservation_duration: self.config.reservation_duration,
329 max_circuit_duration: self.config.max_circuit_duration,
330 max_circuit_bytes: self.config.max_circuit_bytes,
331 },
332 ConnectedPoint::Listener {
333 local_addr: local_addr.clone(),
334 send_back_addr: remote_addr.clone(),
335 },
336 )))
337 }
338
339 fn handle_established_outbound_connection(
340 &mut self,
341 _: ConnectionId,
342 _: PeerId,
343 addr: &Multiaddr,
344 role_override: Endpoint,
345 port_use: PortUse,
346 ) -> Result<THandler<Self>, ConnectionDenied> {
347 if addr.is_relayed() {
348 return Ok(Either::Right(dummy::ConnectionHandler));
350 }
351
352 Ok(Either::Left(Handler::new(
353 handler::Config {
354 reservation_duration: self.config.reservation_duration,
355 max_circuit_duration: self.config.max_circuit_duration,
356 max_circuit_bytes: self.config.max_circuit_bytes,
357 },
358 ConnectedPoint::Dialer {
359 address: addr.clone(),
360 role_override,
361 port_use,
362 },
363 )))
364 }
365
366 fn on_swarm_event(&mut self, event: FromSwarm) {
367 self.external_addresses.on_swarm_event(&event);
368
369 if let FromSwarm::ConnectionClosed(connection_closed) = event {
370 self.on_connection_closed(connection_closed)
371 }
372 }
373
374 fn on_connection_handler_event(
375 &mut self,
376 event_source: PeerId,
377 connection: ConnectionId,
378 event: THandlerOutEvent<Self>,
379 ) {
380 let event = match event {
381 Either::Left(e) => e,
382 Either::Right(v) => libp2p_core::util::unreachable(v),
383 };
384
385 match event {
386 handler::Event::ReservationReqReceived {
387 inbound_reservation_req,
388 endpoint,
389 renewed,
390 } => {
391 let now = Instant::now();
392
393 assert!(
394 !endpoint.is_relayed(),
395 "`dummy::ConnectionHandler` handles relayed connections. It \
396 denies all inbound substreams."
397 );
398
399 let action = if
400 (!renewed
403 && self
404 .reservations
405 .get(&event_source)
406 .map(|cs| cs.len())
407 .unwrap_or(0)
408 > self.config.max_reservations_per_peer)
409 || self
411 .reservations
412 .values()
413 .map(|cs| cs.len())
414 .sum::<usize>()
415 >= self.config.max_reservations
416 || !self
418 .config
419 .reservation_rate_limiters
420 .iter_mut()
421 .all(|limiter| {
422 limiter.try_next(event_source, endpoint.get_remote_address(), now)
423 }) {
424 ToSwarm::NotifyHandler {
425 handler: NotifyHandler::One(connection),
426 peer_id: event_source,
427 event: Either::Left(handler::In::DenyReservationReq {
428 inbound_reservation_req,
429 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
430 }),
431 }
432 } else {
433 self.reservations
435 .entry(event_source)
436 .or_default()
437 .insert(connection);
438
439 ToSwarm::NotifyHandler {
440 handler: NotifyHandler::One(connection),
441 peer_id: event_source,
442 event: Either::Left(handler::In::AcceptReservationReq {
443 inbound_reservation_req,
444 addrs: self
445 .external_addresses
446 .iter()
447 .cloned()
448 .filter_map(|a| match a.iter().last()? {
450 Protocol::P2p(_) => Some(a),
451 _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
452 })
453 .collect(),
454 }),
455 }
456 };
457
458 self.queued_actions.push_back(action);
459 }
460 handler::Event::ReservationReqAccepted { renewed } => {
461 self.reservations
464 .entry(event_source)
465 .or_default()
466 .insert(connection);
467
468 self.queued_actions.push_back(ToSwarm::GenerateEvent(
469 Event::ReservationReqAccepted {
470 src_peer_id: event_source,
471 renewed,
472 },
473 ));
474 }
475 handler::Event::ReservationReqAcceptFailed { error } => {
476 #[allow(deprecated)]
477 self.queued_actions.push_back(ToSwarm::GenerateEvent(
478 Event::ReservationReqAcceptFailed {
479 src_peer_id: event_source,
480 error,
481 },
482 ));
483 }
484 handler::Event::ReservationReqDenied {} => {
485 self.queued_actions.push_back(ToSwarm::GenerateEvent(
486 Event::ReservationReqDenied {
487 src_peer_id: event_source,
488 },
489 ));
490 }
491 handler::Event::ReservationReqDenyFailed { error } => {
492 #[allow(deprecated)]
493 self.queued_actions.push_back(ToSwarm::GenerateEvent(
494 Event::ReservationReqDenyFailed {
495 src_peer_id: event_source,
496 error,
497 },
498 ));
499 }
500 handler::Event::ReservationTimedOut {} => {
501 match self.reservations.entry(event_source) {
502 hash_map::Entry::Occupied(mut peer) => {
503 peer.get_mut().remove(&connection);
504 if peer.get().is_empty() {
505 peer.remove();
506 }
507 }
508 hash_map::Entry::Vacant(_) => {
509 unreachable!(
510 "Expect to track timed out reservation with peer {:?} on connection {:?}",
511 event_source,
512 connection,
513 );
514 }
515 }
516
517 self.queued_actions
518 .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
519 src_peer_id: event_source,
520 }));
521 }
522 handler::Event::CircuitReqReceived {
523 inbound_circuit_req,
524 endpoint,
525 } => {
526 let now = Instant::now();
527
528 assert!(
529 !endpoint.is_relayed(),
530 "`dummy::ConnectionHandler` handles relayed connections. It \
531 denies all inbound substreams."
532 );
533
534 let action = if self.circuits.num_circuits_of_peer(event_source)
535 > self.config.max_circuits_per_peer
536 || self.circuits.len() >= self.config.max_circuits
537 || !self
538 .config
539 .circuit_src_rate_limiters
540 .iter_mut()
541 .all(|limiter| {
542 limiter.try_next(event_source, endpoint.get_remote_address(), now)
543 }) {
544 ToSwarm::NotifyHandler {
546 handler: NotifyHandler::One(connection),
547 peer_id: event_source,
548 event: Either::Left(handler::In::DenyCircuitReq {
549 circuit_id: None,
550 inbound_circuit_req,
551 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
552 }),
553 }
554 } else if let Some(dst_conn) = self
555 .reservations
556 .get(&inbound_circuit_req.dst())
557 .and_then(|cs| cs.iter().next())
558 {
559 let circuit_id = self.circuits.insert(Circuit {
561 status: CircuitStatus::Accepting,
562 src_peer_id: event_source,
563 src_connection_id: connection,
564 dst_peer_id: inbound_circuit_req.dst(),
565 dst_connection_id: *dst_conn,
566 });
567
568 ToSwarm::NotifyHandler {
569 handler: NotifyHandler::One(*dst_conn),
570 peer_id: event_source,
571 event: Either::Left(handler::In::NegotiateOutboundConnect {
572 circuit_id,
573 inbound_circuit_req,
574 src_peer_id: event_source,
575 src_connection_id: connection,
576 }),
577 }
578 } else {
579 ToSwarm::NotifyHandler {
581 handler: NotifyHandler::One(connection),
582 peer_id: event_source,
583 event: Either::Left(handler::In::DenyCircuitReq {
584 circuit_id: None,
585 inbound_circuit_req,
586 status: proto::Status::NO_RESERVATION,
587 }),
588 }
589 };
590 self.queued_actions.push_back(action);
591 }
592 handler::Event::CircuitReqDenied {
593 circuit_id,
594 dst_peer_id,
595 } => {
596 if let Some(circuit_id) = circuit_id {
597 self.circuits.remove(circuit_id);
598 }
599
600 self.queued_actions
601 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
602 src_peer_id: event_source,
603 dst_peer_id,
604 }));
605 }
606 handler::Event::CircuitReqDenyFailed {
607 circuit_id,
608 dst_peer_id,
609 error,
610 } => {
611 if let Some(circuit_id) = circuit_id {
612 self.circuits.remove(circuit_id);
613 }
614
615 #[allow(deprecated)]
616 self.queued_actions.push_back(ToSwarm::GenerateEvent(
617 Event::CircuitReqDenyFailed {
618 src_peer_id: event_source,
619 dst_peer_id,
620 error,
621 },
622 ));
623 }
624 handler::Event::OutboundConnectNegotiated {
625 circuit_id,
626 src_peer_id,
627 src_connection_id,
628 inbound_circuit_req,
629 dst_stream,
630 dst_pending_data,
631 } => {
632 self.queued_actions.push_back(ToSwarm::NotifyHandler {
633 handler: NotifyHandler::One(src_connection_id),
634 peer_id: src_peer_id,
635 event: Either::Left(handler::In::AcceptAndDriveCircuit {
636 circuit_id,
637 dst_peer_id: event_source,
638 inbound_circuit_req,
639 dst_stream,
640 dst_pending_data,
641 }),
642 });
643 }
644 handler::Event::OutboundConnectNegotiationFailed {
645 circuit_id,
646 src_peer_id,
647 src_connection_id,
648 inbound_circuit_req,
649 status,
650 error,
651 } => {
652 self.queued_actions.push_back(ToSwarm::NotifyHandler {
653 handler: NotifyHandler::One(src_connection_id),
654 peer_id: src_peer_id,
655 event: Either::Left(handler::In::DenyCircuitReq {
656 circuit_id: Some(circuit_id),
657 inbound_circuit_req,
658 status,
659 }),
660 });
661 #[allow(deprecated)]
662 self.queued_actions.push_back(ToSwarm::GenerateEvent(
663 Event::CircuitReqOutboundConnectFailed {
664 src_peer_id,
665 dst_peer_id: event_source,
666 error,
667 },
668 ));
669 }
670 handler::Event::CircuitReqAccepted {
671 dst_peer_id,
672 circuit_id,
673 } => {
674 self.circuits.accepted(circuit_id);
675 self.queued_actions
676 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
677 src_peer_id: event_source,
678 dst_peer_id,
679 }));
680 }
681 handler::Event::CircuitReqAcceptFailed {
682 dst_peer_id,
683 circuit_id,
684 error,
685 } => {
686 self.circuits.remove(circuit_id);
687 #[allow(deprecated)]
688 self.queued_actions.push_back(ToSwarm::GenerateEvent(
689 Event::CircuitReqAcceptFailed {
690 src_peer_id: event_source,
691 dst_peer_id,
692 error,
693 },
694 ));
695 }
696 handler::Event::CircuitClosed {
697 dst_peer_id,
698 circuit_id,
699 error,
700 } => {
701 self.circuits.remove(circuit_id);
702
703 self.queued_actions
704 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
705 src_peer_id: event_source,
706 dst_peer_id,
707 error,
708 }));
709 }
710 }
711 }
712
713 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
714 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
715 if let Some(to_swarm) = self.queued_actions.pop_front() {
716 return Poll::Ready(to_swarm);
717 }
718
719 Poll::Pending
720 }
721}
722
723#[derive(Default)]
724struct CircuitsTracker {
725 next_id: CircuitId,
726 circuits: HashMap<CircuitId, Circuit>,
727}
728
729impl CircuitsTracker {
730 fn len(&self) -> usize {
731 self.circuits.len()
732 }
733
734 fn insert(&mut self, circuit: Circuit) -> CircuitId {
735 let id = self.next_id;
736 self.next_id = self.next_id + 1;
737
738 self.circuits.insert(id, circuit);
739
740 id
741 }
742
743 fn accepted(&mut self, circuit_id: CircuitId) {
744 if let Some(c) = self.circuits.get_mut(&circuit_id) {
745 c.status = CircuitStatus::Accepted;
746 };
747 }
748
749 fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
750 self.circuits.remove(&circuit_id)
751 }
752
753 fn remove_by_connection(
754 &mut self,
755 peer_id: PeerId,
756 connection_id: ConnectionId,
757 ) -> Vec<Circuit> {
758 let mut removed = vec![];
759
760 self.circuits.retain(|_circuit_id, circuit| {
761 let is_src =
762 circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
763 let is_dst =
764 circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
765
766 if is_src || is_dst {
767 removed.push(circuit.clone());
768 false
770 } else {
771 true
773 }
774 });
775
776 removed
777 }
778
779 fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
780 self.circuits
781 .iter()
782 .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
783 .count()
784 }
785}
786
787#[derive(Clone)]
788struct Circuit {
789 src_peer_id: PeerId,
790 src_connection_id: ConnectionId,
791 dst_peer_id: PeerId,
792 dst_connection_id: ConnectionId,
793 status: CircuitStatus,
794}
795
796#[derive(Clone)]
797enum CircuitStatus {
798 Accepting,
799 Accepted,
800}
801
802#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
803pub struct CircuitId(u64);
804
805impl Add<u64> for CircuitId {
806 type Output = CircuitId;
807
808 fn add(self, rhs: u64) -> Self {
809 CircuitId(self.0 + rhs)
810 }
811}