libp2p_webtransport_websys/
connection.rs1use std::{
2 collections::HashSet,
3 future::poll_fn,
4 pin::Pin,
5 task::{ready, Context, Poll},
6};
7
8use futures::FutureExt;
9use libp2p_core::{
10 muxing::{StreamMuxer, StreamMuxerEvent},
11 upgrade::OutboundConnectionUpgrade,
12 UpgradeInfo,
13};
14use libp2p_identity::{Keypair, PeerId};
15use multihash::Multihash;
16use send_wrapper::SendWrapper;
17use wasm_bindgen_futures::JsFuture;
18use web_sys::ReadableStreamDefaultReader;
19
20use crate::{
21 bindings::{WebTransport, WebTransportBidirectionalStream},
22 endpoint::Endpoint,
23 fused_js_promise::FusedJsPromise,
24 utils::{detach_promise, parse_reader_response, to_js_type},
25 Error, Stream,
26};
27
28#[derive(Debug)]
30pub struct Connection {
31 inner: SendWrapper<ConnectionInner>,
34}
35
36#[derive(Debug)]
37struct ConnectionInner {
38 session: WebTransport,
39 create_stream_promise: FusedJsPromise,
40 incoming_stream_promise: FusedJsPromise,
41 incoming_streams_reader: ReadableStreamDefaultReader,
42 closed: bool,
43}
44
45impl Connection {
46 pub(crate) fn new(endpoint: &Endpoint) -> Result<Self, Error> {
47 let url = endpoint.url();
48
49 let session = if endpoint.certhashes.is_empty() {
50 WebTransport::new(&url).map_err(Error::from_js_value)?
52 } else {
53 let opts = endpoint.webtransport_opts();
55 WebTransport::new_with_options(&url, &opts).map_err(Error::from_js_value)?
56 };
57 detach_promise(session.closed());
63
64 let incoming_streams = session.incoming_bidirectional_streams();
65 let incoming_streams_reader =
66 to_js_type::<ReadableStreamDefaultReader>(incoming_streams.get_reader())?;
67
68 Ok(Connection {
69 inner: SendWrapper::new(ConnectionInner {
70 session,
71 create_stream_promise: FusedJsPromise::new(),
72 incoming_stream_promise: FusedJsPromise::new(),
73 incoming_streams_reader,
74 closed: false,
75 }),
76 })
77 }
78
79 pub(crate) async fn authenticate(
80 &mut self,
81 keypair: &Keypair,
82 remote_peer: Option<PeerId>,
83 certhashes: HashSet<Multihash<64>>,
84 ) -> Result<PeerId, Error> {
85 let fut = SendWrapper::new(self.inner.authenticate(keypair, remote_peer, certhashes));
86 fut.await
87 }
88}
89
90impl ConnectionInner {
91 async fn authenticate(
99 &mut self,
100 keypair: &Keypair,
101 remote_peer: Option<PeerId>,
102 certhashes: HashSet<Multihash<64>>,
103 ) -> Result<PeerId, Error> {
104 JsFuture::from(self.session.ready())
105 .await
106 .map_err(Error::from_js_value)?;
107
108 let stream = poll_fn(|cx| self.poll_create_bidirectional_stream(cx)).await?;
109 let mut noise = libp2p_noise::Config::new(keypair)?;
110
111 if !certhashes.is_empty() {
112 noise = noise.with_webtransport_certhashes(certhashes);
113 }
114
115 let info = noise.protocol_info().next().unwrap_or_default();
118 let (peer_id, _io) = noise.upgrade_outbound(stream, info).await?;
119
120 if let Some(expected_peer_id) = remote_peer {
122 if peer_id != expected_peer_id {
123 return Err(Error::UnknownRemotePeerId);
124 }
125 }
126
127 Ok(peer_id)
128 }
129
130 fn poll_create_bidirectional_stream(
132 &mut self,
133 cx: &mut Context,
134 ) -> Poll<Result<Stream, Error>> {
135 let val = ready!(self
137 .create_stream_promise
138 .maybe_init(|| self.session.create_bidirectional_stream())
139 .poll_unpin(cx))
140 .map_err(Error::from_js_value)?;
141
142 let bidi_stream = to_js_type::<WebTransportBidirectionalStream>(val)?;
143 let stream = Stream::new(bidi_stream)?;
144
145 Poll::Ready(Ok(stream))
146 }
147
148 fn poll_incoming_bidirectional_streams(
150 &mut self,
151 cx: &mut Context,
152 ) -> Poll<Result<Stream, Error>> {
153 let val = ready!(self
155 .incoming_stream_promise
156 .maybe_init(|| self.incoming_streams_reader.read())
157 .poll_unpin(cx))
158 .map_err(Error::from_js_value)?;
159
160 let val = parse_reader_response(&val)
161 .map_err(Error::from_js_value)?
162 .ok_or_else(|| Error::JsError("incoming_bidirectional_streams closed".to_string()))?;
163
164 let bidi_stream = to_js_type::<WebTransportBidirectionalStream>(val)?;
165 let stream = Stream::new(bidi_stream)?;
166
167 Poll::Ready(Ok(stream))
168 }
169
170 fn close_session(&mut self) {
175 if !self.closed {
176 detach_promise(self.incoming_streams_reader.cancel());
177 self.session.close();
178 self.closed = true;
179 }
180 }
181}
182
183impl Drop for ConnectionInner {
184 fn drop(&mut self) {
185 self.close_session();
186 }
187}
188
189impl StreamMuxer for Connection {
191 type Substream = Stream;
192 type Error = Error;
193
194 fn poll_inbound(
195 mut self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 ) -> Poll<Result<Self::Substream, Self::Error>> {
198 self.inner.poll_incoming_bidirectional_streams(cx)
199 }
200
201 fn poll_outbound(
202 mut self: Pin<&mut Self>,
203 cx: &mut Context<'_>,
204 ) -> Poll<Result<Self::Substream, Self::Error>> {
205 self.inner.poll_create_bidirectional_stream(cx)
206 }
207
208 fn poll_close(
209 mut self: Pin<&mut Self>,
210 _cx: &mut Context<'_>,
211 ) -> Poll<Result<(), Self::Error>> {
212 self.inner.close_session();
213 Poll::Ready(Ok(()))
214 }
215
216 fn poll(
217 self: Pin<&mut Self>,
218 _cx: &mut Context<'_>,
219 ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
220 Poll::Pending
221 }
222}