1#[cfg(not(target_arch = "wasm32"))]
3pub(crate) use native::{build_swarm, init_logger, sleep, Instant, RedisClient};
4#[cfg(target_arch = "wasm32")]
6pub(crate) use wasm::{build_swarm, init_logger, sleep, Instant, RedisClient};
7
8#[cfg(not(target_arch = "wasm32"))]
9pub(crate) mod native {
10    use std::time::Duration;
11
12    use anyhow::{bail, Context, Result};
13    use futures::{future::BoxFuture, FutureExt};
14    use libp2p::{
15        identity::Keypair,
16        noise,
17        swarm::{NetworkBehaviour, Swarm},
18        tcp, tls, yamux,
19    };
20    use libp2p_mplex as mplex;
21    use libp2p_webrtc as webrtc;
22    use redis::AsyncCommands;
23    use tracing_subscriber::EnvFilter;
24
25    use crate::{Muxer, SecProtocol, Transport};
26
27    pub(crate) type Instant = std::time::Instant;
28
29    pub(crate) fn init_logger() {
30        let _ = tracing_subscriber::fmt()
31            .with_env_filter(EnvFilter::from_default_env())
32            .try_init();
33    }
34
35    pub(crate) fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
36        tokio::time::sleep(duration).boxed()
37    }
38
39    pub(crate) async fn build_swarm<B: NetworkBehaviour>(
40        ip: &str,
41        transport: Transport,
42        sec_protocol: Option<SecProtocol>,
43        muxer: Option<Muxer>,
44        behaviour_constructor: impl FnOnce(&Keypair) -> B,
45    ) -> Result<(Swarm<B>, String)> {
46        let (swarm, addr) = match (transport, sec_protocol, muxer) {
47            (Transport::QuicV1, None, None) => (
48                libp2p::SwarmBuilder::with_new_identity()
49                    .with_tokio()
50                    .with_quic()
51                    .with_behaviour(behaviour_constructor)?
52                    .build(),
53                format!("/ip4/{ip}/udp/0/quic-v1"),
54            ),
55            (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => (
56                libp2p::SwarmBuilder::with_new_identity()
57                    .with_tokio()
58                    .with_tcp(
59                        tcp::Config::default(),
60                        tls::Config::new,
61                        mplex::Config::default,
62                    )?
63                    .with_behaviour(behaviour_constructor)?
64                    .build(),
65                format!("/ip4/{ip}/tcp/0"),
66            ),
67            (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => (
68                libp2p::SwarmBuilder::with_new_identity()
69                    .with_tokio()
70                    .with_tcp(
71                        tcp::Config::default(),
72                        tls::Config::new,
73                        yamux::Config::default,
74                    )?
75                    .with_behaviour(behaviour_constructor)?
76                    .build(),
77                format!("/ip4/{ip}/tcp/0"),
78            ),
79            (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
80                libp2p::SwarmBuilder::with_new_identity()
81                    .with_tokio()
82                    .with_tcp(
83                        tcp::Config::default(),
84                        noise::Config::new,
85                        mplex::Config::default,
86                    )?
87                    .with_behaviour(behaviour_constructor)?
88                    .build(),
89                format!("/ip4/{ip}/tcp/0"),
90            ),
91            (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
92                libp2p::SwarmBuilder::with_new_identity()
93                    .with_tokio()
94                    .with_tcp(
95                        tcp::Config::default(),
96                        noise::Config::new,
97                        yamux::Config::default,
98                    )?
99                    .with_behaviour(behaviour_constructor)?
100                    .build(),
101                format!("/ip4/{ip}/tcp/0"),
102            ),
103            (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => (
104                libp2p::SwarmBuilder::with_new_identity()
105                    .with_tokio()
106                    .with_websocket(tls::Config::new, mplex::Config::default)
107                    .await?
108                    .with_behaviour(behaviour_constructor)?
109                    .build(),
110                format!("/ip4/{ip}/tcp/0/ws"),
111            ),
112            (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => (
113                libp2p::SwarmBuilder::with_new_identity()
114                    .with_tokio()
115                    .with_websocket(tls::Config::new, yamux::Config::default)
116                    .await?
117                    .with_behaviour(behaviour_constructor)?
118                    .build(),
119                format!("/ip4/{ip}/tcp/0/ws"),
120            ),
121            (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
122                libp2p::SwarmBuilder::with_new_identity()
123                    .with_tokio()
124                    .with_websocket(noise::Config::new, mplex::Config::default)
125                    .await?
126                    .with_behaviour(behaviour_constructor)?
127                    .build(),
128                format!("/ip4/{ip}/tcp/0/ws"),
129            ),
130            (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
131                libp2p::SwarmBuilder::with_new_identity()
132                    .with_tokio()
133                    .with_websocket(noise::Config::new, yamux::Config::default)
134                    .await?
135                    .with_behaviour(behaviour_constructor)?
136                    .build(),
137                format!("/ip4/{ip}/tcp/0/ws"),
138            ),
139            (Transport::WebRtcDirect, None, None) => (
140                libp2p::SwarmBuilder::with_new_identity()
141                    .with_tokio()
142                    .with_other_transport(|key| {
143                        Ok(webrtc::tokio::Transport::new(
144                            key.clone(),
145                            webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?,
146                        ))
147                    })?
148                    .with_behaviour(behaviour_constructor)?
149                    .build(),
150                format!("/ip4/{ip}/udp/0/webrtc-direct"),
151            ),
152            (t, s, m) => bail!("Unsupported combination: {t:?} {s:?} {m:?}"),
153        };
154        Ok((swarm, addr))
155    }
156
157    pub(crate) struct RedisClient(redis::Client);
158
159    impl RedisClient {
160        pub(crate) fn new(redis_addr: &str) -> Result<Self> {
161            Ok(Self(
162                redis::Client::open(redis_addr).context("Could not connect to redis")?,
163            ))
164        }
165
166        pub(crate) async fn blpop(&self, key: &str, timeout: u64) -> Result<Vec<String>> {
167            let mut conn = self.0.get_async_connection().await?;
168            Ok(conn.blpop(key, timeout as f64).await?)
169        }
170
171        pub(crate) async fn rpush(&self, key: &str, value: String) -> Result<()> {
172            let mut conn = self.0.get_async_connection().await?;
173            conn.rpush(key, value).await.map_err(Into::into)
174        }
175    }
176}
177
178#[cfg(target_arch = "wasm32")]
179pub(crate) mod wasm {
180    use std::time::Duration;
181
182    use anyhow::{bail, Context, Result};
183    use futures::future::{BoxFuture, FutureExt};
184    use libp2p::{
185        core::upgrade::Version,
186        identity::Keypair,
187        noise,
188        swarm::{NetworkBehaviour, Swarm},
189        websocket_websys, webtransport_websys, yamux, Transport as _,
190    };
191    use libp2p_mplex as mplex;
192    use libp2p_webrtc_websys as webrtc_websys;
193
194    use crate::{BlpopRequest, Muxer, SecProtocol, Transport};
195
196    pub(crate) type Instant = web_time::Instant;
197
198    pub(crate) fn init_logger() {
199        console_error_panic_hook::set_once();
200        wasm_logger::init(wasm_logger::Config::default());
201    }
202
203    pub(crate) fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
204        futures_timer::Delay::new(duration).boxed()
205    }
206
207    pub(crate) async fn build_swarm<B: NetworkBehaviour>(
208        ip: &str,
209        transport: Transport,
210        sec_protocol: Option<SecProtocol>,
211        muxer: Option<Muxer>,
212        behaviour_constructor: impl FnOnce(&Keypair) -> B,
213    ) -> Result<(Swarm<B>, String)> {
214        Ok(match (transport, sec_protocol, muxer) {
215            (Transport::Webtransport, None, None) => (
216                libp2p::SwarmBuilder::with_new_identity()
217                    .with_wasm_bindgen()
218                    .with_other_transport(|local_key| {
219                        webtransport_websys::Transport::new(webtransport_websys::Config::new(
220                            &local_key,
221                        ))
222                    })?
223                    .with_behaviour(behaviour_constructor)?
224                    .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
225                    .build(),
226                format!("/ip4/{ip}/udp/0/quic/webtransport"),
227            ),
228            (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
229                libp2p::SwarmBuilder::with_new_identity()
230                    .with_wasm_bindgen()
231                    .with_other_transport(|local_key| {
232                        Ok(websocket_websys::Transport::default()
233                            .upgrade(Version::V1Lazy)
234                            .authenticate(
235                                noise::Config::new(&local_key)
236                                    .context("failed to initialise noise")?,
237                            )
238                            .multiplex(mplex::Config::new()))
239                    })?
240                    .with_behaviour(behaviour_constructor)?
241                    .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
242                    .build(),
243                format!("/ip4/{ip}/tcp/0/tls/ws"),
244            ),
245            (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
246                libp2p::SwarmBuilder::with_new_identity()
247                    .with_wasm_bindgen()
248                    .with_other_transport(|local_key| {
249                        Ok(websocket_websys::Transport::default()
250                            .upgrade(Version::V1Lazy)
251                            .authenticate(
252                                noise::Config::new(&local_key)
253                                    .context("failed to initialise noise")?,
254                            )
255                            .multiplex(yamux::Config::default()))
256                    })?
257                    .with_behaviour(behaviour_constructor)?
258                    .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
259                    .build(),
260                format!("/ip4/{ip}/tcp/0/tls/ws"),
261            ),
262            (Transport::WebRtcDirect, None, None) => (
263                libp2p::SwarmBuilder::with_new_identity()
264                    .with_wasm_bindgen()
265                    .with_other_transport(|local_key| {
266                        webrtc_websys::Transport::new(webrtc_websys::Config::new(&local_key))
267                    })?
268                    .with_behaviour(behaviour_constructor)?
269                    .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
270                    .build(),
271                format!("/ip4/{ip}/udp/0/webrtc-direct"),
272            ),
273            (t, s, m) => bail!("Unsupported combination: {t:?} {s:?} {m:?}"),
274        })
275    }
276
277    pub(crate) struct RedisClient(String);
278
279    impl RedisClient {
280        pub(crate) fn new(base_url: &str) -> Result<Self> {
281            Ok(Self(base_url.to_owned()))
282        }
283
284        pub(crate) async fn blpop(&self, key: &str, timeout: u64) -> Result<Vec<String>> {
285            let res = reqwest::Client::new()
286                .post(&format!("http://{}/blpop", self.0))
287                .json(&BlpopRequest {
288                    key: key.to_owned(),
289                    timeout,
290                })
291                .send()
292                .await?
293                .json()
294                .await?;
295            Ok(res)
296        }
297
298        pub(crate) async fn rpush(&self, _: &str, _: String) -> Result<()> {
299            bail!("unimplemented")
300        }
301    }
302}