libp2p_kad/query/peers/
closest.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::btree_map::{BTreeMap, Entry},
23    num::NonZeroUsize,
24    time::Duration,
25};
26
27use web_time::Instant;
28
29use super::*;
30use crate::{
31    kbucket::{Distance, Key, KeyBytes},
32    ALPHA_VALUE, K_VALUE,
33};
34
35pub(crate) mod disjoint;
36/// A peer iterator for a dynamically changing list of peers, sorted by increasing
37/// distance to a chosen target.
38#[derive(Debug, Clone)]
39pub struct ClosestPeersIter {
40    config: ClosestPeersIterConfig,
41
42    /// The target whose distance to any peer determines the position of
43    /// the peer in the iterator.
44    target: KeyBytes,
45
46    /// The internal iterator state.
47    state: State,
48
49    /// The closest peers to the target, ordered by increasing distance.
50    closest_peers: BTreeMap<Distance, Peer>,
51
52    /// The number of peers for which the iterator is currently waiting for results.
53    num_waiting: usize,
54}
55
56/// Configuration for a `ClosestPeersIter`.
57#[derive(Debug, Clone)]
58pub struct ClosestPeersIterConfig {
59    /// Allowed level of parallelism.
60    ///
61    /// The `α` parameter in the Kademlia paper. The maximum number of peers that
62    /// the iterator is allowed to wait for in parallel while iterating towards the closest
63    /// nodes to a target. Defaults to `ALPHA_VALUE`.
64    pub parallelism: NonZeroUsize,
65
66    /// Number of results (closest peers) to search for.
67    ///
68    /// The number of closest peers for which the iterator must obtain successful results
69    /// in order to finish successfully. Defaults to `K_VALUE`.
70    pub num_results: NonZeroUsize,
71
72    /// The timeout for a single peer.
73    ///
74    /// If a successful result is not reported for a peer within this timeout
75    /// window, the iterator considers the peer unresponsive and will not wait for
76    /// the peer when evaluating the termination conditions, until and unless a
77    /// result is delivered. Defaults to `10` seconds.
78    pub peer_timeout: Duration,
79}
80
81impl Default for ClosestPeersIterConfig {
82    fn default() -> Self {
83        ClosestPeersIterConfig {
84            parallelism: ALPHA_VALUE,
85            num_results: K_VALUE,
86            peer_timeout: Duration::from_secs(10),
87        }
88    }
89}
90
91impl ClosestPeersIter {
92    /// Creates a new iterator with a default configuration.
93    pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
94    where
95        I: IntoIterator<Item = Key<PeerId>>,
96    {
97        Self::with_config(
98            ClosestPeersIterConfig::default(),
99            target,
100            known_closest_peers,
101        )
102    }
103
104    /// Creates a new iterator with the given configuration.
105    pub fn with_config<I, T>(
106        config: ClosestPeersIterConfig,
107        target: T,
108        known_closest_peers: I,
109    ) -> Self
110    where
111        I: IntoIterator<Item = Key<PeerId>>,
112        T: Into<KeyBytes>,
113    {
114        let target = target.into();
115
116        // Initialise the closest peers to start the iterator with.
117        let closest_peers = BTreeMap::from_iter(
118            known_closest_peers
119                .into_iter()
120                .map(|key| {
121                    let distance = key.distance(&target);
122                    let state = PeerState::NotContacted;
123                    (distance, Peer { key, state })
124                })
125                .take(K_VALUE.into()),
126        );
127
128        // The iterator initially makes progress by iterating towards the target.
129        let state = State::Iterating { no_progress: 0 };
130
131        ClosestPeersIter {
132            config,
133            target,
134            state,
135            closest_peers,
136            num_waiting: 0,
137        }
138    }
139
140    /// Callback for delivering the result of a successful request to a peer.
141    ///
142    /// Delivering results of requests back to the iterator allows the iterator to make
143    /// progress. The iterator is said to make progress either when the given
144    /// `closer_peers` contain a peer closer to the target than any peer seen so far,
145    /// or when the iterator did not yet accumulate `num_results` closest peers and
146    /// `closer_peers` contains a new peer, regardless of its distance to the target.
147    ///
148    /// If the iterator is currently waiting for a result from `peer`,
149    /// the iterator state is updated and `true` is returned. In that
150    /// case, after calling this function, `next` should eventually be
151    /// called again to obtain the new state of the iterator.
152    ///
153    /// If the iterator is finished, it is not currently waiting for a
154    /// result from `peer`, or a result for `peer` has already been reported,
155    /// calling this function has no effect and `false` is returned.
156    pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
157    where
158        I: IntoIterator<Item = PeerId>,
159    {
160        if let State::Finished = self.state {
161            return false;
162        }
163
164        let key = Key::from(*peer);
165        let distance = key.distance(&self.target);
166
167        // Mark the peer as succeeded.
168        match self.closest_peers.entry(distance) {
169            Entry::Vacant(..) => return false,
170            Entry::Occupied(mut e) => match e.get().state {
171                PeerState::Waiting(..) => {
172                    debug_assert!(self.num_waiting > 0);
173                    self.num_waiting -= 1;
174                    e.get_mut().state = PeerState::Succeeded;
175                }
176                PeerState::Unresponsive => {
177                    e.get_mut().state = PeerState::Succeeded;
178                }
179                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
180            },
181        }
182
183        let mut cur_range = distance;
184        let num_results = self.config.num_results.get();
185        // furthest_peer is the furthest peer in range among the closest_peers
186        let furthest_peer = self
187            .closest_peers
188            .iter()
189            .enumerate()
190            .nth(num_results - 1)
191            .map(|(_, peer)| peer)
192            .or_else(|| self.closest_peers.iter().last());
193        if let Some((dist, _)) = furthest_peer {
194            cur_range = *dist;
195        }
196
197        // Incorporate the reported closer peers into the iterator.
198        //
199        // The iterator makes progress if:
200        //     1, the iterator did not yet accumulate enough closest peers.
201        //   OR
202        //     2, any of the new peers is closer to the target than any peer seen so far
203        //        (i.e. is the first entry after being incorporated)
204        let mut progress = self.closest_peers.len() < self.config.num_results.get();
205        for peer in closer_peers {
206            let key = peer.into();
207            let distance = self.target.distance(&key);
208            let peer = Peer {
209                key,
210                state: PeerState::NotContacted,
211            };
212
213            let is_first_insert = match self.closest_peers.entry(distance) {
214                Entry::Occupied(_) => false,
215                Entry::Vacant(entry) => {
216                    entry.insert(peer);
217                    true
218                }
219            };
220
221            progress = (is_first_insert && distance < cur_range) || progress;
222        }
223
224        // Update the iterator state.
225        self.state = match self.state {
226            State::Iterating { no_progress } => {
227                let no_progress = if progress { 0 } else { no_progress + 1 };
228                if no_progress >= self.config.parallelism.get() {
229                    State::Stalled
230                } else {
231                    State::Iterating { no_progress }
232                }
233            }
234            State::Stalled => {
235                if progress {
236                    State::Iterating { no_progress: 0 }
237                } else {
238                    State::Stalled
239                }
240            }
241            State::Finished => State::Finished,
242        };
243
244        true
245    }
246
247    /// Callback for informing the iterator about a failed request to a peer.
248    ///
249    /// If the iterator is currently waiting for a result from `peer`,
250    /// the iterator state is updated and `true` is returned. In that
251    /// case, after calling this function, `next` should eventually be
252    /// called again to obtain the new state of the iterator.
253    ///
254    /// If the iterator is finished, it is not currently waiting for a
255    /// result from `peer`, or a result for `peer` has already been reported,
256    /// calling this function has no effect and `false` is returned.
257    pub fn on_failure(&mut self, peer: &PeerId) -> bool {
258        if let State::Finished = self.state {
259            return false;
260        }
261
262        let key = Key::from(*peer);
263        let distance = key.distance(&self.target);
264
265        match self.closest_peers.entry(distance) {
266            Entry::Vacant(_) => return false,
267            Entry::Occupied(mut e) => match e.get().state {
268                PeerState::Waiting(_) => {
269                    debug_assert!(self.num_waiting > 0);
270                    self.num_waiting -= 1;
271                    e.get_mut().state = PeerState::Failed
272                }
273                PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
274                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
275            },
276        }
277
278        true
279    }
280
281    /// Returns the list of peers for which the iterator is currently waiting
282    /// for results.
283    pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
284        self.closest_peers
285            .values()
286            .filter_map(|peer| match peer.state {
287                PeerState::Waiting(..) => Some(peer.key.preimage()),
288                _ => None,
289            })
290    }
291
292    /// Returns the number of peers for which the iterator is currently
293    /// waiting for results.
294    pub fn num_waiting(&self) -> usize {
295        self.num_waiting
296    }
297
298    /// Returns true if the iterator is waiting for a response from the given peer.
299    pub fn is_waiting(&self, peer: &PeerId) -> bool {
300        self.waiting().any(|p| peer == p)
301    }
302
303    /// Advances the state of the iterator, potentially getting a new peer to contact.
304    pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
305        if let State::Finished = self.state {
306            return PeersIterState::Finished;
307        }
308
309        // Count the number of peers that returned a result. If there is a
310        // request in progress to one of the `num_results` closest peers, the
311        // counter is set to `None` as the iterator can only finish once
312        // `num_results` closest peers have responded (or there are no more
313        // peers to contact, see `num_waiting`).
314        let mut result_counter = Some(0);
315
316        // Check if the iterator is at capacity w.r.t. the allowed parallelism.
317        let at_capacity = self.at_capacity();
318
319        for peer in self.closest_peers.values_mut() {
320            match peer.state {
321                PeerState::Waiting(timeout) => {
322                    if now >= timeout {
323                        // Unresponsive peers no longer count towards the limit for the
324                        // bounded parallelism, though they might still be ongoing and
325                        // their results can still be delivered to the iterator.
326                        debug_assert!(self.num_waiting > 0);
327                        self.num_waiting -= 1;
328                        peer.state = PeerState::Unresponsive
329                    } else if at_capacity {
330                        // The iterator is still waiting for a result from a peer and is
331                        // at capacity w.r.t. the maximum number of peers being waited on.
332                        return PeersIterState::WaitingAtCapacity;
333                    } else {
334                        // The iterator is still waiting for a result from a peer and the
335                        // `result_counter` did not yet reach `num_results`. Therefore
336                        // the iterator is not yet done, regardless of already successful
337                        // queries to peers farther from the target.
338                        result_counter = None;
339                    }
340                }
341
342                PeerState::Succeeded => {
343                    if let Some(ref mut cnt) = result_counter {
344                        *cnt += 1;
345                        // If `num_results` successful results have been delivered for the
346                        // closest peers, the iterator is done.
347                        if *cnt >= self.config.num_results.get() {
348                            self.state = State::Finished;
349                            return PeersIterState::Finished;
350                        }
351                    }
352                }
353
354                PeerState::NotContacted => {
355                    if !at_capacity {
356                        let timeout = now + self.config.peer_timeout;
357                        peer.state = PeerState::Waiting(timeout);
358                        self.num_waiting += 1;
359                        return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
360                    } else {
361                        return PeersIterState::WaitingAtCapacity;
362                    }
363                }
364
365                PeerState::Unresponsive | PeerState::Failed => {
366                    // Skip over unresponsive or failed peers.
367                }
368            }
369        }
370
371        if self.num_waiting > 0 {
372            // The iterator is still waiting for results and not at capacity w.r.t.
373            // the allowed parallelism, but there are no new peers to contact
374            // at the moment.
375            PeersIterState::Waiting(None)
376        } else {
377            // The iterator is finished because all available peers have been contacted
378            // and the iterator is not waiting for any more results.
379            self.state = State::Finished;
380            PeersIterState::Finished
381        }
382    }
383
384    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
385    pub fn finish(&mut self) {
386        self.state = State::Finished
387    }
388
389    /// Checks whether the iterator has finished.
390    pub fn is_finished(&self) -> bool {
391        self.state == State::Finished
392    }
393
394    /// Consumes the iterator, returning the closest peers.
395    pub fn into_result(self) -> impl Iterator<Item = PeerId> {
396        self.closest_peers
397            .into_iter()
398            .filter_map(|(_, peer)| {
399                if let PeerState::Succeeded = peer.state {
400                    Some(peer.key.into_preimage())
401                } else {
402                    None
403                }
404            })
405            .take(self.config.num_results.get())
406    }
407
408    /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
409    ///
410    /// While the iterator is stalled, up to `num_results` parallel requests
411    /// are allowed. This is a slightly more permissive variant of the
412    /// requirement that the initiator "resends the FIND_NODE to all of the
413    /// k closest nodes it has not already queried".
414    fn at_capacity(&self) -> bool {
415        match self.state {
416            State::Stalled => {
417                self.num_waiting
418                    >= usize::max(self.config.num_results.get(), self.config.parallelism.get())
419            }
420            State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
421            State::Finished => true,
422        }
423    }
424}
425
426////////////////////////////////////////////////////////////////////////////////
427// Private state
428
429/// Internal state of the iterator.
430#[derive(Debug, PartialEq, Eq, Copy, Clone)]
431enum State {
432    /// The iterator is making progress by iterating towards `num_results` closest
433    /// peers to the target with a maximum of `parallelism` peers for which the
434    /// iterator is waiting for results at a time.
435    ///
436    /// > **Note**: When the iterator switches back to `Iterating` after being
437    /// > `Stalled`, it may temporarily be waiting for more than `parallelism`
438    /// > results from peers, with new peers only being considered once
439    /// > the number pending results drops below `parallelism`.
440    Iterating {
441        /// The number of consecutive results that did not yield a peer closer
442        /// to the target. When this number reaches `parallelism` and no new
443        /// peer was discovered or at least `num_results` peers are known to
444        /// the iterator, it is considered `Stalled`.
445        no_progress: usize,
446    },
447
448    /// A iterator is stalled when it did not make progress after `parallelism`
449    /// consecutive successful results (see `on_success`).
450    ///
451    /// While the iterator is stalled, the maximum allowed parallelism for pending
452    /// results is increased to `num_results` in an attempt to finish the iterator.
453    /// If the iterator can make progress again upon receiving the remaining
454    /// results, it switches back to `Iterating`. Otherwise it will be finished.
455    Stalled,
456
457    /// The iterator is finished.
458    ///
459    /// A iterator finishes either when it has collected `num_results` results
460    /// from the closest peers (not counting those that failed or are unresponsive)
461    /// or because the iterator ran out of peers that have not yet delivered
462    /// results (or failed).
463    Finished,
464}
465
466/// Representation of a peer in the context of a iterator.
467#[derive(Debug, Clone)]
468struct Peer {
469    key: Key<PeerId>,
470    state: PeerState,
471}
472
473/// The state of a single `Peer`.
474#[derive(Debug, Copy, Clone)]
475enum PeerState {
476    /// The peer has not yet been contacted.
477    ///
478    /// This is the starting state for every peer.
479    NotContacted,
480
481    /// The iterator is waiting for a result from the peer.
482    Waiting(Instant),
483
484    /// A result was not delivered for the peer within the configured timeout.
485    ///
486    /// The peer is not taken into account for the termination conditions
487    /// of the iterator until and unless it responds.
488    Unresponsive,
489
490    /// Obtaining a result from the peer has failed.
491    ///
492    /// This is a final state, reached as a result of a call to `on_failure`.
493    Failed,
494
495    /// A successful result from the peer has been delivered.
496    ///
497    /// This is a final state, reached as a result of a call to `on_success`.
498    Succeeded,
499}
500
501#[cfg(test)]
502mod tests {
503    use std::iter;
504
505    use libp2p_core::multihash::Multihash;
506    use quickcheck::*;
507    use rand::{rngs::StdRng, Rng, SeedableRng};
508
509    use super::*;
510    use crate::SHA_256_MH;
511
512    fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
513        (0..n)
514            .map(|_| {
515                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
516                    .unwrap()
517            })
518            .collect()
519    }
520
521    fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
522        peers
523            .windows(2)
524            .all(|w| w[0].distance(&target) < w[1].distance(&target))
525    }
526
527    #[derive(Clone, Debug)]
528    struct ArbitraryPeerId(PeerId);
529
530    impl Arbitrary for ArbitraryPeerId {
531        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
532            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
533            let peer_id =
534                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
535            ArbitraryPeerId(peer_id)
536        }
537    }
538
539    impl Arbitrary for ClosestPeersIter {
540        fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
541            let known_closest_peers = (0..g.gen_range(1..60u8))
542                .map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
543                .collect::<Vec<_>>();
544            let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
545            let config = ClosestPeersIterConfig {
546                parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
547                num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
548                peer_timeout: Duration::from_secs(g.gen_range(10..30)),
549            };
550            ClosestPeersIter::with_config(config, target, known_closest_peers)
551        }
552    }
553
554    #[derive(Clone, Debug)]
555    struct Seed([u8; 32]);
556
557    impl Arbitrary for Seed {
558        fn arbitrary(g: &mut Gen) -> Seed {
559            let seed = core::array::from_fn(|_| u8::arbitrary(g));
560            Seed(seed)
561        }
562    }
563
564    #[test]
565    fn new_iter() {
566        fn prop(iter: ClosestPeersIter) {
567            let target = iter.target;
568
569            let (keys, states): (Vec<_>, Vec<_>) = iter
570                .closest_peers
571                .values()
572                .map(|e| (e.key, &e.state))
573                .unzip();
574
575            let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
576
577            assert!(none_contacted, "Unexpected peer state in new iterator.");
578            assert!(
579                sorted(&target, &keys),
580                "Closest peers in new iterator not sorted by distance to target."
581            );
582            assert_eq!(
583                iter.num_waiting(),
584                0,
585                "Unexpected peers in progress in new iterator."
586            );
587            assert_eq!(
588                iter.into_result().count(),
589                0,
590                "Unexpected closest peers in new iterator"
591            );
592        }
593
594        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
595    }
596
597    #[test]
598    fn termination_and_parallelism() {
599        fn prop(mut iter: ClosestPeersIter, seed: Seed) {
600            let now = Instant::now();
601            let mut rng = StdRng::from_seed(seed.0);
602
603            let mut expected = iter
604                .closest_peers
605                .values()
606                .map(|e| e.key)
607                .collect::<Vec<_>>();
608            let num_known = expected.len();
609            let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
610
611            let target = iter.target;
612            let mut remaining;
613            let mut num_failures = 0;
614
615            'finished: loop {
616                if expected.is_empty() {
617                    break;
618                }
619                // Split off the next up to `parallelism` expected peers.
620                else if expected.len() < max_parallelism {
621                    remaining = Vec::new();
622                } else {
623                    remaining = expected.split_off(max_parallelism);
624                }
625
626                // Advance for maximum parallelism.
627                for k in expected.iter() {
628                    match iter.next(now) {
629                        PeersIterState::Finished => break 'finished,
630                        PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
631                        PeersIterState::Waiting(None) => panic!("Expected another peer."),
632                        PeersIterState::WaitingAtCapacity => {
633                            panic!("Unexpectedly reached capacity.")
634                        }
635                    }
636                }
637                let num_waiting = iter.num_waiting();
638                assert_eq!(num_waiting, expected.len());
639
640                // Check the bounded parallelism.
641                if iter.at_capacity() {
642                    assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
643                }
644
645                // Report results back to the iterator with a random number of "closer"
646                // peers or an error, thus finishing the "in-flight requests".
647                for (i, k) in expected.iter().enumerate() {
648                    if rng.gen_bool(0.75) {
649                        let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
650                        let closer_peers = random_peers(num_closer, &mut rng);
651                        remaining.extend(closer_peers.iter().cloned().map(Key::from));
652                        iter.on_success(k.preimage(), closer_peers);
653                    } else {
654                        num_failures += 1;
655                        iter.on_failure(k.preimage());
656                    }
657                    assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
658                }
659
660                // Re-sort the remaining expected peers for the next "round".
661                remaining.sort_by_key(|k| target.distance(&k));
662
663                expected = remaining
664            }
665
666            // The iterator must be finished.
667            assert_eq!(iter.next(now), PeersIterState::Finished);
668            assert_eq!(iter.state, State::Finished);
669
670            // Determine if all peers have been contacted by the iterator. This _must_ be
671            // the case if the iterator finished with fewer than the requested number
672            // of results.
673            let all_contacted = iter
674                .closest_peers
675                .values()
676                .all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
677
678            let target = iter.target;
679            let num_results = iter.config.num_results;
680            let result = iter.into_result();
681            let closest = result.map(Key::from).collect::<Vec<_>>();
682
683            assert!(sorted(&target, &closest));
684
685            if closest.len() < num_results.get() {
686                // The iterator returned fewer results than requested. Therefore
687                // either the initial number of known peers must have been
688                // less than the desired number of results, or there must
689                // have been failures.
690                assert!(num_known < num_results.get() || num_failures > 0);
691                // All peers must have been contacted.
692                assert!(all_contacted, "Not all peers have been contacted.");
693            } else {
694                assert_eq!(num_results.get(), closest.len(), "Too  many results.");
695            }
696        }
697
698        QuickCheck::new()
699            .tests(10)
700            .quickcheck(prop as fn(_, _) -> _)
701    }
702
703    #[test]
704    fn no_duplicates() {
705        fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
706            let now = Instant::now();
707
708            let closer = vec![closer.0];
709
710            // A first peer reports a "closer" peer.
711            let peer1 = match iter.next(now) {
712                PeersIterState::Waiting(Some(p)) => p.into_owned(),
713                _ => panic!("No peer."),
714            };
715            iter.on_success(&peer1, closer.clone());
716            // Duplicate result from te same peer.
717            iter.on_success(&peer1, closer.clone());
718
719            // If there is a second peer, let it also report the same "closer" peer.
720            match iter.next(now) {
721                PeersIterState::Waiting(Some(p)) => {
722                    let peer2 = p.into_owned();
723                    assert!(iter.on_success(&peer2, closer.clone()))
724                }
725                PeersIterState::Finished => {}
726                _ => panic!("Unexpectedly iter state."),
727            };
728
729            // The "closer" peer must only be in the iterator once.
730            let n = iter
731                .closest_peers
732                .values()
733                .filter(|e| e.key.preimage() == &closer[0])
734                .count();
735            assert_eq!(n, 1);
736
737            true
738        }
739
740        QuickCheck::new()
741            .tests(10)
742            .quickcheck(prop as fn(_, _) -> _)
743    }
744
745    #[test]
746    fn timeout() {
747        fn prop(mut iter: ClosestPeersIter) -> bool {
748            let mut now = Instant::now();
749            let peer = iter
750                .closest_peers
751                .values()
752                .next()
753                .unwrap()
754                .key
755                .into_preimage();
756
757            // Poll the iterator for the first peer to be in progress.
758            match iter.next(now) {
759                PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
760                _ => panic!(),
761            }
762
763            // Artificially advance the clock.
764            now += iter.config.peer_timeout;
765
766            // Advancing the iterator again should mark the first peer as unresponsive.
767            let _ = iter.next(now);
768            match &iter.closest_peers.values().next().unwrap() {
769                Peer {
770                    key,
771                    state: PeerState::Unresponsive,
772                } => {
773                    assert_eq!(key.preimage(), &peer);
774                }
775                Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
776            }
777
778            let finished = iter.is_finished();
779            iter.on_success(&peer, iter::empty());
780            let closest = iter.into_result().collect::<Vec<_>>();
781
782            if finished {
783                // Delivering results when the iterator already finished must have
784                // no effect.
785                assert_eq!(Vec::<PeerId>::new(), closest)
786            } else {
787                // Unresponsive peers can still deliver results while the iterator
788                // is not finished.
789                assert_eq!(vec![peer], closest)
790            }
791            true
792        }
793
794        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
795    }
796
797    #[test]
798    fn without_success_try_up_to_k_peers() {
799        fn prop(mut iter: ClosestPeersIter) {
800            let now = Instant::now();
801
802            for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
803                match iter.next(now) {
804                    PeersIterState::Waiting(Some(p)) => {
805                        let peer = p.clone().into_owned();
806                        iter.on_failure(&peer);
807                    }
808                    _ => panic!("Expected iterator to yield another peer to query."),
809                }
810            }
811
812            assert_eq!(PeersIterState::Finished, iter.next(now));
813        }
814
815        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
816    }
817
818    #[test]
819    fn stalled_at_capacity() {
820        fn prop(mut iter: ClosestPeersIter) {
821            iter.state = State::Stalled;
822
823            for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
824                iter.num_waiting = i;
825                assert!(
826                    !iter.at_capacity(),
827                    "Iterator should not be at capacity if less than \
828                     `max(parallelism, num_results)` requests are waiting.",
829                )
830            }
831
832            iter.num_waiting =
833                usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
834            assert!(
835                iter.at_capacity(),
836                "Iterator should be at capacity if `max(parallelism, num_results)` requests are \
837                 waiting.",
838            )
839        }
840
841        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
842    }
843}