1use std::{fmt::Debug, future::IntoFuture, time::Duration};
22
23use async_trait::async_trait;
24use futures::{
25 future::{BoxFuture, Either},
26 FutureExt, StreamExt,
27};
28use libp2p_core::{multiaddr::Protocol, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31 dial_opts::{DialOpts, PeerCondition},
32 NetworkBehaviour, Swarm, SwarmEvent,
33};
34
35#[async_trait]
38pub trait SwarmExt {
39 type NB: NetworkBehaviour;
40
41 #[cfg(feature = "tokio")]
48 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
49 where
50 Self: Sized;
51
52 async fn connect<T>(&mut self, other: &mut Swarm<T>)
60 where
61 T: NetworkBehaviour + Send,
62 <T as NetworkBehaviour>::ToSwarm: Debug;
63
64 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
73
74 async fn wait<E, P>(&mut self, predicate: P) -> E
76 where
77 P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
78 P: Send;
79
80 fn listen(&mut self) -> ListenFuture<&mut Self>;
85
86 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
90
91 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
95
96 async fn loop_on_next(self);
97}
98
99pub async fn drive<
136 TBehaviour1,
137 const NUM_EVENTS_SWARM_1: usize,
138 Out1,
139 TBehaviour2,
140 const NUM_EVENTS_SWARM_2: usize,
141 Out2,
142>(
143 swarm1: &mut Swarm<TBehaviour2>,
144 swarm2: &mut Swarm<TBehaviour1>,
145) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
146where
147 TBehaviour2: NetworkBehaviour + Send,
148 TBehaviour2::ToSwarm: Debug,
149 TBehaviour1: NetworkBehaviour + Send,
150 TBehaviour1::ToSwarm: Debug,
151 SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
152 SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
153 Out1: Debug,
154 Out2: Debug,
155{
156 let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
157 let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
158
159 while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
160 match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
161 Either::Left((o1, _)) => {
162 if let Ok(o1) = o1.try_into_output() {
163 res1.push(o1);
164 }
165 }
166 Either::Right((o2, _)) => {
167 if let Ok(o2) = o2.try_into_output() {
168 res2.push(o2);
169 }
170 }
171 }
172 }
173
174 (
175 res1.try_into().unwrap_or_else(|res1: Vec<_>| {
176 panic!(
177 "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
178 res1.len()
179 )
180 }),
181 res2.try_into().unwrap_or_else(|res2: Vec<_>| {
182 panic!(
183 "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
184 res2.len()
185 )
186 }),
187 )
188}
189
190pub trait TryIntoOutput<O>: Sized {
191 fn try_into_output(self) -> Result<O, Self>;
192}
193
194impl<O> TryIntoOutput<O> for SwarmEvent<O> {
195 fn try_into_output(self) -> Result<O, Self> {
196 self.try_into_behaviour_event()
197 }
198}
199impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
200 for SwarmEvent<TBehaviourOutEvent>
201{
202 fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
203 Ok(self)
204 }
205}
206
207#[async_trait]
208impl<B> SwarmExt for Swarm<B>
209where
210 B: NetworkBehaviour + Send,
211 <B as NetworkBehaviour>::ToSwarm: Debug,
212{
213 type NB = B;
214
215 #[cfg(feature = "tokio")]
216 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
217 where
218 Self: Sized,
219 {
220 use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
221 use libp2p_identity::Keypair;
222
223 let identity = Keypair::generate_ed25519();
224 let peer_id = PeerId::from(identity.public());
225
226 let transport = MemoryTransport::default()
227 .or_transport(libp2p_tcp::tokio::Transport::default())
228 .upgrade(Version::V1)
229 .authenticate(libp2p_plaintext::Config::new(&identity))
230 .multiplex(libp2p_yamux::Config::default())
231 .timeout(Duration::from_secs(20))
232 .boxed();
233
234 Swarm::new(
235 transport,
236 behaviour_fn(identity),
237 peer_id,
238 libp2p_swarm::Config::with_tokio_executor(),
239 )
240 }
241
242 async fn connect<T>(&mut self, other: &mut Swarm<T>)
243 where
244 T: NetworkBehaviour + Send,
245 <T as NetworkBehaviour>::ToSwarm: Debug,
246 {
247 let external_addresses = other.external_addresses().cloned().collect();
248
249 let dial_opts = DialOpts::peer_id(*other.local_peer_id())
250 .addresses(external_addresses)
251 .condition(PeerCondition::Always)
252 .build();
253
254 self.dial(dial_opts).unwrap();
255
256 let mut dialer_done = false;
257 let mut listener_done = false;
258
259 loop {
260 match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
261 Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
262 dialer_done = true;
263 }
264 Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
265 listener_done = true;
266 }
267 Either::Left((other, _)) => {
268 tracing::debug!(
269 dialer=?other,
270 "Ignoring event from dialer"
271 );
272 }
273 Either::Right((other, _)) => {
274 tracing::debug!(
275 listener=?other,
276 "Ignoring event from listener"
277 );
278 }
279 }
280
281 if dialer_done && listener_done {
282 return;
283 }
284 }
285 }
286
287 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
288 self.dial(addr.clone()).unwrap();
289
290 self.wait(|e| match e {
291 SwarmEvent::ConnectionEstablished {
292 endpoint, peer_id, ..
293 } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
294 other => {
295 tracing::debug!(
296 dialer=?other,
297 "Ignoring event from dialer"
298 );
299 None
300 }
301 })
302 .await
303 }
304
305 async fn wait<E, P>(&mut self, predicate: P) -> E
306 where
307 P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
308 P: Send,
309 {
310 loop {
311 let event = self.next_swarm_event().await;
312 if let Some(e) = predicate(event) {
313 break e;
314 }
315 }
316 }
317
318 fn listen(&mut self) -> ListenFuture<&mut Self> {
319 ListenFuture {
320 add_memory_external: false,
321 add_tcp_external: false,
322 swarm: self,
323 }
324 }
325
326 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
327 match futures::future::select(
328 futures_timer::Delay::new(Duration::from_secs(10)),
329 self.select_next_some(),
330 )
331 .await
332 {
333 Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
334 Either::Right((event, _)) => {
335 tracing::trace!(?event);
336
337 event
338 }
339 }
340 }
341
342 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
343 loop {
344 if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
345 return event;
346 }
347 }
348 }
349
350 async fn loop_on_next(mut self) {
351 while let Some(event) = self.next().await {
352 tracing::trace!(?event);
353 }
354 }
355}
356
357pub struct ListenFuture<S> {
358 add_memory_external: bool,
359 add_tcp_external: bool,
360 swarm: S,
361}
362
363impl<S> ListenFuture<S> {
364 pub fn with_memory_addr_external(mut self) -> Self {
371 self.add_memory_external = true;
372
373 self
374 }
375
376 pub fn with_tcp_addr_external(mut self) -> Self {
383 self.add_tcp_external = true;
384
385 self
386 }
387}
388
389impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
390where
391 B: NetworkBehaviour + Send,
392 <B as NetworkBehaviour>::ToSwarm: Debug,
393{
394 type Output = (Multiaddr, Multiaddr);
395 type IntoFuture = BoxFuture<'s, Self::Output>;
396
397 fn into_future(self) -> Self::IntoFuture {
398 async move {
399 let swarm = self.swarm;
400
401 let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
402
403 let memory_multiaddr = swarm
405 .wait(|e| match e {
406 SwarmEvent::NewListenAddr {
407 address,
408 listener_id,
409 } => (listener_id == memory_addr_listener_id).then_some(address),
410 other => {
411 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
412 }
413 })
414 .await;
415
416 let tcp_addr_listener_id = swarm
417 .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
418 .unwrap();
419
420 let tcp_multiaddr = swarm
421 .wait(|e| match e {
422 SwarmEvent::NewListenAddr {
423 address,
424 listener_id,
425 } => (listener_id == tcp_addr_listener_id).then_some(address),
426 other => {
427 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
428 }
429 })
430 .await;
431
432 if self.add_memory_external {
433 swarm.add_external_address(memory_multiaddr.clone());
434 }
435 if self.add_tcp_external {
436 swarm.add_external_address(tcp_multiaddr.clone());
437 }
438
439 (memory_multiaddr, tcp_multiaddr)
440 }
441 .boxed()
442 }
443}