1use std::{str::FromStr, time::Duration};
2
3use anyhow::{bail, Context, Result};
4use futures::{FutureExt, StreamExt};
5use libp2p::{
6 identify,
7 identity::Keypair,
8 ping,
9 swarm::{NetworkBehaviour, SwarmEvent},
10 Multiaddr,
11};
12#[cfg(target_arch = "wasm32")]
13use wasm_bindgen::prelude::*;
14
15mod arch;
16
17use arch::{build_swarm, init_logger, Instant, RedisClient};
18
19pub async fn run_test(
20 transport: &str,
21 ip: &str,
22 is_dialer: bool,
23 test_timeout_seconds: u64,
24 redis_addr: &str,
25 sec_protocol: Option<String>,
26 muxer: Option<String>,
27) -> Result<Report> {
28 init_logger();
29
30 let test_timeout = Duration::from_secs(test_timeout_seconds);
31 let transport = transport.parse().context("Couldn't parse transport")?;
32 let sec_protocol = sec_protocol
33 .map(|sec_protocol| {
34 sec_protocol
35 .parse()
36 .context("Couldn't parse security protocol")
37 })
38 .transpose()?;
39 let muxer = muxer
40 .map(|sec_protocol| {
41 sec_protocol
42 .parse()
43 .context("Couldn't parse muxer protocol")
44 })
45 .transpose()?;
46
47 let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?;
48
49 let (mut swarm, local_addr) =
51 build_swarm(ip, transport, sec_protocol, muxer, build_behaviour).await?;
52
53 tracing::info!(local_peer=%swarm.local_peer_id(), "Running ping test");
54
55 #[cfg(not(target_arch = "wasm32"))]
57 let maybe_id = if transport == Transport::WebRtcDirect {
58 Some(swarm.listen_on(local_addr.parse()?)?)
59 } else {
60 None
61 };
62 #[cfg(target_arch = "wasm32")]
63 let maybe_id = None;
64
65 match is_dialer {
69 true => {
70 let result: Vec<String> = redis_client
71 .blpop("listenerAddr", test_timeout.as_secs())
72 .await?;
73 let other = result
74 .get(1)
75 .context("Failed to wait for listener to be ready")?;
76
77 let handshake_start = Instant::now();
78
79 swarm.dial(other.parse::<Multiaddr>()?)?;
80 tracing::info!(listener=%other, "Test instance, dialing multiaddress");
81
82 let rtt = loop {
83 if let Some(SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
84 result: Ok(rtt),
85 ..
86 }))) = swarm.next().await
87 {
88 tracing::info!(?rtt, "Ping successful");
89 break rtt.as_micros() as f32 / 1000.;
90 }
91 };
92
93 let handshake_plus_ping = handshake_start.elapsed().as_micros() as f32 / 1000.;
94 Ok(Report {
95 handshake_plus_one_rtt_millis: handshake_plus_ping,
96 ping_rtt_millis: rtt,
97 })
98 }
99 false => {
100 let id = match maybe_id {
103 None => swarm.listen_on(local_addr.parse()?)?,
104 Some(id) => id,
105 };
106
107 tracing::info!(
108 address=%local_addr,
109 "Test instance, listening for incoming connections on address"
110 );
111
112 loop {
113 if let Some(SwarmEvent::NewListenAddr {
114 listener_id,
115 address,
116 }) = swarm.next().await
117 {
118 if address.to_string().contains("127.0.0.1") {
119 continue;
120 }
121 if listener_id == id {
122 let ma = format!("{address}/p2p/{}", swarm.local_peer_id());
123 redis_client.rpush("listenerAddr", ma.clone()).await?;
124 break;
125 }
126 }
127 }
128
129 futures::future::select(
131 async move {
132 loop {
133 let event = swarm.next().await.unwrap();
134
135 tracing::debug!("{event:?}");
136 }
137 }
138 .boxed(),
139 arch::sleep(test_timeout),
140 )
141 .await;
142
143 bail!("Test should have been killed by the test runner!");
145 }
146 }
147}
148
149#[cfg(target_arch = "wasm32")]
150#[wasm_bindgen]
151pub async fn run_test_wasm(
152 transport: &str,
153 ip: &str,
154 is_dialer: bool,
155 test_timeout_secs: u64,
156 base_url: &str,
157 sec_protocol: Option<String>,
158 muxer: Option<String>,
159) -> Result<(), JsValue> {
160 let result = run_test(
161 transport,
162 ip,
163 is_dialer,
164 test_timeout_secs,
165 base_url,
166 sec_protocol,
167 muxer,
168 )
169 .await;
170 tracing::info!(?result, "Sending test result");
171 reqwest::Client::new()
172 .post(&format!("http://{}/results", base_url))
173 .json(&result.map_err(|e| e.to_string()))
174 .send()
175 .await?
176 .error_for_status()
177 .map_err(|e| format!("Sending test result failed: {e}"))?;
178
179 Ok(())
180}
181
182#[derive(serde::Deserialize, serde::Serialize)]
185pub struct BlpopRequest {
186 pub key: String,
187 pub timeout: u64,
188}
189
190#[derive(Copy, Clone, Debug, serde::Serialize, serde::Deserialize)]
192pub struct Report {
193 #[serde(rename = "handshakePlusOneRTTMillis")]
194 handshake_plus_one_rtt_millis: f32,
195 #[serde(rename = "pingRTTMilllis")]
196 ping_rtt_millis: f32,
197}
198
199#[derive(Clone, Copy, Debug, PartialEq)]
201pub enum Transport {
202 Tcp,
203 QuicV1,
204 WebRtcDirect,
205 Ws,
206 Webtransport,
207}
208
209impl FromStr for Transport {
210 type Err = anyhow::Error;
211
212 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
213 Ok(match s {
214 "tcp" => Self::Tcp,
215 "quic-v1" => Self::QuicV1,
216 "webrtc-direct" => Self::WebRtcDirect,
217 "ws" => Self::Ws,
218 "webtransport" => Self::Webtransport,
219 other => bail!("unknown transport {other}"),
220 })
221 }
222}
223
224#[derive(Clone, Debug)]
226pub enum Muxer {
227 Mplex,
228 Yamux,
229}
230
231impl FromStr for Muxer {
232 type Err = anyhow::Error;
233
234 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
235 Ok(match s {
236 "mplex" => Self::Mplex,
237 "yamux" => Self::Yamux,
238 other => bail!("unknown muxer {other}"),
239 })
240 }
241}
242
243#[derive(Clone, Debug)]
245pub enum SecProtocol {
246 Noise,
247 Tls,
248}
249
250impl FromStr for SecProtocol {
251 type Err = anyhow::Error;
252
253 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
254 Ok(match s {
255 "noise" => Self::Noise,
256 "tls" => Self::Tls,
257 other => bail!("unknown security protocol {other}"),
258 })
259 }
260}
261
262#[derive(NetworkBehaviour)]
263pub(crate) struct Behaviour {
264 ping: ping::Behaviour,
265 identify: identify::Behaviour,
266}
267
268pub(crate) fn build_behaviour(key: &Keypair) -> Behaviour {
269 Behaviour {
270 ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
271 identify: identify::Behaviour::new(identify::Config::new(
273 "/interop-tests".to_owned(),
274 key.public(),
275 )),
276 }
277}