libp2p_upnp/
behaviour.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24    borrow::Borrow,
25    collections::{HashMap, VecDeque},
26    error::Error,
27    hash::{Hash, Hasher},
28    net::{self, IpAddr, SocketAddr, SocketAddrV4},
29    ops::{Deref, DerefMut},
30    pin::Pin,
31    task::{Context, Poll},
32    time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39    multiaddr,
40    transport::{ListenerId, PortUse},
41    Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44    derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45    NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50/// The duration in seconds of a port mapping on the gateway.
51const MAPPING_DURATION: u32 = 3600;
52
53/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
54const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56/// A [`Gateway`] Request.
57#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59    AddMapping { mapping: Mapping, duration: u32 },
60    RemoveMapping(Mapping),
61}
62
63/// A [`Gateway`] event.
64#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66    /// Port was successfully mapped.
67    Mapped(Mapping),
68    /// There was a failure mapping port.
69    MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70    /// Port was successfully removed.
71    Removed(Mapping),
72    /// There was a failure removing the mapped port.
73    RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76/// Mapping of a Protocol and Port on the gateway.
77#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79    pub(crate) listener_id: ListenerId,
80    pub(crate) protocol: PortMappingProtocol,
81    pub(crate) multiaddr: Multiaddr,
82    pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86    /// Given the input gateway address, calculate the
87    /// open external `Multiaddr`.
88    fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89        let addr = match gateway_addr {
90            net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91            net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92        };
93        self.multiaddr
94            .replace(0, |_| Some(addr))
95            .expect("multiaddr should be valid")
96    }
97}
98
99impl Hash for Mapping {
100    fn hash<H: Hasher>(&self, state: &mut H) {
101        self.listener_id.hash(state);
102    }
103}
104
105impl PartialEq for Mapping {
106    fn eq(&self, other: &Self) -> bool {
107        self.listener_id == other.listener_id
108    }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114    fn borrow(&self) -> &ListenerId {
115        &self.listener_id
116    }
117}
118
119/// Current state of a [`Mapping`].
120#[derive(Debug)]
121enum MappingState {
122    /// Port mapping is inactive, will be requested or re-requested on the next iteration.
123    Inactive,
124    /// Port mapping/removal has been requested on the gateway.
125    Pending,
126    /// Port mapping is active with the inner timeout.
127    Active(Delay),
128    /// Port mapping failed, we will try again.
129    Failed,
130}
131
132/// Current state of the UPnP [`Gateway`].
133enum GatewayState {
134    Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135    Available(Gateway),
136    GatewayNotFound,
137    NonRoutableGateway(IpAddr),
138}
139
140/// The event produced by `Behaviour`.
141#[derive(Debug)]
142pub enum Event {
143    /// The multiaddress is reachable externally.
144    NewExternalAddr(Multiaddr),
145    /// The renewal of the multiaddress on the gateway failed.
146    ExpiredExternalAddr(Multiaddr),
147    /// The IGD gateway was not found.
148    GatewayNotFound,
149    /// The Gateway is not exposed directly to the public network.
150    NonRoutableGateway,
151}
152
153/// A list of port mappings and its state.
154#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158    type Target = HashMap<Mapping, MappingState>;
159
160    fn deref(&self) -> &Self::Target {
161        &self.0
162    }
163}
164
165impl DerefMut for MappingList {
166    fn deref_mut(&mut self) -> &mut Self::Target {
167        &mut self.0
168    }
169}
170
171impl MappingList {
172    /// Queue for renewal the current mapped ports on the `Gateway` that are expiring,
173    /// and try to activate the inactive.
174    fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175        for (mapping, state) in self.iter_mut() {
176            match state {
177                MappingState::Inactive | MappingState::Failed => {
178                    let duration = MAPPING_DURATION;
179                    if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180                        mapping: mapping.clone(),
181                        duration,
182                    }) {
183                        tracing::debug!(
184                            multiaddress=%mapping.multiaddr,
185                            "could not request port mapping for multiaddress on the gateway: {}",
186                            err
187                        );
188                    }
189                    *state = MappingState::Pending;
190                }
191                MappingState::Active(timeout) => {
192                    if Pin::new(timeout).poll(cx).is_ready() {
193                        let duration = MAPPING_DURATION;
194                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195                            mapping: mapping.clone(),
196                            duration,
197                        }) {
198                            tracing::debug!(
199                                multiaddress=%mapping.multiaddr,
200                                "could not request port mapping for multiaddress on the gateway: {}",
201                                err
202                            );
203                        }
204                    }
205                }
206                MappingState::Pending => {}
207            }
208        }
209    }
210}
211
212/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port
213/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`].
214pub struct Behaviour {
215    /// UPnP interface state.
216    state: GatewayState,
217
218    /// List of port mappings.
219    mappings: MappingList,
220
221    /// Pending behaviour events to be emitted.
222    pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226    fn default() -> Self {
227        Self {
228            state: GatewayState::Searching(crate::tokio::search_gateway()),
229            mappings: Default::default(),
230            pending_events: VecDeque::new(),
231        }
232    }
233}
234
235impl NetworkBehaviour for Behaviour {
236    type ConnectionHandler = dummy::ConnectionHandler;
237
238    type ToSwarm = Event;
239
240    fn handle_established_inbound_connection(
241        &mut self,
242        _connection_id: ConnectionId,
243        _peer: PeerId,
244        _local_addr: &Multiaddr,
245        _remote_addr: &Multiaddr,
246    ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247        Ok(dummy::ConnectionHandler)
248    }
249
250    fn handle_established_outbound_connection(
251        &mut self,
252        _connection_id: ConnectionId,
253        _peer: PeerId,
254        _addr: &Multiaddr,
255        _role_override: Endpoint,
256        _port_use: PortUse,
257    ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258        Ok(dummy::ConnectionHandler)
259    }
260
261    fn on_swarm_event(&mut self, event: FromSwarm) {
262        match event {
263            FromSwarm::NewListenAddr(NewListenAddr {
264                listener_id,
265                addr: multiaddr,
266            }) => {
267                let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
268                else {
269                    tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
270                    return;
271                };
272
273                if let Some((mapping, _state)) = self
274                    .mappings
275                    .iter()
276                    .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
277                {
278                    tracing::debug!(
279                        multiaddress=%multiaddr,
280                        mapped_multiaddress=%mapping.multiaddr,
281                        "port from multiaddress is already being mapped"
282                    );
283                    return;
284                }
285
286                match &mut self.state {
287                    GatewayState::Searching(_) => {
288                        // As the gateway is not yet available we add the mapping with
289                        // `MappingState::Inactive` so that when and if it
290                        // becomes available we map it.
291                        self.mappings.insert(
292                            Mapping {
293                                listener_id,
294                                protocol,
295                                internal_addr: addr,
296                                multiaddr: multiaddr.clone(),
297                            },
298                            MappingState::Inactive,
299                        );
300                    }
301                    GatewayState::Available(ref mut gateway) => {
302                        let mapping = Mapping {
303                            listener_id,
304                            protocol,
305                            internal_addr: addr,
306                            multiaddr: multiaddr.clone(),
307                        };
308
309                        let duration = MAPPING_DURATION;
310                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
311                            mapping: mapping.clone(),
312                            duration,
313                        }) {
314                            tracing::debug!(
315                                multiaddress=%mapping.multiaddr,
316                                "could not request port mapping for multiaddress on the gateway: {}",
317                                err
318                            );
319                        }
320
321                        self.mappings.insert(mapping, MappingState::Pending);
322                    }
323                    GatewayState::GatewayNotFound => {
324                        tracing::debug!(
325                            multiaddres=%multiaddr,
326                            "network gateway not found, UPnP port mapping of multiaddres discarded"
327                        );
328                    }
329                    GatewayState::NonRoutableGateway(addr) => {
330                        tracing::debug!(
331                            multiaddress=%multiaddr,
332                            network_gateway_ip=%addr,
333                            "the network gateway is not exposed to the public network. /
334                             UPnP port mapping of multiaddress discarded"
335                        );
336                    }
337                };
338            }
339            FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
340                listener_id,
341                addr: _addr,
342            }) => {
343                if let GatewayState::Available(ref mut gateway) = &mut self.state {
344                    if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
345                        if let Err(err) = gateway
346                            .sender
347                            .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
348                        {
349                            tracing::debug!(
350                                multiaddress=%mapping.multiaddr,
351                                "could not request port removal for multiaddress on the gateway: {}",
352                                err
353                            );
354                        }
355                        self.mappings.insert(mapping, MappingState::Pending);
356                    }
357                }
358            }
359            _ => {}
360        }
361    }
362
363    fn on_connection_handler_event(
364        &mut self,
365        _peer_id: PeerId,
366        _connection_id: ConnectionId,
367        event: libp2p_swarm::THandlerOutEvent<Self>,
368    ) {
369        libp2p_core::util::unreachable(event)
370    }
371
372    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
373    fn poll(
374        &mut self,
375        cx: &mut Context<'_>,
376    ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
377        // If there are pending addresses to be emitted we emit them.
378        if let Some(event) = self.pending_events.pop_front() {
379            return Poll::Ready(ToSwarm::GenerateEvent(event));
380        }
381
382        // Loop through the gateway state so that if it changes from `Searching` to `Available`
383        // we poll the pending mapping requests.
384        loop {
385            match self.state {
386                GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
387                    Poll::Ready(Ok(result)) => match result {
388                        Ok(gateway) => {
389                            if !is_addr_global(gateway.external_addr) {
390                                self.state =
391                                    GatewayState::NonRoutableGateway(gateway.external_addr);
392                                tracing::debug!(
393                                    gateway_address=%gateway.external_addr,
394                                    "the gateway is not routable"
395                                );
396                                return Poll::Ready(ToSwarm::GenerateEvent(
397                                    Event::NonRoutableGateway,
398                                ));
399                            }
400                            self.state = GatewayState::Available(gateway);
401                        }
402                        Err(err) => {
403                            tracing::debug!("could not find gateway: {err}");
404                            self.state = GatewayState::GatewayNotFound;
405                            return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
406                        }
407                    },
408                    Poll::Ready(Err(err)) => {
409                        // The sender channel has been dropped. This typically indicates a shutdown
410                        // process is underway.
411                        tracing::debug!("sender has been dropped: {err}");
412                        self.state = GatewayState::GatewayNotFound;
413                        return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
414                    }
415                    Poll::Pending => return Poll::Pending,
416                },
417                GatewayState::Available(ref mut gateway) => {
418                    // Poll pending mapping requests.
419                    if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
420                        match result {
421                            GatewayEvent::Mapped(mapping) => {
422                                let new_state = MappingState::Active(Delay::new(
423                                    Duration::from_secs(MAPPING_TIMEOUT),
424                                ));
425
426                                match self
427                                    .mappings
428                                    .insert(mapping.clone(), new_state)
429                                    .expect("mapping should exist")
430                                {
431                                    MappingState::Pending => {
432                                        let external_multiaddr =
433                                            mapping.external_addr(gateway.external_addr);
434                                        self.pending_events.push_back(Event::NewExternalAddr(
435                                            external_multiaddr.clone(),
436                                        ));
437                                        tracing::debug!(
438                                            address=%mapping.internal_addr,
439                                            protocol=%mapping.protocol,
440                                            "successfully mapped UPnP for protocol"
441                                        );
442                                        return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
443                                            external_multiaddr,
444                                        ));
445                                    }
446                                    MappingState::Active(_) => {
447                                        tracing::debug!(
448                                            address=%mapping.internal_addr,
449                                            protocol=%mapping.protocol,
450                                            "successfully renewed UPnP mapping for protocol"
451                                        );
452                                    }
453                                    _ => unreachable!(),
454                                }
455                            }
456                            GatewayEvent::MapFailure(mapping, err) => {
457                                match self
458                                    .mappings
459                                    .insert(mapping.clone(), MappingState::Failed)
460                                    .expect("mapping should exist")
461                                {
462                                    MappingState::Active(_) => {
463                                        tracing::debug!(
464                                            address=%mapping.internal_addr,
465                                            protocol=%mapping.protocol,
466                                            "failed to remap UPnP mapped for protocol: {err}"
467                                        );
468                                        let external_multiaddr =
469                                            mapping.external_addr(gateway.external_addr);
470                                        self.pending_events.push_back(Event::ExpiredExternalAddr(
471                                            external_multiaddr.clone(),
472                                        ));
473                                        return Poll::Ready(ToSwarm::ExternalAddrExpired(
474                                            external_multiaddr,
475                                        ));
476                                    }
477                                    MappingState::Pending => {
478                                        tracing::debug!(
479                                            address=%mapping.internal_addr,
480                                            protocol=%mapping.protocol,
481                                            "failed to map UPnP mapped for protocol: {err}"
482                                        );
483                                    }
484                                    _ => {
485                                        unreachable!()
486                                    }
487                                }
488                            }
489                            GatewayEvent::Removed(mapping) => {
490                                tracing::debug!(
491                                    address=%mapping.internal_addr,
492                                    protocol=%mapping.protocol,
493                                    "successfully removed UPnP mapping for protocol"
494                                );
495                                self.mappings
496                                    .remove(&mapping)
497                                    .expect("mapping should exist");
498                            }
499                            GatewayEvent::RemovalFailure(mapping, err) => {
500                                tracing::debug!(
501                                    address=%mapping.internal_addr,
502                                    protocol=%mapping.protocol,
503                                    "could not remove UPnP mapping for protocol: {err}"
504                                );
505                                if let Err(err) = gateway
506                                    .sender
507                                    .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
508                                {
509                                    tracing::debug!(
510                                        multiaddress=%mapping.multiaddr,
511                                        "could not request port removal for multiaddress on the gateway: {}",
512                                        err
513                                    );
514                                }
515                            }
516                        }
517                    }
518
519                    // Renew expired and request inactive mappings.
520                    self.mappings.renew(gateway, cx);
521                    return Poll::Pending;
522                }
523                _ => return Poll::Pending,
524            }
525        }
526    }
527}
528
529/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`].
530///
531/// Fails if the given [`Multiaddr`] does not begin with an IP
532/// protocol encapsulating a TCP or UDP port.
533fn multiaddr_to_socketaddr_protocol(
534    addr: Multiaddr,
535) -> Result<(SocketAddr, PortMappingProtocol), ()> {
536    let mut iter = addr.into_iter();
537    match iter.next() {
538        // Idg only supports Ipv4.
539        Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
540            Some(multiaddr::Protocol::Tcp(port)) => {
541                return Ok((
542                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
543                    PortMappingProtocol::TCP,
544                ));
545            }
546            Some(multiaddr::Protocol::Udp(port)) => {
547                return Ok((
548                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
549                    PortMappingProtocol::UDP,
550                ));
551            }
552            _ => {}
553        },
554        _ => {}
555    }
556    Err(())
557}