1#[cfg(not(target_arch = "wasm32"))]
3pub(crate) use native::{build_swarm, init_logger, sleep, Instant, RedisClient};
4#[cfg(target_arch = "wasm32")]
6pub(crate) use wasm::{build_swarm, init_logger, sleep, Instant, RedisClient};
7
8#[cfg(not(target_arch = "wasm32"))]
9pub(crate) mod native {
10 use std::time::Duration;
11
12 use anyhow::{bail, Context, Result};
13 use futures::{future::BoxFuture, FutureExt};
14 use libp2p::{
15 identity::Keypair,
16 noise,
17 swarm::{NetworkBehaviour, Swarm},
18 tcp, tls, yamux,
19 };
20 use libp2p_mplex as mplex;
21 use libp2p_webrtc as webrtc;
22 use redis::AsyncCommands;
23 use tracing_subscriber::EnvFilter;
24
25 use crate::{Muxer, SecProtocol, Transport};
26
27 pub(crate) type Instant = std::time::Instant;
28
29 pub(crate) fn init_logger() {
30 let _ = tracing_subscriber::fmt()
31 .with_env_filter(EnvFilter::from_default_env())
32 .try_init();
33 }
34
35 pub(crate) fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
36 tokio::time::sleep(duration).boxed()
37 }
38
39 pub(crate) async fn build_swarm<B: NetworkBehaviour>(
40 ip: &str,
41 transport: Transport,
42 sec_protocol: Option<SecProtocol>,
43 muxer: Option<Muxer>,
44 behaviour_constructor: impl FnOnce(&Keypair) -> B,
45 ) -> Result<(Swarm<B>, String)> {
46 let (swarm, addr) = match (transport, sec_protocol, muxer) {
47 (Transport::QuicV1, None, None) => (
48 libp2p::SwarmBuilder::with_new_identity()
49 .with_tokio()
50 .with_quic()
51 .with_behaviour(behaviour_constructor)?
52 .build(),
53 format!("/ip4/{ip}/udp/0/quic-v1"),
54 ),
55 (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => (
56 libp2p::SwarmBuilder::with_new_identity()
57 .with_tokio()
58 .with_tcp(
59 tcp::Config::default(),
60 tls::Config::new,
61 mplex::Config::default,
62 )?
63 .with_behaviour(behaviour_constructor)?
64 .build(),
65 format!("/ip4/{ip}/tcp/0"),
66 ),
67 (Transport::Tcp, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => (
68 libp2p::SwarmBuilder::with_new_identity()
69 .with_tokio()
70 .with_tcp(
71 tcp::Config::default(),
72 tls::Config::new,
73 yamux::Config::default,
74 )?
75 .with_behaviour(behaviour_constructor)?
76 .build(),
77 format!("/ip4/{ip}/tcp/0"),
78 ),
79 (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
80 libp2p::SwarmBuilder::with_new_identity()
81 .with_tokio()
82 .with_tcp(
83 tcp::Config::default(),
84 noise::Config::new,
85 mplex::Config::default,
86 )?
87 .with_behaviour(behaviour_constructor)?
88 .build(),
89 format!("/ip4/{ip}/tcp/0"),
90 ),
91 (Transport::Tcp, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
92 libp2p::SwarmBuilder::with_new_identity()
93 .with_tokio()
94 .with_tcp(
95 tcp::Config::default(),
96 noise::Config::new,
97 yamux::Config::default,
98 )?
99 .with_behaviour(behaviour_constructor)?
100 .build(),
101 format!("/ip4/{ip}/tcp/0"),
102 ),
103 (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Mplex)) => (
104 libp2p::SwarmBuilder::with_new_identity()
105 .with_tokio()
106 .with_websocket(tls::Config::new, mplex::Config::default)
107 .await?
108 .with_behaviour(behaviour_constructor)?
109 .build(),
110 format!("/ip4/{ip}/tcp/0/ws"),
111 ),
112 (Transport::Ws, Some(SecProtocol::Tls), Some(Muxer::Yamux)) => (
113 libp2p::SwarmBuilder::with_new_identity()
114 .with_tokio()
115 .with_websocket(tls::Config::new, yamux::Config::default)
116 .await?
117 .with_behaviour(behaviour_constructor)?
118 .build(),
119 format!("/ip4/{ip}/tcp/0/ws"),
120 ),
121 (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
122 libp2p::SwarmBuilder::with_new_identity()
123 .with_tokio()
124 .with_websocket(noise::Config::new, mplex::Config::default)
125 .await?
126 .with_behaviour(behaviour_constructor)?
127 .build(),
128 format!("/ip4/{ip}/tcp/0/ws"),
129 ),
130 (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
131 libp2p::SwarmBuilder::with_new_identity()
132 .with_tokio()
133 .with_websocket(noise::Config::new, yamux::Config::default)
134 .await?
135 .with_behaviour(behaviour_constructor)?
136 .build(),
137 format!("/ip4/{ip}/tcp/0/ws"),
138 ),
139 (Transport::WebRtcDirect, None, None) => (
140 libp2p::SwarmBuilder::with_new_identity()
141 .with_tokio()
142 .with_other_transport(|key| {
143 Ok(webrtc::tokio::Transport::new(
144 key.clone(),
145 webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?,
146 ))
147 })?
148 .with_behaviour(behaviour_constructor)?
149 .build(),
150 format!("/ip4/{ip}/udp/0/webrtc-direct"),
151 ),
152 (t, s, m) => bail!("Unsupported combination: {t:?} {s:?} {m:?}"),
153 };
154 Ok((swarm, addr))
155 }
156
157 pub(crate) struct RedisClient(redis::Client);
158
159 impl RedisClient {
160 pub(crate) fn new(redis_addr: &str) -> Result<Self> {
161 Ok(Self(
162 redis::Client::open(redis_addr).context("Could not connect to redis")?,
163 ))
164 }
165
166 pub(crate) async fn blpop(&self, key: &str, timeout: u64) -> Result<Vec<String>> {
167 let mut conn = self.0.get_async_connection().await?;
168 Ok(conn.blpop(key, timeout as f64).await?)
169 }
170
171 pub(crate) async fn rpush(&self, key: &str, value: String) -> Result<()> {
172 let mut conn = self.0.get_async_connection().await?;
173 conn.rpush(key, value).await.map_err(Into::into)
174 }
175 }
176}
177
178#[cfg(target_arch = "wasm32")]
179pub(crate) mod wasm {
180 use std::time::Duration;
181
182 use anyhow::{bail, Context, Result};
183 use futures::future::{BoxFuture, FutureExt};
184 use libp2p::{
185 core::upgrade::Version,
186 identity::Keypair,
187 noise,
188 swarm::{NetworkBehaviour, Swarm},
189 websocket_websys, webtransport_websys, yamux, Transport as _,
190 };
191 use libp2p_mplex as mplex;
192 use libp2p_webrtc_websys as webrtc_websys;
193
194 use crate::{BlpopRequest, Muxer, SecProtocol, Transport};
195
196 pub(crate) type Instant = web_time::Instant;
197
198 pub(crate) fn init_logger() {
199 console_error_panic_hook::set_once();
200 wasm_logger::init(wasm_logger::Config::default());
201 }
202
203 pub(crate) fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
204 futures_timer::Delay::new(duration).boxed()
205 }
206
207 pub(crate) async fn build_swarm<B: NetworkBehaviour>(
208 ip: &str,
209 transport: Transport,
210 sec_protocol: Option<SecProtocol>,
211 muxer: Option<Muxer>,
212 behaviour_constructor: impl FnOnce(&Keypair) -> B,
213 ) -> Result<(Swarm<B>, String)> {
214 Ok(match (transport, sec_protocol, muxer) {
215 (Transport::Webtransport, None, None) => (
216 libp2p::SwarmBuilder::with_new_identity()
217 .with_wasm_bindgen()
218 .with_other_transport(|local_key| {
219 webtransport_websys::Transport::new(webtransport_websys::Config::new(
220 &local_key,
221 ))
222 })?
223 .with_behaviour(behaviour_constructor)?
224 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
225 .build(),
226 format!("/ip4/{ip}/udp/0/quic/webtransport"),
227 ),
228 (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Mplex)) => (
229 libp2p::SwarmBuilder::with_new_identity()
230 .with_wasm_bindgen()
231 .with_other_transport(|local_key| {
232 Ok(websocket_websys::Transport::default()
233 .upgrade(Version::V1Lazy)
234 .authenticate(
235 noise::Config::new(&local_key)
236 .context("failed to initialise noise")?,
237 )
238 .multiplex(mplex::Config::new()))
239 })?
240 .with_behaviour(behaviour_constructor)?
241 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
242 .build(),
243 format!("/ip4/{ip}/tcp/0/tls/ws"),
244 ),
245 (Transport::Ws, Some(SecProtocol::Noise), Some(Muxer::Yamux)) => (
246 libp2p::SwarmBuilder::with_new_identity()
247 .with_wasm_bindgen()
248 .with_other_transport(|local_key| {
249 Ok(websocket_websys::Transport::default()
250 .upgrade(Version::V1Lazy)
251 .authenticate(
252 noise::Config::new(&local_key)
253 .context("failed to initialise noise")?,
254 )
255 .multiplex(yamux::Config::default()))
256 })?
257 .with_behaviour(behaviour_constructor)?
258 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
259 .build(),
260 format!("/ip4/{ip}/tcp/0/tls/ws"),
261 ),
262 (Transport::WebRtcDirect, None, None) => (
263 libp2p::SwarmBuilder::with_new_identity()
264 .with_wasm_bindgen()
265 .with_other_transport(|local_key| {
266 webrtc_websys::Transport::new(webrtc_websys::Config::new(&local_key))
267 })?
268 .with_behaviour(behaviour_constructor)?
269 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
270 .build(),
271 format!("/ip4/{ip}/udp/0/webrtc-direct"),
272 ),
273 (t, s, m) => bail!("Unsupported combination: {t:?} {s:?} {m:?}"),
274 })
275 }
276
277 pub(crate) struct RedisClient(String);
278
279 impl RedisClient {
280 pub(crate) fn new(base_url: &str) -> Result<Self> {
281 Ok(Self(base_url.to_owned()))
282 }
283
284 pub(crate) async fn blpop(&self, key: &str, timeout: u64) -> Result<Vec<String>> {
285 let res = reqwest::Client::new()
286 .post(&format!("http://{}/blpop", self.0))
287 .json(&BlpopRequest {
288 key: key.to_owned(),
289 timeout,
290 })
291 .send()
292 .await?
293 .json()
294 .await?;
295 Ok(res)
296 }
297
298 pub(crate) async fn rpush(&self, _: &str, _: String) -> Result<()> {
299 bail!("unimplemented")
300 }
301 }
302}