libp2p_upnp/
behaviour.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24    borrow::Borrow,
25    collections::{HashMap, VecDeque},
26    error::Error,
27    hash::{Hash, Hasher},
28    net::{self, IpAddr, SocketAddr, SocketAddrV4},
29    ops::{Deref, DerefMut},
30    pin::Pin,
31    task::{Context, Poll},
32    time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39    multiaddr,
40    transport::{ListenerId, PortUse},
41    Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44    derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45    NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50/// The duration in seconds of a port mapping on the gateway.
51const MAPPING_DURATION: u32 = 3600;
52
53/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
54const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56/// Maximum number of retry attempts for failed mappings.
57const MAX_RETRY_ATTEMPTS: u32 = 5;
58
59/// Base delay in seconds for exponential backoff (will be multiplied by 2^retry_count).
60const BASE_RETRY_DELAY_SECS: u64 = 30;
61
62/// Maximum delay in seconds between retry attempts.
63const MAX_RETRY_DELAY_SECS: u64 = 1800;
64
65/// A [`Gateway`] Request.
66#[derive(Debug)]
67pub(crate) enum GatewayRequest {
68    AddMapping { mapping: Mapping, duration: u32 },
69    RemoveMapping(Mapping),
70}
71
72/// A [`Gateway`] event.
73#[derive(Debug)]
74pub(crate) enum GatewayEvent {
75    /// Port was successfully mapped.
76    Mapped(Mapping),
77    /// There was a failure mapping port.
78    MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
79    /// Port was successfully removed.
80    Removed(Mapping),
81    /// There was a failure removing the mapped port.
82    RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
83}
84
85/// Mapping of a Protocol and Port on the gateway.
86#[derive(Debug, Clone)]
87pub(crate) struct Mapping {
88    pub(crate) listener_id: ListenerId,
89    pub(crate) protocol: PortMappingProtocol,
90    pub(crate) multiaddr: Multiaddr,
91    pub(crate) internal_addr: SocketAddr,
92}
93
94impl Mapping {
95    /// Given the input gateway address, calculate the
96    /// open external `Multiaddr`.
97    fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
98        let addr = match gateway_addr {
99            net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
100            net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
101        };
102        self.multiaddr
103            .replace(0, |_| Some(addr))
104            .expect("multiaddr should be valid")
105    }
106}
107
108impl Hash for Mapping {
109    fn hash<H: Hasher>(&self, state: &mut H) {
110        self.listener_id.hash(state);
111    }
112}
113
114impl PartialEq for Mapping {
115    fn eq(&self, other: &Self) -> bool {
116        self.listener_id == other.listener_id
117    }
118}
119
120impl Eq for Mapping {}
121
122impl Borrow<ListenerId> for Mapping {
123    fn borrow(&self) -> &ListenerId {
124        &self.listener_id
125    }
126}
127
128/// Current state of a [`Mapping`].
129#[derive(Debug)]
130enum MappingState {
131    /// Port mapping is inactive, will be requested or re-requested on the next iteration.
132    Inactive,
133    /// Port mapping/removal has been requested on the gateway.
134    Pending {
135        /// Number of times we've tried to map this port.
136        retry_count: u32,
137    },
138    /// Port mapping is active with the inner timeout.
139    Active(Delay),
140    /// Port mapping failed with retry information.
141    Failed {
142        /// Number of times we've tried to map this port.
143        retry_count: u32,
144        /// When we should try again (None means no more retries).
145        next_retry: Option<Delay>,
146    },
147}
148
149/// Current state of the UPnP [`Gateway`].
150enum GatewayState {
151    Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
152    Available(Gateway),
153    GatewayNotFound,
154    NonRoutableGateway(IpAddr),
155}
156
157/// The event produced by `Behaviour`.
158#[derive(Debug)]
159pub enum Event {
160    /// The multiaddress is reachable externally.
161    NewExternalAddr {
162        /// The local listen address that was mapped.
163        local_addr: Multiaddr,
164        /// The external address that is reachable.
165        external_addr: Multiaddr,
166    },
167    /// The renewal of the multiaddress on the gateway failed.
168    ExpiredExternalAddr {
169        /// The local listen address that failed to renew.
170        local_addr: Multiaddr,
171        /// The external address that is no longer reachable.
172        external_addr: Multiaddr,
173    },
174    /// The IGD gateway was not found.
175    GatewayNotFound,
176    /// The Gateway is not exposed directly to the public network.
177    NonRoutableGateway,
178}
179
180/// A list of port mappings and its state.
181#[derive(Debug, Default)]
182struct MappingList(HashMap<Mapping, MappingState>);
183
184impl Deref for MappingList {
185    type Target = HashMap<Mapping, MappingState>;
186
187    fn deref(&self) -> &Self::Target {
188        &self.0
189    }
190}
191
192impl DerefMut for MappingList {
193    fn deref_mut(&mut self) -> &mut Self::Target {
194        &mut self.0
195    }
196}
197
198impl MappingList {
199    /// Queue for renewal the current mapped ports on the `Gateway` that are expiring,
200    /// and try to activate the inactive.
201    fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
202        for (mapping, state) in self.iter_mut() {
203            match state {
204                MappingState::Inactive => {
205                    let duration = MAPPING_DURATION;
206                    if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
207                        mapping: mapping.clone(),
208                        duration,
209                    }) {
210                        tracing::debug!(
211                            multiaddress=%mapping.multiaddr,
212                            "could not request port mapping for multiaddress on the gateway: {}",
213                            err
214                        );
215                    } else {
216                        *state = MappingState::Pending { retry_count: 0 };
217                    }
218                }
219                MappingState::Failed {
220                    retry_count,
221                    next_retry,
222                } => {
223                    if let Some(delay) = next_retry {
224                        if Pin::new(delay).poll(cx).is_ready() {
225                            let duration = MAPPING_DURATION;
226                            if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
227                                mapping: mapping.clone(),
228                                duration,
229                            }) {
230                                tracing::debug!(
231                                    multiaddress=%mapping.multiaddr,
232                                    retry_count=%retry_count,
233                                    "could not retry port mapping for multiaddress on the gateway: {}",
234                                    err
235                                );
236                            } else {
237                                *state = MappingState::Pending {
238                                    retry_count: *retry_count,
239                                };
240                            }
241                        }
242                    }
243                }
244                MappingState::Active(timeout) => {
245                    if Pin::new(timeout).poll(cx).is_ready() {
246                        let duration = MAPPING_DURATION;
247                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
248                            mapping: mapping.clone(),
249                            duration,
250                        }) {
251                            tracing::debug!(
252                                multiaddress=%mapping.multiaddr,
253                                "could not request port mapping for multiaddress on the gateway: {}",
254                                err
255                            );
256                        }
257                    }
258                }
259                MappingState::Pending { .. } => {}
260            }
261        }
262    }
263}
264
265/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port
266/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`].
267pub struct Behaviour {
268    /// UPnP interface state.
269    state: GatewayState,
270
271    /// List of port mappings.
272    mappings: MappingList,
273
274    /// Pending behaviour events to be emitted.
275    pending_events: VecDeque<Event>,
276}
277
278impl Default for Behaviour {
279    fn default() -> Self {
280        Self {
281            state: GatewayState::Searching(crate::tokio::search_gateway()),
282            mappings: Default::default(),
283            pending_events: VecDeque::new(),
284        }
285    }
286}
287
288impl NetworkBehaviour for Behaviour {
289    type ConnectionHandler = dummy::ConnectionHandler;
290
291    type ToSwarm = Event;
292
293    fn handle_established_inbound_connection(
294        &mut self,
295        _connection_id: ConnectionId,
296        _peer: PeerId,
297        _local_addr: &Multiaddr,
298        _remote_addr: &Multiaddr,
299    ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
300        Ok(dummy::ConnectionHandler)
301    }
302
303    fn handle_established_outbound_connection(
304        &mut self,
305        _connection_id: ConnectionId,
306        _peer: PeerId,
307        _addr: &Multiaddr,
308        _role_override: Endpoint,
309        _port_use: PortUse,
310    ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
311        Ok(dummy::ConnectionHandler)
312    }
313
314    fn on_swarm_event(&mut self, event: FromSwarm) {
315        match event {
316            FromSwarm::NewListenAddr(NewListenAddr {
317                listener_id,
318                addr: multiaddr,
319            }) => {
320                let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
321                else {
322                    tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
323                    return;
324                };
325
326                if let Some((mapping, _state)) = self.mappings.iter().find(|(mapping, state)| {
327                    matches!(state, MappingState::Active(_))
328                        && mapping.internal_addr.port() == addr.port()
329                }) {
330                    tracing::debug!(
331                        multiaddress=%multiaddr,
332                        mapped_multiaddress=%mapping.multiaddr,
333                        "port from multiaddress is already mapped on the gateway"
334                    );
335                    return;
336                }
337
338                match &mut self.state {
339                    GatewayState::Searching(_) => {
340                        // As the gateway is not yet available we add the mapping with
341                        // `MappingState::Inactive` so that when and if it
342                        // becomes available we map it.
343                        self.mappings.insert(
344                            Mapping {
345                                listener_id,
346                                protocol,
347                                internal_addr: addr,
348                                multiaddr: multiaddr.clone(),
349                            },
350                            MappingState::Inactive,
351                        );
352                    }
353                    GatewayState::Available(ref mut gateway) => {
354                        let mapping = Mapping {
355                            listener_id,
356                            protocol,
357                            internal_addr: addr,
358                            multiaddr: multiaddr.clone(),
359                        };
360
361                        let duration = MAPPING_DURATION;
362                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
363                            mapping: mapping.clone(),
364                            duration,
365                        }) {
366                            tracing::debug!(
367                                multiaddress=%mapping.multiaddr,
368                                "could not request port mapping for multiaddress on the gateway: {}",
369                                err
370                            );
371                        }
372
373                        self.mappings
374                            .insert(mapping, MappingState::Pending { retry_count: 0 });
375                    }
376                    GatewayState::GatewayNotFound => {
377                        tracing::debug!(
378                            multiaddres=%multiaddr,
379                            "network gateway not found, UPnP port mapping of multiaddres discarded"
380                        );
381                    }
382                    GatewayState::NonRoutableGateway(addr) => {
383                        tracing::debug!(
384                            multiaddress=%multiaddr,
385                            network_gateway_ip=%addr,
386                            "the network gateway is not exposed to the public network. /
387                             UPnP port mapping of multiaddress discarded"
388                        );
389                    }
390                };
391            }
392            FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
393                listener_id,
394                addr: _addr,
395            }) => {
396                if let GatewayState::Available(ref mut gateway) = &mut self.state {
397                    if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
398                        if let Err(err) = gateway
399                            .sender
400                            .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
401                        {
402                            tracing::debug!(
403                                multiaddress=%mapping.multiaddr,
404                                "could not request port removal for multiaddress on the gateway: {}",
405                                err
406                            );
407                        }
408                        self.mappings
409                            .insert(mapping, MappingState::Pending { retry_count: 0 });
410                    }
411                }
412            }
413            _ => {}
414        }
415    }
416
417    fn on_connection_handler_event(
418        &mut self,
419        _peer_id: PeerId,
420        _connection_id: ConnectionId,
421        event: libp2p_swarm::THandlerOutEvent<Self>,
422    ) {
423        libp2p_core::util::unreachable(event)
424    }
425
426    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
427    fn poll(
428        &mut self,
429        cx: &mut Context<'_>,
430    ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
431        // If there are pending addresses to be emitted we emit them.
432        if let Some(event) = self.pending_events.pop_front() {
433            return Poll::Ready(ToSwarm::GenerateEvent(event));
434        }
435
436        // Loop through the gateway state so that if it changes from `Searching` to `Available`
437        // we poll the pending mapping requests.
438        loop {
439            match self.state {
440                GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
441                    Poll::Ready(Ok(result)) => match result {
442                        Ok(gateway) => {
443                            if !is_addr_global(gateway.external_addr) {
444                                self.state =
445                                    GatewayState::NonRoutableGateway(gateway.external_addr);
446                                tracing::debug!(
447                                    gateway_address=%gateway.external_addr,
448                                    "the gateway is not routable"
449                                );
450                                return Poll::Ready(ToSwarm::GenerateEvent(
451                                    Event::NonRoutableGateway,
452                                ));
453                            }
454                            self.state = GatewayState::Available(gateway);
455                        }
456                        Err(err) => {
457                            tracing::debug!("could not find gateway: {err}");
458                            self.state = GatewayState::GatewayNotFound;
459                            return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
460                        }
461                    },
462                    Poll::Ready(Err(err)) => {
463                        // The sender channel has been dropped. This typically indicates a shutdown
464                        // process is underway.
465                        tracing::debug!("sender has been dropped: {err}");
466                        self.state = GatewayState::GatewayNotFound;
467                        return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
468                    }
469                    Poll::Pending => return Poll::Pending,
470                },
471                GatewayState::Available(ref mut gateway) => {
472                    // Poll pending mapping requests.
473                    if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
474                        match result {
475                            GatewayEvent::Mapped(mapping) => {
476                                let new_state = MappingState::Active(Delay::new(
477                                    Duration::from_secs(MAPPING_TIMEOUT),
478                                ));
479
480                                match self
481                                    .mappings
482                                    .insert(mapping.clone(), new_state)
483                                    .expect("mapping should exist")
484                                {
485                                    MappingState::Pending { .. } => {
486                                        let external_multiaddr =
487                                            mapping.external_addr(gateway.external_addr);
488                                        self.pending_events.push_back(Event::NewExternalAddr {
489                                            local_addr: mapping.multiaddr,
490                                            external_addr: external_multiaddr.clone(),
491                                        });
492                                        tracing::debug!(
493                                            address=%mapping.internal_addr,
494                                            protocol=%mapping.protocol,
495                                            "successfully mapped UPnP for protocol"
496                                        );
497                                        return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
498                                            external_multiaddr,
499                                        ));
500                                    }
501                                    MappingState::Active(_) => {
502                                        tracing::debug!(
503                                            address=%mapping.internal_addr,
504                                            protocol=%mapping.protocol,
505                                            "successfully renewed UPnP mapping for protocol"
506                                        );
507                                    }
508                                    _ => unreachable!(),
509                                }
510                            }
511                            GatewayEvent::MapFailure(mapping, err) => {
512                                let prev_state =
513                                    self.mappings.get(&mapping).expect("mapping should exist");
514
515                                let (retry_count, was_active) = match prev_state {
516                                    MappingState::Active(_) => (0, true),
517                                    MappingState::Pending { retry_count } => (*retry_count, false),
518                                    MappingState::Failed { retry_count, .. } => {
519                                        (*retry_count, false)
520                                    }
521                                    _ => unreachable!(),
522                                };
523
524                                let new_retry_count = retry_count + 1;
525                                let next_retry = if new_retry_count < MAX_RETRY_ATTEMPTS {
526                                    let delay_secs = std::cmp::min(
527                                        BASE_RETRY_DELAY_SECS
528                                            .saturating_mul(2_u64.pow(retry_count)),
529                                        MAX_RETRY_DELAY_SECS,
530                                    );
531                                    Some(Delay::new(Duration::from_secs(delay_secs)))
532                                } else {
533                                    tracing::warn!(
534                                        address=%mapping.internal_addr,
535                                        protocol=%mapping.protocol,
536                                        "giving up on UPnP mapping after {new_retry_count} attempts"
537                                    );
538                                    None
539                                };
540
541                                self.mappings.insert(
542                                    mapping.clone(),
543                                    MappingState::Failed {
544                                        retry_count: new_retry_count,
545                                        next_retry,
546                                    },
547                                );
548
549                                if was_active {
550                                    tracing::debug!(
551                                        address=%mapping.internal_addr,
552                                        protocol=%mapping.protocol,
553                                        "failed to remap UPnP mapped for protocol: {err}"
554                                    );
555                                    let external_multiaddr =
556                                        mapping.external_addr(gateway.external_addr);
557                                    self.pending_events.push_back(Event::ExpiredExternalAddr {
558                                        local_addr: mapping.multiaddr,
559                                        external_addr: external_multiaddr.clone(),
560                                    });
561                                    return Poll::Ready(ToSwarm::ExternalAddrExpired(
562                                        external_multiaddr,
563                                    ));
564                                } else {
565                                    tracing::debug!(
566                                        address=%mapping.internal_addr,
567                                        protocol=%mapping.protocol,
568                                        retry_count=%new_retry_count,
569                                        "failed to map UPnP mapped for protocol: {err}"
570                                    );
571                                }
572                            }
573                            GatewayEvent::Removed(mapping) => {
574                                tracing::debug!(
575                                    address=%mapping.internal_addr,
576                                    protocol=%mapping.protocol,
577                                    "successfully removed UPnP mapping for protocol"
578                                );
579                                self.mappings
580                                    .remove(&mapping)
581                                    .expect("mapping should exist");
582                            }
583                            GatewayEvent::RemovalFailure(mapping, err) => {
584                                tracing::debug!(
585                                    address=%mapping.internal_addr,
586                                    protocol=%mapping.protocol,
587                                    "could not remove UPnP mapping for protocol: {err}"
588                                );
589                                if let Err(err) = gateway
590                                    .sender
591                                    .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
592                                {
593                                    tracing::debug!(
594                                        multiaddress=%mapping.multiaddr,
595                                        "could not request port removal for multiaddress on the gateway: {}",
596                                        err
597                                    );
598                                }
599                            }
600                        }
601                    }
602
603                    // Renew expired and request inactive mappings.
604                    self.mappings.renew(gateway, cx);
605                    return Poll::Pending;
606                }
607                _ => return Poll::Pending,
608            }
609        }
610    }
611}
612
613/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`].
614///
615/// Fails if the given [`Multiaddr`] does not begin with an IP
616/// protocol encapsulating a TCP or UDP port.
617fn multiaddr_to_socketaddr_protocol(
618    addr: Multiaddr,
619) -> Result<(SocketAddr, PortMappingProtocol), ()> {
620    let mut iter = addr.into_iter();
621    match iter.next() {
622        // Idg only supports Ipv4.
623        Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
624            Some(multiaddr::Protocol::Tcp(port)) => {
625                return Ok((
626                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
627                    PortMappingProtocol::TCP,
628                ));
629            }
630            Some(multiaddr::Protocol::Udp(port)) => {
631                return Ok((
632                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
633                    PortMappingProtocol::UDP,
634                ));
635            }
636            _ => {}
637        },
638        _ => {}
639    }
640    Err(())
641}