libp2p_webrtc/tokio/
transport.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    io,
23    net::{IpAddr, SocketAddr},
24    pin::Pin,
25    task::{Context, Poll, Waker},
26};
27
28use futures::{future::BoxFuture, prelude::*, stream::SelectAll};
29use if_watch::{tokio::IfWatcher, IfEvent};
30use libp2p_core::{
31    multiaddr::{Multiaddr, Protocol},
32    transport::{DialOpts, ListenerId, TransportError, TransportEvent},
33};
34use libp2p_identity as identity;
35use libp2p_identity::PeerId;
36use webrtc::peer_connection::configuration::RTCConfiguration;
37
38use crate::tokio::{
39    certificate::Certificate,
40    connection::Connection,
41    error::Error,
42    fingerprint::Fingerprint,
43    udp_mux::{UDPMuxEvent, UDPMuxNewAddr},
44    upgrade,
45};
46
47/// A WebRTC transport with direct p2p communication (without a STUN server).
48pub struct Transport {
49    /// The config which holds this peer's keys and certificate.
50    config: Config,
51    /// All the active listeners.
52    listeners: SelectAll<ListenStream>,
53}
54
55impl Transport {
56    /// Creates a new WebRTC transport.
57    ///
58    /// # Example
59    ///
60    /// ```
61    /// use libp2p_identity as identity;
62    /// use libp2p_webrtc::tokio::{Certificate, Transport};
63    /// use rand::thread_rng;
64    ///
65    /// let id_keys = identity::Keypair::generate_ed25519();
66    /// let transport = Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap());
67    /// ```
68    pub fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
69        Self {
70            config: Config::new(id_keys, certificate),
71            listeners: SelectAll::new(),
72        }
73    }
74}
75
76impl libp2p_core::Transport for Transport {
77    type Output = (PeerId, Connection);
78    type Error = Error;
79    type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
80    type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
81
82    fn listen_on(
83        &mut self,
84        id: ListenerId,
85        addr: Multiaddr,
86    ) -> Result<(), TransportError<Self::Error>> {
87        let socket_addr =
88            parse_webrtc_listen_addr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?;
89        let udp_mux = UDPMuxNewAddr::listen_on(socket_addr)
90            .map_err(|io| TransportError::Other(Error::Io(io)))?;
91
92        self.listeners.push(
93            ListenStream::new(id, self.config.clone(), udp_mux)
94                .map_err(|e| TransportError::Other(Error::Io(e)))?,
95        );
96
97        Ok(())
98    }
99
100    fn remove_listener(&mut self, id: ListenerId) -> bool {
101        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
102            listener.close(Ok(()));
103            true
104        } else {
105            false
106        }
107    }
108
109    /// Poll all listeners.
110    fn poll(
111        mut self: Pin<&mut Self>,
112        cx: &mut Context<'_>,
113    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
114        match self.listeners.poll_next_unpin(cx) {
115            Poll::Ready(Some(ev)) => Poll::Ready(ev),
116            _ => Poll::Pending,
117        }
118    }
119
120    fn dial(
121        &mut self,
122        addr: Multiaddr,
123        dial_opts: DialOpts,
124    ) -> Result<Self::Dial, TransportError<Self::Error>> {
125        if dial_opts.role.is_listener() {
126            // TODO: As the listener of a WebRTC hole punch, we need to send a random UDP packet to
127            // the `addr`. See DCUtR specification below.
128            //
129            // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
130            tracing::warn!("WebRTC hole punch is not yet supported");
131        }
132
133        let (sock_addr, server_fingerprint) = libp2p_webrtc_utils::parse_webrtc_dial_addr(&addr)
134            .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
135        if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() {
136            return Err(TransportError::MultiaddrNotSupported(addr));
137        }
138
139        let config = self.config.clone();
140        let client_fingerprint = self.config.fingerprint;
141        let udp_mux = self
142            .listeners
143            .iter()
144            .next()
145            .ok_or(TransportError::Other(Error::NoListeners))?
146            .udp_mux
147            .udp_mux_handle();
148
149        Ok(async move {
150            let (peer_id, connection) = upgrade::outbound(
151                sock_addr,
152                config.inner,
153                udp_mux,
154                client_fingerprint.into_inner(),
155                server_fingerprint,
156                config.id_keys,
157            )
158            .await?;
159
160            Ok((peer_id, connection))
161        }
162        .boxed())
163    }
164}
165
166/// A stream of incoming connections on one or more interfaces.
167struct ListenStream {
168    /// The ID of this listener.
169    listener_id: ListenerId,
170
171    /// The socket address that the listening socket is bound to,
172    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
173    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
174    listen_addr: SocketAddr,
175
176    /// The config which holds this peer's certificate(s).
177    config: Config,
178
179    /// The UDP muxer that manages all ICE connections.
180    udp_mux: UDPMuxNewAddr,
181
182    /// Set to `Some` if this listener should close.
183    ///
184    /// Optionally contains a [`TransportEvent::ListenerClosed`] that should be
185    /// reported before the listener's stream is terminated.
186    report_closed: Option<Option<<Self as Stream>::Item>>,
187
188    /// Watcher for network interface changes.
189    /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
190    /// become or stop being available.
191    ///
192    /// `None` if the socket is only listening on a single interface.
193    if_watcher: Option<IfWatcher>,
194
195    /// Pending event to reported.
196    pending_event: Option<<Self as Stream>::Item>,
197
198    /// The stream must be awaken after it has been closed to deliver the last event.
199    close_listener_waker: Option<Waker>,
200}
201
202impl ListenStream {
203    /// Constructs a `WebRTCListenStream` for incoming connections.
204    fn new(listener_id: ListenerId, config: Config, udp_mux: UDPMuxNewAddr) -> io::Result<Self> {
205        let listen_addr = udp_mux.listen_addr();
206
207        let if_watcher;
208        let pending_event;
209        if listen_addr.ip().is_unspecified() {
210            if_watcher = Some(IfWatcher::new()?);
211            pending_event = None;
212        } else {
213            if_watcher = None;
214            let ma = socketaddr_to_multiaddr(&listen_addr, Some(config.fingerprint));
215            pending_event = Some(TransportEvent::NewAddress {
216                listener_id,
217                listen_addr: ma,
218            })
219        }
220
221        Ok(ListenStream {
222            listener_id,
223            listen_addr,
224            config,
225            udp_mux,
226            report_closed: None,
227            if_watcher,
228            pending_event,
229            close_listener_waker: None,
230        })
231    }
232
233    /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and
234    /// terminate the stream.
235    fn close(&mut self, reason: Result<(), Error>) {
236        match self.report_closed {
237            Some(_) => tracing::debug!("Listener was already closed"),
238            None => {
239                // Report the listener event as closed.
240                let _ = self
241                    .report_closed
242                    .insert(Some(TransportEvent::ListenerClosed {
243                        listener_id: self.listener_id,
244                        reason,
245                    }));
246
247                // Wake the stream to deliver the last event.
248                if let Some(waker) = self.close_listener_waker.take() {
249                    waker.wake();
250                }
251            }
252        }
253    }
254
255    fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
256        let Some(if_watcher) = self.if_watcher.as_mut() else {
257            return Poll::Pending;
258        };
259
260        while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
261            match event {
262                Ok(IfEvent::Up(inet)) => {
263                    let ip = inet.addr();
264                    if self.listen_addr.is_ipv4() == ip.is_ipv4()
265                        || self.listen_addr.is_ipv6() == ip.is_ipv6()
266                    {
267                        return Poll::Ready(TransportEvent::NewAddress {
268                            listener_id: self.listener_id,
269                            listen_addr: self.listen_multiaddress(ip),
270                        });
271                    }
272                }
273                Ok(IfEvent::Down(inet)) => {
274                    let ip = inet.addr();
275                    if self.listen_addr.is_ipv4() == ip.is_ipv4()
276                        || self.listen_addr.is_ipv6() == ip.is_ipv6()
277                    {
278                        return Poll::Ready(TransportEvent::AddressExpired {
279                            listener_id: self.listener_id,
280                            listen_addr: self.listen_multiaddress(ip),
281                        });
282                    }
283                }
284                Err(err) => {
285                    return Poll::Ready(TransportEvent::ListenerError {
286                        listener_id: self.listener_id,
287                        error: Error::Io(err),
288                    });
289                }
290            }
291        }
292
293        Poll::Pending
294    }
295
296    /// Constructs a [`Multiaddr`] for the given IP address that represents our listen address.
297    fn listen_multiaddress(&self, ip: IpAddr) -> Multiaddr {
298        let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
299
300        socketaddr_to_multiaddr(&socket_addr, Some(self.config.fingerprint))
301    }
302}
303
304impl Stream for ListenStream {
305    type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
306
307    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
308        loop {
309            if let Some(event) = self.pending_event.take() {
310                return Poll::Ready(Some(event));
311            }
312
313            if let Some(closed) = self.report_closed.as_mut() {
314                // Listener was closed.
315                // Report the transport event if there is one. On the next iteration, return
316                // `Poll::Ready(None)` to terminate the stream.
317                return Poll::Ready(closed.take());
318            }
319
320            if let Poll::Ready(event) = self.poll_if_watcher(cx) {
321                return Poll::Ready(Some(event));
322            }
323
324            // Poll UDP muxer for new addresses or incoming data for streams.
325            match self.udp_mux.poll(cx) {
326                Poll::Ready(UDPMuxEvent::NewAddr(new_addr)) => {
327                    let local_addr =
328                        socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint));
329                    let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None);
330
331                    let upgrade = upgrade::inbound(
332                        new_addr.addr,
333                        self.config.inner.clone(),
334                        self.udp_mux.udp_mux_handle(),
335                        self.config.fingerprint.into_inner(),
336                        new_addr.ufrag,
337                        self.config.id_keys.clone(),
338                    )
339                    .boxed();
340
341                    return Poll::Ready(Some(TransportEvent::Incoming {
342                        upgrade,
343                        local_addr,
344                        send_back_addr,
345                        listener_id: self.listener_id,
346                    }));
347                }
348                Poll::Ready(UDPMuxEvent::Error(e)) => {
349                    self.close(Err(Error::UDPMux(e)));
350                    continue;
351                }
352                Poll::Pending => {}
353            }
354
355            self.close_listener_waker = Some(cx.waker().clone());
356
357            return Poll::Pending;
358        }
359    }
360}
361
362/// A config which holds peer's keys and a x509Cert used to authenticate WebRTC communications.
363#[derive(Clone)]
364struct Config {
365    inner: RTCConfiguration,
366    fingerprint: Fingerprint,
367    id_keys: identity::Keypair,
368}
369
370impl Config {
371    /// Returns a new [`Config`] with the given keys and certificate.
372    fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
373        let fingerprint = certificate.fingerprint();
374
375        Self {
376            id_keys,
377            inner: RTCConfiguration {
378                certificates: vec![certificate.to_rtc_certificate()],
379                ..RTCConfiguration::default()
380            },
381            fingerprint,
382        }
383    }
384}
385
386/// Turns an IP address and port into the corresponding WebRTC multiaddr.
387fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, certhash: Option<Fingerprint>) -> Multiaddr {
388    let addr = Multiaddr::empty()
389        .with(socket_addr.ip().into())
390        .with(Protocol::Udp(socket_addr.port()))
391        .with(Protocol::WebRTCDirect);
392
393    if let Some(fp) = certhash {
394        return addr.with(Protocol::Certhash(fp.to_multihash()));
395    }
396
397    addr
398}
399
400/// Parse the given [`Multiaddr`] into a [`SocketAddr`] for listening.
401fn parse_webrtc_listen_addr(addr: &Multiaddr) -> Option<SocketAddr> {
402    let mut iter = addr.iter();
403
404    let ip = match iter.next()? {
405        Protocol::Ip4(ip) => IpAddr::from(ip),
406        Protocol::Ip6(ip) => IpAddr::from(ip),
407        _ => return None,
408    };
409
410    let Protocol::Udp(port) = iter.next()? else {
411        return None;
412    };
413    let Protocol::WebRTCDirect = iter.next()? else {
414        return None;
415    };
416
417    if iter.next().is_some() {
418        return None;
419    }
420
421    Some(SocketAddr::new(ip, port))
422}
423
424// Tests //////////////////////////////////////////////////////////////////////////////////////////
425
426#[cfg(test)]
427mod tests {
428    use std::net::Ipv6Addr;
429
430    use futures::future::poll_fn;
431    use libp2p_core::Transport as _;
432    use rand::thread_rng;
433
434    use super::*;
435
436    #[test]
437    fn missing_webrtc_protocol() {
438        let addr = "/ip4/127.0.0.1/udp/1234".parse().unwrap();
439
440        let maybe_parsed = parse_webrtc_listen_addr(&addr);
441
442        assert!(maybe_parsed.is_none());
443    }
444
445    #[test]
446    fn tcp_is_invalid_protocol() {
447        let addr = "/ip4/127.0.0.1/tcp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w"
448            .parse()
449            .unwrap();
450
451        let maybe_parsed = parse_webrtc_listen_addr(&addr);
452
453        assert!(maybe_parsed.is_none());
454    }
455
456    #[test]
457    fn cannot_follow_other_protocols_after_certhash() {
458        let addr = "/ip4/127.0.0.1/udp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/tcp/12345"
459            .parse()
460            .unwrap();
461
462        let maybe_parsed = parse_webrtc_listen_addr(&addr);
463
464        assert!(maybe_parsed.is_none());
465    }
466
467    #[test]
468    fn can_parse_valid_addr_without_certhash() {
469        let addr = "/ip6/::1/udp/12345/webrtc-direct".parse().unwrap();
470
471        let maybe_parsed = parse_webrtc_listen_addr(&addr);
472
473        assert_eq!(
474            maybe_parsed,
475            Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345))
476        );
477    }
478
479    #[test]
480    fn fails_to_parse_if_certhash_present_but_wrong_hash_function() {
481        // We only support SHA2-256 for now but this certhash has been encoded with SHA3-256.
482        let addr =
483            "/ip6/::1/udp/12345/webrtc-direct/certhash/uFiCH_tkkzpAwkoIDbE4I7QtQksFMYs5nQ4MyYrkgCJYi4A"
484                .parse()
485                .unwrap();
486
487        let maybe_addr = parse_webrtc_listen_addr(&addr);
488
489        assert!(maybe_addr.is_none())
490    }
491
492    #[tokio::test]
493    async fn close_listener() {
494        let id_keys = identity::Keypair::generate_ed25519();
495        let mut transport =
496            Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap());
497
498        assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
499            .now_or_never()
500            .is_none());
501
502        // Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener`
503        // is temporarily empty.
504        for _ in 0..2 {
505            let listener = ListenerId::next();
506            transport
507                .listen_on(
508                    listener,
509                    "/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap(),
510                )
511                .unwrap();
512            match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
513                TransportEvent::NewAddress {
514                    listener_id,
515                    listen_addr,
516                } => {
517                    assert_eq!(listener_id, listener);
518                    assert!(
519                        matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified())
520                    );
521                    assert!(
522                        matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0)
523                    );
524                    assert!(matches!(
525                        listen_addr.iter().nth(2),
526                        Some(Protocol::WebRTCDirect)
527                    ));
528                }
529                e => panic!("Unexpected event: {e:?}"),
530            }
531            assert!(
532                transport.remove_listener(listener),
533                "Expect listener to exist."
534            );
535            match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
536                TransportEvent::ListenerClosed {
537                    listener_id,
538                    reason: Ok(()),
539                } => {
540                    assert_eq!(listener_id, listener);
541                }
542                e => panic!("Unexpected event: {e:?}"),
543            }
544            // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and
545            // be removed from the list of listeners.
546            assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
547                .now_or_never()
548                .is_none());
549            assert!(transport.listeners.is_empty());
550        }
551    }
552}