1mod as_client;
22mod as_server;
23
24use std::{
25 collections::{HashMap, HashSet, VecDeque},
26 iter,
27 task::{Context, Poll},
28 time::Duration,
29};
30
31use as_client::AsClient;
32pub use as_client::{OutboundProbeError, OutboundProbeEvent};
33use as_server::AsServer;
34pub use as_server::{InboundProbeError, InboundProbeEvent};
35use futures_timer::Delay;
36use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
37use libp2p_identity::PeerId;
38use libp2p_request_response::{
39 self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel,
40};
41use libp2p_swarm::{
42 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
43 ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, THandler, THandlerInEvent,
44 THandlerOutEvent, ToSwarm,
45};
46use web_time::Instant;
47
48use crate::{
49 protocol::{AutoNatCodec, DialRequest, DialResponse, ResponseError},
50 DEFAULT_PROTOCOL_NAME,
51};
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct Config {
56 pub timeout: Duration,
58
59 pub boot_delay: Duration,
62 pub refresh_interval: Duration,
64 pub retry_interval: Duration,
67 pub throttle_server_period: Duration,
69 pub use_connected: bool,
71 pub confidence_max: usize,
74
75 pub max_peer_addresses: usize,
78 pub throttle_clients_global_max: usize,
80 pub throttle_clients_peer_max: usize,
82 pub throttle_clients_period: Duration,
84 pub only_global_ips: bool,
89}
90
91impl Default for Config {
92 fn default() -> Self {
93 Config {
94 timeout: Duration::from_secs(30),
95 boot_delay: Duration::from_secs(15),
96 retry_interval: Duration::from_secs(90),
97 refresh_interval: Duration::from_secs(15 * 60),
98 throttle_server_period: Duration::from_secs(90),
99 use_connected: true,
100 confidence_max: 3,
101 max_peer_addresses: 16,
102 throttle_clients_global_max: 30,
103 throttle_clients_peer_max: 3,
104 throttle_clients_period: Duration::from_secs(1),
105 only_global_ips: true,
106 }
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum NatStatus {
113 Public(Multiaddr),
114 Private,
115 Unknown,
116}
117
118impl NatStatus {
119 pub fn is_public(&self) -> bool {
120 matches!(self, NatStatus::Public(..))
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub struct ProbeId(usize);
127
128impl ProbeId {
129 fn next(&mut self) -> ProbeId {
130 let current = *self;
131 self.0 += 1;
132 current
133 }
134}
135
136#[derive(Debug)]
138pub enum Event {
139 InboundProbe(InboundProbeEvent),
141 OutboundProbe(OutboundProbeEvent),
143 StatusChanged {
145 old: NatStatus,
147 new: NatStatus,
149 },
150}
151
152pub struct Behaviour {
167 local_peer_id: PeerId,
169
170 inner: request_response::Behaviour<AutoNatCodec>,
172
173 config: Config,
174
175 servers: HashSet<PeerId>,
177
178 nat_status: NatStatus,
180
181 confidence: usize,
183
184 schedule_probe: Delay,
186
187 ongoing_inbound: HashMap<
189 PeerId,
190 (
191 ProbeId,
192 InboundRequestId,
193 Vec<Multiaddr>,
194 ResponseChannel<DialResponse>,
195 ),
196 >,
197
198 ongoing_outbound: HashMap<OutboundRequestId, ProbeId>,
200
201 connected: HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
205
206 throttled_servers: Vec<(PeerId, Instant)>,
209
210 throttled_clients: Vec<(PeerId, Instant)>,
212
213 last_probe: Option<Instant>,
214
215 pending_actions: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
216
217 probe_id: ProbeId,
218
219 listen_addresses: ListenAddresses,
220 other_candidates: HashSet<Multiaddr>,
221}
222
223impl Behaviour {
224 pub fn new(local_peer_id: PeerId, config: Config) -> Self {
225 let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
226 let inner = request_response::Behaviour::with_codec(
227 AutoNatCodec,
228 protocols,
229 request_response::Config::default().with_request_timeout(config.timeout),
230 );
231 Self {
232 local_peer_id,
233 inner,
234 schedule_probe: Delay::new(config.boot_delay),
235 config,
236 servers: HashSet::new(),
237 ongoing_inbound: HashMap::default(),
238 ongoing_outbound: HashMap::default(),
239 connected: HashMap::default(),
240 nat_status: NatStatus::Unknown,
241 confidence: 0,
242 throttled_servers: Vec::new(),
243 throttled_clients: Vec::new(),
244 last_probe: None,
245 pending_actions: VecDeque::new(),
246 probe_id: ProbeId(0),
247 listen_addresses: Default::default(),
248 other_candidates: Default::default(),
249 }
250 }
251
252 pub fn public_address(&self) -> Option<&Multiaddr> {
255 match &self.nat_status {
256 NatStatus::Public(address) => Some(address),
257 _ => None,
258 }
259 }
260
261 pub fn nat_status(&self) -> NatStatus {
263 self.nat_status.clone()
264 }
265
266 pub fn confidence(&self) -> usize {
268 self.confidence
269 }
270
271 pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
275 self.servers.insert(peer);
276 if let Some(addr) = address {
277 #[allow(deprecated)]
278 self.inner.add_address(&peer, addr);
279 }
280 }
281
282 pub fn remove_server(&mut self, peer: &PeerId) {
285 self.servers.retain(|p| p != peer);
286 }
287
288 pub fn probe_address(&mut self, candidate: Multiaddr) {
290 self.other_candidates.insert(candidate);
291 self.as_client().on_new_address();
292 }
293
294 fn as_client(&mut self) -> AsClient {
295 AsClient {
296 inner: &mut self.inner,
297 local_peer_id: self.local_peer_id,
298 config: &self.config,
299 connected: &self.connected,
300 probe_id: &mut self.probe_id,
301 servers: &self.servers,
302 throttled_servers: &mut self.throttled_servers,
303 nat_status: &mut self.nat_status,
304 confidence: &mut self.confidence,
305 ongoing_outbound: &mut self.ongoing_outbound,
306 last_probe: &mut self.last_probe,
307 schedule_probe: &mut self.schedule_probe,
308 listen_addresses: &self.listen_addresses,
309 other_candidates: &self.other_candidates,
310 }
311 }
312
313 fn as_server(&mut self) -> AsServer {
314 AsServer {
315 inner: &mut self.inner,
316 config: &self.config,
317 connected: &self.connected,
318 probe_id: &mut self.probe_id,
319 throttled_clients: &mut self.throttled_clients,
320 ongoing_inbound: &mut self.ongoing_inbound,
321 }
322 }
323
324 fn on_connection_established(
325 &mut self,
326 ConnectionEstablished {
327 peer_id: peer,
328 connection_id: conn,
329 endpoint,
330 ..
331 }: ConnectionEstablished,
332 ) {
333 let connections = self.connected.entry(peer).or_default();
334 let addr = endpoint.get_remote_address();
335 let observed_addr =
336 if !endpoint.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) {
337 Some(addr.clone())
338 } else {
339 None
340 };
341 connections.insert(conn, observed_addr);
342
343 match endpoint {
344 ConnectedPoint::Dialer {
345 address,
346 role_override: Endpoint::Dialer,
347 port_use: _,
348 } => {
349 if let Some(event) = self.as_server().on_outbound_connection(&peer, address) {
350 self.pending_actions
351 .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event)));
352 }
353 }
354 ConnectedPoint::Dialer {
355 address: _,
356 role_override: Endpoint::Listener,
357 port_use: _,
358 } => {
359 }
363 ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(),
364 }
365 }
366
367 fn on_connection_closed(
368 &mut self,
369 ConnectionClosed {
370 peer_id,
371 connection_id,
372 remaining_established,
373 ..
374 }: ConnectionClosed,
375 ) {
376 if remaining_established == 0 {
377 self.connected.remove(&peer_id);
378 } else {
379 let connections = self
380 .connected
381 .get_mut(&peer_id)
382 .expect("Peer is connected.");
383 connections.remove(&connection_id);
384 }
385 }
386
387 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
388 if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) {
389 self.pending_actions
390 .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event)));
391 }
392 }
393
394 fn on_address_change(
395 &mut self,
396 AddressChange {
397 peer_id: peer,
398 connection_id: conn,
399 old,
400 new,
401 }: AddressChange,
402 ) {
403 if old.is_relayed() && new.is_relayed() {
404 return;
405 }
406 let connections = self.connected.get_mut(&peer).expect("Peer is connected.");
407 let addr = new.get_remote_address();
408 let observed_addr =
409 if !new.is_relayed() && (!self.config.only_global_ips || addr.is_global_ip()) {
410 Some(addr.clone())
411 } else {
412 None
413 };
414 connections.insert(conn, observed_addr);
415 }
416}
417
418impl NetworkBehaviour for Behaviour {
419 type ConnectionHandler =
420 <request_response::Behaviour<AutoNatCodec> as NetworkBehaviour>::ConnectionHandler;
421 type ToSwarm = Event;
422
423 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
424 fn poll(
425 &mut self,
426 cx: &mut Context<'_>,
427 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
428 loop {
429 if let Some(event) = self.pending_actions.pop_front() {
430 return Poll::Ready(event);
431 }
432
433 match self.inner.poll(cx) {
434 Poll::Ready(ToSwarm::GenerateEvent(event)) => {
435 let actions = match event {
436 request_response::Event::Message {
437 message: request_response::Message::Response { .. },
438 ..
439 }
440 | request_response::Event::OutboundFailure { .. } => {
441 self.as_client().handle_event(event)
442 }
443 request_response::Event::Message {
444 message: request_response::Message::Request { .. },
445 ..
446 }
447 | request_response::Event::InboundFailure { .. } => {
448 self.as_server().handle_event(event)
449 }
450 request_response::Event::ResponseSent { .. } => VecDeque::new(),
451 };
452
453 self.pending_actions.extend(actions);
454 continue;
455 }
456 Poll::Ready(action) => {
457 self.pending_actions
458 .push_back(action.map_out(|_| unreachable!()));
459 continue;
460 }
461 Poll::Pending => {}
462 }
463
464 match self.as_client().poll_auto_probe(cx) {
465 Poll::Ready(event) => {
466 self.pending_actions
467 .push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event)));
468 continue;
469 }
470 Poll::Pending => {}
471 }
472
473 return Poll::Pending;
474 }
475 }
476
477 fn handle_pending_inbound_connection(
478 &mut self,
479 connection_id: ConnectionId,
480 local_addr: &Multiaddr,
481 remote_addr: &Multiaddr,
482 ) -> Result<(), ConnectionDenied> {
483 self.inner
484 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
485 }
486
487 fn handle_established_inbound_connection(
488 &mut self,
489 connection_id: ConnectionId,
490 peer: PeerId,
491 local_addr: &Multiaddr,
492 remote_addr: &Multiaddr,
493 ) -> Result<THandler<Self>, ConnectionDenied> {
494 self.inner.handle_established_inbound_connection(
495 connection_id,
496 peer,
497 local_addr,
498 remote_addr,
499 )
500 }
501
502 fn handle_pending_outbound_connection(
503 &mut self,
504 connection_id: ConnectionId,
505 maybe_peer: Option<PeerId>,
506 addresses: &[Multiaddr],
507 effective_role: Endpoint,
508 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
509 self.inner.handle_pending_outbound_connection(
510 connection_id,
511 maybe_peer,
512 addresses,
513 effective_role,
514 )
515 }
516
517 fn handle_established_outbound_connection(
518 &mut self,
519 connection_id: ConnectionId,
520 peer: PeerId,
521 addr: &Multiaddr,
522 role_override: Endpoint,
523 port_use: PortUse,
524 ) -> Result<THandler<Self>, ConnectionDenied> {
525 self.inner.handle_established_outbound_connection(
526 connection_id,
527 peer,
528 addr,
529 role_override,
530 port_use,
531 )
532 }
533
534 fn on_swarm_event(&mut self, event: FromSwarm) {
535 self.listen_addresses.on_swarm_event(&event);
536 self.inner.on_swarm_event(event);
537
538 match event {
539 FromSwarm::ConnectionEstablished(e) => self.on_connection_established(e),
540 FromSwarm::ConnectionClosed(e) => self.on_connection_closed(e),
541 FromSwarm::DialFailure(e) => self.on_dial_failure(e),
542 FromSwarm::AddressChange(e) => self.on_address_change(e),
543 FromSwarm::NewListenAddr(_) => {
544 self.as_client().on_new_address();
545 }
546 FromSwarm::ExpiredListenAddr(e) => {
547 self.as_client().on_expired_address(e.addr);
548 }
549 FromSwarm::ExternalAddrExpired(e) => {
550 self.as_client().on_expired_address(e.addr);
551 }
552 FromSwarm::NewExternalAddrCandidate(e) => {
553 self.probe_address(e.addr.to_owned());
554 }
555 _ => {}
556 }
557 }
558
559 fn on_connection_handler_event(
560 &mut self,
561 peer_id: PeerId,
562 connection_id: ConnectionId,
563 event: THandlerOutEvent<Self>,
564 ) {
565 self.inner
566 .on_connection_handler_event(peer_id, connection_id, event)
567 }
568}
569
570type Action = ToSwarm<<Behaviour as NetworkBehaviour>::ToSwarm, THandlerInEvent<Behaviour>>;
571
572trait HandleInnerEvent {
575 fn handle_event(
576 &mut self,
577 event: request_response::Event<DialRequest, DialResponse>,
578 ) -> VecDeque<Action>;
579}
580
581trait GlobalIp {
582 fn is_global_ip(&self) -> bool;
583}
584
585impl GlobalIp for Multiaddr {
586 fn is_global_ip(&self) -> bool {
587 match self.iter().next() {
588 Some(Protocol::Ip4(a)) => a.is_global_ip(),
589 Some(Protocol::Ip6(a)) => a.is_global_ip(),
590 _ => false,
591 }
592 }
593}
594
595impl GlobalIp for std::net::Ipv4Addr {
596 fn is_global_ip(&self) -> bool {
600 if u32::from_be_bytes(self.octets()) == 0xc0000009
603 || u32::from_be_bytes(self.octets()) == 0xc000000a
604 {
605 return true;
606 }
607
608 fn is_shared(addr: &std::net::Ipv4Addr) -> bool {
610 addr.octets()[0] == 100 && (addr.octets()[1] & 0b1100_0000 == 0b0100_0000)
611 }
612
613 fn is_reserved(addr: &std::net::Ipv4Addr) -> bool {
620 addr.octets()[0] & 240 == 240 && !addr.is_broadcast()
621 }
622
623 fn is_benchmarking(addr: &std::net::Ipv4Addr) -> bool {
625 addr.octets()[0] == 198 && (addr.octets()[1] & 0xfe) == 18
626 }
627
628 !self.is_private()
629 && !self.is_loopback()
630 && !self.is_link_local()
631 && !self.is_broadcast()
632 && !self.is_documentation()
633 && !is_shared(self)
634 && !(self.octets()[0] == 192 && self.octets()[1] == 0 && self.octets()[2] == 0)
636 && !is_reserved(self)
637 && !is_benchmarking(self)
638 && self.octets()[0] != 0
640 }
641}
642
643impl GlobalIp for std::net::Ipv6Addr {
644 fn is_global_ip(&self) -> bool {
651 fn is_unicast(addr: &std::net::Ipv6Addr) -> bool {
653 !addr.is_multicast()
654 }
655 fn is_unicast_link_local(addr: &std::net::Ipv6Addr) -> bool {
657 (addr.segments()[0] & 0xffc0) == 0xfe80
658 }
659 fn is_unique_local(addr: &std::net::Ipv6Addr) -> bool {
661 (addr.segments()[0] & 0xfe00) == 0xfc00
662 }
663 fn is_documentation(addr: &std::net::Ipv6Addr) -> bool {
665 (addr.segments()[0] == 0x2001) && (addr.segments()[1] == 0xdb8)
666 }
667
668 fn is_unicast_global(addr: &std::net::Ipv6Addr) -> bool {
670 is_unicast(addr)
671 && !addr.is_loopback()
672 && !is_unicast_link_local(addr)
673 && !is_unique_local(addr)
674 && !addr.is_unspecified()
675 && !is_documentation(addr)
676 }
677
678 fn is_multicast_scope_global(addr: &std::net::Ipv6Addr) -> Option<bool> {
683 match addr.segments()[0] & 0x000f {
684 14 => Some(true), 1..=5 | 8 => Some(false), _ => None, }
688 }
689
690 match is_multicast_scope_global(self) {
691 Some(true) => true,
692 None => is_unicast_global(self),
693 _ => false,
694 }
695 }
696}