libp2p_webtransport_websys/
connection.rs

1use std::{
2    collections::HashSet,
3    future::poll_fn,
4    pin::Pin,
5    task::{ready, Context, Poll},
6};
7
8use futures::FutureExt;
9use libp2p_core::{
10    muxing::{StreamMuxer, StreamMuxerEvent},
11    upgrade::OutboundConnectionUpgrade,
12    UpgradeInfo,
13};
14use libp2p_identity::{Keypair, PeerId};
15use multihash::Multihash;
16use send_wrapper::SendWrapper;
17use wasm_bindgen_futures::JsFuture;
18use web_sys::ReadableStreamDefaultReader;
19
20use crate::{
21    bindings::{WebTransport, WebTransportBidirectionalStream},
22    endpoint::Endpoint,
23    fused_js_promise::FusedJsPromise,
24    utils::{detach_promise, parse_reader_response, to_js_type},
25    Error, Stream,
26};
27
28/// An opened WebTransport connection.
29#[derive(Debug)]
30pub struct Connection {
31    // Swarm needs all types to be Send. WASM is single-threaded
32    // and it is safe to use SendWrapper.
33    inner: SendWrapper<ConnectionInner>,
34}
35
36#[derive(Debug)]
37struct ConnectionInner {
38    session: WebTransport,
39    create_stream_promise: FusedJsPromise,
40    incoming_stream_promise: FusedJsPromise,
41    incoming_streams_reader: ReadableStreamDefaultReader,
42    closed: bool,
43}
44
45impl Connection {
46    pub(crate) fn new(endpoint: &Endpoint) -> Result<Self, Error> {
47        let url = endpoint.url();
48
49        let session = if endpoint.certhashes.is_empty() {
50            // Endpoint has CA-signed TLS certificate.
51            WebTransport::new(&url).map_err(Error::from_js_value)?
52        } else {
53            // Endpoint has self-signed TLS certificates.
54            let opts = endpoint.webtransport_opts();
55            WebTransport::new_with_options(&url, &opts).map_err(Error::from_js_value)?
56        };
57        // Create a promise that will resolve once session is closed.
58        // It will catch the errors that can eventually happen when
59        // `.close()` is called. Without it, there is no way of catching
60        // those from the `.close()` itself, resulting in `Uncaught in promise...`
61        // logs popping up.
62        detach_promise(session.closed());
63
64        let incoming_streams = session.incoming_bidirectional_streams();
65        let incoming_streams_reader =
66            to_js_type::<ReadableStreamDefaultReader>(incoming_streams.get_reader())?;
67
68        Ok(Connection {
69            inner: SendWrapper::new(ConnectionInner {
70                session,
71                create_stream_promise: FusedJsPromise::new(),
72                incoming_stream_promise: FusedJsPromise::new(),
73                incoming_streams_reader,
74                closed: false,
75            }),
76        })
77    }
78
79    pub(crate) async fn authenticate(
80        &mut self,
81        keypair: &Keypair,
82        remote_peer: Option<PeerId>,
83        certhashes: HashSet<Multihash<64>>,
84    ) -> Result<PeerId, Error> {
85        let fut = SendWrapper::new(self.inner.authenticate(keypair, remote_peer, certhashes));
86        fut.await
87    }
88}
89
90impl ConnectionInner {
91    /// Authenticates with the server
92    ///
93    /// This methods runs the security handshake as descripted
94    /// in the [spec][1]. It validates the certhashes and peer ID
95    /// of the server.
96    ///
97    /// [1]: https://github.com/libp2p/specs/tree/master/webtransport#security-handshake
98    async fn authenticate(
99        &mut self,
100        keypair: &Keypair,
101        remote_peer: Option<PeerId>,
102        certhashes: HashSet<Multihash<64>>,
103    ) -> Result<PeerId, Error> {
104        JsFuture::from(self.session.ready())
105            .await
106            .map_err(Error::from_js_value)?;
107
108        let stream = poll_fn(|cx| self.poll_create_bidirectional_stream(cx)).await?;
109        let mut noise = libp2p_noise::Config::new(keypair)?;
110
111        if !certhashes.is_empty() {
112            noise = noise.with_webtransport_certhashes(certhashes);
113        }
114
115        // We do not use `upgrade::apply_outbound` function because it uses
116        // `multistream_select` protocol, which is not used by WebTransport spec.
117        let info = noise.protocol_info().next().unwrap_or_default();
118        let (peer_id, _io) = noise.upgrade_outbound(stream, info).await?;
119
120        // TODO: This should be part libp2p-noise
121        if let Some(expected_peer_id) = remote_peer {
122            if peer_id != expected_peer_id {
123                return Err(Error::UnknownRemotePeerId);
124            }
125        }
126
127        Ok(peer_id)
128    }
129
130    /// Initiates and polls a promise from `create_bidirectional_stream`.
131    fn poll_create_bidirectional_stream(
132        &mut self,
133        cx: &mut Context,
134    ) -> Poll<Result<Stream, Error>> {
135        // Create bidirectional stream
136        let val = ready!(self
137            .create_stream_promise
138            .maybe_init(|| self.session.create_bidirectional_stream())
139            .poll_unpin(cx))
140        .map_err(Error::from_js_value)?;
141
142        let bidi_stream = to_js_type::<WebTransportBidirectionalStream>(val)?;
143        let stream = Stream::new(bidi_stream)?;
144
145        Poll::Ready(Ok(stream))
146    }
147
148    /// Polls for incoming stream from `incoming_bidirectional_streams` reader.
149    fn poll_incoming_bidirectional_streams(
150        &mut self,
151        cx: &mut Context,
152    ) -> Poll<Result<Stream, Error>> {
153        // Read the next incoming stream from the JS channel
154        let val = ready!(self
155            .incoming_stream_promise
156            .maybe_init(|| self.incoming_streams_reader.read())
157            .poll_unpin(cx))
158        .map_err(Error::from_js_value)?;
159
160        let val = parse_reader_response(&val)
161            .map_err(Error::from_js_value)?
162            .ok_or_else(|| Error::JsError("incoming_bidirectional_streams closed".to_string()))?;
163
164        let bidi_stream = to_js_type::<WebTransportBidirectionalStream>(val)?;
165        let stream = Stream::new(bidi_stream)?;
166
167        Poll::Ready(Ok(stream))
168    }
169
170    /// Closes the session.
171    ///
172    /// This closes the streams also and they will return an error
173    /// when they will be used.
174    fn close_session(&mut self) {
175        if !self.closed {
176            detach_promise(self.incoming_streams_reader.cancel());
177            self.session.close();
178            self.closed = true;
179        }
180    }
181}
182
183impl Drop for ConnectionInner {
184    fn drop(&mut self) {
185        self.close_session();
186    }
187}
188
189/// WebTransport native multiplexing
190impl StreamMuxer for Connection {
191    type Substream = Stream;
192    type Error = Error;
193
194    fn poll_inbound(
195        mut self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197    ) -> Poll<Result<Self::Substream, Self::Error>> {
198        self.inner.poll_incoming_bidirectional_streams(cx)
199    }
200
201    fn poll_outbound(
202        mut self: Pin<&mut Self>,
203        cx: &mut Context<'_>,
204    ) -> Poll<Result<Self::Substream, Self::Error>> {
205        self.inner.poll_create_bidirectional_stream(cx)
206    }
207
208    fn poll_close(
209        mut self: Pin<&mut Self>,
210        _cx: &mut Context<'_>,
211    ) -> Poll<Result<(), Self::Error>> {
212        self.inner.close_session();
213        Poll::Ready(Ok(()))
214    }
215
216    fn poll(
217        self: Pin<&mut Self>,
218        _cx: &mut Context<'_>,
219    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
220        Poll::Pending
221    }
222}