1use std::{
2 task::{Context, Poll, Waker},
3 time::Duration,
4};
5
6use futures::FutureExt;
7use futures_timer::Delay;
8
9pub(crate) const DEFAULT_AUTOMATIC_THROTTLE: Duration = Duration::from_millis(500);
11
12#[derive(Debug)]
13pub(crate) struct Status {
14 interval_and_delay: Option<(Duration, Delay)>,
18
19 automatic_throttle: Option<Duration>,
22 throttle_timer: Option<ThrottleTimer>,
27
28 current_bootstrap_requests: usize,
31 waker: Option<Waker>,
33}
34
35impl Status {
36 pub(crate) fn new(
37 periodic_interval: Option<Duration>,
38 automatic_throttle: Option<Duration>,
39 ) -> Self {
40 Self {
41 interval_and_delay: periodic_interval.map(|interval| (interval, Delay::new(interval))),
42 waker: None,
43 automatic_throttle,
44 throttle_timer: None,
45 current_bootstrap_requests: 0,
46 }
47 }
48
49 pub(crate) fn trigger(&mut self) {
51 if let Some(throttle_duration) = self.automatic_throttle {
56 self.throttle_timer = Some(throttle_duration.into());
57 } else {
58 }
60
61 if let Some(waker) = self.waker.take() {
63 waker.wake()
64 }
65 }
66
67 pub(crate) fn reset_timers(&mut self) {
68 self.throttle_timer = None;
70
71 if let Some((interval, delay)) = self.interval_and_delay.as_mut() {
72 delay.reset(*interval);
73 }
74 }
75
76 pub(crate) fn on_started(&mut self) {
77 self.current_bootstrap_requests += 1;
81
82 self.reset_timers();
84 }
85
86 pub(crate) fn on_finish(&mut self) {
87 if let Some(value) = self.current_bootstrap_requests.checked_sub(1) {
88 self.current_bootstrap_requests = value;
89 } else {
90 debug_assert!(
91 false,
92 "Could not decrement current_bootstrap_requests because it's already 0"
93 );
94 }
95
96 if let Some(waker) = self.waker.take() {
98 waker.wake();
99 }
100 }
101
102 pub(crate) fn poll_next_bootstrap(&mut self, cx: &mut Context<'_>) -> Poll<()> {
103 if self.current_bootstrap_requests > 0 {
104 self.waker = Some(cx.waker().clone());
106 return Poll::Pending;
107 }
108
109 if let Some(throttle_delay) = &mut self.throttle_timer {
110 if throttle_delay.poll_unpin(cx).is_ready() {
116 return Poll::Ready(());
119 }
120
121 } else {
124 }
127
128 if let Some((_, delay)) = self.interval_and_delay.as_mut() {
130 if let Poll::Ready(()) = delay.poll_unpin(cx) {
131 return Poll::Ready(());
134 }
135 } else {
136 }
138
139 self.waker = Some(cx.waker().clone());
142 Poll::Pending
143 }
144
145 #[cfg(test)]
146 async fn next(&mut self) {
147 std::future::poll_fn(|cx| self.poll_next_bootstrap(cx)).await
148 }
149}
150
151#[derive(Debug)]
156enum ThrottleTimer {
157 Immediate,
158 Delay(Delay),
159}
160
161impl From<Duration> for ThrottleTimer {
162 fn from(value: Duration) -> Self {
163 if value.is_zero() {
164 Self::Immediate
165 } else {
166 Self::Delay(Delay::new(value))
167 }
168 }
169}
170
171impl futures::Future for ThrottleTimer {
172 type Output = ();
173
174 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175 match self.get_mut() {
176 Self::Immediate => Poll::Ready(()),
177 Self::Delay(delay) => delay.poll_unpin(cx),
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use web_time::Instant;
185
186 use super::*;
187
188 const MS_5: Duration = Duration::from_millis(5);
189 const MS_100: Duration = Duration::from_millis(100);
190
191 fn do_bootstrap(status: &mut Status) {
192 status.on_started();
193 status.on_finish();
194 }
195
196 async fn await_and_do_bootstrap(status: &mut Status) {
197 status.next().await;
198 do_bootstrap(status);
199 }
200
201 #[tokio::test]
202 async fn immediate_automatic_bootstrap_is_triggered_immediately() {
203 let mut status = Status::new(Some(Duration::from_secs(1)), Some(Duration::ZERO));
204
205 await_and_do_bootstrap(&mut status).await; assert!(
208 status.next().now_or_never().is_none(),
209 "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s"
210 );
211
212 status.trigger(); assert!(
214 status.next().now_or_never().is_some(),
215 "bootstrap to be triggered immediately because we connected to a new peer"
216 );
217
218 assert!(
219 tokio::time::timeout(Duration::from_millis(500), status.next())
220 .await
221 .is_ok(),
222 "bootstrap to be triggered in less then the configured delay because we connected to a new peer"
223 );
224 }
225
226 #[tokio::test]
227 async fn delayed_automatic_bootstrap_is_triggered_before_periodic_bootstrap() {
228 let mut status = Status::new(Some(Duration::from_secs(1)), Some(MS_5));
229
230 await_and_do_bootstrap(&mut status).await; assert!(
233 status.next().now_or_never().is_none(),
234 "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s"
235 );
236
237 status.trigger(); assert!(
239 status.next().now_or_never().is_none(),
240 "bootstrap to not be triggered immediately because throttle is 5ms"
241 );
242
243 assert!(
244 tokio::time::timeout(MS_5 * 2, status.next())
245 .await
246 .is_ok(),
247 "bootstrap to be triggered in less then the configured periodic delay because we connected to a new peer"
248 );
249 }
250
251 #[test]
252 fn given_no_periodic_bootstrap_and_immediate_automatic_bootstrap_try_on_next_connection() {
253 let mut status = Status::new(None, Some(Duration::ZERO));
254
255 do_bootstrap(&mut status);
257
258 status.trigger(); assert!(
261 status.next().now_or_never().is_some(),
262 "bootstrap to be triggered immediately because we connected to a new peer"
263 )
264 }
265
266 #[tokio::test]
267 async fn given_periodic_bootstrap_when_routing_table_updated_then_wont_bootstrap_until_next_interval(
268 ) {
269 let mut status = Status::new(Some(MS_100), Some(MS_5));
270
271 status.trigger();
272
273 let start = Instant::now();
274 await_and_do_bootstrap(&mut status).await;
275 let elapsed = Instant::now().duration_since(start);
276
277 assert!(elapsed < MS_5 * 2);
278
279 let start = Instant::now();
280 await_and_do_bootstrap(&mut status).await;
281 let elapsed = Instant::now().duration_since(start);
282
283 assert!(elapsed > MS_100);
284 }
285
286 #[tokio::test]
287 async fn given_no_periodic_bootstrap_and_automatic_bootstrap_when_new_entry_then_will_bootstrap(
288 ) {
289 let mut status = Status::new(None, Some(Duration::ZERO));
290
291 status.trigger();
292
293 status.next().await;
294 }
295
296 #[tokio::test]
297 async fn given_periodic_bootstrap_and_no_automatic_bootstrap_triggers_periodically() {
298 let mut status = Status::new(Some(MS_100), None);
299
300 let start = Instant::now();
301 for i in 1..6 {
302 await_and_do_bootstrap(&mut status).await;
303
304 let elapsed = Instant::now().duration_since(start);
305
306 assert!(elapsed > (i * MS_100 - Duration::from_millis(10)));
308 }
309 }
310
311 #[tokio::test]
312 async fn given_no_periodic_bootstrap_and_automatic_bootstrap_reset_throttle_when_multiple_peers(
313 ) {
314 let mut status = Status::new(None, Some(MS_100));
315
316 status.trigger();
317 for _ in 0..10 {
318 Delay::new(MS_100 / 2).await;
319 status.trigger();
321 }
322 assert!(
323 status.next().now_or_never().is_none(),
324 "bootstrap to not be triggered immediately because throttle has been reset"
325 );
326
327 Delay::new(MS_100 - MS_5).await;
328
329 assert!(
330 tokio::time::timeout(MS_5*2, status.next())
331 .await
332 .is_ok(),
333 "bootstrap to be triggered in the configured throttle delay because we connected to a new peer"
334 );
335 }
336
337 #[tokio::test]
338 async fn given_periodic_bootstrap_and_no_automatic_bootstrap_manually_triggering_prevent_periodic(
339 ) {
340 let mut status = Status::new(Some(MS_100), None);
341
342 status.on_started();
344 status.on_started();
346 status.on_finish();
348
349 assert!(
350 tokio::time::timeout(10 * MS_100, status.next())
351 .await
352 .is_err(),
353 "periodic bootstrap to never be triggered because one is still being run"
354 );
355
356 status.on_finish(); assert!(
359 status.next().now_or_never().is_some(),
360 "bootstrap to be triggered immediately because no more bootstrap requests are running"
361 )
362 }
363}