webtransport_tests/
lib.rs1#![allow(unexpected_cfgs)]
2use std::{future::poll_fn, pin::Pin};
3
4use futures::{channel::oneshot, AsyncReadExt, AsyncWriteExt};
5use getrandom::getrandom;
6use libp2p_core::{
7 transport::{DialOpts, PortUse},
8 Endpoint, StreamMuxer, Transport as _,
9};
10use libp2p_identity::{Keypair, PeerId};
11use libp2p_noise as noise;
12use libp2p_webtransport_websys::{Config, Connection, Error, Stream, Transport};
13use multiaddr::{Multiaddr, Protocol};
14use multihash::Multihash;
15use wasm_bindgen::JsCast;
16use wasm_bindgen_futures::{spawn_local, JsFuture};
17use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
18use web_sys::{window, Response};
19
20wasm_bindgen_test_configure!(run_in_browser);
21
22#[wasm_bindgen_test]
23pub async fn single_conn_single_stream() {
24 let mut conn = new_connection_to_echo_server().await;
25 let mut stream = create_stream(&mut conn).await;
26
27 send_recv(&mut stream).await;
28}
29
30#[wasm_bindgen_test]
31pub async fn single_conn_single_stream_incoming() {
32 let mut conn = new_connection_to_echo_server().await;
33 let mut stream = incoming_stream(&mut conn).await;
34
35 send_recv(&mut stream).await;
36}
37
38#[wasm_bindgen_test]
39pub async fn single_conn_multiple_streams() {
40 let mut conn = new_connection_to_echo_server().await;
41 let mut tasks = Vec::new();
42 let mut streams = Vec::new();
43
44 for i in 0..30 {
45 let stream = if i % 2 == 0 {
46 create_stream(&mut conn).await
47 } else {
48 incoming_stream(&mut conn).await
49 };
50
51 streams.push(stream);
52 }
53
54 for stream in streams {
55 tasks.push(send_recv_task(stream));
56 }
57
58 futures::future::try_join_all(tasks).await.unwrap();
59}
60
61#[wasm_bindgen_test]
62pub async fn multiple_conn_multiple_streams() {
63 let mut tasks = Vec::new();
64 let mut conns = Vec::new();
65
66 for _ in 0..10 {
67 let mut conn = new_connection_to_echo_server().await;
68 let mut streams = Vec::new();
69
70 for i in 0..10 {
71 let stream = if i % 2 == 0 {
72 create_stream(&mut conn).await
73 } else {
74 incoming_stream(&mut conn).await
75 };
76
77 streams.push(stream);
78 }
79
80 conns.push(conn);
83
84 for stream in streams {
85 tasks.push(send_recv_task(stream));
86 }
87 }
88
89 futures::future::try_join_all(tasks).await.unwrap();
90}
91
92#[wasm_bindgen_test]
93pub async fn multiple_conn_multiple_streams_sequential() {
94 for _ in 0..10 {
95 let mut conn = new_connection_to_echo_server().await;
96
97 for i in 0..10 {
98 let mut stream = if i % 2 == 0 {
99 create_stream(&mut conn).await
100 } else {
101 incoming_stream(&mut conn).await
102 };
103
104 send_recv(&mut stream).await;
105 }
106 }
107}
108
109#[wasm_bindgen_test]
110pub async fn read_leftovers() {
111 let mut conn = new_connection_to_echo_server().await;
112 let mut stream = create_stream(&mut conn).await;
113
114 send_recv(&mut stream).await;
116
117 stream.write_all(b"hello").await.unwrap();
118
119 let mut buf = [0u8; 3];
120
121 let len = stream.read(&mut buf[..]).await.unwrap();
123 assert_eq!(len, 3);
124 assert_eq!(&buf[..len], b"hel");
125
126 let len = stream.read(&mut buf[..]).await.unwrap();
128 assert_eq!(len, 2);
129 assert_eq!(&buf[..len], b"lo");
130}
131
132#[wasm_bindgen_test]
133pub async fn allow_read_after_closing_writer() {
134 let mut conn = new_connection_to_echo_server().await;
135 let mut stream = create_stream(&mut conn).await;
136
137 send_recv(&mut stream).await;
139
140 let mut send_buf = [0u8; 1024];
142 getrandom(&mut send_buf).unwrap();
143 stream.write_all(&send_buf).await.unwrap();
144
145 stream.close().await.unwrap();
147
148 stream.write_all(b"1").await.unwrap_err();
150
151 let mut recv_buf = [0u8; 1024];
153 stream.read_exact(&mut recv_buf).await.unwrap();
154
155 assert_eq!(send_buf, recv_buf);
156}
157
158#[wasm_bindgen_test]
159pub async fn poll_outbound_error_after_connection_close() {
160 let mut conn = new_connection_to_echo_server().await;
161
162 let mut stream = create_stream(&mut conn).await;
164 send_recv(&mut stream).await;
165 drop(stream);
166
167 poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
168 .await
169 .unwrap();
170
171 poll_fn(|cx| Pin::new(&mut conn).poll_outbound(cx))
172 .await
173 .expect_err("poll_outbound error after conn closed");
174}
175
176#[wasm_bindgen_test]
177pub async fn poll_inbound_error_after_connection_close() {
178 let mut conn = new_connection_to_echo_server().await;
179
180 let mut stream = incoming_stream(&mut conn).await;
182 send_recv(&mut stream).await;
183 drop(stream);
184
185 poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
186 .await
187 .unwrap();
188
189 poll_fn(|cx| Pin::new(&mut conn).poll_inbound(cx))
190 .await
191 .expect_err("poll_inbound error after conn closed");
192}
193
194#[wasm_bindgen_test]
195pub async fn read_error_after_connection_drop() {
196 let mut conn = new_connection_to_echo_server().await;
197 let mut stream = create_stream(&mut conn).await;
198
199 send_recv(&mut stream).await;
200 drop(conn);
201
202 let mut buf = [0u8; 16];
203 stream
204 .read(&mut buf)
205 .await
206 .expect_err("read error after conn drop");
207}
208
209#[wasm_bindgen_test]
210pub async fn read_error_after_connection_close() {
211 let mut conn = new_connection_to_echo_server().await;
212 let mut stream = create_stream(&mut conn).await;
213
214 send_recv(&mut stream).await;
215
216 poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
217 .await
218 .unwrap();
219
220 let mut buf = [0u8; 16];
221 stream
222 .read(&mut buf)
223 .await
224 .expect_err("read error after conn drop");
225}
226
227#[wasm_bindgen_test]
228pub async fn write_error_after_connection_drop() {
229 let mut conn = new_connection_to_echo_server().await;
230 let mut stream = create_stream(&mut conn).await;
231
232 send_recv(&mut stream).await;
233 drop(conn);
234
235 let buf = [0u8; 16];
236 stream
237 .write(&buf)
238 .await
239 .expect_err("write error after conn drop");
240}
241
242#[wasm_bindgen_test]
243pub async fn write_error_after_connection_close() {
244 let mut conn = new_connection_to_echo_server().await;
245 let mut stream = create_stream(&mut conn).await;
246
247 send_recv(&mut stream).await;
248
249 poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
250 .await
251 .unwrap();
252
253 let buf = [0u8; 16];
254 stream
255 .write(&buf)
256 .await
257 .expect_err("write error after conn drop");
258}
259
260#[wasm_bindgen_test]
261pub async fn connect_without_peer_id() {
262 let mut addr = fetch_server_addr().await;
263 let keypair = Keypair::generate_ed25519();
264
265 addr.pop();
267
268 let mut transport = Transport::new(Config::new(&keypair));
269 transport
270 .dial(
271 addr,
272 DialOpts {
273 role: Endpoint::Dialer,
274 port_use: PortUse::Reuse,
275 },
276 )
277 .unwrap()
278 .await
279 .unwrap();
280}
281
282#[wasm_bindgen_test]
283pub async fn error_on_unknown_peer_id() {
284 let mut addr = fetch_server_addr().await;
285 let keypair = Keypair::generate_ed25519();
286
287 addr.pop();
289
290 addr.push(Protocol::P2p(PeerId::random()));
292
293 let mut transport = Transport::new(Config::new(&keypair));
294 let e = transport
295 .dial(
296 addr.clone(),
297 DialOpts {
298 role: Endpoint::Dialer,
299 port_use: PortUse::Reuse,
300 },
301 )
302 .unwrap()
303 .await
304 .unwrap_err();
305 assert!(matches!(e, Error::UnknownRemotePeerId));
306}
307
308#[wasm_bindgen_test]
309pub async fn error_on_unknown_certhash() {
310 let mut addr = fetch_server_addr().await;
311 let keypair = Keypair::generate_ed25519();
312
313 let peer_id = addr.pop().unwrap();
315
316 addr.push(Protocol::Certhash(Multihash::wrap(1, b"1").unwrap()));
318
319 addr.push(peer_id);
321
322 let mut transport = Transport::new(Config::new(&keypair));
323 let e = transport
324 .dial(
325 addr.clone(),
326 DialOpts {
327 role: Endpoint::Dialer,
328 port_use: PortUse::Reuse,
329 },
330 )
331 .unwrap()
332 .await
333 .unwrap_err();
334 assert!(matches!(
335 e,
336 Error::Noise(noise::Error::UnknownWebTransportCerthashes(..))
337 ));
338}
339
340async fn new_connection_to_echo_server() -> Connection {
341 let addr = fetch_server_addr().await;
342 let keypair = Keypair::generate_ed25519();
343
344 let mut transport = Transport::new(Config::new(&keypair));
345
346 let (_peer_id, conn) = transport
347 .dial(
348 addr,
349 DialOpts {
350 role: Endpoint::Dialer,
351 port_use: PortUse::Reuse,
352 },
353 )
354 .unwrap()
355 .await
356 .unwrap();
357
358 conn
359}
360
361async fn fetch_server_addr() -> Multiaddr {
366 let url = "http://127.0.0.1:4455/";
367 let window = window().expect("failed to get browser window");
368
369 let value = JsFuture::from(window.fetch_with_str(url))
370 .await
371 .expect("fetch failed");
372 let resp = value.dyn_into::<Response>().expect("cast failed");
373
374 let text = resp.text().expect("text failed");
375 let text = JsFuture::from(text).await.expect("text promise failed");
376
377 text.as_string()
378 .filter(|s| !s.is_empty())
379 .expect("response not a text")
380 .parse()
381 .unwrap()
382}
383
384#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] async fn create_stream(conn: &mut Connection) -> Stream {
386 poll_fn(|cx| Pin::new(&mut *conn).poll_outbound(cx))
387 .await
388 .unwrap()
389}
390
391#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] async fn incoming_stream(conn: &mut Connection) -> Stream {
393 let mut stream = poll_fn(|cx| Pin::new(&mut *conn).poll_inbound(cx))
394 .await
395 .unwrap();
396
397 let mut buf = [0u8; 1];
399 stream.read_exact(&mut buf).await.unwrap();
400
401 stream
402}
403
404fn send_recv_task(mut steam: Stream) -> oneshot::Receiver<()> {
405 let (tx, rx) = oneshot::channel();
406
407 spawn_local(async move {
408 send_recv(&mut steam).await;
409 tx.send(()).unwrap();
410 });
411
412 rx
413}
414
415async fn send_recv(stream: &mut Stream) {
416 let mut send_buf = [0u8; 1024];
417 let mut recv_buf = [0u8; 1024];
418
419 for _ in 0..30 {
420 getrandom(&mut send_buf).unwrap();
421
422 stream.write_all(&send_buf).await.unwrap();
423 stream.read_exact(&mut recv_buf).await.unwrap();
424
425 assert_eq!(send_buf, recv_buf);
426 }
427}