libp2p_kad/
bootstrap.rs

1use std::{
2    task::{Context, Poll, Waker},
3    time::Duration,
4};
5
6use futures::FutureExt;
7use futures_timer::Delay;
8
9/// Default value chosen at `<https://github.com/libp2p/rust-libp2p/pull/4838#discussion_r1490184754>`.
10pub(crate) const DEFAULT_AUTOMATIC_THROTTLE: Duration = Duration::from_millis(500);
11
12#[derive(Debug)]
13pub(crate) struct Status {
14    /// If the user did not disable periodic bootstrap (by providing `None` for
15    /// `periodic_interval`) this is the periodic interval and the delay of the current period.
16    /// When `Delay` finishes, a bootstrap will be triggered and the `Delay` will be reset.
17    interval_and_delay: Option<(Duration, Delay)>,
18
19    /// Configured duration to wait before triggering a bootstrap when a new peer
20    /// is inserted in the routing table. `None` if automatic bootstrap is disabled.
21    automatic_throttle: Option<Duration>,
22    /// Timer that will be set (if automatic bootstrap is not disabled) when a new peer is inserted
23    /// in the routing table. When it finishes, it will trigger a bootstrap and will be set to
24    /// `None` again. If an other new peer is inserted in the routing table before this timer
25    /// finishes, the timer is reset.
26    throttle_timer: Option<ThrottleTimer>,
27
28    /// Number of bootstrap requests currently in progress. We ensure neither periodic bootstrap
29    /// or automatic bootstrap trigger new requests when there is still some running.
30    current_bootstrap_requests: usize,
31    /// Waker to wake up the `poll` method if progress is ready to be made.
32    waker: Option<Waker>,
33}
34
35impl Status {
36    pub(crate) fn new(
37        periodic_interval: Option<Duration>,
38        automatic_throttle: Option<Duration>,
39    ) -> Self {
40        Self {
41            interval_and_delay: periodic_interval.map(|interval| (interval, Delay::new(interval))),
42            waker: None,
43            automatic_throttle,
44            throttle_timer: None,
45            current_bootstrap_requests: 0,
46        }
47    }
48
49    /// Trigger a bootstrap now or after the configured `automatic_throttle` if configured.
50    pub(crate) fn trigger(&mut self) {
51        // Registering `self.throttle_timer` means scheduling a bootstrap.
52        // A bootstrap will be triggered when `self.throttle_timer` finishes.
53        // A `throttle_timer` is useful to not trigger a batch of bootstraps when a
54        // batch of peers is inserted into the routing table.
55        if let Some(throttle_duration) = self.automatic_throttle {
56            self.throttle_timer = Some(throttle_duration.into());
57        } else {
58            // The user disabled bootstrapping on new peer in the routing table.
59        }
60
61        // Waking up the waker that could have been registered.
62        if let Some(waker) = self.waker.take() {
63            waker.wake()
64        }
65    }
66
67    pub(crate) fn reset_timers(&mut self) {
68        // Canceling the `throttle_timer` if any and resetting the `delay` if any.
69        self.throttle_timer = None;
70
71        if let Some((interval, delay)) = self.interval_and_delay.as_mut() {
72            delay.reset(*interval);
73        }
74    }
75
76    pub(crate) fn on_started(&mut self) {
77        // No periodic or automatic bootstrap will be triggered as long as
78        // `self.current_bootstrap_requests > 0` but the user could still manually
79        // trigger a bootstrap.
80        self.current_bootstrap_requests += 1;
81
82        // Resetting the Status timers since a bootstrap request is being triggered right now.
83        self.reset_timers();
84    }
85
86    pub(crate) fn on_finish(&mut self) {
87        if let Some(value) = self.current_bootstrap_requests.checked_sub(1) {
88            self.current_bootstrap_requests = value;
89        } else {
90            debug_assert!(
91                false,
92                "Could not decrement current_bootstrap_requests because it's already 0"
93            );
94        }
95
96        // Waking up the waker that could have been registered.
97        if let Some(waker) = self.waker.take() {
98            waker.wake();
99        }
100    }
101
102    pub(crate) fn poll_next_bootstrap(&mut self, cx: &mut Context<'_>) -> Poll<()> {
103        if self.current_bootstrap_requests > 0 {
104            // Some bootstrap request(s) is(are) currently running.
105            self.waker = Some(cx.waker().clone());
106            return Poll::Pending;
107        }
108
109        if let Some(throttle_delay) = &mut self.throttle_timer {
110            // A `throttle_timer` has been registered. It means one or more peers have been
111            // inserted into the routing table and that a bootstrap request should be triggered.
112            // However, to not risk cascading bootstrap requests, we wait a little time to ensure
113            // the user will not add more peers in the routing table in the next "throttle_timer"
114            // remaining.
115            if throttle_delay.poll_unpin(cx).is_ready() {
116                // The `throttle_timer` is finished, triggering bootstrap right now.
117                // The call to `on_started` will reset `throttle_delay`.
118                return Poll::Ready(());
119            }
120
121            // The `throttle_timer` is not finished but the periodic interval for triggering
122            // bootstrap might be reached.
123        } else {
124            // No new peer has recently been inserted into the routing table or automatic bootstrap
125            // is disabled.
126        }
127
128        // Checking if the user has enabled the periodic bootstrap feature.
129        if let Some((_, delay)) = self.interval_and_delay.as_mut() {
130            if let Poll::Ready(()) = delay.poll_unpin(cx) {
131                // It is time to run the periodic bootstrap.
132                // The call to `on_started` will reset `delay`.
133                return Poll::Ready(());
134            }
135        } else {
136            // The user disabled periodic bootstrap.
137        }
138
139        // Registering the `waker` so that we can wake up when calling
140        // `on_new_peer_in_routing_table`.
141        self.waker = Some(cx.waker().clone());
142        Poll::Pending
143    }
144
145    #[cfg(test)]
146    async fn next(&mut self) {
147        std::future::poll_fn(|cx| self.poll_next_bootstrap(cx)).await
148    }
149}
150
151/// Simple enum to indicate when the throttle timer resolves.
152/// A dedicated `Immediate` variant is necessary because creating
153/// `Delay::new(Duration::ZERO)` does not always actually resolve
154/// immediately.
155#[derive(Debug)]
156enum ThrottleTimer {
157    Immediate,
158    Delay(Delay),
159}
160
161impl From<Duration> for ThrottleTimer {
162    fn from(value: Duration) -> Self {
163        if value.is_zero() {
164            Self::Immediate
165        } else {
166            Self::Delay(Delay::new(value))
167        }
168    }
169}
170
171impl futures::Future for ThrottleTimer {
172    type Output = ();
173
174    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175        match self.get_mut() {
176            Self::Immediate => Poll::Ready(()),
177            Self::Delay(delay) => delay.poll_unpin(cx),
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use web_time::Instant;
185
186    use super::*;
187
188    const MS_5: Duration = Duration::from_millis(5);
189    const MS_100: Duration = Duration::from_millis(100);
190
191    fn do_bootstrap(status: &mut Status) {
192        status.on_started();
193        status.on_finish();
194    }
195
196    async fn await_and_do_bootstrap(status: &mut Status) {
197        status.next().await;
198        do_bootstrap(status);
199    }
200
201    #[tokio::test]
202    async fn immediate_automatic_bootstrap_is_triggered_immediately() {
203        let mut status = Status::new(Some(Duration::from_secs(1)), Some(Duration::ZERO));
204
205        await_and_do_bootstrap(&mut status).await; // Wait for periodic bootstrap
206
207        assert!(
208            status.next().now_or_never().is_none(),
209            "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s"
210        );
211
212        status.trigger(); // Connected to a new peer though!
213        assert!(
214            status.next().now_or_never().is_some(),
215            "bootstrap to be triggered immediately because we connected to a new peer"
216        );
217
218        assert!(
219            tokio::time::timeout(Duration::from_millis(500), status.next())
220                .await
221                .is_ok(),
222            "bootstrap to be triggered in less then the configured delay because we connected to a new peer"
223        );
224    }
225
226    #[tokio::test]
227    async fn delayed_automatic_bootstrap_is_triggered_before_periodic_bootstrap() {
228        let mut status = Status::new(Some(Duration::from_secs(1)), Some(MS_5));
229
230        await_and_do_bootstrap(&mut status).await; // Wait for periodic bootstrap
231
232        assert!(
233            status.next().now_or_never().is_none(),
234            "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s"
235        );
236
237        status.trigger(); // Connected to a new peer though!
238        assert!(
239            status.next().now_or_never().is_none(),
240            "bootstrap to not be triggered immediately because throttle is 5ms"
241        );
242
243        assert!(
244            tokio::time::timeout(MS_5 * 2, status.next())
245                .await
246                .is_ok(),
247            "bootstrap to be triggered in less then the configured periodic delay because we connected to a new peer"
248        );
249    }
250
251    #[test]
252    fn given_no_periodic_bootstrap_and_immediate_automatic_bootstrap_try_on_next_connection() {
253        let mut status = Status::new(None, Some(Duration::ZERO));
254
255        // User manually triggered a bootstrap
256        do_bootstrap(&mut status);
257
258        status.trigger(); // Connected to a new peer though!
259
260        assert!(
261            status.next().now_or_never().is_some(),
262            "bootstrap to be triggered immediately because we connected to a new peer"
263        )
264    }
265
266    #[tokio::test]
267    async fn given_periodic_bootstrap_when_routing_table_updated_then_wont_bootstrap_until_next_interval(
268    ) {
269        let mut status = Status::new(Some(MS_100), Some(MS_5));
270
271        status.trigger();
272
273        let start = Instant::now();
274        await_and_do_bootstrap(&mut status).await;
275        let elapsed = Instant::now().duration_since(start);
276
277        assert!(elapsed < MS_5 * 2);
278
279        let start = Instant::now();
280        await_and_do_bootstrap(&mut status).await;
281        let elapsed = Instant::now().duration_since(start);
282
283        assert!(elapsed > MS_100);
284    }
285
286    #[tokio::test]
287    async fn given_no_periodic_bootstrap_and_automatic_bootstrap_when_new_entry_then_will_bootstrap(
288    ) {
289        let mut status = Status::new(None, Some(Duration::ZERO));
290
291        status.trigger();
292
293        status.next().await;
294    }
295
296    #[tokio::test]
297    async fn given_periodic_bootstrap_and_no_automatic_bootstrap_triggers_periodically() {
298        let mut status = Status::new(Some(MS_100), None);
299
300        let start = Instant::now();
301        for i in 1..6 {
302            await_and_do_bootstrap(&mut status).await;
303
304            let elapsed = Instant::now().duration_since(start);
305
306            // Subtract 10ms to avoid flakes.
307            assert!(elapsed > (i * MS_100 - Duration::from_millis(10)));
308        }
309    }
310
311    #[tokio::test]
312    async fn given_no_periodic_bootstrap_and_automatic_bootstrap_reset_throttle_when_multiple_peers(
313    ) {
314        let mut status = Status::new(None, Some(MS_100));
315
316        status.trigger();
317        for _ in 0..10 {
318            Delay::new(MS_100 / 2).await;
319            // should reset throttle_timer
320            status.trigger();
321        }
322        assert!(
323            status.next().now_or_never().is_none(),
324            "bootstrap to not be triggered immediately because throttle has been reset"
325        );
326
327        Delay::new(MS_100 - MS_5).await;
328
329        assert!(
330            tokio::time::timeout(MS_5*2, status.next())
331                .await
332                .is_ok(),
333            "bootstrap to be triggered in the configured throttle delay because we connected to a new peer"
334        );
335    }
336
337    #[tokio::test]
338    async fn given_periodic_bootstrap_and_no_automatic_bootstrap_manually_triggering_prevent_periodic(
339    ) {
340        let mut status = Status::new(Some(MS_100), None);
341
342        // first manually triggering
343        status.on_started();
344        // second manually triggering
345        status.on_started();
346        // one finishes
347        status.on_finish();
348
349        assert!(
350            tokio::time::timeout(10 * MS_100, status.next())
351                .await
352                .is_err(),
353            "periodic bootstrap to never be triggered because one is still being run"
354        );
355
356        status.on_finish(); // all manual bootstrap finished
357
358        assert!(
359            status.next().now_or_never().is_some(),
360            "bootstrap to be triggered immediately because no more bootstrap requests are running"
361        )
362    }
363}