libp2p_webrtc_websys/
connection.rs

1//! A libp2p connection backed by an [RtcPeerConnection](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection).
2
3use std::{
4    pin::Pin,
5    task::{ready, Context, Poll, Waker},
6};
7
8use futures::{channel::mpsc, stream::FuturesUnordered, StreamExt};
9use js_sys::{Object, Reflect};
10use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
11use libp2p_webrtc_utils::Fingerprint;
12use send_wrapper::SendWrapper;
13use wasm_bindgen::prelude::*;
14use wasm_bindgen_futures::JsFuture;
15use web_sys::{
16    RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit, RtcDataChannelType,
17    RtcSessionDescriptionInit,
18};
19
20use super::{Error, Stream};
21use crate::stream::DropListener;
22
23/// A WebRTC Connection.
24///
25/// All connections need to be [`Send`] which is why some fields are wrapped in [`SendWrapper`].
26/// This is safe because WASM is single-threaded.
27pub struct Connection {
28    /// The [RtcPeerConnection] that is used for the WebRTC Connection
29    inner: SendWrapper<RtcPeerConnection>,
30
31    /// Whether the connection is closed
32    closed: bool,
33    /// An [`mpsc::channel`] for all inbound data channels.
34    ///
35    /// Because the browser's WebRTC API is event-based, we need to use a channel to obtain all
36    /// inbound data channels.
37    inbound_data_channels: SendWrapper<mpsc::Receiver<RtcDataChannel>>,
38    /// A list of futures, which, once completed, signal that a [`Stream`] has been dropped.
39    drop_listeners: FuturesUnordered<DropListener>,
40    no_drop_listeners_waker: Option<Waker>,
41
42    _ondatachannel_closure: SendWrapper<Closure<dyn FnMut(RtcDataChannelEvent)>>,
43}
44
45impl Connection {
46    /// Create a new inner WebRTC Connection
47    pub(crate) fn new(peer_connection: RtcPeerConnection) -> Self {
48        // An ondatachannel Future enables us to poll for incoming data channel events in
49        // poll_incoming
50        let (mut tx_ondatachannel, rx_ondatachannel) = mpsc::channel(4); // we may get more than one data channel opened on a single peer connection
51
52        let ondatachannel_closure = Closure::new(move |ev: RtcDataChannelEvent| {
53            tracing::trace!("New data channel");
54
55            if let Err(e) = tx_ondatachannel.try_send(ev.channel()) {
56                if e.is_full() {
57                    tracing::warn!("Remote is opening too many data channels, we can't keep up!");
58                    return;
59                }
60
61                if e.is_disconnected() {
62                    tracing::warn!("Receiver is gone, are we shutting down?");
63                }
64            }
65        });
66        peer_connection
67            .inner
68            .set_ondatachannel(Some(ondatachannel_closure.as_ref().unchecked_ref()));
69
70        Self {
71            inner: SendWrapper::new(peer_connection),
72            closed: false,
73            drop_listeners: FuturesUnordered::default(),
74            no_drop_listeners_waker: None,
75            inbound_data_channels: SendWrapper::new(rx_ondatachannel),
76            _ondatachannel_closure: SendWrapper::new(ondatachannel_closure),
77        }
78    }
79
80    fn new_stream_from_data_channel(&mut self, data_channel: RtcDataChannel) -> Stream {
81        let (stream, drop_listener) = Stream::new(data_channel);
82
83        self.drop_listeners.push(drop_listener);
84        if let Some(waker) = self.no_drop_listeners_waker.take() {
85            waker.wake()
86        }
87        stream
88    }
89
90    /// Closes the Peer Connection.
91    ///
92    /// This closes the data channels also and they will return an error
93    /// if they are used.
94    fn close_connection(&mut self) {
95        if !self.closed {
96            tracing::trace!("connection::close_connection");
97            self.inner.inner.close();
98            self.closed = true;
99        }
100    }
101}
102
103impl Drop for Connection {
104    fn drop(&mut self) {
105        self.close_connection();
106    }
107}
108
109/// WebRTC native multiplexing
110/// Allows users to open substreams
111impl StreamMuxer for Connection {
112    type Substream = Stream;
113    type Error = Error;
114
115    fn poll_inbound(
116        mut self: Pin<&mut Self>,
117        cx: &mut Context<'_>,
118    ) -> Poll<Result<Self::Substream, Self::Error>> {
119        match ready!(self.inbound_data_channels.poll_next_unpin(cx)) {
120            Some(data_channel) => {
121                let stream = self.new_stream_from_data_channel(data_channel);
122
123                Poll::Ready(Ok(stream))
124            }
125            None => {
126                // This only happens if the [`RtcPeerConnection::ondatachannel`] closure gets freed
127                // which means we are most likely shutting down the connection.
128                tracing::debug!("`Sender` for inbound data channels has been dropped");
129                Poll::Ready(Err(Error::Connection("connection closed".to_owned())))
130            }
131        }
132    }
133
134    fn poll_outbound(
135        mut self: Pin<&mut Self>,
136        _: &mut Context<'_>,
137    ) -> Poll<Result<Self::Substream, Self::Error>> {
138        tracing::trace!("Creating outbound data channel");
139
140        let data_channel = self.inner.new_regular_data_channel();
141        let stream = self.new_stream_from_data_channel(data_channel);
142
143        Poll::Ready(Ok(stream))
144    }
145
146    /// Closes the Peer Connection.
147    fn poll_close(
148        mut self: Pin<&mut Self>,
149        _cx: &mut Context<'_>,
150    ) -> Poll<Result<(), Self::Error>> {
151        tracing::trace!("connection::poll_close");
152
153        self.close_connection();
154        Poll::Ready(Ok(()))
155    }
156
157    fn poll(
158        mut self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
161        loop {
162            match ready!(self.drop_listeners.poll_next_unpin(cx)) {
163                Some(Ok(())) => {}
164                Some(Err(e)) => {
165                    tracing::debug!("a DropListener failed: {e}")
166                }
167                None => {
168                    self.no_drop_listeners_waker = Some(cx.waker().clone());
169                    return Poll::Pending;
170                }
171            }
172        }
173    }
174}
175
176pub(crate) struct RtcPeerConnection {
177    inner: web_sys::RtcPeerConnection,
178}
179
180impl RtcPeerConnection {
181    pub(crate) async fn new(algorithm: String) -> Result<Self, Error> {
182        let algo: Object = Object::new();
183        Reflect::set(&algo, &"name".into(), &"ECDSA".into()).unwrap();
184        Reflect::set(&algo, &"namedCurve".into(), &"P-256".into()).unwrap();
185        Reflect::set(&algo, &"hash".into(), &algorithm.into()).unwrap();
186
187        let certificate_promise =
188            web_sys::RtcPeerConnection::generate_certificate_with_object(&algo)
189                .expect("certificate to be valid");
190
191        let certificate = JsFuture::from(certificate_promise).await?;
192
193        let config = RtcConfiguration::default();
194        // wrap certificate in a js Array first before adding it to the config object
195        let certificate_arr = js_sys::Array::new();
196        certificate_arr.push(&certificate);
197        config.set_certificates(&certificate_arr);
198
199        let inner = web_sys::RtcPeerConnection::new_with_configuration(&config)?;
200
201        Ok(Self { inner })
202    }
203
204    /// Creates the stream for the initial noise handshake.
205    ///
206    /// The underlying data channel MUST have `negotiated` set to `true` and carry the ID 0.
207    pub(crate) fn new_handshake_stream(&self) -> (Stream, DropListener) {
208        Stream::new(self.new_data_channel(true))
209    }
210
211    /// Creates a regular data channel for when the connection is already established.
212    pub(crate) fn new_regular_data_channel(&self) -> RtcDataChannel {
213        self.new_data_channel(false)
214    }
215
216    fn new_data_channel(&self, negotiated: bool) -> RtcDataChannel {
217        const LABEL: &str = "";
218
219        let dc = match negotiated {
220            true => {
221                let options = RtcDataChannelInit::new();
222                options.set_negotiated(true);
223                options.set_id(0); // id is only ever set to zero when negotiated is true
224
225                self.inner
226                    .create_data_channel_with_data_channel_dict(LABEL, &options)
227            }
228            false => self.inner.create_data_channel(LABEL),
229        };
230        dc.set_binary_type(RtcDataChannelType::Arraybuffer); // Hardcoded here, it's the only type we use
231
232        dc
233    }
234
235    pub(crate) async fn create_offer(&self) -> Result<String, Error> {
236        let offer = JsFuture::from(self.inner.create_offer()).await?;
237
238        let offer = Reflect::get(&offer, &JsValue::from_str("sdp"))
239            .expect("sdp should be valid")
240            .as_string()
241            .expect("sdp string should be valid string");
242
243        Ok(offer)
244    }
245
246    pub(crate) async fn set_local_description(
247        &self,
248        sdp: RtcSessionDescriptionInit,
249    ) -> Result<(), Error> {
250        let promise = self.inner.set_local_description(&sdp);
251        JsFuture::from(promise).await?;
252
253        Ok(())
254    }
255
256    pub(crate) fn local_fingerprint(&self) -> Result<Fingerprint, Error> {
257        let sdp = &self
258            .inner
259            .local_description()
260            .ok_or_else(|| Error::Js("No local description".to_string()))?
261            .sdp();
262
263        let fingerprint =
264            parse_fingerprint(sdp).ok_or_else(|| Error::Js("No fingerprint in SDP".to_string()))?;
265
266        Ok(fingerprint)
267    }
268
269    pub(crate) async fn set_remote_description(
270        &self,
271        sdp: RtcSessionDescriptionInit,
272    ) -> Result<(), Error> {
273        let promise = self.inner.set_remote_description(&sdp);
274        JsFuture::from(promise).await?;
275
276        Ok(())
277    }
278}
279
280/// Parse Fingerprint from a SDP.
281fn parse_fingerprint(sdp: &str) -> Option<Fingerprint> {
282    // split the sdp by new lines / carriage returns
283    let lines = sdp.split("\r\n");
284
285    // iterate through the lines to find the one starting with a=fingerprint:
286    // get the value after the first space
287    // return the value as a Fingerprint
288    for line in lines {
289        if line.starts_with("a=fingerprint:") {
290            let fingerprint = line.split(' ').nth(1).unwrap();
291            let bytes = hex::decode(fingerprint.replace(':', "")).unwrap();
292            let arr: [u8; 32] = bytes.as_slice().try_into().unwrap();
293            return Some(Fingerprint::raw(arr));
294        }
295    }
296    None
297}
298
299#[cfg(test)]
300mod sdp_tests {
301    use super::*;
302
303    #[test]
304    fn test_fingerprint() {
305        let sdp = "v=0\r\no=- 0 0 IN IP6 ::1\r\ns=-\r\nc=IN IP6 ::1\r\nt=0 0\r\na=ice-lite\r\nm=application 61885 UDP/DTLS/SCTP webrtc-datachannel\r\na=mid:0\r\na=setup:passive\r\na=ice-ufrag:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=ice-pwd:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=fingerprint:sha-256 A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89\r\na=sctp-port:5000\r\na=max-message-size:16384\r\na=candidate:1467250027 1 UDP 1467250027 ::1 61885 typ host\r\n";
306
307        let fingerprint = parse_fingerprint(sdp).unwrap();
308
309        assert_eq!(fingerprint.algorithm(), "sha-256");
310        assert_eq!(fingerprint.to_sdp_format(), "A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89");
311    }
312}