1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24 borrow::Borrow,
25 collections::{HashMap, VecDeque},
26 error::Error,
27 hash::{Hash, Hasher},
28 net::{self, IpAddr, SocketAddr, SocketAddrV4},
29 ops::{Deref, DerefMut},
30 pin::Pin,
31 task::{Context, Poll},
32 time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39 multiaddr,
40 transport::{ListenerId, PortUse},
41 Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44 derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45 NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50const MAPPING_DURATION: u32 = 3600;
52
53const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59 AddMapping { mapping: Mapping, duration: u32 },
60 RemoveMapping(Mapping),
61}
62
63#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66 Mapped(Mapping),
68 MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70 Removed(Mapping),
72 RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79 pub(crate) listener_id: ListenerId,
80 pub(crate) protocol: PortMappingProtocol,
81 pub(crate) multiaddr: Multiaddr,
82 pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86 fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89 let addr = match gateway_addr {
90 net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91 net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92 };
93 self.multiaddr
94 .replace(0, |_| Some(addr))
95 .expect("multiaddr should be valid")
96 }
97}
98
99impl Hash for Mapping {
100 fn hash<H: Hasher>(&self, state: &mut H) {
101 self.listener_id.hash(state);
102 }
103}
104
105impl PartialEq for Mapping {
106 fn eq(&self, other: &Self) -> bool {
107 self.listener_id == other.listener_id
108 }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114 fn borrow(&self) -> &ListenerId {
115 &self.listener_id
116 }
117}
118
119#[derive(Debug)]
121enum MappingState {
122 Inactive,
124 Pending,
126 Active(Delay),
128 Failed,
130}
131
132enum GatewayState {
134 Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135 Available(Gateway),
136 GatewayNotFound,
137 NonRoutableGateway(IpAddr),
138}
139
140#[derive(Debug)]
142pub enum Event {
143 NewExternalAddr(Multiaddr),
145 ExpiredExternalAddr(Multiaddr),
147 GatewayNotFound,
149 NonRoutableGateway,
151}
152
153#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158 type Target = HashMap<Mapping, MappingState>;
159
160 fn deref(&self) -> &Self::Target {
161 &self.0
162 }
163}
164
165impl DerefMut for MappingList {
166 fn deref_mut(&mut self) -> &mut Self::Target {
167 &mut self.0
168 }
169}
170
171impl MappingList {
172 fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175 for (mapping, state) in self.iter_mut() {
176 match state {
177 MappingState::Inactive | MappingState::Failed => {
178 let duration = MAPPING_DURATION;
179 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180 mapping: mapping.clone(),
181 duration,
182 }) {
183 tracing::debug!(
184 multiaddress=%mapping.multiaddr,
185 "could not request port mapping for multiaddress on the gateway: {}",
186 err
187 );
188 }
189 *state = MappingState::Pending;
190 }
191 MappingState::Active(timeout) => {
192 if Pin::new(timeout).poll(cx).is_ready() {
193 let duration = MAPPING_DURATION;
194 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195 mapping: mapping.clone(),
196 duration,
197 }) {
198 tracing::debug!(
199 multiaddress=%mapping.multiaddr,
200 "could not request port mapping for multiaddress on the gateway: {}",
201 err
202 );
203 }
204 }
205 }
206 MappingState::Pending => {}
207 }
208 }
209 }
210}
211
212pub struct Behaviour {
215 state: GatewayState,
217
218 mappings: MappingList,
220
221 pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226 fn default() -> Self {
227 Self {
228 state: GatewayState::Searching(crate::tokio::search_gateway()),
229 mappings: Default::default(),
230 pending_events: VecDeque::new(),
231 }
232 }
233}
234
235impl NetworkBehaviour for Behaviour {
236 type ConnectionHandler = dummy::ConnectionHandler;
237
238 type ToSwarm = Event;
239
240 fn handle_established_inbound_connection(
241 &mut self,
242 _connection_id: ConnectionId,
243 _peer: PeerId,
244 _local_addr: &Multiaddr,
245 _remote_addr: &Multiaddr,
246 ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247 Ok(dummy::ConnectionHandler)
248 }
249
250 fn handle_established_outbound_connection(
251 &mut self,
252 _connection_id: ConnectionId,
253 _peer: PeerId,
254 _addr: &Multiaddr,
255 _role_override: Endpoint,
256 _port_use: PortUse,
257 ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258 Ok(dummy::ConnectionHandler)
259 }
260
261 fn on_swarm_event(&mut self, event: FromSwarm) {
262 match event {
263 FromSwarm::NewListenAddr(NewListenAddr {
264 listener_id,
265 addr: multiaddr,
266 }) => {
267 let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
268 else {
269 tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
270 return;
271 };
272
273 if let Some((mapping, _state)) = self
274 .mappings
275 .iter()
276 .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
277 {
278 tracing::debug!(
279 multiaddress=%multiaddr,
280 mapped_multiaddress=%mapping.multiaddr,
281 "port from multiaddress is already being mapped"
282 );
283 return;
284 }
285
286 match &mut self.state {
287 GatewayState::Searching(_) => {
288 self.mappings.insert(
292 Mapping {
293 listener_id,
294 protocol,
295 internal_addr: addr,
296 multiaddr: multiaddr.clone(),
297 },
298 MappingState::Inactive,
299 );
300 }
301 GatewayState::Available(ref mut gateway) => {
302 let mapping = Mapping {
303 listener_id,
304 protocol,
305 internal_addr: addr,
306 multiaddr: multiaddr.clone(),
307 };
308
309 let duration = MAPPING_DURATION;
310 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
311 mapping: mapping.clone(),
312 duration,
313 }) {
314 tracing::debug!(
315 multiaddress=%mapping.multiaddr,
316 "could not request port mapping for multiaddress on the gateway: {}",
317 err
318 );
319 }
320
321 self.mappings.insert(mapping, MappingState::Pending);
322 }
323 GatewayState::GatewayNotFound => {
324 tracing::debug!(
325 multiaddres=%multiaddr,
326 "network gateway not found, UPnP port mapping of multiaddres discarded"
327 );
328 }
329 GatewayState::NonRoutableGateway(addr) => {
330 tracing::debug!(
331 multiaddress=%multiaddr,
332 network_gateway_ip=%addr,
333 "the network gateway is not exposed to the public network. /
334 UPnP port mapping of multiaddress discarded"
335 );
336 }
337 };
338 }
339 FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
340 listener_id,
341 addr: _addr,
342 }) => {
343 if let GatewayState::Available(ref mut gateway) = &mut self.state {
344 if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
345 if let Err(err) = gateway
346 .sender
347 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
348 {
349 tracing::debug!(
350 multiaddress=%mapping.multiaddr,
351 "could not request port removal for multiaddress on the gateway: {}",
352 err
353 );
354 }
355 self.mappings.insert(mapping, MappingState::Pending);
356 }
357 }
358 }
359 _ => {}
360 }
361 }
362
363 fn on_connection_handler_event(
364 &mut self,
365 _peer_id: PeerId,
366 _connection_id: ConnectionId,
367 event: libp2p_swarm::THandlerOutEvent<Self>,
368 ) {
369 libp2p_core::util::unreachable(event)
370 }
371
372 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
373 fn poll(
374 &mut self,
375 cx: &mut Context<'_>,
376 ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
377 if let Some(event) = self.pending_events.pop_front() {
379 return Poll::Ready(ToSwarm::GenerateEvent(event));
380 }
381
382 loop {
385 match self.state {
386 GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
387 Poll::Ready(result) => {
388 match result.expect("sender shouldn't have been dropped") {
389 Ok(gateway) => {
390 if !is_addr_global(gateway.external_addr) {
391 self.state =
392 GatewayState::NonRoutableGateway(gateway.external_addr);
393 tracing::debug!(
394 gateway_address=%gateway.external_addr,
395 "the gateway is not routable"
396 );
397 return Poll::Ready(ToSwarm::GenerateEvent(
398 Event::NonRoutableGateway,
399 ));
400 }
401 self.state = GatewayState::Available(gateway);
402 }
403 Err(err) => {
404 tracing::debug!("could not find gateway: {err}");
405 self.state = GatewayState::GatewayNotFound;
406 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
407 }
408 }
409 }
410 Poll::Pending => return Poll::Pending,
411 },
412 GatewayState::Available(ref mut gateway) => {
413 if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
415 match result {
416 GatewayEvent::Mapped(mapping) => {
417 let new_state = MappingState::Active(Delay::new(
418 Duration::from_secs(MAPPING_TIMEOUT),
419 ));
420
421 match self
422 .mappings
423 .insert(mapping.clone(), new_state)
424 .expect("mapping should exist")
425 {
426 MappingState::Pending => {
427 let external_multiaddr =
428 mapping.external_addr(gateway.external_addr);
429 self.pending_events.push_back(Event::NewExternalAddr(
430 external_multiaddr.clone(),
431 ));
432 tracing::debug!(
433 address=%mapping.internal_addr,
434 protocol=%mapping.protocol,
435 "successfully mapped UPnP for protocol"
436 );
437 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
438 external_multiaddr,
439 ));
440 }
441 MappingState::Active(_) => {
442 tracing::debug!(
443 address=%mapping.internal_addr,
444 protocol=%mapping.protocol,
445 "successfully renewed UPnP mapping for protocol"
446 );
447 }
448 _ => unreachable!(),
449 }
450 }
451 GatewayEvent::MapFailure(mapping, err) => {
452 match self
453 .mappings
454 .insert(mapping.clone(), MappingState::Failed)
455 .expect("mapping should exist")
456 {
457 MappingState::Active(_) => {
458 tracing::debug!(
459 address=%mapping.internal_addr,
460 protocol=%mapping.protocol,
461 "failed to remap UPnP mapped for protocol: {err}"
462 );
463 let external_multiaddr =
464 mapping.external_addr(gateway.external_addr);
465 self.pending_events.push_back(Event::ExpiredExternalAddr(
466 external_multiaddr.clone(),
467 ));
468 return Poll::Ready(ToSwarm::ExternalAddrExpired(
469 external_multiaddr,
470 ));
471 }
472 MappingState::Pending => {
473 tracing::debug!(
474 address=%mapping.internal_addr,
475 protocol=%mapping.protocol,
476 "failed to map UPnP mapped for protocol: {err}"
477 );
478 }
479 _ => {
480 unreachable!()
481 }
482 }
483 }
484 GatewayEvent::Removed(mapping) => {
485 tracing::debug!(
486 address=%mapping.internal_addr,
487 protocol=%mapping.protocol,
488 "successfully removed UPnP mapping for protocol"
489 );
490 self.mappings
491 .remove(&mapping)
492 .expect("mapping should exist");
493 }
494 GatewayEvent::RemovalFailure(mapping, err) => {
495 tracing::debug!(
496 address=%mapping.internal_addr,
497 protocol=%mapping.protocol,
498 "could not remove UPnP mapping for protocol: {err}"
499 );
500 if let Err(err) = gateway
501 .sender
502 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
503 {
504 tracing::debug!(
505 multiaddress=%mapping.multiaddr,
506 "could not request port removal for multiaddress on the gateway: {}",
507 err
508 );
509 }
510 }
511 }
512 }
513
514 self.mappings.renew(gateway, cx);
516 return Poll::Pending;
517 }
518 _ => return Poll::Pending,
519 }
520 }
521 }
522}
523
524fn multiaddr_to_socketaddr_protocol(
529 addr: Multiaddr,
530) -> Result<(SocketAddr, PortMappingProtocol), ()> {
531 let mut iter = addr.into_iter();
532 match iter.next() {
533 Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
535 Some(multiaddr::Protocol::Tcp(port)) => {
536 return Ok((
537 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
538 PortMappingProtocol::TCP,
539 ));
540 }
541 Some(multiaddr::Protocol::Udp(port)) => {
542 return Ok((
543 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
544 PortMappingProtocol::UDP,
545 ));
546 }
547 _ => {}
548 },
549 _ => {}
550 }
551 Err(())
552}