wasm_ping/
wasm_ping.rs

1#![allow(non_upper_case_globals)]
2
3use std::{future::IntoFuture, process::Stdio, time::Duration};
4
5use anyhow::{bail, Context, Result};
6use axum::{
7    extract::State,
8    http::{header, StatusCode, Uri},
9    response::{Html, IntoResponse, Response},
10    routing::{get, post},
11    Json, Router,
12};
13use interop_tests::{BlpopRequest, Report};
14use redis::{AsyncCommands, Client};
15use thirtyfour::prelude::*;
16use tokio::{
17    io::{AsyncBufReadExt, BufReader},
18    net::TcpListener,
19    process::Child,
20    sync::mpsc,
21};
22use tower_http::{cors::CorsLayer, trace::TraceLayer};
23use tracing_subscriber::{fmt, prelude::*, EnvFilter};
24
25mod config;
26
27const BIND_ADDR: &str = "127.0.0.1:8080";
28
29/// Embedded Wasm package
30///
31/// Make sure to build the wasm with `wasm-pack build --target web`
32#[derive(rust_embed::RustEmbed)]
33#[folder = "pkg"]
34struct WasmPackage;
35
36#[derive(Clone)]
37struct TestState {
38    redis_client: Client,
39    config: config::Config,
40    results_tx: mpsc::Sender<Result<Report, String>>,
41}
42
43#[tokio::main]
44async fn main() -> Result<()> {
45    // start logging
46    tracing_subscriber::registry()
47        .with(fmt::layer())
48        .with(EnvFilter::from_default_env())
49        .init();
50
51    // read env variables
52    let config = config::Config::from_env()?;
53    let test_timeout = Duration::from_secs(config.test_timeout);
54
55    // create a redis client
56    let redis_client =
57        Client::open(config.redis_addr.as_str()).context("Could not connect to redis")?;
58    let (results_tx, mut results_rx) = mpsc::channel(1);
59
60    let state = TestState {
61        redis_client,
62        config,
63        results_tx,
64    };
65
66    // create a wasm-app service
67    let app = Router::new()
68        // Redis proxy
69        .route("/blpop", post(redis_blpop))
70        // Report tests status
71        .route("/results", post(post_results))
72        // Wasm ping test trigger
73        .route("/", get(serve_index_html))
74        // Wasm app static files
75        .fallback(serve_wasm_pkg)
76        // Middleware
77        .layer(CorsLayer::very_permissive())
78        .layer(TraceLayer::new_for_http())
79        .with_state(state);
80
81    // Run the service in background
82    tokio::spawn(axum::serve(TcpListener::bind(BIND_ADDR).await?, app).into_future());
83
84    // Start executing the test in a browser
85    let (mut chrome, driver) = open_in_browser().await?;
86
87    // Wait for the outcome to be reported
88    let test_result = match tokio::time::timeout(test_timeout, results_rx.recv()).await {
89        Ok(received) => received.unwrap_or(Err("Results channel closed".to_owned())),
90        Err(_) => Err("Test timed out".to_owned()),
91    };
92
93    // Close the browser after we got the results
94    driver.quit().await?;
95    chrome.kill().await?;
96
97    match test_result {
98        Ok(report) => println!("{}", serde_json::to_string(&report)?),
99        Err(error) => bail!("Tests failed: {error}"),
100    }
101
102    Ok(())
103}
104
105async fn open_in_browser() -> Result<(Child, WebDriver)> {
106    // start a webdriver process
107    // currently only the chromedriver is supported as firefox doesn't
108    // have support yet for the certhashes
109    let chromedriver = if cfg!(windows) {
110        "chromedriver.cmd"
111    } else {
112        "chromedriver"
113    };
114    let mut chrome = tokio::process::Command::new(chromedriver)
115        .arg("--port=45782")
116        .stdout(Stdio::piped())
117        .spawn()?;
118    // read driver's stdout
119    let driver_out = chrome
120        .stdout
121        .take()
122        .context("No stdout found for webdriver")?;
123    // wait for the 'ready' message
124    let mut reader = BufReader::new(driver_out).lines();
125    while let Some(line) = reader.next_line().await? {
126        if line.contains("ChromeDriver was started successfully.") {
127            break;
128        }
129    }
130
131    // run a webdriver client
132    let mut caps = DesiredCapabilities::chrome();
133    caps.set_headless()?;
134    caps.set_disable_dev_shm_usage()?;
135    caps.set_no_sandbox()?;
136    let driver = WebDriver::new("http://localhost:45782", caps).await?;
137    // go to the wasm test service
138    driver.goto(format!("http://{BIND_ADDR}")).await?;
139
140    Ok((chrome, driver))
141}
142
143/// Redis proxy handler.
144/// `blpop` is currently the only redis client method used in a ping dialer.
145async fn redis_blpop(
146    state: State<TestState>,
147    request: Json<BlpopRequest>,
148) -> Result<Json<Vec<String>>, StatusCode> {
149    let client = state.0.redis_client;
150    let mut conn = client.get_async_connection().await.map_err(|e| {
151        tracing::warn!("Failed to connect to redis: {e}");
152        StatusCode::INTERNAL_SERVER_ERROR
153    })?;
154    let res = conn
155        .blpop(&request.key, request.timeout as f64)
156        .await
157        .map_err(|e| {
158            tracing::warn!(
159                key=%request.key,
160                timeout=%request.timeout,
161                "Failed to get list elem key within timeout: {e}"
162            );
163            StatusCode::INTERNAL_SERVER_ERROR
164        })?;
165
166    Ok(Json(res))
167}
168
169/// Receive test results
170async fn post_results(
171    state: State<TestState>,
172    request: Json<Result<Report, String>>,
173) -> Result<(), StatusCode> {
174    state.0.results_tx.send(request.0).await.map_err(|_| {
175        tracing::error!("Failed to send results");
176        StatusCode::INTERNAL_SERVER_ERROR
177    })
178}
179
180/// Serve the main page which loads our javascript
181async fn serve_index_html(state: State<TestState>) -> Result<impl IntoResponse, StatusCode> {
182    let config::Config {
183        transport,
184        ip,
185        is_dialer,
186        test_timeout,
187        sec_protocol,
188        muxer,
189        ..
190    } = state.0.config;
191
192    let sec_protocol = sec_protocol
193        .map(|p| format!(r#""{p}""#))
194        .unwrap_or("null".to_owned());
195    let muxer = muxer
196        .map(|p| format!(r#""{p}""#))
197        .unwrap_or("null".to_owned());
198
199    Ok(Html(format!(
200        r#"
201        <!DOCTYPE html>
202        <html>
203        <head>
204            <meta charset="UTF-8" />
205            <title>libp2p ping test</title>
206            <script type="module"">
207                // import a wasm initialization fn and our test entrypoint
208                import init, {{ run_test_wasm }} from "/interop_tests.js";
209
210                // initialize wasm
211                await init()
212                // run our entrypoint with params from the env
213                await run_test_wasm(
214                    "{transport}",
215                    "{ip}",
216                    {is_dialer},
217                    "{test_timeout}",
218                    "{BIND_ADDR}",
219                    {sec_protocol},
220                    {muxer}
221                )
222            </script>
223        </head>
224
225        <body></body>
226        </html>
227    "#
228    )))
229}
230
231async fn serve_wasm_pkg(uri: Uri) -> Result<Response, StatusCode> {
232    let path = uri.path().trim_start_matches('/').to_string();
233    if let Some(content) = WasmPackage::get(&path) {
234        let mime = mime_guess::from_path(&path).first_or_octet_stream();
235        Ok(Response::builder()
236            .header(header::CONTENT_TYPE, mime.as_ref())
237            .body(content.data.into())
238            .unwrap())
239    } else {
240        Err(StatusCode::NOT_FOUND)
241    }
242}