1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24 borrow::Borrow,
25 collections::{HashMap, VecDeque},
26 error::Error,
27 hash::{Hash, Hasher},
28 net::{self, IpAddr, SocketAddr, SocketAddrV4},
29 ops::{Deref, DerefMut},
30 pin::Pin,
31 task::{Context, Poll},
32 time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39 multiaddr,
40 transport::{ListenerId, PortUse},
41 Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44 derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45 NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50const MAPPING_DURATION: u32 = 3600;
52
53const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56const MAX_RETRY_ATTEMPTS: u32 = 5;
58
59const BASE_RETRY_DELAY_SECS: u64 = 30;
61
62const MAX_RETRY_DELAY_SECS: u64 = 1800;
64
65#[derive(Debug)]
67pub(crate) enum GatewayRequest {
68 AddMapping { mapping: Mapping, duration: u32 },
69 RemoveMapping(Mapping),
70}
71
72#[derive(Debug)]
74pub(crate) enum GatewayEvent {
75 Mapped(Mapping),
77 MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
79 Removed(Mapping),
81 RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
83}
84
85#[derive(Debug, Clone)]
87pub(crate) struct Mapping {
88 pub(crate) listener_id: ListenerId,
89 pub(crate) protocol: PortMappingProtocol,
90 pub(crate) multiaddr: Multiaddr,
91 pub(crate) internal_addr: SocketAddr,
92}
93
94impl Mapping {
95 fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
98 let addr = match gateway_addr {
99 net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
100 net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
101 };
102 self.multiaddr
103 .replace(0, |_| Some(addr))
104 .expect("multiaddr should be valid")
105 }
106}
107
108impl Hash for Mapping {
109 fn hash<H: Hasher>(&self, state: &mut H) {
110 self.listener_id.hash(state);
111 }
112}
113
114impl PartialEq for Mapping {
115 fn eq(&self, other: &Self) -> bool {
116 self.listener_id == other.listener_id
117 }
118}
119
120impl Eq for Mapping {}
121
122impl Borrow<ListenerId> for Mapping {
123 fn borrow(&self) -> &ListenerId {
124 &self.listener_id
125 }
126}
127
128#[derive(Debug)]
130enum MappingState {
131 Inactive,
133 Pending {
135 retry_count: u32,
137 },
138 Active(Delay),
140 Failed {
142 retry_count: u32,
144 next_retry: Option<Delay>,
146 },
147}
148
149enum GatewayState {
151 Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
152 Available(Gateway),
153 GatewayNotFound,
154 NonRoutableGateway(IpAddr),
155}
156
157#[derive(Debug)]
159pub enum Event {
160 NewExternalAddr {
162 local_addr: Multiaddr,
164 external_addr: Multiaddr,
166 },
167 ExpiredExternalAddr {
169 local_addr: Multiaddr,
171 external_addr: Multiaddr,
173 },
174 GatewayNotFound,
176 NonRoutableGateway,
178}
179
180#[derive(Debug, Default)]
182struct MappingList(HashMap<Mapping, MappingState>);
183
184impl Deref for MappingList {
185 type Target = HashMap<Mapping, MappingState>;
186
187 fn deref(&self) -> &Self::Target {
188 &self.0
189 }
190}
191
192impl DerefMut for MappingList {
193 fn deref_mut(&mut self) -> &mut Self::Target {
194 &mut self.0
195 }
196}
197
198impl MappingList {
199 fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
202 for (mapping, state) in self.iter_mut() {
203 match state {
204 MappingState::Inactive => {
205 let duration = MAPPING_DURATION;
206 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
207 mapping: mapping.clone(),
208 duration,
209 }) {
210 tracing::debug!(
211 multiaddress=%mapping.multiaddr,
212 "could not request port mapping for multiaddress on the gateway: {}",
213 err
214 );
215 } else {
216 *state = MappingState::Pending { retry_count: 0 };
217 }
218 }
219 MappingState::Failed {
220 retry_count,
221 next_retry,
222 } => {
223 if let Some(delay) = next_retry {
224 if Pin::new(delay).poll(cx).is_ready() {
225 let duration = MAPPING_DURATION;
226 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
227 mapping: mapping.clone(),
228 duration,
229 }) {
230 tracing::debug!(
231 multiaddress=%mapping.multiaddr,
232 retry_count=%retry_count,
233 "could not retry port mapping for multiaddress on the gateway: {}",
234 err
235 );
236 } else {
237 *state = MappingState::Pending {
238 retry_count: *retry_count,
239 };
240 }
241 }
242 }
243 }
244 MappingState::Active(timeout) => {
245 if Pin::new(timeout).poll(cx).is_ready() {
246 let duration = MAPPING_DURATION;
247 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
248 mapping: mapping.clone(),
249 duration,
250 }) {
251 tracing::debug!(
252 multiaddress=%mapping.multiaddr,
253 "could not request port mapping for multiaddress on the gateway: {}",
254 err
255 );
256 }
257 }
258 }
259 MappingState::Pending { .. } => {}
260 }
261 }
262 }
263}
264
265pub struct Behaviour {
268 state: GatewayState,
270
271 mappings: MappingList,
273
274 pending_events: VecDeque<Event>,
276}
277
278impl Default for Behaviour {
279 fn default() -> Self {
280 Self {
281 state: GatewayState::Searching(crate::tokio::search_gateway()),
282 mappings: Default::default(),
283 pending_events: VecDeque::new(),
284 }
285 }
286}
287
288impl NetworkBehaviour for Behaviour {
289 type ConnectionHandler = dummy::ConnectionHandler;
290
291 type ToSwarm = Event;
292
293 fn handle_established_inbound_connection(
294 &mut self,
295 _connection_id: ConnectionId,
296 _peer: PeerId,
297 _local_addr: &Multiaddr,
298 _remote_addr: &Multiaddr,
299 ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
300 Ok(dummy::ConnectionHandler)
301 }
302
303 fn handle_established_outbound_connection(
304 &mut self,
305 _connection_id: ConnectionId,
306 _peer: PeerId,
307 _addr: &Multiaddr,
308 _role_override: Endpoint,
309 _port_use: PortUse,
310 ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
311 Ok(dummy::ConnectionHandler)
312 }
313
314 fn on_swarm_event(&mut self, event: FromSwarm) {
315 match event {
316 FromSwarm::NewListenAddr(NewListenAddr {
317 listener_id,
318 addr: multiaddr,
319 }) => {
320 let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
321 else {
322 tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
323 return;
324 };
325
326 if let Some((mapping, _state)) = self.mappings.iter().find(|(mapping, state)| {
327 matches!(state, MappingState::Active(_))
328 && mapping.internal_addr.port() == addr.port()
329 }) {
330 tracing::debug!(
331 multiaddress=%multiaddr,
332 mapped_multiaddress=%mapping.multiaddr,
333 "port from multiaddress is already mapped on the gateway"
334 );
335 return;
336 }
337
338 match &mut self.state {
339 GatewayState::Searching(_) => {
340 self.mappings.insert(
344 Mapping {
345 listener_id,
346 protocol,
347 internal_addr: addr,
348 multiaddr: multiaddr.clone(),
349 },
350 MappingState::Inactive,
351 );
352 }
353 GatewayState::Available(ref mut gateway) => {
354 let mapping = Mapping {
355 listener_id,
356 protocol,
357 internal_addr: addr,
358 multiaddr: multiaddr.clone(),
359 };
360
361 let duration = MAPPING_DURATION;
362 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
363 mapping: mapping.clone(),
364 duration,
365 }) {
366 tracing::debug!(
367 multiaddress=%mapping.multiaddr,
368 "could not request port mapping for multiaddress on the gateway: {}",
369 err
370 );
371 }
372
373 self.mappings
374 .insert(mapping, MappingState::Pending { retry_count: 0 });
375 }
376 GatewayState::GatewayNotFound => {
377 tracing::debug!(
378 multiaddres=%multiaddr,
379 "network gateway not found, UPnP port mapping of multiaddres discarded"
380 );
381 }
382 GatewayState::NonRoutableGateway(addr) => {
383 tracing::debug!(
384 multiaddress=%multiaddr,
385 network_gateway_ip=%addr,
386 "the network gateway is not exposed to the public network. /
387 UPnP port mapping of multiaddress discarded"
388 );
389 }
390 };
391 }
392 FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
393 listener_id,
394 addr: _addr,
395 }) => {
396 if let GatewayState::Available(ref mut gateway) = &mut self.state {
397 if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
398 if let Err(err) = gateway
399 .sender
400 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
401 {
402 tracing::debug!(
403 multiaddress=%mapping.multiaddr,
404 "could not request port removal for multiaddress on the gateway: {}",
405 err
406 );
407 }
408 self.mappings
409 .insert(mapping, MappingState::Pending { retry_count: 0 });
410 }
411 }
412 }
413 _ => {}
414 }
415 }
416
417 fn on_connection_handler_event(
418 &mut self,
419 _peer_id: PeerId,
420 _connection_id: ConnectionId,
421 event: libp2p_swarm::THandlerOutEvent<Self>,
422 ) {
423 libp2p_core::util::unreachable(event)
424 }
425
426 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
427 fn poll(
428 &mut self,
429 cx: &mut Context<'_>,
430 ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
431 if let Some(event) = self.pending_events.pop_front() {
433 return Poll::Ready(ToSwarm::GenerateEvent(event));
434 }
435
436 loop {
439 match self.state {
440 GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
441 Poll::Ready(Ok(result)) => match result {
442 Ok(gateway) => {
443 if !is_addr_global(gateway.external_addr) {
444 self.state =
445 GatewayState::NonRoutableGateway(gateway.external_addr);
446 tracing::debug!(
447 gateway_address=%gateway.external_addr,
448 "the gateway is not routable"
449 );
450 return Poll::Ready(ToSwarm::GenerateEvent(
451 Event::NonRoutableGateway,
452 ));
453 }
454 self.state = GatewayState::Available(gateway);
455 }
456 Err(err) => {
457 tracing::debug!("could not find gateway: {err}");
458 self.state = GatewayState::GatewayNotFound;
459 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
460 }
461 },
462 Poll::Ready(Err(err)) => {
463 tracing::debug!("sender has been dropped: {err}");
466 self.state = GatewayState::GatewayNotFound;
467 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
468 }
469 Poll::Pending => return Poll::Pending,
470 },
471 GatewayState::Available(ref mut gateway) => {
472 if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
474 match result {
475 GatewayEvent::Mapped(mapping) => {
476 let new_state = MappingState::Active(Delay::new(
477 Duration::from_secs(MAPPING_TIMEOUT),
478 ));
479
480 match self
481 .mappings
482 .insert(mapping.clone(), new_state)
483 .expect("mapping should exist")
484 {
485 MappingState::Pending { .. } => {
486 let external_multiaddr =
487 mapping.external_addr(gateway.external_addr);
488 self.pending_events.push_back(Event::NewExternalAddr {
489 local_addr: mapping.multiaddr,
490 external_addr: external_multiaddr.clone(),
491 });
492 tracing::debug!(
493 address=%mapping.internal_addr,
494 protocol=%mapping.protocol,
495 "successfully mapped UPnP for protocol"
496 );
497 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
498 external_multiaddr,
499 ));
500 }
501 MappingState::Active(_) => {
502 tracing::debug!(
503 address=%mapping.internal_addr,
504 protocol=%mapping.protocol,
505 "successfully renewed UPnP mapping for protocol"
506 );
507 }
508 _ => unreachable!(),
509 }
510 }
511 GatewayEvent::MapFailure(mapping, err) => {
512 let prev_state =
513 self.mappings.get(&mapping).expect("mapping should exist");
514
515 let (retry_count, was_active) = match prev_state {
516 MappingState::Active(_) => (0, true),
517 MappingState::Pending { retry_count } => (*retry_count, false),
518 MappingState::Failed { retry_count, .. } => {
519 (*retry_count, false)
520 }
521 _ => unreachable!(),
522 };
523
524 let new_retry_count = retry_count + 1;
525 let next_retry = if new_retry_count < MAX_RETRY_ATTEMPTS {
526 let delay_secs = std::cmp::min(
527 BASE_RETRY_DELAY_SECS
528 .saturating_mul(2_u64.pow(retry_count)),
529 MAX_RETRY_DELAY_SECS,
530 );
531 Some(Delay::new(Duration::from_secs(delay_secs)))
532 } else {
533 tracing::warn!(
534 address=%mapping.internal_addr,
535 protocol=%mapping.protocol,
536 "giving up on UPnP mapping after {new_retry_count} attempts"
537 );
538 None
539 };
540
541 self.mappings.insert(
542 mapping.clone(),
543 MappingState::Failed {
544 retry_count: new_retry_count,
545 next_retry,
546 },
547 );
548
549 if was_active {
550 tracing::debug!(
551 address=%mapping.internal_addr,
552 protocol=%mapping.protocol,
553 "failed to remap UPnP mapped for protocol: {err}"
554 );
555 let external_multiaddr =
556 mapping.external_addr(gateway.external_addr);
557 self.pending_events.push_back(Event::ExpiredExternalAddr {
558 local_addr: mapping.multiaddr,
559 external_addr: external_multiaddr.clone(),
560 });
561 return Poll::Ready(ToSwarm::ExternalAddrExpired(
562 external_multiaddr,
563 ));
564 } else {
565 tracing::debug!(
566 address=%mapping.internal_addr,
567 protocol=%mapping.protocol,
568 retry_count=%new_retry_count,
569 "failed to map UPnP mapped for protocol: {err}"
570 );
571 }
572 }
573 GatewayEvent::Removed(mapping) => {
574 tracing::debug!(
575 address=%mapping.internal_addr,
576 protocol=%mapping.protocol,
577 "successfully removed UPnP mapping for protocol"
578 );
579 self.mappings
580 .remove(&mapping)
581 .expect("mapping should exist");
582 }
583 GatewayEvent::RemovalFailure(mapping, err) => {
584 tracing::debug!(
585 address=%mapping.internal_addr,
586 protocol=%mapping.protocol,
587 "could not remove UPnP mapping for protocol: {err}"
588 );
589 if let Err(err) = gateway
590 .sender
591 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
592 {
593 tracing::debug!(
594 multiaddress=%mapping.multiaddr,
595 "could not request port removal for multiaddress on the gateway: {}",
596 err
597 );
598 }
599 }
600 }
601 }
602
603 self.mappings.renew(gateway, cx);
605 return Poll::Pending;
606 }
607 _ => return Poll::Pending,
608 }
609 }
610 }
611}
612
613fn multiaddr_to_socketaddr_protocol(
618 addr: Multiaddr,
619) -> Result<(SocketAddr, PortMappingProtocol), ()> {
620 let mut iter = addr.into_iter();
621 match iter.next() {
622 Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
624 Some(multiaddr::Protocol::Tcp(port)) => {
625 return Ok((
626 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
627 PortMappingProtocol::TCP,
628 ));
629 }
630 Some(multiaddr::Protocol::Udp(port)) => {
631 return Ok((
632 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
633 PortMappingProtocol::UDP,
634 ));
635 }
636 _ => {}
637 },
638 _ => {}
639 }
640 Err(())
641}