libp2p_quic/
transport.rs

1// Copyright 2017-2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{
23        hash_map::{DefaultHasher, Entry},
24        HashMap, HashSet,
25    },
26    fmt,
27    hash::{Hash, Hasher},
28    io,
29    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
30    pin::Pin,
31    task::{Context, Poll, Waker},
32    time::Duration,
33};
34
35use futures::{
36    channel::oneshot,
37    future::{BoxFuture, Either},
38    prelude::*,
39    ready,
40    stream::{SelectAll, StreamExt},
41};
42use if_watch::IfEvent;
43use libp2p_core::{
44    multiaddr::{Multiaddr, Protocol},
45    transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
46    Endpoint, Transport,
47};
48use libp2p_identity::PeerId;
49use socket2::{Domain, Socket, Type};
50
51use crate::{
52    config::{Config, QuinnConfig},
53    hole_punching::hole_puncher,
54    provider::Provider,
55    ConnectError, Connecting, Connection, Error,
56};
57
58/// Implementation of the [`Transport`] trait for QUIC.
59///
60/// By default only QUIC Version 1 (RFC 9000) is supported. In the [`Multiaddr`] this maps to
61/// [`libp2p_core::multiaddr::Protocol::QuicV1`].
62/// The [`libp2p_core::multiaddr::Protocol::Quic`] codepoint is interpreted as QUIC version
63/// draft-29 and only supported if [`Config::support_draft_29`] is set to `true`.
64/// Note that in that case servers support both version an all QUIC listening addresses.
65///
66/// Version draft-29 should only be used to connect to nodes from other libp2p implementations
67/// that do not support `QuicV1` yet. Support for it will be removed long-term.
68/// See <https://github.com/multiformats/multiaddr/issues/145>.
69#[derive(Debug)]
70pub struct GenTransport<P: Provider> {
71    /// Config for the inner [`quinn`] structs.
72    quinn_config: QuinnConfig,
73    /// Timeout for the [`Connecting`] future.
74    handshake_timeout: Duration,
75    /// Whether draft-29 is supported for dialing and listening.
76    support_draft_29: bool,
77    /// Streams of active [`Listener`]s.
78    listeners: SelectAll<Listener<P>>,
79    /// Dialer for each socket family if no matching listener exists.
80    dialer: HashMap<SocketFamily, quinn::Endpoint>,
81    /// Waker to poll the transport again when a new dialer or listener is added.
82    waker: Option<Waker>,
83    /// Holepunching attempts
84    hole_punch_attempts: HashMap<SocketAddr, oneshot::Sender<Connecting>>,
85}
86
87#[expect(deprecated)]
88impl<P: Provider> GenTransport<P> {
89    /// Create a new [`GenTransport`] with the given [`Config`].
90    pub fn new(config: Config) -> Self {
91        let handshake_timeout = config.handshake_timeout;
92        let support_draft_29 = config.support_draft_29;
93        let quinn_config = config.into();
94        Self {
95            listeners: SelectAll::new(),
96            quinn_config,
97            handshake_timeout,
98            dialer: HashMap::new(),
99            waker: None,
100            support_draft_29,
101            hole_punch_attempts: Default::default(),
102        }
103    }
104
105    /// Create a new [`quinn::Endpoint`] with the given configs.
106    fn new_endpoint(
107        endpoint_config: quinn::EndpointConfig,
108        server_config: Option<quinn::ServerConfig>,
109        socket: UdpSocket,
110    ) -> Result<quinn::Endpoint, Error> {
111        use crate::provider::Runtime;
112        match P::runtime() {
113            #[cfg(feature = "tokio")]
114            Runtime::Tokio => {
115                let runtime = std::sync::Arc::new(quinn::TokioRuntime);
116                let endpoint =
117                    quinn::Endpoint::new(endpoint_config, server_config, socket, runtime)?;
118                Ok(endpoint)
119            }
120            #[cfg(feature = "async-std")]
121            Runtime::AsyncStd => {
122                let runtime = std::sync::Arc::new(quinn::AsyncStdRuntime);
123                let endpoint =
124                    quinn::Endpoint::new(endpoint_config, server_config, socket, runtime)?;
125                Ok(endpoint)
126            }
127            Runtime::Dummy => {
128                let _ = endpoint_config;
129                let _ = server_config;
130                let _ = socket;
131                let err = std::io::Error::new(std::io::ErrorKind::Other, "no async runtime found");
132                Err(Error::Io(err))
133            }
134        }
135    }
136
137    /// Extract the addr, quic version and peer id from the given [`Multiaddr`].
138    fn remote_multiaddr_to_socketaddr(
139        &self,
140        addr: Multiaddr,
141        check_unspecified_addr: bool,
142    ) -> Result<
143        (SocketAddr, ProtocolVersion, Option<PeerId>),
144        TransportError<<Self as Transport>::Error>,
145    > {
146        let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29)
147            .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
148        if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified())
149        {
150            return Err(TransportError::MultiaddrNotSupported(addr));
151        }
152        Ok((socket_addr, version, peer_id))
153    }
154
155    /// Pick any listener to use for dialing.
156    fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener<P>> {
157        let mut listeners: Vec<_> = self
158            .listeners
159            .iter_mut()
160            .filter(|l| {
161                if l.is_closed {
162                    return false;
163                }
164                SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip())
165            })
166            .filter(|l| {
167                if socket_addr.ip().is_loopback() {
168                    l.listening_addresses
169                        .iter()
170                        .any(|ip_addr| ip_addr.is_loopback())
171                } else {
172                    true
173                }
174            })
175            .collect();
176        match listeners.len() {
177            0 => None,
178            1 => listeners.pop(),
179            _ => {
180                // Pick any listener to use for dialing.
181                // We hash the socket address to achieve determinism.
182                let mut hasher = DefaultHasher::new();
183                socket_addr.hash(&mut hasher);
184                let index = hasher.finish() as usize % listeners.len();
185                Some(listeners.swap_remove(index))
186            }
187        }
188    }
189
190    fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<UdpSocket> {
191        let socket = Socket::new(
192            Domain::for_address(socket_addr),
193            Type::DGRAM,
194            Some(socket2::Protocol::UDP),
195        )?;
196        if socket_addr.is_ipv6() {
197            socket.set_only_v6(true)?;
198        }
199
200        socket.bind(&socket_addr.into())?;
201
202        Ok(socket.into())
203    }
204
205    fn bound_socket(&mut self, socket_addr: SocketAddr) -> Result<quinn::Endpoint, Error> {
206        let socket_family = socket_addr.ip().into();
207        if let Some(waker) = self.waker.take() {
208            waker.wake();
209        }
210        let listen_socket_addr = match socket_family {
211            SocketFamily::Ipv4 => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0),
212            SocketFamily::Ipv6 => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
213        };
214        let socket = UdpSocket::bind(listen_socket_addr)?;
215        let endpoint_config = self.quinn_config.endpoint_config.clone();
216        let endpoint = Self::new_endpoint(endpoint_config, None, socket)?;
217        Ok(endpoint)
218    }
219}
220
221impl<P: Provider> Transport for GenTransport<P> {
222    type Output = (PeerId, Connection);
223    type Error = Error;
224    type ListenerUpgrade = Connecting;
225    type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
226
227    fn listen_on(
228        &mut self,
229        listener_id: ListenerId,
230        addr: Multiaddr,
231    ) -> Result<(), TransportError<Self::Error>> {
232        let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?;
233        let endpoint_config = self.quinn_config.endpoint_config.clone();
234        let server_config = self.quinn_config.server_config.clone();
235        let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?;
236
237        let socket_c = socket.try_clone().map_err(Self::Error::from)?;
238        let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?;
239        let listener = Listener::new(
240            listener_id,
241            socket_c,
242            endpoint,
243            self.handshake_timeout,
244            version,
245        )?;
246        self.listeners.push(listener);
247
248        if let Some(waker) = self.waker.take() {
249            waker.wake();
250        }
251
252        // Remove dialer endpoint so that the endpoint is dropped once the last
253        // connection that uses it is closed.
254        // New outbound connections will use the bidirectional (listener) endpoint.
255        self.dialer.remove(&socket_addr.ip().into());
256
257        Ok(())
258    }
259
260    fn remove_listener(&mut self, id: ListenerId) -> bool {
261        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
262            // Close the listener, which will eventually finish its stream.
263            // `SelectAll` removes streams once they are finished.
264            listener.close(Ok(()));
265            true
266        } else {
267            false
268        }
269    }
270
271    fn dial(
272        &mut self,
273        addr: Multiaddr,
274        dial_opts: DialOpts,
275    ) -> Result<Self::Dial, TransportError<Self::Error>> {
276        let (socket_addr, version, peer_id) =
277            self.remote_multiaddr_to_socketaddr(addr.clone(), true)?;
278
279        match (dial_opts.role, dial_opts.port_use) {
280            (Endpoint::Dialer, _) | (Endpoint::Listener, PortUse::Reuse) => {
281                let endpoint = if let Some(listener) = dial_opts
282                    .port_use
283                    .eq(&PortUse::Reuse)
284                    .then(|| self.eligible_listener(&socket_addr))
285                    .flatten()
286                {
287                    listener.endpoint.clone()
288                } else {
289                    let socket_family = socket_addr.ip().into();
290                    let dialer = if dial_opts.port_use == PortUse::Reuse {
291                        if let Some(occupied) = self.dialer.get(&socket_family) {
292                            occupied.clone()
293                        } else {
294                            let endpoint = self.bound_socket(socket_addr)?;
295                            self.dialer.insert(socket_family, endpoint.clone());
296                            endpoint
297                        }
298                    } else {
299                        self.bound_socket(socket_addr)?
300                    };
301                    dialer
302                };
303                let handshake_timeout = self.handshake_timeout;
304                let mut client_config = self.quinn_config.client_config.clone();
305                if version == ProtocolVersion::Draft29 {
306                    client_config.version(0xff00_001d);
307                }
308                Ok(Box::pin(async move {
309                    // This `"l"` seems necessary because an empty string is an invalid domain
310                    // name. While we don't use domain names, the underlying rustls library
311                    // is based upon the assumption that we do.
312                    let connecting = endpoint
313                        .connect_with(client_config, socket_addr, "l")
314                        .map_err(ConnectError)?;
315                    Connecting::new(connecting, handshake_timeout).await
316                }))
317            }
318            (Endpoint::Listener, _) => {
319                let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?;
320
321                let socket = self
322                    .eligible_listener(&socket_addr)
323                    .ok_or(TransportError::Other(
324                        Error::NoActiveListenerForDialAsListener,
325                    ))?
326                    .try_clone_socket()
327                    .map_err(Self::Error::from)?;
328
329                tracing::debug!("Preparing for hole-punch from {addr}");
330
331                let hole_puncher = hole_puncher::<P>(socket, socket_addr, self.handshake_timeout);
332
333                let (sender, receiver) = oneshot::channel();
334
335                match self.hole_punch_attempts.entry(socket_addr) {
336                    Entry::Occupied(mut sender_entry) => {
337                        // Stale senders, i.e. from failed hole punches are not removed.
338                        // Thus, we can just overwrite a stale sender.
339                        if !sender_entry.get().is_canceled() {
340                            return Err(TransportError::Other(Error::HolePunchInProgress(
341                                socket_addr,
342                            )));
343                        }
344                        sender_entry.insert(sender);
345                    }
346                    Entry::Vacant(entry) => {
347                        entry.insert(sender);
348                    }
349                };
350
351                Ok(Box::pin(async move {
352                    futures::pin_mut!(hole_puncher);
353                    match futures::future::select(receiver, hole_puncher).await {
354                        Either::Left((message, _)) => {
355                            let (inbound_peer_id, connection) = message
356                                .expect(
357                                    "hole punch connection sender is never dropped before receiver",
358                                )
359                                .await?;
360                            if inbound_peer_id != peer_id {
361                                tracing::warn!(
362                                    peer=%peer_id,
363                                    inbound_peer=%inbound_peer_id,
364                                    socket_address=%socket_addr,
365                                    "expected inbound connection from socket_address to resolve to peer but got inbound peer"
366                                );
367                            }
368                            Ok((inbound_peer_id, connection))
369                        }
370                        Either::Right((hole_punch_err, _)) => Err(hole_punch_err),
371                    }
372                }))
373            }
374        }
375    }
376
377    fn poll(
378        mut self: Pin<&mut Self>,
379        cx: &mut Context<'_>,
380    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
381        while let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) {
382            match ev {
383                TransportEvent::Incoming {
384                    listener_id,
385                    mut upgrade,
386                    local_addr,
387                    send_back_addr,
388                } => {
389                    let socket_addr =
390                        multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29)
391                            .unwrap()
392                            .0;
393
394                    if let Some(sender) = self.hole_punch_attempts.remove(&socket_addr) {
395                        match sender.send(upgrade) {
396                            Ok(()) => continue,
397                            Err(timed_out_holepunch) => {
398                                upgrade = timed_out_holepunch;
399                            }
400                        }
401                    }
402
403                    return Poll::Ready(TransportEvent::Incoming {
404                        listener_id,
405                        upgrade,
406                        local_addr,
407                        send_back_addr,
408                    });
409                }
410                _ => return Poll::Ready(ev),
411            }
412        }
413
414        self.waker = Some(cx.waker().clone());
415        Poll::Pending
416    }
417}
418
419impl From<Error> for TransportError<Error> {
420    fn from(err: Error) -> Self {
421        TransportError::Other(err)
422    }
423}
424
425/// Listener for incoming connections.
426struct Listener<P: Provider> {
427    /// Id of the listener.
428    listener_id: ListenerId,
429
430    /// Version of the supported quic protocol.
431    version: ProtocolVersion,
432
433    /// Endpoint
434    endpoint: quinn::Endpoint,
435
436    /// An underlying copy of the socket to be able to hole punch with
437    socket: UdpSocket,
438
439    /// A future to poll new incoming connections.
440    accept: BoxFuture<'static, Option<quinn::Incoming>>,
441    /// Timeout for connection establishment on inbound connections.
442    handshake_timeout: Duration,
443
444    /// Watcher for network interface changes.
445    ///
446    /// None if we are only listening on a single interface.
447    if_watcher: Option<P::IfWatcher>,
448
449    /// Whether the listener was closed and the stream should terminate.
450    is_closed: bool,
451
452    /// Pending event to reported.
453    pending_event: Option<<Self as Stream>::Item>,
454
455    /// The stream must be awaken after it has been closed to deliver the last event.
456    close_listener_waker: Option<Waker>,
457
458    listening_addresses: HashSet<IpAddr>,
459}
460
461impl<P: Provider> Listener<P> {
462    fn new(
463        listener_id: ListenerId,
464        socket: UdpSocket,
465        endpoint: quinn::Endpoint,
466        handshake_timeout: Duration,
467        version: ProtocolVersion,
468    ) -> Result<Self, Error> {
469        let if_watcher;
470        let pending_event;
471        let mut listening_addresses = HashSet::new();
472        let local_addr = socket.local_addr()?;
473        if local_addr.ip().is_unspecified() {
474            if_watcher = Some(P::new_if_watcher()?);
475            pending_event = None;
476        } else {
477            if_watcher = None;
478            listening_addresses.insert(local_addr.ip());
479            let ma = socketaddr_to_multiaddr(&local_addr, version);
480            pending_event = Some(TransportEvent::NewAddress {
481                listener_id,
482                listen_addr: ma,
483            })
484        }
485
486        let endpoint_c = endpoint.clone();
487        let accept = async move { endpoint_c.accept().await }.boxed();
488
489        Ok(Listener {
490            endpoint,
491            socket,
492            accept,
493            listener_id,
494            version,
495            handshake_timeout,
496            if_watcher,
497            is_closed: false,
498            pending_event,
499            close_listener_waker: None,
500            listening_addresses,
501        })
502    }
503
504    /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and
505    /// terminate the stream.
506    fn close(&mut self, reason: Result<(), Error>) {
507        if self.is_closed {
508            return;
509        }
510        self.endpoint.close(From::from(0u32), &[]);
511        self.pending_event = Some(TransportEvent::ListenerClosed {
512            listener_id: self.listener_id,
513            reason,
514        });
515        self.is_closed = true;
516
517        // Wake the stream to deliver the last event.
518        if let Some(waker) = self.close_listener_waker.take() {
519            waker.wake();
520        }
521    }
522
523    /// Clone underlying socket (for hole punching).
524    fn try_clone_socket(&self) -> std::io::Result<UdpSocket> {
525        self.socket.try_clone()
526    }
527
528    fn socket_addr(&self) -> SocketAddr {
529        self.socket
530            .local_addr()
531            .expect("Cannot fail because the socket is bound")
532    }
533
534    /// Poll for a next If Event.
535    fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
536        let endpoint_addr = self.socket_addr();
537        let Some(if_watcher) = self.if_watcher.as_mut() else {
538            return Poll::Pending;
539        };
540        loop {
541            match ready!(P::poll_if_event(if_watcher, cx)) {
542                Ok(IfEvent::Up(inet)) => {
543                    if let Some(listen_addr) =
544                        ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
545                    {
546                        tracing::debug!(
547                            address=%listen_addr,
548                            "New listen address"
549                        );
550                        self.listening_addresses.insert(inet.addr());
551                        return Poll::Ready(TransportEvent::NewAddress {
552                            listener_id: self.listener_id,
553                            listen_addr,
554                        });
555                    }
556                }
557                Ok(IfEvent::Down(inet)) => {
558                    if let Some(listen_addr) =
559                        ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
560                    {
561                        tracing::debug!(
562                            address=%listen_addr,
563                            "Expired listen address"
564                        );
565                        self.listening_addresses.remove(&inet.addr());
566                        return Poll::Ready(TransportEvent::AddressExpired {
567                            listener_id: self.listener_id,
568                            listen_addr,
569                        });
570                    }
571                }
572                Err(err) => {
573                    return Poll::Ready(TransportEvent::ListenerError {
574                        listener_id: self.listener_id,
575                        error: err.into(),
576                    })
577                }
578            }
579        }
580    }
581}
582
583impl<P: Provider> Stream for Listener<P> {
584    type Item = TransportEvent<<GenTransport<P> as Transport>::ListenerUpgrade, Error>;
585    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
586        loop {
587            if let Some(event) = self.pending_event.take() {
588                return Poll::Ready(Some(event));
589            }
590            if self.is_closed {
591                return Poll::Ready(None);
592            }
593            if let Poll::Ready(event) = self.poll_if_addr(cx) {
594                return Poll::Ready(Some(event));
595            }
596
597            match self.accept.poll_unpin(cx) {
598                Poll::Ready(Some(incoming)) => {
599                    let endpoint = self.endpoint.clone();
600                    self.accept = async move { endpoint.accept().await }.boxed();
601
602                    let connecting = match incoming.accept() {
603                        Ok(connecting) => connecting,
604                        Err(error) => {
605                            return Poll::Ready(Some(TransportEvent::ListenerError {
606                                listener_id: self.listener_id,
607                                error: Error::Connection(crate::ConnectionError(error)),
608                            }))
609                        }
610                    };
611
612                    let local_addr = socketaddr_to_multiaddr(&self.socket_addr(), self.version);
613                    let remote_addr = connecting.remote_address();
614                    let send_back_addr = socketaddr_to_multiaddr(&remote_addr, self.version);
615
616                    let event = TransportEvent::Incoming {
617                        upgrade: Connecting::new(connecting, self.handshake_timeout),
618                        local_addr,
619                        send_back_addr,
620                        listener_id: self.listener_id,
621                    };
622                    return Poll::Ready(Some(event));
623                }
624                Poll::Ready(None) => {
625                    self.close(Ok(()));
626                    continue;
627                }
628                Poll::Pending => {}
629            };
630
631            self.close_listener_waker = Some(cx.waker().clone());
632
633            return Poll::Pending;
634        }
635    }
636}
637
638impl<P: Provider> fmt::Debug for Listener<P> {
639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640        f.debug_struct("Listener")
641            .field("listener_id", &self.listener_id)
642            .field("handshake_timeout", &self.handshake_timeout)
643            .field("is_closed", &self.is_closed)
644            .field("pending_event", &self.pending_event)
645            .finish()
646    }
647}
648
649#[derive(Debug, Clone, Copy, PartialEq, Eq)]
650pub(crate) enum ProtocolVersion {
651    V1, // i.e. RFC9000
652    Draft29,
653}
654
655#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
656pub(crate) enum SocketFamily {
657    Ipv4,
658    Ipv6,
659}
660
661impl SocketFamily {
662    fn is_same(a: &IpAddr, b: &IpAddr) -> bool {
663        matches!(
664            (a, b),
665            (IpAddr::V4(_), IpAddr::V4(_)) | (IpAddr::V6(_), IpAddr::V6(_))
666        )
667    }
668}
669
670impl From<IpAddr> for SocketFamily {
671    fn from(ip: IpAddr) -> Self {
672        match ip {
673            IpAddr::V4(_) => SocketFamily::Ipv4,
674            IpAddr::V6(_) => SocketFamily::Ipv6,
675        }
676    }
677}
678
679/// Turn an [`IpAddr`] reported by the interface watcher into a
680/// listen-address for the endpoint.
681///
682/// For this, the `ip` is combined with the port that the endpoint
683/// is actually bound.
684///
685/// Returns `None` if the `ip` is not the same socket family as the
686/// address that the endpoint is bound to.
687fn ip_to_listenaddr(
688    endpoint_addr: &SocketAddr,
689    ip: IpAddr,
690    version: ProtocolVersion,
691) -> Option<Multiaddr> {
692    // True if either both addresses are Ipv4 or both Ipv6.
693    if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) {
694        return None;
695    }
696    let socket_addr = SocketAddr::new(ip, endpoint_addr.port());
697    Some(socketaddr_to_multiaddr(&socket_addr, version))
698}
699
700/// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns None if the format
701/// of the multiaddr is wrong.
702fn multiaddr_to_socketaddr(
703    addr: &Multiaddr,
704    support_draft_29: bool,
705) -> Option<(SocketAddr, ProtocolVersion, Option<PeerId>)> {
706    let mut iter = addr.iter();
707    let proto1 = iter.next()?;
708    let proto2 = iter.next()?;
709    let proto3 = iter.next()?;
710
711    let mut peer_id = None;
712    for proto in iter {
713        match proto {
714            Protocol::P2p(id) => {
715                peer_id = Some(id);
716            }
717            _ => return None,
718        }
719    }
720    let version = match proto3 {
721        Protocol::QuicV1 => ProtocolVersion::V1,
722        Protocol::Quic if support_draft_29 => ProtocolVersion::Draft29,
723        _ => return None,
724    };
725
726    match (proto1, proto2) {
727        (Protocol::Ip4(ip), Protocol::Udp(port)) => {
728            Some((SocketAddr::new(ip.into(), port), version, peer_id))
729        }
730        (Protocol::Ip6(ip), Protocol::Udp(port)) => {
731            Some((SocketAddr::new(ip.into(), port), version, peer_id))
732        }
733        _ => None,
734    }
735}
736
737/// Turns an IP address and port into the corresponding QUIC multiaddr.
738fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -> Multiaddr {
739    let quic_proto = match version {
740        ProtocolVersion::V1 => Protocol::QuicV1,
741        ProtocolVersion::Draft29 => Protocol::Quic,
742    };
743    Multiaddr::empty()
744        .with(socket_addr.ip().into())
745        .with(Protocol::Udp(socket_addr.port()))
746        .with(quic_proto)
747}
748
749#[cfg(test)]
750#[cfg(any(feature = "async-std", feature = "tokio"))]
751mod tests {
752    use futures::future::poll_fn;
753
754    use super::*;
755
756    #[test]
757    fn multiaddr_to_udp_conversion() {
758        assert!(multiaddr_to_socketaddr(
759            &"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap(),
760            true
761        )
762        .is_none());
763
764        assert_eq!(
765            multiaddr_to_socketaddr(
766                &"/ip4/127.0.0.1/udp/12345/quic-v1"
767                    .parse::<Multiaddr>()
768                    .unwrap(),
769                false
770            ),
771            Some((
772                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,),
773                ProtocolVersion::V1,
774                None
775            ))
776        );
777        assert_eq!(
778            multiaddr_to_socketaddr(
779                &"/ip4/255.255.255.255/udp/8080/quic-v1"
780                    .parse::<Multiaddr>()
781                    .unwrap(),
782                false
783            ),
784            Some((
785                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,),
786                ProtocolVersion::V1,
787                None
788            ))
789        );
790        assert_eq!(
791            multiaddr_to_socketaddr(
792                &"/ip4/127.0.0.1/udp/55148/quic-v1/p2p/12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ"
793                    .parse::<Multiaddr>()
794                    .unwrap(), false
795            ),
796            Some((SocketAddr::new(
797                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
798                55148,
799            ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap())))
800        );
801        assert_eq!(
802            multiaddr_to_socketaddr(
803                &"/ip6/::1/udp/12345/quic-v1".parse::<Multiaddr>().unwrap(),
804                false
805            ),
806            Some((
807                SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,),
808                ProtocolVersion::V1,
809                None
810            ))
811        );
812        assert_eq!(
813            multiaddr_to_socketaddr(
814                &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic-v1"
815                    .parse::<Multiaddr>()
816                    .unwrap(),
817                false
818            ),
819            Some((
820                SocketAddr::new(
821                    IpAddr::V6(Ipv6Addr::new(
822                        65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
823                    )),
824                    8080,
825                ),
826                ProtocolVersion::V1,
827                None
828            ))
829        );
830
831        assert!(multiaddr_to_socketaddr(
832            &"/ip4/127.0.0.1/udp/1234/quic".parse::<Multiaddr>().unwrap(),
833            false
834        )
835        .is_none());
836        assert_eq!(
837            multiaddr_to_socketaddr(
838                &"/ip4/127.0.0.1/udp/1234/quic".parse::<Multiaddr>().unwrap(),
839                true
840            ),
841            Some((
842                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,),
843                ProtocolVersion::Draft29,
844                None
845            ))
846        );
847    }
848
849    #[cfg(feature = "tokio")]
850    #[tokio::test]
851    async fn test_close_listener() {
852        let keypair = libp2p_identity::Keypair::generate_ed25519();
853        let config = Config::new(&keypair);
854        let mut transport = crate::tokio::Transport::new(config);
855        assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
856            .now_or_never()
857            .is_none());
858
859        // Run test twice to check that there is no unexpected behaviour if `Transport.listener`
860        // is temporarily empty.
861        for _ in 0..2 {
862            let id = ListenerId::next();
863            transport
864                .listen_on(id, "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap())
865                .unwrap();
866
867            match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
868                TransportEvent::NewAddress {
869                    listener_id,
870                    listen_addr,
871                } => {
872                    assert_eq!(listener_id, id);
873                    assert!(
874                        matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified())
875                    );
876                    assert!(
877                        matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0)
878                    );
879                    assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::QuicV1)));
880                }
881                e => panic!("Unexpected event: {e:?}"),
882            }
883            assert!(transport.remove_listener(id), "Expect listener to exist.");
884            match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
885                TransportEvent::ListenerClosed {
886                    listener_id,
887                    reason: Ok(()),
888                } => {
889                    assert_eq!(listener_id, id);
890                }
891                e => panic!("Unexpected event: {e:?}"),
892            }
893            // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and
894            // be removed from the list of listeners.
895            assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
896                .now_or_never()
897                .is_none());
898            assert!(transport.listeners.is_empty());
899        }
900    }
901
902    #[cfg(feature = "tokio")]
903    #[tokio::test]
904    async fn test_dialer_drop() {
905        let keypair = libp2p_identity::Keypair::generate_ed25519();
906        let config = Config::new(&keypair);
907        let mut transport = crate::tokio::Transport::new(config);
908
909        let _dial = transport
910            .dial(
911                "/ip4/123.45.67.8/udp/1234/quic-v1".parse().unwrap(),
912                DialOpts {
913                    role: Endpoint::Dialer,
914                    port_use: PortUse::Reuse,
915                },
916            )
917            .unwrap();
918
919        assert!(transport.dialer.contains_key(&SocketFamily::Ipv4));
920        assert!(!transport.dialer.contains_key(&SocketFamily::Ipv6));
921
922        // Start listening so that the dialer and driver are dropped.
923        transport
924            .listen_on(
925                ListenerId::next(),
926                "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
927            )
928            .unwrap();
929        assert!(!transport.dialer.contains_key(&SocketFamily::Ipv4));
930    }
931
932    #[cfg(feature = "tokio")]
933    #[tokio::test]
934    async fn test_listens_ipv4_ipv6_separately() {
935        let keypair = libp2p_identity::Keypair::generate_ed25519();
936        let config = Config::new(&keypair);
937        let mut transport = crate::tokio::Transport::new(config);
938        let port = {
939            let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
940            socket.local_addr().unwrap().port()
941        };
942
943        transport
944            .listen_on(
945                ListenerId::next(),
946                format!("/ip4/0.0.0.0/udp/{port}/quic-v1").parse().unwrap(),
947            )
948            .unwrap();
949        transport
950            .listen_on(
951                ListenerId::next(),
952                format!("/ip6/::/udp/{port}/quic-v1").parse().unwrap(),
953            )
954            .unwrap();
955    }
956}