libp2p_swarm_test/
lib.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{fmt::Debug, future::IntoFuture, time::Duration};
22
23use async_trait::async_trait;
24use futures::{
25    future::{BoxFuture, Either},
26    FutureExt, StreamExt,
27};
28use libp2p_core::{multiaddr::Protocol, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31    dial_opts::{DialOpts, PeerCondition},
32    NetworkBehaviour, Swarm, SwarmEvent,
33};
34
35/// An extension trait for [`Swarm`] that makes it
36/// easier to set up a network of [`Swarm`]s for tests.
37#[async_trait]
38pub trait SwarmExt {
39    type NB: NetworkBehaviour;
40
41    /// Create a new [`Swarm`] with an ephemeral identity and the `tokio` runtime.
42    ///
43    /// The swarm will use a [`libp2p_core::transport::MemoryTransport`] together with a
44    /// [`libp2p_plaintext::Config`] authentication layer and [`libp2p_yamux::Config`] as the
45    /// multiplexer. However, these details should not be relied
46    /// upon by the test and may change at any time.
47    #[cfg(feature = "tokio")]
48    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
49    where
50        Self: Sized;
51
52    /// Establishes a connection to the given [`Swarm`], polling both of them until the connection
53    /// is established.
54    ///
55    /// This will take addresses from the `other` [`Swarm`] via [`Swarm::external_addresses`].
56    /// By default, this iterator will not yield any addresses.
57    /// To add listen addresses as external addresses, use
58    /// [`ListenFuture::with_memory_addr_external`] or [`ListenFuture::with_tcp_addr_external`].
59    async fn connect<T>(&mut self, other: &mut Swarm<T>)
60    where
61        T: NetworkBehaviour + Send,
62        <T as NetworkBehaviour>::ToSwarm: Debug;
63
64    /// Dial the provided address and wait until a connection has been established.
65    ///
66    /// In a normal test scenario, you should prefer [`SwarmExt::connect`] but that is not always
67    /// possible. This function only abstracts away the "dial and wait for
68    /// `ConnectionEstablished` event" part.
69    ///
70    /// Because we don't have access to the other [`Swarm`],
71    /// we can't guarantee that it makes progress.
72    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
73
74    /// Wait for specified condition to return `Some`.
75    async fn wait<E, P>(&mut self, predicate: P) -> E
76    where
77        P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
78        P: Send;
79
80    /// Listens for incoming connections, polling the [`Swarm`] until the
81    /// transport is ready to accept connections.
82    ///
83    /// The first address is for the memory transport, the second one for the TCP transport.
84    fn listen(&mut self) -> ListenFuture<&mut Self>;
85
86    /// Returns the next [`SwarmEvent`] or times out after 10 seconds.
87    ///
88    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
89    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
90
91    /// Returns the next behaviour event or times out after 10 seconds.
92    ///
93    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
94    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
95
96    async fn loop_on_next(self);
97}
98
99/// Drives two [`Swarm`]s until a certain number of events are emitted.
100///
101/// # Usage
102///
103/// ## Number of events
104///
105/// The number of events is configured via const generics based on the array size of the return
106/// type. This allows the compiler to infer how many events you are expecting based on how you use
107/// this function. For example, if you expect the first [`Swarm`] to emit 2 events, you should
108/// assign the first variable of the returned tuple value to an array of size 2. This works
109/// especially well if you directly pattern-match on the return value.
110///
111/// ## Type of event
112///
113/// This function utilizes the [`TryIntoOutput`] trait.
114/// Similar as to the number of expected events, the type of event is inferred based on your usage.
115/// If you match against a [`SwarmEvent`], the first [`SwarmEvent`] will be returned.
116/// If you match against your [`NetworkBehaviour::ToSwarm`] type, [`SwarmEvent`]s which are not
117/// [`SwarmEvent::Behaviour`] will be skipped until the [`Swarm`] returns a behaviour event.
118///
119/// You can implement the [`TryIntoOutput`] for any other type to further customize this behaviour.
120///
121/// # Difference to [`futures::future::join`]
122///
123/// This function is similar to joining two futures with two crucial differences:
124/// 1. As described above, it allows you to obtain more than a single event.
125/// 2. More importantly, it will continue to poll the [`Swarm`]s **even if they already has emitted
126///    all expected events**.
127///
128/// Especially (2) is crucial for our usage of this function.
129/// If a [`Swarm`] is not polled, nothing within it makes progress.
130/// This can "starve" the other swarm which for example may wait for another message to be sent on a
131/// connection.
132///
133/// Using [`drive`] instead of [`futures::future::join`] ensures that a [`Swarm`] continues to be
134/// polled, even after it emitted its events.
135pub async fn drive<
136    TBehaviour1,
137    const NUM_EVENTS_SWARM_1: usize,
138    Out1,
139    TBehaviour2,
140    const NUM_EVENTS_SWARM_2: usize,
141    Out2,
142>(
143    swarm1: &mut Swarm<TBehaviour2>,
144    swarm2: &mut Swarm<TBehaviour1>,
145) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
146where
147    TBehaviour2: NetworkBehaviour + Send,
148    TBehaviour2::ToSwarm: Debug,
149    TBehaviour1: NetworkBehaviour + Send,
150    TBehaviour1::ToSwarm: Debug,
151    SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
152    SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
153    Out1: Debug,
154    Out2: Debug,
155{
156    let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
157    let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
158
159    while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
160        match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
161            Either::Left((o1, _)) => {
162                if let Ok(o1) = o1.try_into_output() {
163                    res1.push(o1);
164                }
165            }
166            Either::Right((o2, _)) => {
167                if let Ok(o2) = o2.try_into_output() {
168                    res2.push(o2);
169                }
170            }
171        }
172    }
173
174    (
175        res1.try_into().unwrap_or_else(|res1: Vec<_>| {
176            panic!(
177                "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
178                res1.len()
179            )
180        }),
181        res2.try_into().unwrap_or_else(|res2: Vec<_>| {
182            panic!(
183                "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
184                res2.len()
185            )
186        }),
187    )
188}
189
190pub trait TryIntoOutput<O>: Sized {
191    fn try_into_output(self) -> Result<O, Self>;
192}
193
194impl<O> TryIntoOutput<O> for SwarmEvent<O> {
195    fn try_into_output(self) -> Result<O, Self> {
196        self.try_into_behaviour_event()
197    }
198}
199impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
200    for SwarmEvent<TBehaviourOutEvent>
201{
202    fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
203        Ok(self)
204    }
205}
206
207#[async_trait]
208impl<B> SwarmExt for Swarm<B>
209where
210    B: NetworkBehaviour + Send,
211    <B as NetworkBehaviour>::ToSwarm: Debug,
212{
213    type NB = B;
214
215    #[cfg(feature = "tokio")]
216    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
217    where
218        Self: Sized,
219    {
220        use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
221        use libp2p_identity::Keypair;
222
223        let identity = Keypair::generate_ed25519();
224        let peer_id = PeerId::from(identity.public());
225
226        let transport = MemoryTransport::default()
227            .or_transport(libp2p_tcp::tokio::Transport::default())
228            .upgrade(Version::V1)
229            .authenticate(libp2p_plaintext::Config::new(&identity))
230            .multiplex(libp2p_yamux::Config::default())
231            .timeout(Duration::from_secs(20))
232            .boxed();
233
234        Swarm::new(
235            transport,
236            behaviour_fn(identity),
237            peer_id,
238            libp2p_swarm::Config::with_tokio_executor(),
239        )
240    }
241
242    async fn connect<T>(&mut self, other: &mut Swarm<T>)
243    where
244        T: NetworkBehaviour + Send,
245        <T as NetworkBehaviour>::ToSwarm: Debug,
246    {
247        let external_addresses = other.external_addresses().cloned().collect();
248
249        let dial_opts = DialOpts::peer_id(*other.local_peer_id())
250            .addresses(external_addresses)
251            .condition(PeerCondition::Always)
252            .build();
253
254        self.dial(dial_opts).unwrap();
255
256        let mut dialer_done = false;
257        let mut listener_done = false;
258
259        loop {
260            match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
261                Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
262                    dialer_done = true;
263                }
264                Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
265                    listener_done = true;
266                }
267                Either::Left((other, _)) => {
268                    tracing::debug!(
269                        dialer=?other,
270                        "Ignoring event from dialer"
271                    );
272                }
273                Either::Right((other, _)) => {
274                    tracing::debug!(
275                        listener=?other,
276                        "Ignoring event from listener"
277                    );
278                }
279            }
280
281            if dialer_done && listener_done {
282                return;
283            }
284        }
285    }
286
287    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
288        self.dial(addr.clone()).unwrap();
289
290        self.wait(|e| match e {
291            SwarmEvent::ConnectionEstablished {
292                endpoint, peer_id, ..
293            } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
294            other => {
295                tracing::debug!(
296                    dialer=?other,
297                    "Ignoring event from dialer"
298                );
299                None
300            }
301        })
302        .await
303    }
304
305    async fn wait<E, P>(&mut self, predicate: P) -> E
306    where
307        P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
308        P: Send,
309    {
310        loop {
311            let event = self.next_swarm_event().await;
312            if let Some(e) = predicate(event) {
313                break e;
314            }
315        }
316    }
317
318    fn listen(&mut self) -> ListenFuture<&mut Self> {
319        ListenFuture {
320            add_memory_external: false,
321            add_tcp_external: false,
322            swarm: self,
323        }
324    }
325
326    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
327        match futures::future::select(
328            futures_timer::Delay::new(Duration::from_secs(10)),
329            self.select_next_some(),
330        )
331        .await
332        {
333            Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
334            Either::Right((event, _)) => {
335                tracing::trace!(?event);
336
337                event
338            }
339        }
340    }
341
342    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
343        loop {
344            if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
345                return event;
346            }
347        }
348    }
349
350    async fn loop_on_next(mut self) {
351        while let Some(event) = self.next().await {
352            tracing::trace!(?event);
353        }
354    }
355}
356
357pub struct ListenFuture<S> {
358    add_memory_external: bool,
359    add_tcp_external: bool,
360    swarm: S,
361}
362
363impl<S> ListenFuture<S> {
364    /// Adds the memory address we are starting to listen on as an external address using
365    /// [`Swarm::add_external_address`].
366    ///
367    /// This is typically "safe" for tests because within a process, memory addresses are "globally"
368    /// reachable. However, some tests depend on which addresses are external and need this to
369    /// be configurable so it is not a good default.
370    pub fn with_memory_addr_external(mut self) -> Self {
371        self.add_memory_external = true;
372
373        self
374    }
375
376    /// Adds the TCP address we are starting to listen on as an external address using
377    /// [`Swarm::add_external_address`].
378    ///
379    /// This is typically "safe" for tests because on the same machine, 127.0.0.1 is reachable for
380    /// other [`Swarm`]s. However, some tests depend on which addresses are external and need
381    /// this to be configurable so it is not a good default.
382    pub fn with_tcp_addr_external(mut self) -> Self {
383        self.add_tcp_external = true;
384
385        self
386    }
387}
388
389impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
390where
391    B: NetworkBehaviour + Send,
392    <B as NetworkBehaviour>::ToSwarm: Debug,
393{
394    type Output = (Multiaddr, Multiaddr);
395    type IntoFuture = BoxFuture<'s, Self::Output>;
396
397    fn into_future(self) -> Self::IntoFuture {
398        async move {
399            let swarm = self.swarm;
400
401            let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
402
403            // block until we are actually listening
404            let memory_multiaddr = swarm
405                .wait(|e| match e {
406                    SwarmEvent::NewListenAddr {
407                        address,
408                        listener_id,
409                    } => (listener_id == memory_addr_listener_id).then_some(address),
410                    other => {
411                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
412                    }
413                })
414                .await;
415
416            let tcp_addr_listener_id = swarm
417                .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
418                .unwrap();
419
420            let tcp_multiaddr = swarm
421                .wait(|e| match e {
422                    SwarmEvent::NewListenAddr {
423                        address,
424                        listener_id,
425                    } => (listener_id == tcp_addr_listener_id).then_some(address),
426                    other => {
427                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
428                    }
429                })
430                .await;
431
432            if self.add_memory_external {
433                swarm.add_external_address(memory_multiaddr.clone());
434            }
435            if self.add_tcp_external {
436                swarm.add_external_address(tcp_multiaddr.clone());
437            }
438
439            (memory_multiaddr, tcp_multiaddr)
440        }
441        .boxed()
442    }
443}