webtransport_tests/
lib.rs

1#![allow(unexpected_cfgs)]
2use std::{future::poll_fn, pin::Pin};
3
4use futures::{channel::oneshot, AsyncReadExt, AsyncWriteExt};
5use getrandom::getrandom;
6use libp2p_core::{
7    transport::{DialOpts, PortUse},
8    Endpoint, StreamMuxer, Transport as _,
9};
10use libp2p_identity::{Keypair, PeerId};
11use libp2p_noise as noise;
12use libp2p_webtransport_websys::{Config, Connection, Error, Stream, Transport};
13use multiaddr::{Multiaddr, Protocol};
14use multihash::Multihash;
15use wasm_bindgen::JsCast;
16use wasm_bindgen_futures::{spawn_local, JsFuture};
17use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
18use web_sys::{window, Response};
19
20wasm_bindgen_test_configure!(run_in_browser);
21
22#[wasm_bindgen_test]
23pub async fn single_conn_single_stream() {
24    let mut conn = new_connection_to_echo_server().await;
25    let mut stream = create_stream(&mut conn).await;
26
27    send_recv(&mut stream).await;
28}
29
30#[wasm_bindgen_test]
31pub async fn single_conn_single_stream_incoming() {
32    let mut conn = new_connection_to_echo_server().await;
33    let mut stream = incoming_stream(&mut conn).await;
34
35    send_recv(&mut stream).await;
36}
37
38#[wasm_bindgen_test]
39pub async fn single_conn_multiple_streams() {
40    let mut conn = new_connection_to_echo_server().await;
41    let mut tasks = Vec::new();
42    let mut streams = Vec::new();
43
44    for i in 0..30 {
45        let stream = if i % 2 == 0 {
46            create_stream(&mut conn).await
47        } else {
48            incoming_stream(&mut conn).await
49        };
50
51        streams.push(stream);
52    }
53
54    for stream in streams {
55        tasks.push(send_recv_task(stream));
56    }
57
58    futures::future::try_join_all(tasks).await.unwrap();
59}
60
61#[wasm_bindgen_test]
62pub async fn multiple_conn_multiple_streams() {
63    let mut tasks = Vec::new();
64    let mut conns = Vec::new();
65
66    for _ in 0..10 {
67        let mut conn = new_connection_to_echo_server().await;
68        let mut streams = Vec::new();
69
70        for i in 0..10 {
71            let stream = if i % 2 == 0 {
72                create_stream(&mut conn).await
73            } else {
74                incoming_stream(&mut conn).await
75            };
76
77            streams.push(stream);
78        }
79
80        // If `conn` gets drop then its streams will close.
81        // Keep it alive by moving it to the outer scope.
82        conns.push(conn);
83
84        for stream in streams {
85            tasks.push(send_recv_task(stream));
86        }
87    }
88
89    futures::future::try_join_all(tasks).await.unwrap();
90}
91
92#[wasm_bindgen_test]
93pub async fn multiple_conn_multiple_streams_sequential() {
94    for _ in 0..10 {
95        let mut conn = new_connection_to_echo_server().await;
96
97        for i in 0..10 {
98            let mut stream = if i % 2 == 0 {
99                create_stream(&mut conn).await
100            } else {
101                incoming_stream(&mut conn).await
102            };
103
104            send_recv(&mut stream).await;
105        }
106    }
107}
108
109#[wasm_bindgen_test]
110pub async fn read_leftovers() {
111    let mut conn = new_connection_to_echo_server().await;
112    let mut stream = create_stream(&mut conn).await;
113
114    // Test that stream works
115    send_recv(&mut stream).await;
116
117    stream.write_all(b"hello").await.unwrap();
118
119    let mut buf = [0u8; 3];
120
121    // Read first half
122    let len = stream.read(&mut buf[..]).await.unwrap();
123    assert_eq!(len, 3);
124    assert_eq!(&buf[..len], b"hel");
125
126    // Read second half
127    let len = stream.read(&mut buf[..]).await.unwrap();
128    assert_eq!(len, 2);
129    assert_eq!(&buf[..len], b"lo");
130}
131
132#[wasm_bindgen_test]
133pub async fn allow_read_after_closing_writer() {
134    let mut conn = new_connection_to_echo_server().await;
135    let mut stream = create_stream(&mut conn).await;
136
137    // Test that stream works
138    send_recv(&mut stream).await;
139
140    // Write random data
141    let mut send_buf = [0u8; 1024];
142    getrandom(&mut send_buf).unwrap();
143    stream.write_all(&send_buf).await.unwrap();
144
145    // Close writer by calling AsyncWrite::poll_close
146    stream.close().await.unwrap();
147
148    // Make sure writer is closed
149    stream.write_all(b"1").await.unwrap_err();
150
151    // We should be able to read
152    let mut recv_buf = [0u8; 1024];
153    stream.read_exact(&mut recv_buf).await.unwrap();
154
155    assert_eq!(send_buf, recv_buf);
156}
157
158#[wasm_bindgen_test]
159pub async fn poll_outbound_error_after_connection_close() {
160    let mut conn = new_connection_to_echo_server().await;
161
162    // Make sure that poll_outbound works well before closing the connection
163    let mut stream = create_stream(&mut conn).await;
164    send_recv(&mut stream).await;
165    drop(stream);
166
167    poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
168        .await
169        .unwrap();
170
171    poll_fn(|cx| Pin::new(&mut conn).poll_outbound(cx))
172        .await
173        .expect_err("poll_outbound error after conn closed");
174}
175
176#[wasm_bindgen_test]
177pub async fn poll_inbound_error_after_connection_close() {
178    let mut conn = new_connection_to_echo_server().await;
179
180    // Make sure that poll_inbound works well before closing the connection
181    let mut stream = incoming_stream(&mut conn).await;
182    send_recv(&mut stream).await;
183    drop(stream);
184
185    poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
186        .await
187        .unwrap();
188
189    poll_fn(|cx| Pin::new(&mut conn).poll_inbound(cx))
190        .await
191        .expect_err("poll_inbound error after conn closed");
192}
193
194#[wasm_bindgen_test]
195pub async fn read_error_after_connection_drop() {
196    let mut conn = new_connection_to_echo_server().await;
197    let mut stream = create_stream(&mut conn).await;
198
199    send_recv(&mut stream).await;
200    drop(conn);
201
202    let mut buf = [0u8; 16];
203    stream
204        .read(&mut buf)
205        .await
206        .expect_err("read error after conn drop");
207}
208
209#[wasm_bindgen_test]
210pub async fn read_error_after_connection_close() {
211    let mut conn = new_connection_to_echo_server().await;
212    let mut stream = create_stream(&mut conn).await;
213
214    send_recv(&mut stream).await;
215
216    poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
217        .await
218        .unwrap();
219
220    let mut buf = [0u8; 16];
221    stream
222        .read(&mut buf)
223        .await
224        .expect_err("read error after conn drop");
225}
226
227#[wasm_bindgen_test]
228pub async fn write_error_after_connection_drop() {
229    let mut conn = new_connection_to_echo_server().await;
230    let mut stream = create_stream(&mut conn).await;
231
232    send_recv(&mut stream).await;
233    drop(conn);
234
235    let buf = [0u8; 16];
236    stream
237        .write(&buf)
238        .await
239        .expect_err("write error after conn drop");
240}
241
242#[wasm_bindgen_test]
243pub async fn write_error_after_connection_close() {
244    let mut conn = new_connection_to_echo_server().await;
245    let mut stream = create_stream(&mut conn).await;
246
247    send_recv(&mut stream).await;
248
249    poll_fn(|cx| Pin::new(&mut conn).poll_close(cx))
250        .await
251        .unwrap();
252
253    let buf = [0u8; 16];
254    stream
255        .write(&buf)
256        .await
257        .expect_err("write error after conn drop");
258}
259
260#[wasm_bindgen_test]
261pub async fn connect_without_peer_id() {
262    let mut addr = fetch_server_addr().await;
263    let keypair = Keypair::generate_ed25519();
264
265    // Remove peer id
266    addr.pop();
267
268    let mut transport = Transport::new(Config::new(&keypair));
269    transport
270        .dial(
271            addr,
272            DialOpts {
273                role: Endpoint::Dialer,
274                port_use: PortUse::Reuse,
275            },
276        )
277        .unwrap()
278        .await
279        .unwrap();
280}
281
282#[wasm_bindgen_test]
283pub async fn error_on_unknown_peer_id() {
284    let mut addr = fetch_server_addr().await;
285    let keypair = Keypair::generate_ed25519();
286
287    // Remove peer id
288    addr.pop();
289
290    // Add an unknown one
291    addr.push(Protocol::P2p(PeerId::random()));
292
293    let mut transport = Transport::new(Config::new(&keypair));
294    let e = transport
295        .dial(
296            addr.clone(),
297            DialOpts {
298                role: Endpoint::Dialer,
299                port_use: PortUse::Reuse,
300            },
301        )
302        .unwrap()
303        .await
304        .unwrap_err();
305    assert!(matches!(e, Error::UnknownRemotePeerId));
306}
307
308#[wasm_bindgen_test]
309pub async fn error_on_unknown_certhash() {
310    let mut addr = fetch_server_addr().await;
311    let keypair = Keypair::generate_ed25519();
312
313    // Remove peer id
314    let peer_id = addr.pop().unwrap();
315
316    // Add unknown certhash
317    addr.push(Protocol::Certhash(Multihash::wrap(1, b"1").unwrap()));
318
319    // Add peer id back
320    addr.push(peer_id);
321
322    let mut transport = Transport::new(Config::new(&keypair));
323    let e = transport
324        .dial(
325            addr.clone(),
326            DialOpts {
327                role: Endpoint::Dialer,
328                port_use: PortUse::Reuse,
329            },
330        )
331        .unwrap()
332        .await
333        .unwrap_err();
334    assert!(matches!(
335        e,
336        Error::Noise(noise::Error::UnknownWebTransportCerthashes(..))
337    ));
338}
339
340async fn new_connection_to_echo_server() -> Connection {
341    let addr = fetch_server_addr().await;
342    let keypair = Keypair::generate_ed25519();
343
344    let mut transport = Transport::new(Config::new(&keypair));
345
346    let (_peer_id, conn) = transport
347        .dial(
348            addr,
349            DialOpts {
350                role: Endpoint::Dialer,
351                port_use: PortUse::Reuse,
352            },
353        )
354        .unwrap()
355        .await
356        .unwrap();
357
358    conn
359}
360
361/// Helper that returns the multiaddress of echo-server
362///
363/// It fetches the multiaddress via HTTP request to
364/// 127.0.0.1:4455.
365async fn fetch_server_addr() -> Multiaddr {
366    let url = "http://127.0.0.1:4455/";
367    let window = window().expect("failed to get browser window");
368
369    let value = JsFuture::from(window.fetch_with_str(url))
370        .await
371        .expect("fetch failed");
372    let resp = value.dyn_into::<Response>().expect("cast failed");
373
374    let text = resp.text().expect("text failed");
375    let text = JsFuture::from(text).await.expect("text promise failed");
376
377    text.as_string()
378        .filter(|s| !s.is_empty())
379        .expect("response not a text")
380        .parse()
381        .unwrap()
382}
383
384#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // False positive.
385async fn create_stream(conn: &mut Connection) -> Stream {
386    poll_fn(|cx| Pin::new(&mut *conn).poll_outbound(cx))
387        .await
388        .unwrap()
389}
390
391#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // False positive.
392async fn incoming_stream(conn: &mut Connection) -> Stream {
393    let mut stream = poll_fn(|cx| Pin::new(&mut *conn).poll_inbound(cx))
394        .await
395        .unwrap();
396
397    // For the stream to be initiated `echo-server` sends a single byte
398    let mut buf = [0u8; 1];
399    stream.read_exact(&mut buf).await.unwrap();
400
401    stream
402}
403
404fn send_recv_task(mut steam: Stream) -> oneshot::Receiver<()> {
405    let (tx, rx) = oneshot::channel();
406
407    spawn_local(async move {
408        send_recv(&mut steam).await;
409        tx.send(()).unwrap();
410    });
411
412    rx
413}
414
415async fn send_recv(stream: &mut Stream) {
416    let mut send_buf = [0u8; 1024];
417    let mut recv_buf = [0u8; 1024];
418
419    for _ in 0..30 {
420        getrandom(&mut send_buf).unwrap();
421
422        stream.write_all(&send_buf).await.unwrap();
423        stream.read_exact(&mut recv_buf).await.unwrap();
424
425        assert_eq!(send_buf, recv_buf);
426    }
427}