1#![allow(non_upper_case_globals)]
2
3use std::{future::IntoFuture, process::Stdio, time::Duration};
4
5use anyhow::{bail, Context, Result};
6use axum::{
7 extract::State,
8 http::{header, StatusCode, Uri},
9 response::{Html, IntoResponse, Response},
10 routing::{get, post},
11 Json, Router,
12};
13use interop_tests::{BlpopRequest, Report};
14use redis::{AsyncCommands, Client};
15use thirtyfour::prelude::*;
16use tokio::{
17 io::{AsyncBufReadExt, BufReader},
18 net::TcpListener,
19 process::Child,
20 sync::mpsc,
21};
22use tower_http::{cors::CorsLayer, trace::TraceLayer};
23use tracing_subscriber::{fmt, prelude::*, EnvFilter};
24
25mod config;
26
27const BIND_ADDR: &str = "127.0.0.1:8080";
28
29#[derive(rust_embed::RustEmbed)]
33#[folder = "pkg"]
34struct WasmPackage;
35
36#[derive(Clone)]
37struct TestState {
38 redis_client: Client,
39 config: config::Config,
40 results_tx: mpsc::Sender<Result<Report, String>>,
41}
42
43#[tokio::main]
44async fn main() -> Result<()> {
45 tracing_subscriber::registry()
47 .with(fmt::layer())
48 .with(EnvFilter::from_default_env())
49 .init();
50
51 let config = config::Config::from_env()?;
53 let test_timeout = Duration::from_secs(config.test_timeout);
54
55 let redis_client =
57 Client::open(config.redis_addr.as_str()).context("Could not connect to redis")?;
58 let (results_tx, mut results_rx) = mpsc::channel(1);
59
60 let state = TestState {
61 redis_client,
62 config,
63 results_tx,
64 };
65
66 let app = Router::new()
68 .route("/blpop", post(redis_blpop))
70 .route("/results", post(post_results))
72 .route("/", get(serve_index_html))
74 .fallback(serve_wasm_pkg)
76 .layer(CorsLayer::very_permissive())
78 .layer(TraceLayer::new_for_http())
79 .with_state(state);
80
81 tokio::spawn(axum::serve(TcpListener::bind(BIND_ADDR).await?, app).into_future());
83
84 let (mut chrome, driver) = open_in_browser().await?;
86
87 let test_result = match tokio::time::timeout(test_timeout, results_rx.recv()).await {
89 Ok(received) => received.unwrap_or(Err("Results channel closed".to_owned())),
90 Err(_) => Err("Test timed out".to_owned()),
91 };
92
93 driver.quit().await?;
95 chrome.kill().await?;
96
97 match test_result {
98 Ok(report) => println!("{}", serde_json::to_string(&report)?),
99 Err(error) => bail!("Tests failed: {error}"),
100 }
101
102 Ok(())
103}
104
105async fn open_in_browser() -> Result<(Child, WebDriver)> {
106 let chromedriver = if cfg!(windows) {
110 "chromedriver.cmd"
111 } else {
112 "chromedriver"
113 };
114 let mut chrome = tokio::process::Command::new(chromedriver)
115 .arg("--port=45782")
116 .stdout(Stdio::piped())
117 .spawn()?;
118 let driver_out = chrome
120 .stdout
121 .take()
122 .context("No stdout found for webdriver")?;
123 let mut reader = BufReader::new(driver_out).lines();
125 while let Some(line) = reader.next_line().await? {
126 if line.contains("ChromeDriver was started successfully.") {
127 break;
128 }
129 }
130
131 let mut caps = DesiredCapabilities::chrome();
133 caps.set_headless()?;
134 caps.set_disable_dev_shm_usage()?;
135 caps.set_no_sandbox()?;
136 let driver = WebDriver::new("http://localhost:45782", caps).await?;
137 driver.goto(format!("http://{BIND_ADDR}")).await?;
139
140 Ok((chrome, driver))
141}
142
143async fn redis_blpop(
146 state: State<TestState>,
147 request: Json<BlpopRequest>,
148) -> Result<Json<Vec<String>>, StatusCode> {
149 let client = state.0.redis_client;
150 let mut conn = client.get_async_connection().await.map_err(|e| {
151 tracing::warn!("Failed to connect to redis: {e}");
152 StatusCode::INTERNAL_SERVER_ERROR
153 })?;
154 let res = conn
155 .blpop(&request.key, request.timeout as f64)
156 .await
157 .map_err(|e| {
158 tracing::warn!(
159 key=%request.key,
160 timeout=%request.timeout,
161 "Failed to get list elem key within timeout: {e}"
162 );
163 StatusCode::INTERNAL_SERVER_ERROR
164 })?;
165
166 Ok(Json(res))
167}
168
169async fn post_results(
171 state: State<TestState>,
172 request: Json<Result<Report, String>>,
173) -> Result<(), StatusCode> {
174 state.0.results_tx.send(request.0).await.map_err(|_| {
175 tracing::error!("Failed to send results");
176 StatusCode::INTERNAL_SERVER_ERROR
177 })
178}
179
180async fn serve_index_html(state: State<TestState>) -> Result<impl IntoResponse, StatusCode> {
182 let config::Config {
183 transport,
184 ip,
185 is_dialer,
186 test_timeout,
187 sec_protocol,
188 muxer,
189 ..
190 } = state.0.config;
191
192 let sec_protocol = sec_protocol
193 .map(|p| format!(r#""{p}""#))
194 .unwrap_or("null".to_owned());
195 let muxer = muxer
196 .map(|p| format!(r#""{p}""#))
197 .unwrap_or("null".to_owned());
198
199 Ok(Html(format!(
200 r#"
201 <!DOCTYPE html>
202 <html>
203 <head>
204 <meta charset="UTF-8" />
205 <title>libp2p ping test</title>
206 <script type="module"">
207 // import a wasm initialization fn and our test entrypoint
208 import init, {{ run_test_wasm }} from "/interop_tests.js";
209
210 // initialize wasm
211 await init()
212 // run our entrypoint with params from the env
213 await run_test_wasm(
214 "{transport}",
215 "{ip}",
216 {is_dialer},
217 "{test_timeout}",
218 "{BIND_ADDR}",
219 {sec_protocol},
220 {muxer}
221 )
222 </script>
223 </head>
224
225 <body></body>
226 </html>
227 "#
228 )))
229}
230
231async fn serve_wasm_pkg(uri: Uri) -> Result<Response, StatusCode> {
232 let path = uri.path().trim_start_matches('/').to_string();
233 if let Some(content) = WasmPackage::get(&path) {
234 let mime = mime_guess::from_path(&path).first_or_octet_stream();
235 Ok(Response::builder()
236 .header(header::CONTENT_TYPE, mime.as_ref())
237 .body(content.data.into())
238 .unwrap())
239 } else {
240 Err(StatusCode::NOT_FOUND)
241 }
242}