1use std::{
22 collections::{
23 hash_map::{DefaultHasher, Entry},
24 HashMap, HashSet,
25 },
26 fmt,
27 hash::{Hash, Hasher},
28 io,
29 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
30 pin::Pin,
31 task::{Context, Poll, Waker},
32 time::Duration,
33};
34
35use futures::{
36 channel::oneshot,
37 future::{BoxFuture, Either},
38 prelude::*,
39 ready,
40 stream::{SelectAll, StreamExt},
41};
42use if_watch::IfEvent;
43use libp2p_core::{
44 multiaddr::{Multiaddr, Protocol},
45 transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
46 Endpoint, Transport,
47};
48use libp2p_identity::PeerId;
49use socket2::{Domain, Socket, Type};
50
51use crate::{
52 config::{Config, QuinnConfig},
53 hole_punching::hole_puncher,
54 provider::Provider,
55 ConnectError, Connecting, Connection, Error,
56};
57
58#[derive(Debug)]
70pub struct GenTransport<P: Provider> {
71 quinn_config: QuinnConfig,
73 handshake_timeout: Duration,
75 support_draft_29: bool,
77 listeners: SelectAll<Listener<P>>,
79 dialer: HashMap<SocketFamily, quinn::Endpoint>,
81 waker: Option<Waker>,
83 hole_punch_attempts: HashMap<SocketAddr, oneshot::Sender<Connecting>>,
85}
86
87#[expect(deprecated)]
88impl<P: Provider> GenTransport<P> {
89 pub fn new(config: Config) -> Self {
91 let handshake_timeout = config.handshake_timeout;
92 let support_draft_29 = config.support_draft_29;
93 let quinn_config = config.into();
94 Self {
95 listeners: SelectAll::new(),
96 quinn_config,
97 handshake_timeout,
98 dialer: HashMap::new(),
99 waker: None,
100 support_draft_29,
101 hole_punch_attempts: Default::default(),
102 }
103 }
104
105 fn new_endpoint(
107 endpoint_config: quinn::EndpointConfig,
108 server_config: Option<quinn::ServerConfig>,
109 socket: UdpSocket,
110 ) -> Result<quinn::Endpoint, Error> {
111 use crate::provider::Runtime;
112 match P::runtime() {
113 #[cfg(feature = "tokio")]
114 Runtime::Tokio => {
115 let runtime = std::sync::Arc::new(quinn::TokioRuntime);
116 let endpoint =
117 quinn::Endpoint::new(endpoint_config, server_config, socket, runtime)?;
118 Ok(endpoint)
119 }
120 #[cfg(feature = "async-std")]
121 Runtime::AsyncStd => {
122 let runtime = std::sync::Arc::new(quinn::AsyncStdRuntime);
123 let endpoint =
124 quinn::Endpoint::new(endpoint_config, server_config, socket, runtime)?;
125 Ok(endpoint)
126 }
127 Runtime::Dummy => {
128 let _ = endpoint_config;
129 let _ = server_config;
130 let _ = socket;
131 let err = std::io::Error::new(std::io::ErrorKind::Other, "no async runtime found");
132 Err(Error::Io(err))
133 }
134 }
135 }
136
137 fn remote_multiaddr_to_socketaddr(
139 &self,
140 addr: Multiaddr,
141 check_unspecified_addr: bool,
142 ) -> Result<
143 (SocketAddr, ProtocolVersion, Option<PeerId>),
144 TransportError<<Self as Transport>::Error>,
145 > {
146 let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29)
147 .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
148 if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified())
149 {
150 return Err(TransportError::MultiaddrNotSupported(addr));
151 }
152 Ok((socket_addr, version, peer_id))
153 }
154
155 fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener<P>> {
157 let mut listeners: Vec<_> = self
158 .listeners
159 .iter_mut()
160 .filter(|l| {
161 if l.is_closed {
162 return false;
163 }
164 SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip())
165 })
166 .filter(|l| {
167 if socket_addr.ip().is_loopback() {
168 l.listening_addresses
169 .iter()
170 .any(|ip_addr| ip_addr.is_loopback())
171 } else {
172 true
173 }
174 })
175 .collect();
176 match listeners.len() {
177 0 => None,
178 1 => listeners.pop(),
179 _ => {
180 let mut hasher = DefaultHasher::new();
183 socket_addr.hash(&mut hasher);
184 let index = hasher.finish() as usize % listeners.len();
185 Some(listeners.swap_remove(index))
186 }
187 }
188 }
189
190 fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<UdpSocket> {
191 let socket = Socket::new(
192 Domain::for_address(socket_addr),
193 Type::DGRAM,
194 Some(socket2::Protocol::UDP),
195 )?;
196 if socket_addr.is_ipv6() {
197 socket.set_only_v6(true)?;
198 }
199
200 socket.bind(&socket_addr.into())?;
201
202 Ok(socket.into())
203 }
204
205 fn bound_socket(&mut self, socket_addr: SocketAddr) -> Result<quinn::Endpoint, Error> {
206 let socket_family = socket_addr.ip().into();
207 if let Some(waker) = self.waker.take() {
208 waker.wake();
209 }
210 let listen_socket_addr = match socket_family {
211 SocketFamily::Ipv4 => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0),
212 SocketFamily::Ipv6 => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
213 };
214 let socket = UdpSocket::bind(listen_socket_addr)?;
215 let endpoint_config = self.quinn_config.endpoint_config.clone();
216 let endpoint = Self::new_endpoint(endpoint_config, None, socket)?;
217 Ok(endpoint)
218 }
219}
220
221impl<P: Provider> Transport for GenTransport<P> {
222 type Output = (PeerId, Connection);
223 type Error = Error;
224 type ListenerUpgrade = Connecting;
225 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
226
227 fn listen_on(
228 &mut self,
229 listener_id: ListenerId,
230 addr: Multiaddr,
231 ) -> Result<(), TransportError<Self::Error>> {
232 let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?;
233 let endpoint_config = self.quinn_config.endpoint_config.clone();
234 let server_config = self.quinn_config.server_config.clone();
235 let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?;
236
237 let socket_c = socket.try_clone().map_err(Self::Error::from)?;
238 let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?;
239 let listener = Listener::new(
240 listener_id,
241 socket_c,
242 endpoint,
243 self.handshake_timeout,
244 version,
245 )?;
246 self.listeners.push(listener);
247
248 if let Some(waker) = self.waker.take() {
249 waker.wake();
250 }
251
252 self.dialer.remove(&socket_addr.ip().into());
256
257 Ok(())
258 }
259
260 fn remove_listener(&mut self, id: ListenerId) -> bool {
261 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
262 listener.close(Ok(()));
265 true
266 } else {
267 false
268 }
269 }
270
271 fn dial(
272 &mut self,
273 addr: Multiaddr,
274 dial_opts: DialOpts,
275 ) -> Result<Self::Dial, TransportError<Self::Error>> {
276 let (socket_addr, version, peer_id) =
277 self.remote_multiaddr_to_socketaddr(addr.clone(), true)?;
278
279 match (dial_opts.role, dial_opts.port_use) {
280 (Endpoint::Dialer, _) | (Endpoint::Listener, PortUse::Reuse) => {
281 let endpoint = if let Some(listener) = dial_opts
282 .port_use
283 .eq(&PortUse::Reuse)
284 .then(|| self.eligible_listener(&socket_addr))
285 .flatten()
286 {
287 listener.endpoint.clone()
288 } else {
289 let socket_family = socket_addr.ip().into();
290 let dialer = if dial_opts.port_use == PortUse::Reuse {
291 if let Some(occupied) = self.dialer.get(&socket_family) {
292 occupied.clone()
293 } else {
294 let endpoint = self.bound_socket(socket_addr)?;
295 self.dialer.insert(socket_family, endpoint.clone());
296 endpoint
297 }
298 } else {
299 self.bound_socket(socket_addr)?
300 };
301 dialer
302 };
303 let handshake_timeout = self.handshake_timeout;
304 let mut client_config = self.quinn_config.client_config.clone();
305 if version == ProtocolVersion::Draft29 {
306 client_config.version(0xff00_001d);
307 }
308 Ok(Box::pin(async move {
309 let connecting = endpoint
313 .connect_with(client_config, socket_addr, "l")
314 .map_err(ConnectError)?;
315 Connecting::new(connecting, handshake_timeout).await
316 }))
317 }
318 (Endpoint::Listener, _) => {
319 let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?;
320
321 let socket = self
322 .eligible_listener(&socket_addr)
323 .ok_or(TransportError::Other(
324 Error::NoActiveListenerForDialAsListener,
325 ))?
326 .try_clone_socket()
327 .map_err(Self::Error::from)?;
328
329 tracing::debug!("Preparing for hole-punch from {addr}");
330
331 let hole_puncher = hole_puncher::<P>(socket, socket_addr, self.handshake_timeout);
332
333 let (sender, receiver) = oneshot::channel();
334
335 match self.hole_punch_attempts.entry(socket_addr) {
336 Entry::Occupied(mut sender_entry) => {
337 if !sender_entry.get().is_canceled() {
340 return Err(TransportError::Other(Error::HolePunchInProgress(
341 socket_addr,
342 )));
343 }
344 sender_entry.insert(sender);
345 }
346 Entry::Vacant(entry) => {
347 entry.insert(sender);
348 }
349 };
350
351 Ok(Box::pin(async move {
352 futures::pin_mut!(hole_puncher);
353 match futures::future::select(receiver, hole_puncher).await {
354 Either::Left((message, _)) => {
355 let (inbound_peer_id, connection) = message
356 .expect(
357 "hole punch connection sender is never dropped before receiver",
358 )
359 .await?;
360 if inbound_peer_id != peer_id {
361 tracing::warn!(
362 peer=%peer_id,
363 inbound_peer=%inbound_peer_id,
364 socket_address=%socket_addr,
365 "expected inbound connection from socket_address to resolve to peer but got inbound peer"
366 );
367 }
368 Ok((inbound_peer_id, connection))
369 }
370 Either::Right((hole_punch_err, _)) => Err(hole_punch_err),
371 }
372 }))
373 }
374 }
375 }
376
377 fn poll(
378 mut self: Pin<&mut Self>,
379 cx: &mut Context<'_>,
380 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
381 while let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) {
382 match ev {
383 TransportEvent::Incoming {
384 listener_id,
385 mut upgrade,
386 local_addr,
387 send_back_addr,
388 } => {
389 let socket_addr =
390 multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29)
391 .unwrap()
392 .0;
393
394 if let Some(sender) = self.hole_punch_attempts.remove(&socket_addr) {
395 match sender.send(upgrade) {
396 Ok(()) => continue,
397 Err(timed_out_holepunch) => {
398 upgrade = timed_out_holepunch;
399 }
400 }
401 }
402
403 return Poll::Ready(TransportEvent::Incoming {
404 listener_id,
405 upgrade,
406 local_addr,
407 send_back_addr,
408 });
409 }
410 _ => return Poll::Ready(ev),
411 }
412 }
413
414 self.waker = Some(cx.waker().clone());
415 Poll::Pending
416 }
417}
418
419impl From<Error> for TransportError<Error> {
420 fn from(err: Error) -> Self {
421 TransportError::Other(err)
422 }
423}
424
425struct Listener<P: Provider> {
427 listener_id: ListenerId,
429
430 version: ProtocolVersion,
432
433 endpoint: quinn::Endpoint,
435
436 socket: UdpSocket,
438
439 accept: BoxFuture<'static, Option<quinn::Incoming>>,
441 handshake_timeout: Duration,
443
444 if_watcher: Option<P::IfWatcher>,
448
449 is_closed: bool,
451
452 pending_event: Option<<Self as Stream>::Item>,
454
455 close_listener_waker: Option<Waker>,
457
458 listening_addresses: HashSet<IpAddr>,
459}
460
461impl<P: Provider> Listener<P> {
462 fn new(
463 listener_id: ListenerId,
464 socket: UdpSocket,
465 endpoint: quinn::Endpoint,
466 handshake_timeout: Duration,
467 version: ProtocolVersion,
468 ) -> Result<Self, Error> {
469 let if_watcher;
470 let pending_event;
471 let mut listening_addresses = HashSet::new();
472 let local_addr = socket.local_addr()?;
473 if local_addr.ip().is_unspecified() {
474 if_watcher = Some(P::new_if_watcher()?);
475 pending_event = None;
476 } else {
477 if_watcher = None;
478 listening_addresses.insert(local_addr.ip());
479 let ma = socketaddr_to_multiaddr(&local_addr, version);
480 pending_event = Some(TransportEvent::NewAddress {
481 listener_id,
482 listen_addr: ma,
483 })
484 }
485
486 let endpoint_c = endpoint.clone();
487 let accept = async move { endpoint_c.accept().await }.boxed();
488
489 Ok(Listener {
490 endpoint,
491 socket,
492 accept,
493 listener_id,
494 version,
495 handshake_timeout,
496 if_watcher,
497 is_closed: false,
498 pending_event,
499 close_listener_waker: None,
500 listening_addresses,
501 })
502 }
503
504 fn close(&mut self, reason: Result<(), Error>) {
507 if self.is_closed {
508 return;
509 }
510 self.endpoint.close(From::from(0u32), &[]);
511 self.pending_event = Some(TransportEvent::ListenerClosed {
512 listener_id: self.listener_id,
513 reason,
514 });
515 self.is_closed = true;
516
517 if let Some(waker) = self.close_listener_waker.take() {
519 waker.wake();
520 }
521 }
522
523 fn try_clone_socket(&self) -> std::io::Result<UdpSocket> {
525 self.socket.try_clone()
526 }
527
528 fn socket_addr(&self) -> SocketAddr {
529 self.socket
530 .local_addr()
531 .expect("Cannot fail because the socket is bound")
532 }
533
534 fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
536 let endpoint_addr = self.socket_addr();
537 let Some(if_watcher) = self.if_watcher.as_mut() else {
538 return Poll::Pending;
539 };
540 loop {
541 match ready!(P::poll_if_event(if_watcher, cx)) {
542 Ok(IfEvent::Up(inet)) => {
543 if let Some(listen_addr) =
544 ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
545 {
546 tracing::debug!(
547 address=%listen_addr,
548 "New listen address"
549 );
550 self.listening_addresses.insert(inet.addr());
551 return Poll::Ready(TransportEvent::NewAddress {
552 listener_id: self.listener_id,
553 listen_addr,
554 });
555 }
556 }
557 Ok(IfEvent::Down(inet)) => {
558 if let Some(listen_addr) =
559 ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
560 {
561 tracing::debug!(
562 address=%listen_addr,
563 "Expired listen address"
564 );
565 self.listening_addresses.remove(&inet.addr());
566 return Poll::Ready(TransportEvent::AddressExpired {
567 listener_id: self.listener_id,
568 listen_addr,
569 });
570 }
571 }
572 Err(err) => {
573 return Poll::Ready(TransportEvent::ListenerError {
574 listener_id: self.listener_id,
575 error: err.into(),
576 })
577 }
578 }
579 }
580 }
581}
582
583impl<P: Provider> Stream for Listener<P> {
584 type Item = TransportEvent<<GenTransport<P> as Transport>::ListenerUpgrade, Error>;
585 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
586 loop {
587 if let Some(event) = self.pending_event.take() {
588 return Poll::Ready(Some(event));
589 }
590 if self.is_closed {
591 return Poll::Ready(None);
592 }
593 if let Poll::Ready(event) = self.poll_if_addr(cx) {
594 return Poll::Ready(Some(event));
595 }
596
597 match self.accept.poll_unpin(cx) {
598 Poll::Ready(Some(incoming)) => {
599 let endpoint = self.endpoint.clone();
600 self.accept = async move { endpoint.accept().await }.boxed();
601
602 let connecting = match incoming.accept() {
603 Ok(connecting) => connecting,
604 Err(error) => {
605 return Poll::Ready(Some(TransportEvent::ListenerError {
606 listener_id: self.listener_id,
607 error: Error::Connection(crate::ConnectionError(error)),
608 }))
609 }
610 };
611
612 let local_addr = socketaddr_to_multiaddr(&self.socket_addr(), self.version);
613 let remote_addr = connecting.remote_address();
614 let send_back_addr = socketaddr_to_multiaddr(&remote_addr, self.version);
615
616 let event = TransportEvent::Incoming {
617 upgrade: Connecting::new(connecting, self.handshake_timeout),
618 local_addr,
619 send_back_addr,
620 listener_id: self.listener_id,
621 };
622 return Poll::Ready(Some(event));
623 }
624 Poll::Ready(None) => {
625 self.close(Ok(()));
626 continue;
627 }
628 Poll::Pending => {}
629 };
630
631 self.close_listener_waker = Some(cx.waker().clone());
632
633 return Poll::Pending;
634 }
635 }
636}
637
638impl<P: Provider> fmt::Debug for Listener<P> {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640 f.debug_struct("Listener")
641 .field("listener_id", &self.listener_id)
642 .field("handshake_timeout", &self.handshake_timeout)
643 .field("is_closed", &self.is_closed)
644 .field("pending_event", &self.pending_event)
645 .finish()
646 }
647}
648
649#[derive(Debug, Clone, Copy, PartialEq, Eq)]
650pub(crate) enum ProtocolVersion {
651 V1, Draft29,
653}
654
655#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
656pub(crate) enum SocketFamily {
657 Ipv4,
658 Ipv6,
659}
660
661impl SocketFamily {
662 fn is_same(a: &IpAddr, b: &IpAddr) -> bool {
663 matches!(
664 (a, b),
665 (IpAddr::V4(_), IpAddr::V4(_)) | (IpAddr::V6(_), IpAddr::V6(_))
666 )
667 }
668}
669
670impl From<IpAddr> for SocketFamily {
671 fn from(ip: IpAddr) -> Self {
672 match ip {
673 IpAddr::V4(_) => SocketFamily::Ipv4,
674 IpAddr::V6(_) => SocketFamily::Ipv6,
675 }
676 }
677}
678
679fn ip_to_listenaddr(
688 endpoint_addr: &SocketAddr,
689 ip: IpAddr,
690 version: ProtocolVersion,
691) -> Option<Multiaddr> {
692 if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) {
694 return None;
695 }
696 let socket_addr = SocketAddr::new(ip, endpoint_addr.port());
697 Some(socketaddr_to_multiaddr(&socket_addr, version))
698}
699
700fn multiaddr_to_socketaddr(
703 addr: &Multiaddr,
704 support_draft_29: bool,
705) -> Option<(SocketAddr, ProtocolVersion, Option<PeerId>)> {
706 let mut iter = addr.iter();
707 let proto1 = iter.next()?;
708 let proto2 = iter.next()?;
709 let proto3 = iter.next()?;
710
711 let mut peer_id = None;
712 for proto in iter {
713 match proto {
714 Protocol::P2p(id) => {
715 peer_id = Some(id);
716 }
717 _ => return None,
718 }
719 }
720 let version = match proto3 {
721 Protocol::QuicV1 => ProtocolVersion::V1,
722 Protocol::Quic if support_draft_29 => ProtocolVersion::Draft29,
723 _ => return None,
724 };
725
726 match (proto1, proto2) {
727 (Protocol::Ip4(ip), Protocol::Udp(port)) => {
728 Some((SocketAddr::new(ip.into(), port), version, peer_id))
729 }
730 (Protocol::Ip6(ip), Protocol::Udp(port)) => {
731 Some((SocketAddr::new(ip.into(), port), version, peer_id))
732 }
733 _ => None,
734 }
735}
736
737fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -> Multiaddr {
739 let quic_proto = match version {
740 ProtocolVersion::V1 => Protocol::QuicV1,
741 ProtocolVersion::Draft29 => Protocol::Quic,
742 };
743 Multiaddr::empty()
744 .with(socket_addr.ip().into())
745 .with(Protocol::Udp(socket_addr.port()))
746 .with(quic_proto)
747}
748
749#[cfg(test)]
750#[cfg(any(feature = "async-std", feature = "tokio"))]
751mod tests {
752 use futures::future::poll_fn;
753
754 use super::*;
755
756 #[test]
757 fn multiaddr_to_udp_conversion() {
758 assert!(multiaddr_to_socketaddr(
759 &"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap(),
760 true
761 )
762 .is_none());
763
764 assert_eq!(
765 multiaddr_to_socketaddr(
766 &"/ip4/127.0.0.1/udp/12345/quic-v1"
767 .parse::<Multiaddr>()
768 .unwrap(),
769 false
770 ),
771 Some((
772 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,),
773 ProtocolVersion::V1,
774 None
775 ))
776 );
777 assert_eq!(
778 multiaddr_to_socketaddr(
779 &"/ip4/255.255.255.255/udp/8080/quic-v1"
780 .parse::<Multiaddr>()
781 .unwrap(),
782 false
783 ),
784 Some((
785 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,),
786 ProtocolVersion::V1,
787 None
788 ))
789 );
790 assert_eq!(
791 multiaddr_to_socketaddr(
792 &"/ip4/127.0.0.1/udp/55148/quic-v1/p2p/12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ"
793 .parse::<Multiaddr>()
794 .unwrap(), false
795 ),
796 Some((SocketAddr::new(
797 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
798 55148,
799 ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap())))
800 );
801 assert_eq!(
802 multiaddr_to_socketaddr(
803 &"/ip6/::1/udp/12345/quic-v1".parse::<Multiaddr>().unwrap(),
804 false
805 ),
806 Some((
807 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,),
808 ProtocolVersion::V1,
809 None
810 ))
811 );
812 assert_eq!(
813 multiaddr_to_socketaddr(
814 &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic-v1"
815 .parse::<Multiaddr>()
816 .unwrap(),
817 false
818 ),
819 Some((
820 SocketAddr::new(
821 IpAddr::V6(Ipv6Addr::new(
822 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
823 )),
824 8080,
825 ),
826 ProtocolVersion::V1,
827 None
828 ))
829 );
830
831 assert!(multiaddr_to_socketaddr(
832 &"/ip4/127.0.0.1/udp/1234/quic".parse::<Multiaddr>().unwrap(),
833 false
834 )
835 .is_none());
836 assert_eq!(
837 multiaddr_to_socketaddr(
838 &"/ip4/127.0.0.1/udp/1234/quic".parse::<Multiaddr>().unwrap(),
839 true
840 ),
841 Some((
842 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,),
843 ProtocolVersion::Draft29,
844 None
845 ))
846 );
847 }
848
849 #[cfg(feature = "tokio")]
850 #[tokio::test]
851 async fn test_close_listener() {
852 let keypair = libp2p_identity::Keypair::generate_ed25519();
853 let config = Config::new(&keypair);
854 let mut transport = crate::tokio::Transport::new(config);
855 assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
856 .now_or_never()
857 .is_none());
858
859 for _ in 0..2 {
862 let id = ListenerId::next();
863 transport
864 .listen_on(id, "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap())
865 .unwrap();
866
867 match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
868 TransportEvent::NewAddress {
869 listener_id,
870 listen_addr,
871 } => {
872 assert_eq!(listener_id, id);
873 assert!(
874 matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified())
875 );
876 assert!(
877 matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0)
878 );
879 assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::QuicV1)));
880 }
881 e => panic!("Unexpected event: {e:?}"),
882 }
883 assert!(transport.remove_listener(id), "Expect listener to exist.");
884 match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
885 TransportEvent::ListenerClosed {
886 listener_id,
887 reason: Ok(()),
888 } => {
889 assert_eq!(listener_id, id);
890 }
891 e => panic!("Unexpected event: {e:?}"),
892 }
893 assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
896 .now_or_never()
897 .is_none());
898 assert!(transport.listeners.is_empty());
899 }
900 }
901
902 #[cfg(feature = "tokio")]
903 #[tokio::test]
904 async fn test_dialer_drop() {
905 let keypair = libp2p_identity::Keypair::generate_ed25519();
906 let config = Config::new(&keypair);
907 let mut transport = crate::tokio::Transport::new(config);
908
909 let _dial = transport
910 .dial(
911 "/ip4/123.45.67.8/udp/1234/quic-v1".parse().unwrap(),
912 DialOpts {
913 role: Endpoint::Dialer,
914 port_use: PortUse::Reuse,
915 },
916 )
917 .unwrap();
918
919 assert!(transport.dialer.contains_key(&SocketFamily::Ipv4));
920 assert!(!transport.dialer.contains_key(&SocketFamily::Ipv6));
921
922 transport
924 .listen_on(
925 ListenerId::next(),
926 "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
927 )
928 .unwrap();
929 assert!(!transport.dialer.contains_key(&SocketFamily::Ipv4));
930 }
931
932 #[cfg(feature = "tokio")]
933 #[tokio::test]
934 async fn test_listens_ipv4_ipv6_separately() {
935 let keypair = libp2p_identity::Keypair::generate_ed25519();
936 let config = Config::new(&keypair);
937 let mut transport = crate::tokio::Transport::new(config);
938 let port = {
939 let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
940 socket.local_addr().unwrap().port()
941 };
942
943 transport
944 .listen_on(
945 ListenerId::next(),
946 format!("/ip4/0.0.0.0/udp/{port}/quic-v1").parse().unwrap(),
947 )
948 .unwrap();
949 transport
950 .listen_on(
951 ListenerId::next(),
952 format!("/ip6/::/udp/{port}/quic-v1").parse().unwrap(),
953 )
954 .unwrap();
955 }
956}