interop_tests/
lib.rs

1use std::{str::FromStr, time::Duration};
2
3use anyhow::{bail, Context, Result};
4use futures::{FutureExt, StreamExt};
5use libp2p::{
6    identify,
7    identity::Keypair,
8    ping,
9    swarm::{NetworkBehaviour, SwarmEvent},
10    Multiaddr,
11};
12#[cfg(target_arch = "wasm32")]
13use wasm_bindgen::prelude::*;
14
15mod arch;
16
17use arch::{build_swarm, init_logger, Instant, RedisClient};
18
19pub async fn run_test(
20    transport: &str,
21    ip: &str,
22    is_dialer: bool,
23    test_timeout_seconds: u64,
24    redis_addr: &str,
25    sec_protocol: Option<String>,
26    muxer: Option<String>,
27) -> Result<Report> {
28    init_logger();
29
30    let test_timeout = Duration::from_secs(test_timeout_seconds);
31    let transport = transport.parse().context("Couldn't parse transport")?;
32    let sec_protocol = sec_protocol
33        .map(|sec_protocol| {
34            sec_protocol
35                .parse()
36                .context("Couldn't parse security protocol")
37        })
38        .transpose()?;
39    let muxer = muxer
40        .map(|sec_protocol| {
41            sec_protocol
42                .parse()
43                .context("Couldn't parse muxer protocol")
44        })
45        .transpose()?;
46
47    let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?;
48
49    // Build the transport from the passed ENV var.
50    let (mut swarm, local_addr) =
51        build_swarm(ip, transport, sec_protocol, muxer, build_behaviour).await?;
52
53    tracing::info!(local_peer=%swarm.local_peer_id(), "Running ping test");
54
55    // See https://github.com/libp2p/rust-libp2p/issues/4071.
56    #[cfg(not(target_arch = "wasm32"))]
57    let maybe_id = if transport == Transport::WebRtcDirect {
58        Some(swarm.listen_on(local_addr.parse()?)?)
59    } else {
60        None
61    };
62    #[cfg(target_arch = "wasm32")]
63    let maybe_id = None;
64
65    // Run a ping interop test. Based on `is_dialer`, either dial the address
66    // retrieved via `listenAddr` key over the redis connection. Or wait to be pinged and have
67    // `dialerDone` key ready on the redis connection.
68    match is_dialer {
69        true => {
70            let result: Vec<String> = redis_client
71                .blpop("listenerAddr", test_timeout.as_secs())
72                .await?;
73            let other = result
74                .get(1)
75                .context("Failed to wait for listener to be ready")?;
76
77            let handshake_start = Instant::now();
78
79            swarm.dial(other.parse::<Multiaddr>()?)?;
80            tracing::info!(listener=%other, "Test instance, dialing multiaddress");
81
82            let rtt = loop {
83                if let Some(SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
84                    result: Ok(rtt),
85                    ..
86                }))) = swarm.next().await
87                {
88                    tracing::info!(?rtt, "Ping successful");
89                    break rtt.as_micros() as f32 / 1000.;
90                }
91            };
92
93            let handshake_plus_ping = handshake_start.elapsed().as_micros() as f32 / 1000.;
94            Ok(Report {
95                handshake_plus_one_rtt_millis: handshake_plus_ping,
96                ping_rtt_millis: rtt,
97            })
98        }
99        false => {
100            // Listen if we haven't done so already.
101            // This is a hack until https://github.com/libp2p/rust-libp2p/issues/4071 is fixed at which point we can do this unconditionally here.
102            let id = match maybe_id {
103                None => swarm.listen_on(local_addr.parse()?)?,
104                Some(id) => id,
105            };
106
107            tracing::info!(
108                address=%local_addr,
109                "Test instance, listening for incoming connections on address"
110            );
111
112            loop {
113                if let Some(SwarmEvent::NewListenAddr {
114                    listener_id,
115                    address,
116                }) = swarm.next().await
117                {
118                    if address.to_string().contains("127.0.0.1") {
119                        continue;
120                    }
121                    if listener_id == id {
122                        let ma = format!("{address}/p2p/{}", swarm.local_peer_id());
123                        redis_client.rpush("listenerAddr", ma.clone()).await?;
124                        break;
125                    }
126                }
127            }
128
129            // Drive Swarm while we await for `dialerDone` to be ready.
130            futures::future::select(
131                async move {
132                    loop {
133                        let event = swarm.next().await.unwrap();
134
135                        tracing::debug!("{event:?}");
136                    }
137                }
138                .boxed(),
139                arch::sleep(test_timeout),
140            )
141            .await;
142
143            // The loop never ends so if we get here, we hit the timeout.
144            bail!("Test should have been killed by the test runner!");
145        }
146    }
147}
148
149#[cfg(target_arch = "wasm32")]
150#[wasm_bindgen]
151pub async fn run_test_wasm(
152    transport: &str,
153    ip: &str,
154    is_dialer: bool,
155    test_timeout_secs: u64,
156    base_url: &str,
157    sec_protocol: Option<String>,
158    muxer: Option<String>,
159) -> Result<(), JsValue> {
160    let result = run_test(
161        transport,
162        ip,
163        is_dialer,
164        test_timeout_secs,
165        base_url,
166        sec_protocol,
167        muxer,
168    )
169    .await;
170    tracing::info!(?result, "Sending test result");
171    reqwest::Client::new()
172        .post(&format!("http://{}/results", base_url))
173        .json(&result.map_err(|e| e.to_string()))
174        .send()
175        .await?
176        .error_for_status()
177        .map_err(|e| format!("Sending test result failed: {e}"))?;
178
179    Ok(())
180}
181
182/// A request to redis proxy that will pop the value from the list
183/// and will wait for it being inserted until a timeout is reached.
184#[derive(serde::Deserialize, serde::Serialize)]
185pub struct BlpopRequest {
186    pub key: String,
187    pub timeout: u64,
188}
189
190/// A report generated by the test
191#[derive(Copy, Clone, Debug, serde::Serialize, serde::Deserialize)]
192pub struct Report {
193    #[serde(rename = "handshakePlusOneRTTMillis")]
194    handshake_plus_one_rtt_millis: f32,
195    #[serde(rename = "pingRTTMilllis")]
196    ping_rtt_millis: f32,
197}
198
199/// Supported transports by rust-libp2p.
200#[derive(Clone, Copy, Debug, PartialEq)]
201pub enum Transport {
202    Tcp,
203    QuicV1,
204    WebRtcDirect,
205    Ws,
206    Webtransport,
207}
208
209impl FromStr for Transport {
210    type Err = anyhow::Error;
211
212    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
213        Ok(match s {
214            "tcp" => Self::Tcp,
215            "quic-v1" => Self::QuicV1,
216            "webrtc-direct" => Self::WebRtcDirect,
217            "ws" => Self::Ws,
218            "webtransport" => Self::Webtransport,
219            other => bail!("unknown transport {other}"),
220        })
221    }
222}
223
224/// Supported stream multiplexers by rust-libp2p.
225#[derive(Clone, Debug)]
226pub enum Muxer {
227    Mplex,
228    Yamux,
229}
230
231impl FromStr for Muxer {
232    type Err = anyhow::Error;
233
234    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
235        Ok(match s {
236            "mplex" => Self::Mplex,
237            "yamux" => Self::Yamux,
238            other => bail!("unknown muxer {other}"),
239        })
240    }
241}
242
243/// Supported security protocols by rust-libp2p.
244#[derive(Clone, Debug)]
245pub enum SecProtocol {
246    Noise,
247    Tls,
248}
249
250impl FromStr for SecProtocol {
251    type Err = anyhow::Error;
252
253    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
254        Ok(match s {
255            "noise" => Self::Noise,
256            "tls" => Self::Tls,
257            other => bail!("unknown security protocol {other}"),
258        })
259    }
260}
261
262#[derive(NetworkBehaviour)]
263pub(crate) struct Behaviour {
264    ping: ping::Behaviour,
265    identify: identify::Behaviour,
266}
267
268pub(crate) fn build_behaviour(key: &Keypair) -> Behaviour {
269    Behaviour {
270        ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
271        // Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved.
272        identify: identify::Behaviour::new(identify::Config::new(
273            "/interop-tests".to_owned(),
274            key.public(),
275        )),
276    }
277}