libp2p_upnp/
behaviour.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24    borrow::Borrow,
25    collections::{HashMap, VecDeque},
26    error::Error,
27    hash::{Hash, Hasher},
28    net::{self, IpAddr, SocketAddr, SocketAddrV4},
29    ops::{Deref, DerefMut},
30    pin::Pin,
31    task::{Context, Poll},
32    time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39    multiaddr,
40    transport::{ListenerId, PortUse},
41    Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44    derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45    NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50/// The duration in seconds of a port mapping on the gateway.
51const MAPPING_DURATION: u32 = 3600;
52
53/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
54const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56/// A [`Gateway`] Request.
57#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59    AddMapping { mapping: Mapping, duration: u32 },
60    RemoveMapping(Mapping),
61}
62
63/// A [`Gateway`] event.
64#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66    /// Port was successfully mapped.
67    Mapped(Mapping),
68    /// There was a failure mapping port.
69    MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70    /// Port was successfully removed.
71    Removed(Mapping),
72    /// There was a failure removing the mapped port.
73    RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76/// Mapping of a Protocol and Port on the gateway.
77#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79    pub(crate) listener_id: ListenerId,
80    pub(crate) protocol: PortMappingProtocol,
81    pub(crate) multiaddr: Multiaddr,
82    pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86    /// Given the input gateway address, calculate the
87    /// open external `Multiaddr`.
88    fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89        let addr = match gateway_addr {
90            net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91            net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92        };
93        self.multiaddr
94            .replace(0, |_| Some(addr))
95            .expect("multiaddr should be valid")
96    }
97}
98
99impl Hash for Mapping {
100    fn hash<H: Hasher>(&self, state: &mut H) {
101        self.listener_id.hash(state);
102    }
103}
104
105impl PartialEq for Mapping {
106    fn eq(&self, other: &Self) -> bool {
107        self.listener_id == other.listener_id
108    }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114    fn borrow(&self) -> &ListenerId {
115        &self.listener_id
116    }
117}
118
119/// Current state of a [`Mapping`].
120#[derive(Debug)]
121enum MappingState {
122    /// Port mapping is inactive, will be requested or re-requested on the next iteration.
123    Inactive,
124    /// Port mapping/removal has been requested on the gateway.
125    Pending,
126    /// Port mapping is active with the inner timeout.
127    Active(Delay),
128    /// Port mapping failed, we will try again.
129    Failed,
130}
131
132/// Current state of the UPnP [`Gateway`].
133enum GatewayState {
134    Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135    Available(Gateway),
136    GatewayNotFound,
137    NonRoutableGateway(IpAddr),
138}
139
140/// The event produced by `Behaviour`.
141#[derive(Debug)]
142pub enum Event {
143    /// The multiaddress is reachable externally.
144    NewExternalAddr(Multiaddr),
145    /// The renewal of the multiaddress on the gateway failed.
146    ExpiredExternalAddr(Multiaddr),
147    /// The IGD gateway was not found.
148    GatewayNotFound,
149    /// The Gateway is not exposed directly to the public network.
150    NonRoutableGateway,
151}
152
153/// A list of port mappings and its state.
154#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158    type Target = HashMap<Mapping, MappingState>;
159
160    fn deref(&self) -> &Self::Target {
161        &self.0
162    }
163}
164
165impl DerefMut for MappingList {
166    fn deref_mut(&mut self) -> &mut Self::Target {
167        &mut self.0
168    }
169}
170
171impl MappingList {
172    /// Queue for renewal the current mapped ports on the `Gateway` that are expiring,
173    /// and try to activate the inactive.
174    fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175        for (mapping, state) in self.iter_mut() {
176            match state {
177                MappingState::Inactive | MappingState::Failed => {
178                    let duration = MAPPING_DURATION;
179                    if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180                        mapping: mapping.clone(),
181                        duration,
182                    }) {
183                        tracing::debug!(
184                            multiaddress=%mapping.multiaddr,
185                            "could not request port mapping for multiaddress on the gateway: {}",
186                            err
187                        );
188                    }
189                    *state = MappingState::Pending;
190                }
191                MappingState::Active(timeout) => {
192                    if Pin::new(timeout).poll(cx).is_ready() {
193                        let duration = MAPPING_DURATION;
194                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195                            mapping: mapping.clone(),
196                            duration,
197                        }) {
198                            tracing::debug!(
199                                multiaddress=%mapping.multiaddr,
200                                "could not request port mapping for multiaddress on the gateway: {}",
201                                err
202                            );
203                        }
204                    }
205                }
206                MappingState::Pending => {}
207            }
208        }
209    }
210}
211
212/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port
213/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`].
214pub struct Behaviour {
215    /// UPnP interface state.
216    state: GatewayState,
217
218    /// List of port mappings.
219    mappings: MappingList,
220
221    /// Pending behaviour events to be emitted.
222    pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226    fn default() -> Self {
227        Self {
228            state: GatewayState::Searching(crate::tokio::search_gateway()),
229            mappings: Default::default(),
230            pending_events: VecDeque::new(),
231        }
232    }
233}
234
235impl NetworkBehaviour for Behaviour {
236    type ConnectionHandler = dummy::ConnectionHandler;
237
238    type ToSwarm = Event;
239
240    fn handle_established_inbound_connection(
241        &mut self,
242        _connection_id: ConnectionId,
243        _peer: PeerId,
244        _local_addr: &Multiaddr,
245        _remote_addr: &Multiaddr,
246    ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247        Ok(dummy::ConnectionHandler)
248    }
249
250    fn handle_established_outbound_connection(
251        &mut self,
252        _connection_id: ConnectionId,
253        _peer: PeerId,
254        _addr: &Multiaddr,
255        _role_override: Endpoint,
256        _port_use: PortUse,
257    ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258        Ok(dummy::ConnectionHandler)
259    }
260
261    fn on_swarm_event(&mut self, event: FromSwarm) {
262        match event {
263            FromSwarm::NewListenAddr(NewListenAddr {
264                listener_id,
265                addr: multiaddr,
266            }) => {
267                let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
268                else {
269                    tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
270                    return;
271                };
272
273                if let Some((mapping, _state)) = self
274                    .mappings
275                    .iter()
276                    .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
277                {
278                    tracing::debug!(
279                        multiaddress=%multiaddr,
280                        mapped_multiaddress=%mapping.multiaddr,
281                        "port from multiaddress is already being mapped"
282                    );
283                    return;
284                }
285
286                match &mut self.state {
287                    GatewayState::Searching(_) => {
288                        // As the gateway is not yet available we add the mapping with
289                        // `MappingState::Inactive` so that when and if it
290                        // becomes available we map it.
291                        self.mappings.insert(
292                            Mapping {
293                                listener_id,
294                                protocol,
295                                internal_addr: addr,
296                                multiaddr: multiaddr.clone(),
297                            },
298                            MappingState::Inactive,
299                        );
300                    }
301                    GatewayState::Available(ref mut gateway) => {
302                        let mapping = Mapping {
303                            listener_id,
304                            protocol,
305                            internal_addr: addr,
306                            multiaddr: multiaddr.clone(),
307                        };
308
309                        let duration = MAPPING_DURATION;
310                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
311                            mapping: mapping.clone(),
312                            duration,
313                        }) {
314                            tracing::debug!(
315                                multiaddress=%mapping.multiaddr,
316                                "could not request port mapping for multiaddress on the gateway: {}",
317                                err
318                            );
319                        }
320
321                        self.mappings.insert(mapping, MappingState::Pending);
322                    }
323                    GatewayState::GatewayNotFound => {
324                        tracing::debug!(
325                            multiaddres=%multiaddr,
326                            "network gateway not found, UPnP port mapping of multiaddres discarded"
327                        );
328                    }
329                    GatewayState::NonRoutableGateway(addr) => {
330                        tracing::debug!(
331                            multiaddress=%multiaddr,
332                            network_gateway_ip=%addr,
333                            "the network gateway is not exposed to the public network. /
334                             UPnP port mapping of multiaddress discarded"
335                        );
336                    }
337                };
338            }
339            FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
340                listener_id,
341                addr: _addr,
342            }) => {
343                if let GatewayState::Available(ref mut gateway) = &mut self.state {
344                    if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
345                        if let Err(err) = gateway
346                            .sender
347                            .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
348                        {
349                            tracing::debug!(
350                                multiaddress=%mapping.multiaddr,
351                                "could not request port removal for multiaddress on the gateway: {}",
352                                err
353                            );
354                        }
355                        self.mappings.insert(mapping, MappingState::Pending);
356                    }
357                }
358            }
359            _ => {}
360        }
361    }
362
363    fn on_connection_handler_event(
364        &mut self,
365        _peer_id: PeerId,
366        _connection_id: ConnectionId,
367        event: libp2p_swarm::THandlerOutEvent<Self>,
368    ) {
369        libp2p_core::util::unreachable(event)
370    }
371
372    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
373    fn poll(
374        &mut self,
375        cx: &mut Context<'_>,
376    ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
377        // If there are pending addresses to be emitted we emit them.
378        if let Some(event) = self.pending_events.pop_front() {
379            return Poll::Ready(ToSwarm::GenerateEvent(event));
380        }
381
382        // Loop through the gateway state so that if it changes from `Searching` to `Available`
383        // we poll the pending mapping requests.
384        loop {
385            match self.state {
386                GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
387                    Poll::Ready(result) => {
388                        match result.expect("sender shouldn't have been dropped") {
389                            Ok(gateway) => {
390                                if !is_addr_global(gateway.external_addr) {
391                                    self.state =
392                                        GatewayState::NonRoutableGateway(gateway.external_addr);
393                                    tracing::debug!(
394                                        gateway_address=%gateway.external_addr,
395                                        "the gateway is not routable"
396                                    );
397                                    return Poll::Ready(ToSwarm::GenerateEvent(
398                                        Event::NonRoutableGateway,
399                                    ));
400                                }
401                                self.state = GatewayState::Available(gateway);
402                            }
403                            Err(err) => {
404                                tracing::debug!("could not find gateway: {err}");
405                                self.state = GatewayState::GatewayNotFound;
406                                return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
407                            }
408                        }
409                    }
410                    Poll::Pending => return Poll::Pending,
411                },
412                GatewayState::Available(ref mut gateway) => {
413                    // Poll pending mapping requests.
414                    if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
415                        match result {
416                            GatewayEvent::Mapped(mapping) => {
417                                let new_state = MappingState::Active(Delay::new(
418                                    Duration::from_secs(MAPPING_TIMEOUT),
419                                ));
420
421                                match self
422                                    .mappings
423                                    .insert(mapping.clone(), new_state)
424                                    .expect("mapping should exist")
425                                {
426                                    MappingState::Pending => {
427                                        let external_multiaddr =
428                                            mapping.external_addr(gateway.external_addr);
429                                        self.pending_events.push_back(Event::NewExternalAddr(
430                                            external_multiaddr.clone(),
431                                        ));
432                                        tracing::debug!(
433                                            address=%mapping.internal_addr,
434                                            protocol=%mapping.protocol,
435                                            "successfully mapped UPnP for protocol"
436                                        );
437                                        return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
438                                            external_multiaddr,
439                                        ));
440                                    }
441                                    MappingState::Active(_) => {
442                                        tracing::debug!(
443                                            address=%mapping.internal_addr,
444                                            protocol=%mapping.protocol,
445                                            "successfully renewed UPnP mapping for protocol"
446                                        );
447                                    }
448                                    _ => unreachable!(),
449                                }
450                            }
451                            GatewayEvent::MapFailure(mapping, err) => {
452                                match self
453                                    .mappings
454                                    .insert(mapping.clone(), MappingState::Failed)
455                                    .expect("mapping should exist")
456                                {
457                                    MappingState::Active(_) => {
458                                        tracing::debug!(
459                                            address=%mapping.internal_addr,
460                                            protocol=%mapping.protocol,
461                                            "failed to remap UPnP mapped for protocol: {err}"
462                                        );
463                                        let external_multiaddr =
464                                            mapping.external_addr(gateway.external_addr);
465                                        self.pending_events.push_back(Event::ExpiredExternalAddr(
466                                            external_multiaddr.clone(),
467                                        ));
468                                        return Poll::Ready(ToSwarm::ExternalAddrExpired(
469                                            external_multiaddr,
470                                        ));
471                                    }
472                                    MappingState::Pending => {
473                                        tracing::debug!(
474                                            address=%mapping.internal_addr,
475                                            protocol=%mapping.protocol,
476                                            "failed to map UPnP mapped for protocol: {err}"
477                                        );
478                                    }
479                                    _ => {
480                                        unreachable!()
481                                    }
482                                }
483                            }
484                            GatewayEvent::Removed(mapping) => {
485                                tracing::debug!(
486                                    address=%mapping.internal_addr,
487                                    protocol=%mapping.protocol,
488                                    "successfully removed UPnP mapping for protocol"
489                                );
490                                self.mappings
491                                    .remove(&mapping)
492                                    .expect("mapping should exist");
493                            }
494                            GatewayEvent::RemovalFailure(mapping, err) => {
495                                tracing::debug!(
496                                    address=%mapping.internal_addr,
497                                    protocol=%mapping.protocol,
498                                    "could not remove UPnP mapping for protocol: {err}"
499                                );
500                                if let Err(err) = gateway
501                                    .sender
502                                    .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
503                                {
504                                    tracing::debug!(
505                                        multiaddress=%mapping.multiaddr,
506                                        "could not request port removal for multiaddress on the gateway: {}",
507                                        err
508                                    );
509                                }
510                            }
511                        }
512                    }
513
514                    // Renew expired and request inactive mappings.
515                    self.mappings.renew(gateway, cx);
516                    return Poll::Pending;
517                }
518                _ => return Poll::Pending,
519            }
520        }
521    }
522}
523
524/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`].
525///
526/// Fails if the given [`Multiaddr`] does not begin with an IP
527/// protocol encapsulating a TCP or UDP port.
528fn multiaddr_to_socketaddr_protocol(
529    addr: Multiaddr,
530) -> Result<(SocketAddr, PortMappingProtocol), ()> {
531    let mut iter = addr.into_iter();
532    match iter.next() {
533        // Idg only supports Ipv4.
534        Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
535            Some(multiaddr::Protocol::Tcp(port)) => {
536                return Ok((
537                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
538                    PortMappingProtocol::TCP,
539                ));
540            }
541            Some(multiaddr::Protocol::Udp(port)) => {
542                return Ok((
543                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
544                    PortMappingProtocol::UDP,
545                ));
546            }
547            _ => {}
548        },
549        _ => {}
550    }
551    Err(())
552}