libp2p_kad/query/peers/closest.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22 collections::btree_map::{BTreeMap, Entry},
23 num::NonZeroUsize,
24 time::Duration,
25};
26
27use web_time::Instant;
28
29use super::*;
30use crate::{
31 kbucket::{Distance, Key, KeyBytes},
32 ALPHA_VALUE, K_VALUE,
33};
34
35pub(crate) mod disjoint;
36/// A peer iterator for a dynamically changing list of peers, sorted by increasing
37/// distance to a chosen target.
38#[derive(Debug, Clone)]
39pub struct ClosestPeersIter {
40 config: ClosestPeersIterConfig,
41
42 /// The target whose distance to any peer determines the position of
43 /// the peer in the iterator.
44 target: KeyBytes,
45
46 /// The internal iterator state.
47 state: State,
48
49 /// The closest peers to the target, ordered by increasing distance.
50 closest_peers: BTreeMap<Distance, Peer>,
51
52 /// The number of peers for which the iterator is currently waiting for results.
53 num_waiting: usize,
54}
55
56/// Configuration for a `ClosestPeersIter`.
57#[derive(Debug, Clone)]
58pub struct ClosestPeersIterConfig {
59 /// Allowed level of parallelism.
60 ///
61 /// The `α` parameter in the Kademlia paper. The maximum number of peers that
62 /// the iterator is allowed to wait for in parallel while iterating towards the closest
63 /// nodes to a target. Defaults to `ALPHA_VALUE`.
64 pub parallelism: NonZeroUsize,
65
66 /// Number of results (closest peers) to search for.
67 ///
68 /// The number of closest peers for which the iterator must obtain successful results
69 /// in order to finish successfully. Defaults to `K_VALUE`.
70 pub num_results: NonZeroUsize,
71
72 /// The timeout for a single peer.
73 ///
74 /// If a successful result is not reported for a peer within this timeout
75 /// window, the iterator considers the peer unresponsive and will not wait for
76 /// the peer when evaluating the termination conditions, until and unless a
77 /// result is delivered. Defaults to `10` seconds.
78 pub peer_timeout: Duration,
79}
80
81impl Default for ClosestPeersIterConfig {
82 fn default() -> Self {
83 ClosestPeersIterConfig {
84 parallelism: ALPHA_VALUE,
85 num_results: K_VALUE,
86 peer_timeout: Duration::from_secs(10),
87 }
88 }
89}
90
91impl ClosestPeersIter {
92 /// Creates a new iterator with a default configuration.
93 pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
94 where
95 I: IntoIterator<Item = Key<PeerId>>,
96 {
97 Self::with_config(
98 ClosestPeersIterConfig::default(),
99 target,
100 known_closest_peers,
101 )
102 }
103
104 /// Creates a new iterator with the given configuration.
105 pub fn with_config<I, T>(
106 config: ClosestPeersIterConfig,
107 target: T,
108 known_closest_peers: I,
109 ) -> Self
110 where
111 I: IntoIterator<Item = Key<PeerId>>,
112 T: Into<KeyBytes>,
113 {
114 let target = target.into();
115
116 // Initialise the closest peers to start the iterator with.
117 let closest_peers = BTreeMap::from_iter(
118 known_closest_peers
119 .into_iter()
120 .map(|key| {
121 let distance = key.distance(&target);
122 let state = PeerState::NotContacted;
123 (distance, Peer { key, state })
124 })
125 .take(K_VALUE.into()),
126 );
127
128 // The iterator initially makes progress by iterating towards the target.
129 let state = State::Iterating { no_progress: 0 };
130
131 ClosestPeersIter {
132 config,
133 target,
134 state,
135 closest_peers,
136 num_waiting: 0,
137 }
138 }
139
140 /// Callback for delivering the result of a successful request to a peer.
141 ///
142 /// Delivering results of requests back to the iterator allows the iterator to make
143 /// progress. The iterator is said to make progress either when the given
144 /// `closer_peers` contain a peer closer to the target than any peer seen so far,
145 /// or when the iterator did not yet accumulate `num_results` closest peers and
146 /// `closer_peers` contains a new peer, regardless of its distance to the target.
147 ///
148 /// If the iterator is currently waiting for a result from `peer`,
149 /// the iterator state is updated and `true` is returned. In that
150 /// case, after calling this function, `next` should eventually be
151 /// called again to obtain the new state of the iterator.
152 ///
153 /// If the iterator is finished, it is not currently waiting for a
154 /// result from `peer`, or a result for `peer` has already been reported,
155 /// calling this function has no effect and `false` is returned.
156 pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
157 where
158 I: IntoIterator<Item = PeerId>,
159 {
160 if let State::Finished = self.state {
161 return false;
162 }
163
164 let key = Key::from(*peer);
165 let distance = key.distance(&self.target);
166
167 // Mark the peer as succeeded.
168 match self.closest_peers.entry(distance) {
169 Entry::Vacant(..) => return false,
170 Entry::Occupied(mut e) => match e.get().state {
171 PeerState::Waiting(..) => {
172 debug_assert!(self.num_waiting > 0);
173 self.num_waiting -= 1;
174 e.get_mut().state = PeerState::Succeeded;
175 }
176 PeerState::Unresponsive => {
177 e.get_mut().state = PeerState::Succeeded;
178 }
179 PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
180 },
181 }
182
183 let mut cur_range = distance;
184 let num_results = self.config.num_results.get();
185 // furthest_peer is the furthest peer in range among the closest_peers
186 let furthest_peer = self
187 .closest_peers
188 .iter()
189 .enumerate()
190 .nth(num_results - 1)
191 .map(|(_, peer)| peer)
192 .or_else(|| self.closest_peers.iter().last());
193 if let Some((dist, _)) = furthest_peer {
194 cur_range = *dist;
195 }
196
197 // Incorporate the reported closer peers into the iterator.
198 //
199 // The iterator makes progress if:
200 // 1, the iterator did not yet accumulate enough closest peers.
201 // OR
202 // 2, any of the new peers is closer to the target than any peer seen so far
203 // (i.e. is the first entry after being incorporated)
204 let mut progress = self.closest_peers.len() < self.config.num_results.get();
205 for peer in closer_peers {
206 let key = peer.into();
207 let distance = self.target.distance(&key);
208 let peer = Peer {
209 key,
210 state: PeerState::NotContacted,
211 };
212
213 let is_first_insert = match self.closest_peers.entry(distance) {
214 Entry::Occupied(_) => false,
215 Entry::Vacant(entry) => {
216 entry.insert(peer);
217 true
218 }
219 };
220
221 progress = (is_first_insert && distance < cur_range) || progress;
222 }
223
224 // Update the iterator state.
225 self.state = match self.state {
226 State::Iterating { no_progress } => {
227 let no_progress = if progress { 0 } else { no_progress + 1 };
228 if no_progress >= self.config.parallelism.get() {
229 State::Stalled
230 } else {
231 State::Iterating { no_progress }
232 }
233 }
234 State::Stalled => {
235 if progress {
236 State::Iterating { no_progress: 0 }
237 } else {
238 State::Stalled
239 }
240 }
241 State::Finished => State::Finished,
242 };
243
244 true
245 }
246
247 /// Callback for informing the iterator about a failed request to a peer.
248 ///
249 /// If the iterator is currently waiting for a result from `peer`,
250 /// the iterator state is updated and `true` is returned. In that
251 /// case, after calling this function, `next` should eventually be
252 /// called again to obtain the new state of the iterator.
253 ///
254 /// If the iterator is finished, it is not currently waiting for a
255 /// result from `peer`, or a result for `peer` has already been reported,
256 /// calling this function has no effect and `false` is returned.
257 pub fn on_failure(&mut self, peer: &PeerId) -> bool {
258 if let State::Finished = self.state {
259 return false;
260 }
261
262 let key = Key::from(*peer);
263 let distance = key.distance(&self.target);
264
265 match self.closest_peers.entry(distance) {
266 Entry::Vacant(_) => return false,
267 Entry::Occupied(mut e) => match e.get().state {
268 PeerState::Waiting(_) => {
269 debug_assert!(self.num_waiting > 0);
270 self.num_waiting -= 1;
271 e.get_mut().state = PeerState::Failed
272 }
273 PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
274 PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
275 },
276 }
277
278 true
279 }
280
281 /// Returns the list of peers for which the iterator is currently waiting
282 /// for results.
283 pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
284 self.closest_peers
285 .values()
286 .filter_map(|peer| match peer.state {
287 PeerState::Waiting(..) => Some(peer.key.preimage()),
288 _ => None,
289 })
290 }
291
292 /// Returns the number of peers for which the iterator is currently
293 /// waiting for results.
294 pub fn num_waiting(&self) -> usize {
295 self.num_waiting
296 }
297
298 /// Returns true if the iterator is waiting for a response from the given peer.
299 pub fn is_waiting(&self, peer: &PeerId) -> bool {
300 self.waiting().any(|p| peer == p)
301 }
302
303 /// Advances the state of the iterator, potentially getting a new peer to contact.
304 pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
305 if let State::Finished = self.state {
306 return PeersIterState::Finished;
307 }
308
309 // Count the number of peers that returned a result. If there is a
310 // request in progress to one of the `num_results` closest peers, the
311 // counter is set to `None` as the iterator can only finish once
312 // `num_results` closest peers have responded (or there are no more
313 // peers to contact, see `num_waiting`).
314 let mut result_counter = Some(0);
315
316 // Check if the iterator is at capacity w.r.t. the allowed parallelism.
317 let at_capacity = self.at_capacity();
318
319 for peer in self.closest_peers.values_mut() {
320 match peer.state {
321 PeerState::Waiting(timeout) => {
322 if now >= timeout {
323 // Unresponsive peers no longer count towards the limit for the
324 // bounded parallelism, though they might still be ongoing and
325 // their results can still be delivered to the iterator.
326 debug_assert!(self.num_waiting > 0);
327 self.num_waiting -= 1;
328 peer.state = PeerState::Unresponsive
329 } else if at_capacity {
330 // The iterator is still waiting for a result from a peer and is
331 // at capacity w.r.t. the maximum number of peers being waited on.
332 return PeersIterState::WaitingAtCapacity;
333 } else {
334 // The iterator is still waiting for a result from a peer and the
335 // `result_counter` did not yet reach `num_results`. Therefore
336 // the iterator is not yet done, regardless of already successful
337 // queries to peers farther from the target.
338 result_counter = None;
339 }
340 }
341
342 PeerState::Succeeded => {
343 if let Some(ref mut cnt) = result_counter {
344 *cnt += 1;
345 // If `num_results` successful results have been delivered for the
346 // closest peers, the iterator is done.
347 if *cnt >= self.config.num_results.get() {
348 self.state = State::Finished;
349 return PeersIterState::Finished;
350 }
351 }
352 }
353
354 PeerState::NotContacted => {
355 if !at_capacity {
356 let timeout = now + self.config.peer_timeout;
357 peer.state = PeerState::Waiting(timeout);
358 self.num_waiting += 1;
359 return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
360 } else {
361 return PeersIterState::WaitingAtCapacity;
362 }
363 }
364
365 PeerState::Unresponsive | PeerState::Failed => {
366 // Skip over unresponsive or failed peers.
367 }
368 }
369 }
370
371 if self.num_waiting > 0 {
372 // The iterator is still waiting for results and not at capacity w.r.t.
373 // the allowed parallelism, but there are no new peers to contact
374 // at the moment.
375 PeersIterState::Waiting(None)
376 } else {
377 // The iterator is finished because all available peers have been contacted
378 // and the iterator is not waiting for any more results.
379 self.state = State::Finished;
380 PeersIterState::Finished
381 }
382 }
383
384 /// Immediately transitions the iterator to [`PeersIterState::Finished`].
385 pub fn finish(&mut self) {
386 self.state = State::Finished
387 }
388
389 /// Checks whether the iterator has finished.
390 pub fn is_finished(&self) -> bool {
391 self.state == State::Finished
392 }
393
394 /// Consumes the iterator, returning the closest peers.
395 pub fn into_result(self) -> impl Iterator<Item = PeerId> {
396 self.closest_peers
397 .into_iter()
398 .filter_map(|(_, peer)| {
399 if let PeerState::Succeeded = peer.state {
400 Some(peer.key.into_preimage())
401 } else {
402 None
403 }
404 })
405 .take(self.config.num_results.get())
406 }
407
408 /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
409 ///
410 /// While the iterator is stalled, up to `num_results` parallel requests
411 /// are allowed. This is a slightly more permissive variant of the
412 /// requirement that the initiator "resends the FIND_NODE to all of the
413 /// k closest nodes it has not already queried".
414 fn at_capacity(&self) -> bool {
415 match self.state {
416 State::Stalled => {
417 self.num_waiting
418 >= usize::max(self.config.num_results.get(), self.config.parallelism.get())
419 }
420 State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
421 State::Finished => true,
422 }
423 }
424}
425
426////////////////////////////////////////////////////////////////////////////////
427// Private state
428
429/// Internal state of the iterator.
430#[derive(Debug, PartialEq, Eq, Copy, Clone)]
431enum State {
432 /// The iterator is making progress by iterating towards `num_results` closest
433 /// peers to the target with a maximum of `parallelism` peers for which the
434 /// iterator is waiting for results at a time.
435 ///
436 /// > **Note**: When the iterator switches back to `Iterating` after being
437 /// > `Stalled`, it may temporarily be waiting for more than `parallelism`
438 /// > results from peers, with new peers only being considered once
439 /// > the number pending results drops below `parallelism`.
440 Iterating {
441 /// The number of consecutive results that did not yield a peer closer
442 /// to the target. When this number reaches `parallelism` and no new
443 /// peer was discovered or at least `num_results` peers are known to
444 /// the iterator, it is considered `Stalled`.
445 no_progress: usize,
446 },
447
448 /// A iterator is stalled when it did not make progress after `parallelism`
449 /// consecutive successful results (see `on_success`).
450 ///
451 /// While the iterator is stalled, the maximum allowed parallelism for pending
452 /// results is increased to `num_results` in an attempt to finish the iterator.
453 /// If the iterator can make progress again upon receiving the remaining
454 /// results, it switches back to `Iterating`. Otherwise it will be finished.
455 Stalled,
456
457 /// The iterator is finished.
458 ///
459 /// A iterator finishes either when it has collected `num_results` results
460 /// from the closest peers (not counting those that failed or are unresponsive)
461 /// or because the iterator ran out of peers that have not yet delivered
462 /// results (or failed).
463 Finished,
464}
465
466/// Representation of a peer in the context of a iterator.
467#[derive(Debug, Clone)]
468struct Peer {
469 key: Key<PeerId>,
470 state: PeerState,
471}
472
473/// The state of a single `Peer`.
474#[derive(Debug, Copy, Clone)]
475enum PeerState {
476 /// The peer has not yet been contacted.
477 ///
478 /// This is the starting state for every peer.
479 NotContacted,
480
481 /// The iterator is waiting for a result from the peer.
482 Waiting(Instant),
483
484 /// A result was not delivered for the peer within the configured timeout.
485 ///
486 /// The peer is not taken into account for the termination conditions
487 /// of the iterator until and unless it responds.
488 Unresponsive,
489
490 /// Obtaining a result from the peer has failed.
491 ///
492 /// This is a final state, reached as a result of a call to `on_failure`.
493 Failed,
494
495 /// A successful result from the peer has been delivered.
496 ///
497 /// This is a final state, reached as a result of a call to `on_success`.
498 Succeeded,
499}
500
501#[cfg(test)]
502mod tests {
503 use std::iter;
504
505 use libp2p_core::multihash::Multihash;
506 use quickcheck::*;
507 use rand::{rngs::StdRng, Rng, SeedableRng};
508
509 use super::*;
510 use crate::SHA_256_MH;
511
512 fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
513 (0..n)
514 .map(|_| {
515 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
516 .unwrap()
517 })
518 .collect()
519 }
520
521 fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
522 peers
523 .windows(2)
524 .all(|w| w[0].distance(&target) < w[1].distance(&target))
525 }
526
527 #[derive(Clone, Debug)]
528 struct ArbitraryPeerId(PeerId);
529
530 impl Arbitrary for ArbitraryPeerId {
531 fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
532 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
533 let peer_id =
534 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
535 ArbitraryPeerId(peer_id)
536 }
537 }
538
539 impl Arbitrary for ClosestPeersIter {
540 fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
541 let known_closest_peers = (0..g.gen_range(1..60u8))
542 .map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
543 .collect::<Vec<_>>();
544 let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
545 let config = ClosestPeersIterConfig {
546 parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
547 num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
548 peer_timeout: Duration::from_secs(g.gen_range(10..30)),
549 };
550 ClosestPeersIter::with_config(config, target, known_closest_peers)
551 }
552 }
553
554 #[derive(Clone, Debug)]
555 struct Seed([u8; 32]);
556
557 impl Arbitrary for Seed {
558 fn arbitrary(g: &mut Gen) -> Seed {
559 let seed = core::array::from_fn(|_| u8::arbitrary(g));
560 Seed(seed)
561 }
562 }
563
564 #[test]
565 fn new_iter() {
566 fn prop(iter: ClosestPeersIter) {
567 let target = iter.target;
568
569 let (keys, states): (Vec<_>, Vec<_>) = iter
570 .closest_peers
571 .values()
572 .map(|e| (e.key, &e.state))
573 .unzip();
574
575 let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
576
577 assert!(none_contacted, "Unexpected peer state in new iterator.");
578 assert!(
579 sorted(&target, &keys),
580 "Closest peers in new iterator not sorted by distance to target."
581 );
582 assert_eq!(
583 iter.num_waiting(),
584 0,
585 "Unexpected peers in progress in new iterator."
586 );
587 assert_eq!(
588 iter.into_result().count(),
589 0,
590 "Unexpected closest peers in new iterator"
591 );
592 }
593
594 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
595 }
596
597 #[test]
598 fn termination_and_parallelism() {
599 fn prop(mut iter: ClosestPeersIter, seed: Seed) {
600 let now = Instant::now();
601 let mut rng = StdRng::from_seed(seed.0);
602
603 let mut expected = iter
604 .closest_peers
605 .values()
606 .map(|e| e.key)
607 .collect::<Vec<_>>();
608 let num_known = expected.len();
609 let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
610
611 let target = iter.target;
612 let mut remaining;
613 let mut num_failures = 0;
614
615 'finished: loop {
616 if expected.is_empty() {
617 break;
618 }
619 // Split off the next up to `parallelism` expected peers.
620 else if expected.len() < max_parallelism {
621 remaining = Vec::new();
622 } else {
623 remaining = expected.split_off(max_parallelism);
624 }
625
626 // Advance for maximum parallelism.
627 for k in expected.iter() {
628 match iter.next(now) {
629 PeersIterState::Finished => break 'finished,
630 PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
631 PeersIterState::Waiting(None) => panic!("Expected another peer."),
632 PeersIterState::WaitingAtCapacity => {
633 panic!("Unexpectedly reached capacity.")
634 }
635 }
636 }
637 let num_waiting = iter.num_waiting();
638 assert_eq!(num_waiting, expected.len());
639
640 // Check the bounded parallelism.
641 if iter.at_capacity() {
642 assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
643 }
644
645 // Report results back to the iterator with a random number of "closer"
646 // peers or an error, thus finishing the "in-flight requests".
647 for (i, k) in expected.iter().enumerate() {
648 if rng.gen_bool(0.75) {
649 let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
650 let closer_peers = random_peers(num_closer, &mut rng);
651 remaining.extend(closer_peers.iter().cloned().map(Key::from));
652 iter.on_success(k.preimage(), closer_peers);
653 } else {
654 num_failures += 1;
655 iter.on_failure(k.preimage());
656 }
657 assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
658 }
659
660 // Re-sort the remaining expected peers for the next "round".
661 remaining.sort_by_key(|k| target.distance(&k));
662
663 expected = remaining
664 }
665
666 // The iterator must be finished.
667 assert_eq!(iter.next(now), PeersIterState::Finished);
668 assert_eq!(iter.state, State::Finished);
669
670 // Determine if all peers have been contacted by the iterator. This _must_ be
671 // the case if the iterator finished with fewer than the requested number
672 // of results.
673 let all_contacted = iter
674 .closest_peers
675 .values()
676 .all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
677
678 let target = iter.target;
679 let num_results = iter.config.num_results;
680 let result = iter.into_result();
681 let closest = result.map(Key::from).collect::<Vec<_>>();
682
683 assert!(sorted(&target, &closest));
684
685 if closest.len() < num_results.get() {
686 // The iterator returned fewer results than requested. Therefore
687 // either the initial number of known peers must have been
688 // less than the desired number of results, or there must
689 // have been failures.
690 assert!(num_known < num_results.get() || num_failures > 0);
691 // All peers must have been contacted.
692 assert!(all_contacted, "Not all peers have been contacted.");
693 } else {
694 assert_eq!(num_results.get(), closest.len(), "Too many results.");
695 }
696 }
697
698 QuickCheck::new()
699 .tests(10)
700 .quickcheck(prop as fn(_, _) -> _)
701 }
702
703 #[test]
704 fn no_duplicates() {
705 fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
706 let now = Instant::now();
707
708 let closer = vec![closer.0];
709
710 // A first peer reports a "closer" peer.
711 let peer1 = match iter.next(now) {
712 PeersIterState::Waiting(Some(p)) => p.into_owned(),
713 _ => panic!("No peer."),
714 };
715 iter.on_success(&peer1, closer.clone());
716 // Duplicate result from te same peer.
717 iter.on_success(&peer1, closer.clone());
718
719 // If there is a second peer, let it also report the same "closer" peer.
720 match iter.next(now) {
721 PeersIterState::Waiting(Some(p)) => {
722 let peer2 = p.into_owned();
723 assert!(iter.on_success(&peer2, closer.clone()))
724 }
725 PeersIterState::Finished => {}
726 _ => panic!("Unexpectedly iter state."),
727 };
728
729 // The "closer" peer must only be in the iterator once.
730 let n = iter
731 .closest_peers
732 .values()
733 .filter(|e| e.key.preimage() == &closer[0])
734 .count();
735 assert_eq!(n, 1);
736
737 true
738 }
739
740 QuickCheck::new()
741 .tests(10)
742 .quickcheck(prop as fn(_, _) -> _)
743 }
744
745 #[test]
746 fn timeout() {
747 fn prop(mut iter: ClosestPeersIter) -> bool {
748 let mut now = Instant::now();
749 let peer = iter
750 .closest_peers
751 .values()
752 .next()
753 .unwrap()
754 .key
755 .into_preimage();
756
757 // Poll the iterator for the first peer to be in progress.
758 match iter.next(now) {
759 PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
760 _ => panic!(),
761 }
762
763 // Artificially advance the clock.
764 now += iter.config.peer_timeout;
765
766 // Advancing the iterator again should mark the first peer as unresponsive.
767 let _ = iter.next(now);
768 match &iter.closest_peers.values().next().unwrap() {
769 Peer {
770 key,
771 state: PeerState::Unresponsive,
772 } => {
773 assert_eq!(key.preimage(), &peer);
774 }
775 Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
776 }
777
778 let finished = iter.is_finished();
779 iter.on_success(&peer, iter::empty());
780 let closest = iter.into_result().collect::<Vec<_>>();
781
782 if finished {
783 // Delivering results when the iterator already finished must have
784 // no effect.
785 assert_eq!(Vec::<PeerId>::new(), closest)
786 } else {
787 // Unresponsive peers can still deliver results while the iterator
788 // is not finished.
789 assert_eq!(vec![peer], closest)
790 }
791 true
792 }
793
794 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
795 }
796
797 #[test]
798 fn without_success_try_up_to_k_peers() {
799 fn prop(mut iter: ClosestPeersIter) {
800 let now = Instant::now();
801
802 for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
803 match iter.next(now) {
804 PeersIterState::Waiting(Some(p)) => {
805 let peer = p.clone().into_owned();
806 iter.on_failure(&peer);
807 }
808 _ => panic!("Expected iterator to yield another peer to query."),
809 }
810 }
811
812 assert_eq!(PeersIterState::Finished, iter.next(now));
813 }
814
815 QuickCheck::new().tests(10).quickcheck(prop as fn(_))
816 }
817
818 #[test]
819 fn stalled_at_capacity() {
820 fn prop(mut iter: ClosestPeersIter) {
821 iter.state = State::Stalled;
822
823 for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
824 iter.num_waiting = i;
825 assert!(
826 !iter.at_capacity(),
827 "Iterator should not be at capacity if less than \
828 `max(parallelism, num_results)` requests are waiting.",
829 )
830 }
831
832 iter.num_waiting =
833 usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
834 assert!(
835 iter.at_capacity(),
836 "Iterator should be at capacity if `max(parallelism, num_results)` requests are \
837 waiting.",
838 )
839 }
840
841 QuickCheck::new().tests(10).quickcheck(prop as fn(_))
842 }
843}