1use std::{
22 io,
23 net::{IpAddr, SocketAddr},
24 pin::Pin,
25 task::{Context, Poll, Waker},
26};
27
28use futures::{future::BoxFuture, prelude::*, stream::SelectAll};
29use if_watch::{tokio::IfWatcher, IfEvent};
30use libp2p_core::{
31 multiaddr::{Multiaddr, Protocol},
32 transport::{DialOpts, ListenerId, TransportError, TransportEvent},
33};
34use libp2p_identity as identity;
35use libp2p_identity::PeerId;
36use webrtc::peer_connection::configuration::RTCConfiguration;
37
38use crate::tokio::{
39 certificate::Certificate,
40 connection::Connection,
41 error::Error,
42 fingerprint::Fingerprint,
43 udp_mux::{UDPMuxEvent, UDPMuxNewAddr},
44 upgrade,
45};
46
47pub struct Transport {
49 config: Config,
51 listeners: SelectAll<ListenStream>,
53}
54
55impl Transport {
56 pub fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
69 Self {
70 config: Config::new(id_keys, certificate),
71 listeners: SelectAll::new(),
72 }
73 }
74}
75
76impl libp2p_core::Transport for Transport {
77 type Output = (PeerId, Connection);
78 type Error = Error;
79 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
80 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
81
82 fn listen_on(
83 &mut self,
84 id: ListenerId,
85 addr: Multiaddr,
86 ) -> Result<(), TransportError<Self::Error>> {
87 let socket_addr =
88 parse_webrtc_listen_addr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?;
89 let udp_mux = UDPMuxNewAddr::listen_on(socket_addr)
90 .map_err(|io| TransportError::Other(Error::Io(io)))?;
91
92 self.listeners.push(
93 ListenStream::new(id, self.config.clone(), udp_mux)
94 .map_err(|e| TransportError::Other(Error::Io(e)))?,
95 );
96
97 Ok(())
98 }
99
100 fn remove_listener(&mut self, id: ListenerId) -> bool {
101 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
102 listener.close(Ok(()));
103 true
104 } else {
105 false
106 }
107 }
108
109 fn poll(
111 mut self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
114 match self.listeners.poll_next_unpin(cx) {
115 Poll::Ready(Some(ev)) => Poll::Ready(ev),
116 _ => Poll::Pending,
117 }
118 }
119
120 fn dial(
121 &mut self,
122 addr: Multiaddr,
123 dial_opts: DialOpts,
124 ) -> Result<Self::Dial, TransportError<Self::Error>> {
125 if dial_opts.role.is_listener() {
126 tracing::warn!("WebRTC hole punch is not yet supported");
131 }
132
133 let (sock_addr, server_fingerprint) = libp2p_webrtc_utils::parse_webrtc_dial_addr(&addr)
134 .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
135 if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() {
136 return Err(TransportError::MultiaddrNotSupported(addr));
137 }
138
139 let config = self.config.clone();
140 let client_fingerprint = self.config.fingerprint;
141 let udp_mux = self
142 .listeners
143 .iter()
144 .next()
145 .ok_or(TransportError::Other(Error::NoListeners))?
146 .udp_mux
147 .udp_mux_handle();
148
149 Ok(async move {
150 let (peer_id, connection) = upgrade::outbound(
151 sock_addr,
152 config.inner,
153 udp_mux,
154 client_fingerprint.into_inner(),
155 server_fingerprint,
156 config.id_keys,
157 )
158 .await?;
159
160 Ok((peer_id, connection))
161 }
162 .boxed())
163 }
164}
165
166struct ListenStream {
168 listener_id: ListenerId,
170
171 listen_addr: SocketAddr,
175
176 config: Config,
178
179 udp_mux: UDPMuxNewAddr,
181
182 report_closed: Option<Option<<Self as Stream>::Item>>,
187
188 if_watcher: Option<IfWatcher>,
194
195 pending_event: Option<<Self as Stream>::Item>,
197
198 close_listener_waker: Option<Waker>,
200}
201
202impl ListenStream {
203 fn new(listener_id: ListenerId, config: Config, udp_mux: UDPMuxNewAddr) -> io::Result<Self> {
205 let listen_addr = udp_mux.listen_addr();
206
207 let if_watcher;
208 let pending_event;
209 if listen_addr.ip().is_unspecified() {
210 if_watcher = Some(IfWatcher::new()?);
211 pending_event = None;
212 } else {
213 if_watcher = None;
214 let ma = socketaddr_to_multiaddr(&listen_addr, Some(config.fingerprint));
215 pending_event = Some(TransportEvent::NewAddress {
216 listener_id,
217 listen_addr: ma,
218 })
219 }
220
221 Ok(ListenStream {
222 listener_id,
223 listen_addr,
224 config,
225 udp_mux,
226 report_closed: None,
227 if_watcher,
228 pending_event,
229 close_listener_waker: None,
230 })
231 }
232
233 fn close(&mut self, reason: Result<(), Error>) {
236 match self.report_closed {
237 Some(_) => tracing::debug!("Listener was already closed"),
238 None => {
239 let _ = self
241 .report_closed
242 .insert(Some(TransportEvent::ListenerClosed {
243 listener_id: self.listener_id,
244 reason,
245 }));
246
247 if let Some(waker) = self.close_listener_waker.take() {
249 waker.wake();
250 }
251 }
252 }
253 }
254
255 fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
256 let Some(if_watcher) = self.if_watcher.as_mut() else {
257 return Poll::Pending;
258 };
259
260 while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
261 match event {
262 Ok(IfEvent::Up(inet)) => {
263 let ip = inet.addr();
264 if self.listen_addr.is_ipv4() == ip.is_ipv4()
265 || self.listen_addr.is_ipv6() == ip.is_ipv6()
266 {
267 return Poll::Ready(TransportEvent::NewAddress {
268 listener_id: self.listener_id,
269 listen_addr: self.listen_multiaddress(ip),
270 });
271 }
272 }
273 Ok(IfEvent::Down(inet)) => {
274 let ip = inet.addr();
275 if self.listen_addr.is_ipv4() == ip.is_ipv4()
276 || self.listen_addr.is_ipv6() == ip.is_ipv6()
277 {
278 return Poll::Ready(TransportEvent::AddressExpired {
279 listener_id: self.listener_id,
280 listen_addr: self.listen_multiaddress(ip),
281 });
282 }
283 }
284 Err(err) => {
285 return Poll::Ready(TransportEvent::ListenerError {
286 listener_id: self.listener_id,
287 error: Error::Io(err),
288 });
289 }
290 }
291 }
292
293 Poll::Pending
294 }
295
296 fn listen_multiaddress(&self, ip: IpAddr) -> Multiaddr {
298 let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
299
300 socketaddr_to_multiaddr(&socket_addr, Some(self.config.fingerprint))
301 }
302}
303
304impl Stream for ListenStream {
305 type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
306
307 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
308 loop {
309 if let Some(event) = self.pending_event.take() {
310 return Poll::Ready(Some(event));
311 }
312
313 if let Some(closed) = self.report_closed.as_mut() {
314 return Poll::Ready(closed.take());
318 }
319
320 if let Poll::Ready(event) = self.poll_if_watcher(cx) {
321 return Poll::Ready(Some(event));
322 }
323
324 match self.udp_mux.poll(cx) {
326 Poll::Ready(UDPMuxEvent::NewAddr(new_addr)) => {
327 let local_addr =
328 socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint));
329 let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None);
330
331 let upgrade = upgrade::inbound(
332 new_addr.addr,
333 self.config.inner.clone(),
334 self.udp_mux.udp_mux_handle(),
335 self.config.fingerprint.into_inner(),
336 new_addr.ufrag,
337 self.config.id_keys.clone(),
338 )
339 .boxed();
340
341 return Poll::Ready(Some(TransportEvent::Incoming {
342 upgrade,
343 local_addr,
344 send_back_addr,
345 listener_id: self.listener_id,
346 }));
347 }
348 Poll::Ready(UDPMuxEvent::Error(e)) => {
349 self.close(Err(Error::UDPMux(e)));
350 continue;
351 }
352 Poll::Pending => {}
353 }
354
355 self.close_listener_waker = Some(cx.waker().clone());
356
357 return Poll::Pending;
358 }
359 }
360}
361
362#[derive(Clone)]
364struct Config {
365 inner: RTCConfiguration,
366 fingerprint: Fingerprint,
367 id_keys: identity::Keypair,
368}
369
370impl Config {
371 fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
373 let fingerprint = certificate.fingerprint();
374
375 Self {
376 id_keys,
377 inner: RTCConfiguration {
378 certificates: vec![certificate.to_rtc_certificate()],
379 ..RTCConfiguration::default()
380 },
381 fingerprint,
382 }
383 }
384}
385
386fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, certhash: Option<Fingerprint>) -> Multiaddr {
388 let addr = Multiaddr::empty()
389 .with(socket_addr.ip().into())
390 .with(Protocol::Udp(socket_addr.port()))
391 .with(Protocol::WebRTCDirect);
392
393 if let Some(fp) = certhash {
394 return addr.with(Protocol::Certhash(fp.to_multihash()));
395 }
396
397 addr
398}
399
400fn parse_webrtc_listen_addr(addr: &Multiaddr) -> Option<SocketAddr> {
402 let mut iter = addr.iter();
403
404 let ip = match iter.next()? {
405 Protocol::Ip4(ip) => IpAddr::from(ip),
406 Protocol::Ip6(ip) => IpAddr::from(ip),
407 _ => return None,
408 };
409
410 let Protocol::Udp(port) = iter.next()? else {
411 return None;
412 };
413 let Protocol::WebRTCDirect = iter.next()? else {
414 return None;
415 };
416
417 if iter.next().is_some() {
418 return None;
419 }
420
421 Some(SocketAddr::new(ip, port))
422}
423
424#[cfg(test)]
427mod tests {
428 use std::net::Ipv6Addr;
429
430 use futures::future::poll_fn;
431 use libp2p_core::Transport as _;
432 use rand::thread_rng;
433
434 use super::*;
435
436 #[test]
437 fn missing_webrtc_protocol() {
438 let addr = "/ip4/127.0.0.1/udp/1234".parse().unwrap();
439
440 let maybe_parsed = parse_webrtc_listen_addr(&addr);
441
442 assert!(maybe_parsed.is_none());
443 }
444
445 #[test]
446 fn tcp_is_invalid_protocol() {
447 let addr = "/ip4/127.0.0.1/tcp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w"
448 .parse()
449 .unwrap();
450
451 let maybe_parsed = parse_webrtc_listen_addr(&addr);
452
453 assert!(maybe_parsed.is_none());
454 }
455
456 #[test]
457 fn cannot_follow_other_protocols_after_certhash() {
458 let addr = "/ip4/127.0.0.1/udp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/tcp/12345"
459 .parse()
460 .unwrap();
461
462 let maybe_parsed = parse_webrtc_listen_addr(&addr);
463
464 assert!(maybe_parsed.is_none());
465 }
466
467 #[test]
468 fn can_parse_valid_addr_without_certhash() {
469 let addr = "/ip6/::1/udp/12345/webrtc-direct".parse().unwrap();
470
471 let maybe_parsed = parse_webrtc_listen_addr(&addr);
472
473 assert_eq!(
474 maybe_parsed,
475 Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345))
476 );
477 }
478
479 #[test]
480 fn fails_to_parse_if_certhash_present_but_wrong_hash_function() {
481 let addr =
483 "/ip6/::1/udp/12345/webrtc-direct/certhash/uFiCH_tkkzpAwkoIDbE4I7QtQksFMYs5nQ4MyYrkgCJYi4A"
484 .parse()
485 .unwrap();
486
487 let maybe_addr = parse_webrtc_listen_addr(&addr);
488
489 assert!(maybe_addr.is_none())
490 }
491
492 #[tokio::test]
493 async fn close_listener() {
494 let id_keys = identity::Keypair::generate_ed25519();
495 let mut transport =
496 Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap());
497
498 assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
499 .now_or_never()
500 .is_none());
501
502 for _ in 0..2 {
505 let listener = ListenerId::next();
506 transport
507 .listen_on(
508 listener,
509 "/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap(),
510 )
511 .unwrap();
512 match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
513 TransportEvent::NewAddress {
514 listener_id,
515 listen_addr,
516 } => {
517 assert_eq!(listener_id, listener);
518 assert!(
519 matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified())
520 );
521 assert!(
522 matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0)
523 );
524 assert!(matches!(
525 listen_addr.iter().nth(2),
526 Some(Protocol::WebRTCDirect)
527 ));
528 }
529 e => panic!("Unexpected event: {e:?}"),
530 }
531 assert!(
532 transport.remove_listener(listener),
533 "Expect listener to exist."
534 );
535 match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
536 TransportEvent::ListenerClosed {
537 listener_id,
538 reason: Ok(()),
539 } => {
540 assert_eq!(listener_id, listener);
541 }
542 e => panic!("Unexpected event: {e:?}"),
543 }
544 assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
547 .now_or_never()
548 .is_none());
549 assert!(transport.listeners.is_empty());
550 }
551 }
552}