1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24 borrow::Borrow,
25 collections::{HashMap, VecDeque},
26 error::Error,
27 hash::{Hash, Hasher},
28 net::{self, IpAddr, SocketAddr, SocketAddrV4},
29 ops::{Deref, DerefMut},
30 pin::Pin,
31 task::{Context, Poll},
32 time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39 multiaddr,
40 transport::{ListenerId, PortUse},
41 Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44 derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45 NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50const MAPPING_DURATION: u32 = 3600;
52
53const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59 AddMapping { mapping: Mapping, duration: u32 },
60 RemoveMapping(Mapping),
61}
62
63#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66 Mapped(Mapping),
68 MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70 Removed(Mapping),
72 RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79 pub(crate) listener_id: ListenerId,
80 pub(crate) protocol: PortMappingProtocol,
81 pub(crate) multiaddr: Multiaddr,
82 pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86 fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89 let addr = match gateway_addr {
90 net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91 net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92 };
93 self.multiaddr
94 .replace(0, |_| Some(addr))
95 .expect("multiaddr should be valid")
96 }
97}
98
99impl Hash for Mapping {
100 fn hash<H: Hasher>(&self, state: &mut H) {
101 self.listener_id.hash(state);
102 }
103}
104
105impl PartialEq for Mapping {
106 fn eq(&self, other: &Self) -> bool {
107 self.listener_id == other.listener_id
108 }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114 fn borrow(&self) -> &ListenerId {
115 &self.listener_id
116 }
117}
118
119#[derive(Debug)]
121enum MappingState {
122 Inactive,
124 Pending,
126 Active(Delay),
128 Failed,
130}
131
132enum GatewayState {
134 Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135 Available(Gateway),
136 GatewayNotFound,
137 NonRoutableGateway(IpAddr),
138}
139
140#[derive(Debug)]
142pub enum Event {
143 NewExternalAddr(Multiaddr),
145 ExpiredExternalAddr(Multiaddr),
147 GatewayNotFound,
149 NonRoutableGateway,
151}
152
153#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158 type Target = HashMap<Mapping, MappingState>;
159
160 fn deref(&self) -> &Self::Target {
161 &self.0
162 }
163}
164
165impl DerefMut for MappingList {
166 fn deref_mut(&mut self) -> &mut Self::Target {
167 &mut self.0
168 }
169}
170
171impl MappingList {
172 fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175 for (mapping, state) in self.iter_mut() {
176 match state {
177 MappingState::Inactive | MappingState::Failed => {
178 let duration = MAPPING_DURATION;
179 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180 mapping: mapping.clone(),
181 duration,
182 }) {
183 tracing::debug!(
184 multiaddress=%mapping.multiaddr,
185 "could not request port mapping for multiaddress on the gateway: {}",
186 err
187 );
188 }
189 *state = MappingState::Pending;
190 }
191 MappingState::Active(timeout) => {
192 if Pin::new(timeout).poll(cx).is_ready() {
193 let duration = MAPPING_DURATION;
194 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195 mapping: mapping.clone(),
196 duration,
197 }) {
198 tracing::debug!(
199 multiaddress=%mapping.multiaddr,
200 "could not request port mapping for multiaddress on the gateway: {}",
201 err
202 );
203 }
204 }
205 }
206 MappingState::Pending => {}
207 }
208 }
209 }
210}
211
212pub struct Behaviour {
215 state: GatewayState,
217
218 mappings: MappingList,
220
221 pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226 fn default() -> Self {
227 Self {
228 state: GatewayState::Searching(crate::tokio::search_gateway()),
229 mappings: Default::default(),
230 pending_events: VecDeque::new(),
231 }
232 }
233}
234
235impl NetworkBehaviour for Behaviour {
236 type ConnectionHandler = dummy::ConnectionHandler;
237
238 type ToSwarm = Event;
239
240 fn handle_established_inbound_connection(
241 &mut self,
242 _connection_id: ConnectionId,
243 _peer: PeerId,
244 _local_addr: &Multiaddr,
245 _remote_addr: &Multiaddr,
246 ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247 Ok(dummy::ConnectionHandler)
248 }
249
250 fn handle_established_outbound_connection(
251 &mut self,
252 _connection_id: ConnectionId,
253 _peer: PeerId,
254 _addr: &Multiaddr,
255 _role_override: Endpoint,
256 _port_use: PortUse,
257 ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258 Ok(dummy::ConnectionHandler)
259 }
260
261 fn on_swarm_event(&mut self, event: FromSwarm) {
262 match event {
263 FromSwarm::NewListenAddr(NewListenAddr {
264 listener_id,
265 addr: multiaddr,
266 }) => {
267 let Ok((addr, protocol)) = multiaddr_to_socketaddr_protocol(multiaddr.clone())
268 else {
269 tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
270 return;
271 };
272
273 if let Some((mapping, _state)) = self
274 .mappings
275 .iter()
276 .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
277 {
278 tracing::debug!(
279 multiaddress=%multiaddr,
280 mapped_multiaddress=%mapping.multiaddr,
281 "port from multiaddress is already being mapped"
282 );
283 return;
284 }
285
286 match &mut self.state {
287 GatewayState::Searching(_) => {
288 self.mappings.insert(
292 Mapping {
293 listener_id,
294 protocol,
295 internal_addr: addr,
296 multiaddr: multiaddr.clone(),
297 },
298 MappingState::Inactive,
299 );
300 }
301 GatewayState::Available(ref mut gateway) => {
302 let mapping = Mapping {
303 listener_id,
304 protocol,
305 internal_addr: addr,
306 multiaddr: multiaddr.clone(),
307 };
308
309 let duration = MAPPING_DURATION;
310 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
311 mapping: mapping.clone(),
312 duration,
313 }) {
314 tracing::debug!(
315 multiaddress=%mapping.multiaddr,
316 "could not request port mapping for multiaddress on the gateway: {}",
317 err
318 );
319 }
320
321 self.mappings.insert(mapping, MappingState::Pending);
322 }
323 GatewayState::GatewayNotFound => {
324 tracing::debug!(
325 multiaddres=%multiaddr,
326 "network gateway not found, UPnP port mapping of multiaddres discarded"
327 );
328 }
329 GatewayState::NonRoutableGateway(addr) => {
330 tracing::debug!(
331 multiaddress=%multiaddr,
332 network_gateway_ip=%addr,
333 "the network gateway is not exposed to the public network. /
334 UPnP port mapping of multiaddress discarded"
335 );
336 }
337 };
338 }
339 FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
340 listener_id,
341 addr: _addr,
342 }) => {
343 if let GatewayState::Available(ref mut gateway) = &mut self.state {
344 if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
345 if let Err(err) = gateway
346 .sender
347 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
348 {
349 tracing::debug!(
350 multiaddress=%mapping.multiaddr,
351 "could not request port removal for multiaddress on the gateway: {}",
352 err
353 );
354 }
355 self.mappings.insert(mapping, MappingState::Pending);
356 }
357 }
358 }
359 _ => {}
360 }
361 }
362
363 fn on_connection_handler_event(
364 &mut self,
365 _peer_id: PeerId,
366 _connection_id: ConnectionId,
367 event: libp2p_swarm::THandlerOutEvent<Self>,
368 ) {
369 libp2p_core::util::unreachable(event)
370 }
371
372 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
373 fn poll(
374 &mut self,
375 cx: &mut Context<'_>,
376 ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
377 if let Some(event) = self.pending_events.pop_front() {
379 return Poll::Ready(ToSwarm::GenerateEvent(event));
380 }
381
382 loop {
385 match self.state {
386 GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
387 Poll::Ready(Ok(result)) => match result {
388 Ok(gateway) => {
389 if !is_addr_global(gateway.external_addr) {
390 self.state =
391 GatewayState::NonRoutableGateway(gateway.external_addr);
392 tracing::debug!(
393 gateway_address=%gateway.external_addr,
394 "the gateway is not routable"
395 );
396 return Poll::Ready(ToSwarm::GenerateEvent(
397 Event::NonRoutableGateway,
398 ));
399 }
400 self.state = GatewayState::Available(gateway);
401 }
402 Err(err) => {
403 tracing::debug!("could not find gateway: {err}");
404 self.state = GatewayState::GatewayNotFound;
405 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
406 }
407 },
408 Poll::Ready(Err(err)) => {
409 tracing::debug!("sender has been dropped: {err}");
412 self.state = GatewayState::GatewayNotFound;
413 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
414 }
415 Poll::Pending => return Poll::Pending,
416 },
417 GatewayState::Available(ref mut gateway) => {
418 if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
420 match result {
421 GatewayEvent::Mapped(mapping) => {
422 let new_state = MappingState::Active(Delay::new(
423 Duration::from_secs(MAPPING_TIMEOUT),
424 ));
425
426 match self
427 .mappings
428 .insert(mapping.clone(), new_state)
429 .expect("mapping should exist")
430 {
431 MappingState::Pending => {
432 let external_multiaddr =
433 mapping.external_addr(gateway.external_addr);
434 self.pending_events.push_back(Event::NewExternalAddr(
435 external_multiaddr.clone(),
436 ));
437 tracing::debug!(
438 address=%mapping.internal_addr,
439 protocol=%mapping.protocol,
440 "successfully mapped UPnP for protocol"
441 );
442 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
443 external_multiaddr,
444 ));
445 }
446 MappingState::Active(_) => {
447 tracing::debug!(
448 address=%mapping.internal_addr,
449 protocol=%mapping.protocol,
450 "successfully renewed UPnP mapping for protocol"
451 );
452 }
453 _ => unreachable!(),
454 }
455 }
456 GatewayEvent::MapFailure(mapping, err) => {
457 match self
458 .mappings
459 .insert(mapping.clone(), MappingState::Failed)
460 .expect("mapping should exist")
461 {
462 MappingState::Active(_) => {
463 tracing::debug!(
464 address=%mapping.internal_addr,
465 protocol=%mapping.protocol,
466 "failed to remap UPnP mapped for protocol: {err}"
467 );
468 let external_multiaddr =
469 mapping.external_addr(gateway.external_addr);
470 self.pending_events.push_back(Event::ExpiredExternalAddr(
471 external_multiaddr.clone(),
472 ));
473 return Poll::Ready(ToSwarm::ExternalAddrExpired(
474 external_multiaddr,
475 ));
476 }
477 MappingState::Pending => {
478 tracing::debug!(
479 address=%mapping.internal_addr,
480 protocol=%mapping.protocol,
481 "failed to map UPnP mapped for protocol: {err}"
482 );
483 }
484 _ => {
485 unreachable!()
486 }
487 }
488 }
489 GatewayEvent::Removed(mapping) => {
490 tracing::debug!(
491 address=%mapping.internal_addr,
492 protocol=%mapping.protocol,
493 "successfully removed UPnP mapping for protocol"
494 );
495 self.mappings
496 .remove(&mapping)
497 .expect("mapping should exist");
498 }
499 GatewayEvent::RemovalFailure(mapping, err) => {
500 tracing::debug!(
501 address=%mapping.internal_addr,
502 protocol=%mapping.protocol,
503 "could not remove UPnP mapping for protocol: {err}"
504 );
505 if let Err(err) = gateway
506 .sender
507 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
508 {
509 tracing::debug!(
510 multiaddress=%mapping.multiaddr,
511 "could not request port removal for multiaddress on the gateway: {}",
512 err
513 );
514 }
515 }
516 }
517 }
518
519 self.mappings.renew(gateway, cx);
521 return Poll::Pending;
522 }
523 _ => return Poll::Pending,
524 }
525 }
526 }
527}
528
529fn multiaddr_to_socketaddr_protocol(
534 addr: Multiaddr,
535) -> Result<(SocketAddr, PortMappingProtocol), ()> {
536 let mut iter = addr.into_iter();
537 match iter.next() {
538 Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
540 Some(multiaddr::Protocol::Tcp(port)) => {
541 return Ok((
542 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
543 PortMappingProtocol::TCP,
544 ));
545 }
546 Some(multiaddr::Protocol::Udp(port)) => {
547 return Ok((
548 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
549 PortMappingProtocol::UDP,
550 ));
551 }
552 _ => {}
553 },
554 _ => {}
555 }
556 Err(())
557}