libp2p_kad/query/peers/closest/
disjoint.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::HashMap,
23    iter::{Cycle, Map, Peekable},
24    ops::{Index, IndexMut, Range},
25};
26
27use super::*;
28
29/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery
30/// path per configured parallelism according to the S/Kademlia paper.
31pub(crate) struct ClosestDisjointPeersIter {
32    target: KeyBytes,
33
34    /// The set of wrapped [`ClosestPeersIter`].
35    iters: Vec<ClosestPeersIter>,
36    /// Order in which to query the iterators ensuring fairness across
37    /// [`ClosestPeersIter::next`] calls.
38    iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
39
40    /// Mapping of contacted peers by their [`PeerId`] to [`PeerState`]
41    /// containing the corresponding iterator indices as well as the response
42    /// state.
43    ///
44    /// Used to track which iterator contacted which peer. See [`PeerState`]
45    /// for details.
46    contacted_peers: HashMap<PeerId, PeerState>,
47}
48
49impl ClosestDisjointPeersIter {
50    /// Creates a new iterator with a default configuration.
51    #[cfg(test)]
52    pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
53    where
54        I: IntoIterator<Item = Key<PeerId>>,
55    {
56        Self::with_config(
57            ClosestPeersIterConfig::default(),
58            target,
59            known_closest_peers,
60        )
61    }
62
63    /// Creates a new iterator with the given configuration.
64    pub(crate) fn with_config<I, T>(
65        config: ClosestPeersIterConfig,
66        target: T,
67        known_closest_peers: I,
68    ) -> Self
69    where
70        I: IntoIterator<Item = Key<PeerId>>,
71        T: Into<KeyBytes> + Clone,
72    {
73        let peers = known_closest_peers
74            .into_iter()
75            .take(K_VALUE.get())
76            .collect::<Vec<_>>();
77        let iters = (0..config.parallelism.get())
78            // NOTE: All [`ClosestPeersIter`] share the same set of peers at
79            // initialization. The [`ClosestDisjointPeersIter.contacted_peers`]
80            // mapping ensures that a successful response from a peer is only
81            // ever passed to a single [`ClosestPeersIter`]. See
82            // [`ClosestDisjointPeersIter::on_success`] for details.
83            .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
84            .collect::<Vec<_>>();
85
86        let iters_len = iters.len();
87
88        ClosestDisjointPeersIter {
89            target: target.into(),
90            iters,
91            iter_order: (0..iters_len)
92                .map(IteratorIndex as fn(usize) -> IteratorIndex)
93                .cycle(),
94            contacted_peers: HashMap::new(),
95        }
96    }
97
98    /// Callback for informing the iterator about a failed request to a peer.
99    ///
100    /// If the iterator is currently waiting for a result from `peer`,
101    /// the iterator state is updated and `true` is returned. In that
102    /// case, after calling this function, `next` should eventually be
103    /// called again to obtain the new state of the iterator.
104    ///
105    /// If the iterator is finished, it is not currently waiting for a
106    /// result from `peer`, or a result for `peer` has already been reported,
107    /// calling this function has no effect and `false` is returned.
108    pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
109        let mut updated = false;
110
111        if let Some(PeerState {
112            initiated_by,
113            response,
114        }) = self.contacted_peers.get_mut(peer)
115        {
116            updated = self.iters[*initiated_by].on_failure(peer);
117
118            if updated {
119                *response = ResponseState::Failed;
120            }
121
122            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
123                if IteratorIndex(i) != *initiated_by {
124                    // This iterator never triggered an actual request to the
125                    // given peer - thus ignore the returned boolean.
126                    iter.on_failure(peer);
127                }
128            }
129        }
130
131        updated
132    }
133
134    /// Callback for delivering the result of a successful request to a peer.
135    ///
136    /// Delivering results of requests back to the iterator allows the iterator
137    /// to make progress. The iterator is said to make progress either when the
138    /// given `closer_peers` contain a peer closer to the target than any peer
139    /// seen so far, or when the iterator did not yet accumulate `num_results`
140    /// closest peers and `closer_peers` contains a new peer, regardless of its
141    /// distance to the target.
142    ///
143    /// If the iterator is currently waiting for a result from `peer`,
144    /// the iterator state is updated and `true` is returned. In that
145    /// case, after calling this function, `next` should eventually be
146    /// called again to obtain the new state of the iterator.
147    ///
148    /// If the iterator is finished, it is not currently waiting for a
149    /// result from `peer`, or a result for `peer` has already been reported,
150    /// calling this function has no effect and `false` is returned.
151    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
152    where
153        I: IntoIterator<Item = PeerId>,
154    {
155        let mut updated = false;
156
157        if let Some(PeerState {
158            initiated_by,
159            response,
160        }) = self.contacted_peers.get_mut(peer)
161        {
162            // Pass the new `closer_peers` to the iterator that first yielded
163            // the peer.
164            updated = self.iters[*initiated_by].on_success(peer, closer_peers);
165
166            if updated {
167                // Mark the response as succeeded for future iterators yielding
168                // this peer. There is no need to keep the `closer_peers`
169                // around, given that they are only passed to the first
170                // iterator.
171                *response = ResponseState::Succeeded;
172            }
173
174            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
175                if IteratorIndex(i) != *initiated_by {
176                    // Only report the success to all remaining not-first
177                    // iterators. Do not pass the `closer_peers` in order to
178                    // uphold the S/Kademlia disjoint paths guarantee.
179                    //
180                    // This iterator never triggered an actual request to the
181                    // given peer - thus ignore the returned boolean.
182                    iter.on_success(peer, std::iter::empty());
183                }
184            }
185        }
186
187        updated
188    }
189
190    pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
191        let mut state = None;
192
193        // Ensure querying each iterator at most once.
194        for _ in 0..self.iters.len() {
195            let i = self.iter_order.next().expect("Cycle never ends.");
196            let iter = &mut self.iters[i];
197
198            loop {
199                match iter.next(now) {
200                    PeersIterState::Waiting(None) => {
201                        match state {
202                            Some(PeersIterState::Waiting(Some(_))) => {
203                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
204                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
205                                // unreachable.
206                                unreachable!();
207                            }
208                            Some(PeersIterState::Waiting(None)) => {}
209                            Some(PeersIterState::WaitingAtCapacity) => {
210                                // At least one ClosestPeersIter is no longer at capacity, thus the
211                                // composite ClosestDisjointPeersIter is no longer at capacity.
212                                state = Some(PeersIterState::Waiting(None))
213                            }
214                            Some(PeersIterState::Finished) => {
215                                // `state` is never set to `Finished`.
216                                unreachable!();
217                            }
218                            None => state = Some(PeersIterState::Waiting(None)),
219                        };
220
221                        break;
222                    }
223                    PeersIterState::Waiting(Some(peer)) => {
224                        match self.contacted_peers.get_mut(&*peer) {
225                            Some(PeerState { response, .. }) => {
226                                // Another iterator already contacted this peer.
227                                let peer = peer.into_owned();
228
229                                match response {
230                                    // The iterator will be notified later whether the given node
231                                    // was successfully contacted or not. See
232                                    // [`ClosestDisjointPeersIter::on_success`] for details.
233                                    ResponseState::Waiting => {}
234                                    ResponseState::Succeeded => {
235                                        // Given that iterator was not the first to contact the peer
236                                        // it will not be made aware of the closer peers discovered
237                                        // to uphold the S/Kademlia disjoint paths guarantee. See
238                                        // [`ClosestDisjointPeersIter::on_success`] for details.
239                                        iter.on_success(&peer, std::iter::empty());
240                                    }
241                                    ResponseState::Failed => {
242                                        iter.on_failure(&peer);
243                                    }
244                                }
245                            }
246                            None => {
247                                // The iterator is the first to contact this peer.
248                                self.contacted_peers
249                                    .insert(peer.clone().into_owned(), PeerState::new(i));
250                                return PeersIterState::Waiting(Some(Cow::Owned(
251                                    peer.into_owned(),
252                                )));
253                            }
254                        }
255                    }
256                    PeersIterState::WaitingAtCapacity => {
257                        match state {
258                            Some(PeersIterState::Waiting(Some(_))) => {
259                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
260                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
261                                // unreachable.
262                                unreachable!();
263                            }
264                            Some(PeersIterState::Waiting(None)) => {}
265                            Some(PeersIterState::WaitingAtCapacity) => {}
266                            Some(PeersIterState::Finished) => {
267                                // `state` is never set to `Finished`.
268                                unreachable!();
269                            }
270                            None => state = Some(PeersIterState::WaitingAtCapacity),
271                        };
272
273                        break;
274                    }
275                    PeersIterState::Finished => break,
276                }
277            }
278        }
279
280        state.unwrap_or(PeersIterState::Finished)
281    }
282
283    /// Finishes all paths containing one of the given peers.
284    ///
285    /// See [`crate::query::Query::try_finish`] for details.
286    pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
287    where
288        I: IntoIterator<Item = &'a PeerId>,
289    {
290        for peer in peers {
291            if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
292                self.iters[*initiated_by].finish();
293            }
294        }
295
296        self.is_finished()
297    }
298
299    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
300    pub(crate) fn finish(&mut self) {
301        for iter in &mut self.iters {
302            iter.finish();
303        }
304    }
305
306    /// Checks whether the iterator has finished.
307    pub(crate) fn is_finished(&self) -> bool {
308        self.iters.iter().all(|i| i.is_finished())
309    }
310
311    /// Note: In the case of no adversarial peers or connectivity issues along
312    ///       any path, all paths return the same result, deduplicated through
313    ///       the `ResultIter`, thus overall `into_result` returns
314    ///       `num_results`. In the case of adversarial peers or connectivity
315    ///       issues `ClosestDisjointPeersIter` tries to return the
316    ///       `num_results` closest benign peers, but as it can not
317    ///       differentiate benign from faulty paths it as well returns faulty
318    ///       peers and thus overall returns more than `num_results` peers.
319    pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
320        let result_per_path = self
321            .iters
322            .into_iter()
323            .map(|iter| iter.into_result().map(Key::from));
324
325        ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
326    }
327}
328
329/// Index into the [`ClosestDisjointPeersIter`] `iters` vector.
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331struct IteratorIndex(usize);
332
333impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
334    type Output = ClosestPeersIter;
335
336    fn index(&self, index: IteratorIndex) -> &Self::Output {
337        &self[index.0]
338    }
339}
340
341impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
342    fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
343        &mut self[index.0]
344    }
345}
346
347/// State tracking the iterator that yielded (i.e. tried to contact) a peer. See
348/// [`ClosestDisjointPeersIter::on_success`] for details.
349#[derive(Debug, PartialEq, Eq)]
350struct PeerState {
351    /// First iterator to yield the peer. Will be notified both of the outcome
352    /// (success/failure) as well as the closer peers.
353    initiated_by: IteratorIndex,
354    /// Keeping track of the response state. In case other iterators later on
355    /// yield the same peer, they can be notified of the response outcome.
356    response: ResponseState,
357}
358
359impl PeerState {
360    fn new(initiated_by: IteratorIndex) -> Self {
361        PeerState {
362            initiated_by,
363            response: ResponseState::Waiting,
364        }
365    }
366}
367
368#[derive(Debug, PartialEq, Eq)]
369enum ResponseState {
370    Waiting,
371    Succeeded,
372    Failed,
373}
374
375/// Iterator combining the result of multiple [`ClosestPeersIter`] into a single
376/// deduplicated ordered iterator.
377// Note: This operates under the assumption that `I` is ordered.
378#[derive(Clone, Debug)]
379struct ResultIter<I>
380where
381    I: Iterator<Item = Key<PeerId>>,
382{
383    target: KeyBytes,
384    iters: Vec<Peekable<I>>,
385}
386
387impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
388    fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
389        ResultIter {
390            target,
391            iters: iters.map(Iterator::peekable).collect(),
392        }
393    }
394}
395
396impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
397    type Item = I::Item;
398
399    fn next(&mut self) -> Option<Self::Item> {
400        let target = &self.target;
401
402        self.iters
403            .iter_mut()
404            // Find the iterator with the next closest peer.
405            .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
406                let Some(iter_a) = iter_a else {
407                    return Some(iter_b);
408                };
409
410                match (iter_a.peek(), iter_b.peek()) {
411                    (Some(next_a), Some(next_b)) => {
412                        if next_a == next_b {
413                            // Remove from one for deduplication.
414                            iter_b.next();
415                            return Some(iter_a);
416                        }
417
418                        if target.distance(next_a) < target.distance(next_b) {
419                            Some(iter_a)
420                        } else {
421                            Some(iter_b)
422                        }
423                    }
424                    (Some(_), None) => Some(iter_a),
425                    (None, Some(_)) => Some(iter_b),
426                    (None, None) => None,
427                }
428            })
429            // Pop off the next closest peer from that iterator.
430            .and_then(Iterator::next)
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use std::{collections::HashSet, iter};
437
438    use libp2p_core::multihash::Multihash;
439    use quickcheck::*;
440
441    use super::*;
442    use crate::SHA_256_MH;
443
444    impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
445        fn arbitrary(g: &mut Gen) -> Self {
446            let target = Target::arbitrary(g).0;
447            let num_closest_iters = g.gen_range(0..20 + 1);
448            let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
449
450            let iters = (0..num_closest_iters).map(|_| {
451                let num_peers = g.gen_range(0..20 + 1);
452                let mut peers = g
453                    .choose_multiple(&peers, num_peers)
454                    .cloned()
455                    .map(Key::from)
456                    .collect::<Vec<_>>();
457
458                peers.sort_unstable_by_key(|a| target.distance(a));
459
460                peers.into_iter()
461            });
462
463            ResultIter::new(target, iters)
464        }
465
466        fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
467            let peers = self
468                .iters
469                .clone()
470                .into_iter()
471                .flatten()
472                .collect::<HashSet<_>>()
473                .into_iter()
474                .collect::<Vec<_>>();
475
476            let iters = self
477                .iters
478                .clone()
479                .into_iter()
480                .map(|iter| iter.collect::<Vec<_>>())
481                .collect();
482
483            Box::new(ResultIterShrinker {
484                target: self.target,
485                peers,
486                iters,
487            })
488        }
489    }
490
491    struct ResultIterShrinker {
492        target: KeyBytes,
493        peers: Vec<Key<PeerId>>,
494        iters: Vec<Vec<Key<PeerId>>>,
495    }
496
497    impl Iterator for ResultIterShrinker {
498        type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
499
500        /// Return an iterator of [`ResultIter`]s with each of them missing a
501        /// different peer from the original set.
502        fn next(&mut self) -> Option<Self::Item> {
503            // The peer that should not be included.
504            let peer = self.peers.pop()?;
505
506            let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
507                iter.retain(|p| p != &peer);
508                if iter.is_empty() {
509                    return None;
510                }
511                Some(iter.into_iter())
512            });
513
514            Some(ResultIter::new(self.target, iters))
515        }
516    }
517
518    #[derive(Clone, Debug)]
519    struct ArbitraryPeerId(PeerId);
520
521    impl Arbitrary for ArbitraryPeerId {
522        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
523            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
524            let peer_id =
525                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
526            ArbitraryPeerId(peer_id)
527        }
528    }
529
530    #[derive(Clone, Debug)]
531    struct Target(KeyBytes);
532
533    impl Arbitrary for Target {
534        fn arbitrary(g: &mut Gen) -> Self {
535            let peer_id = ArbitraryPeerId::arbitrary(g).0;
536            Target(Key::from(peer_id).into())
537        }
538    }
539
540    fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
541        (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
542    }
543
544    #[test]
545    fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
546        fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
547            let expected = {
548                let mut deduplicated = result_iter
549                    .clone()
550                    .iters
551                    .into_iter()
552                    .flatten()
553                    .collect::<HashSet<_>>()
554                    .into_iter()
555                    .collect::<Vec<_>>();
556
557                deduplicated.sort_unstable_by(|a, b| {
558                    result_iter
559                        .target
560                        .distance(a)
561                        .cmp(&result_iter.target.distance(b))
562                });
563
564                deduplicated
565            };
566
567            assert_eq!(expected, result_iter.collect::<Vec<_>>());
568        }
569
570        QuickCheck::new().quickcheck(prop as fn(_))
571    }
572
573    #[derive(Debug, Clone)]
574    struct Parallelism(NonZeroUsize);
575
576    impl Arbitrary for Parallelism {
577        fn arbitrary(g: &mut Gen) -> Self {
578            Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
579        }
580    }
581
582    #[derive(Debug, Clone)]
583    struct NumResults(NonZeroUsize);
584
585    impl Arbitrary for NumResults {
586        fn arbitrary(g: &mut Gen) -> Self {
587            NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
588        }
589    }
590
591    impl Arbitrary for ClosestPeersIterConfig {
592        fn arbitrary(g: &mut Gen) -> Self {
593            ClosestPeersIterConfig {
594                parallelism: Parallelism::arbitrary(g).0,
595                num_results: NumResults::arbitrary(g).0,
596                peer_timeout: Duration::from_secs(1),
597            }
598        }
599    }
600
601    #[test]
602    fn s_kademlia_disjoint_paths() {
603        let now = Instant::now();
604        let target: KeyBytes = Key::from(PeerId::random()).into();
605
606        let mut pool = [0; 12]
607            .iter()
608            .map(|_| Key::from(PeerId::random()))
609            .collect::<Vec<_>>();
610
611        pool.sort_unstable_by_key(|a| target.distance(a));
612
613        let known_closest_peers = pool.split_off(pool.len() - 3);
614
615        let config = ClosestPeersIterConfig {
616            parallelism: NonZeroUsize::new(3).unwrap(),
617            num_results: NonZeroUsize::new(3).unwrap(),
618            ..ClosestPeersIterConfig::default()
619        };
620
621        let mut peers_iter =
622            ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
623
624        ////////////////////////////////////////////////////////////////////////
625        // First round.
626
627        for _ in 0..3 {
628            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
629                assert!(known_closest_peers.contains(&Key::from(peer)));
630            } else {
631                panic!("Expected iterator to return peer to query.");
632            }
633        }
634
635        assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
636
637        let response_2 = pool.split_off(pool.len() - 3);
638        let response_3 = pool.split_off(pool.len() - 3);
639        // Keys are closer than any of the previous two responses from honest
640        // node 1 and 2.
641        let malicious_response_1 = pool.split_off(pool.len() - 3);
642
643        // Response from malicious peer 1.
644        peers_iter.on_success(
645            known_closest_peers[0].preimage(),
646            malicious_response_1
647                .clone()
648                .into_iter()
649                .map(|k| *k.preimage()),
650        );
651
652        // Response from peer 2.
653        peers_iter.on_success(
654            known_closest_peers[1].preimage(),
655            response_2.clone().into_iter().map(|k| *k.preimage()),
656        );
657
658        // Response from peer 3.
659        peers_iter.on_success(
660            known_closest_peers[2].preimage(),
661            response_3.clone().into_iter().map(|k| *k.preimage()),
662        );
663
664        ////////////////////////////////////////////////////////////////////////
665        // Second round.
666
667        let mut next_to_query = vec![];
668        for _ in 0..3 {
669            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
670                next_to_query.push(peer)
671            } else {
672                panic!("Expected iterator to return peer to query.");
673            }
674        }
675
676        // Expect a peer from each disjoint path.
677        assert!(next_to_query.contains(malicious_response_1[0].preimage()));
678        assert!(next_to_query.contains(response_2[0].preimage()));
679        assert!(next_to_query.contains(response_3[0].preimage()));
680
681        for peer in next_to_query {
682            peers_iter.on_success(&peer, vec![]);
683        }
684
685        // Mark all remaining peers as succeeded.
686        for _ in 0..6 {
687            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
688                peers_iter.on_success(&peer, vec![]);
689            } else {
690                panic!("Expected iterator to return peer to query.");
691            }
692        }
693
694        assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
695
696        let final_peers: Vec<_> = peers_iter.into_result().collect();
697
698        // Expect final result to contain peer from each disjoint path, even
699        // though not all are among the best ones.
700        assert!(final_peers.contains(malicious_response_1[0].preimage()));
701        assert!(final_peers.contains(response_2[0].preimage()));
702        assert!(final_peers.contains(response_3[0].preimage()));
703    }
704
705    #[derive(Clone)]
706    struct Graph(HashMap<PeerId, Peer>);
707
708    impl std::fmt::Debug for Graph {
709        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710            fmt.debug_list().entries(self.0.keys()).finish()
711        }
712    }
713
714    impl Arbitrary for Graph {
715        fn arbitrary(g: &mut Gen) -> Self {
716            let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
717                .into_iter()
718                .map(|peer_id| (peer_id, Key::from(peer_id)))
719                .collect::<Vec<_>>();
720
721            // Make each peer aware of its direct neighborhood.
722            let mut peers = peer_ids
723                .clone()
724                .into_iter()
725                .map(|(peer_id, key)| {
726                    peer_ids
727                        .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
728
729                    assert_eq!(peer_id, peer_ids[0].0);
730
731                    let known_peers = peer_ids
732                        .iter()
733                        // Skip itself.
734                        .skip(1)
735                        .take(K_VALUE.get())
736                        .cloned()
737                        .collect::<Vec<_>>();
738
739                    (peer_id, Peer { known_peers })
740                })
741                .collect::<HashMap<_, _>>();
742
743            // Make each peer aware of a random set of other peers within the graph.
744            for (peer_id, peer) in peers.iter_mut() {
745                g.shuffle(&mut peer_ids);
746
747                let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
748                let mut random_peer_ids = g
749                    .choose_multiple(&peer_ids, num_peers)
750                    // Make sure not to include itself.
751                    .filter(|(id, _)| peer_id != id)
752                    .cloned()
753                    .collect::<Vec<_>>();
754
755                peer.known_peers.append(&mut random_peer_ids);
756                peer.known_peers = std::mem::take(&mut peer.known_peers)
757                    // Deduplicate peer ids.
758                    .into_iter()
759                    .collect::<HashSet<_>>()
760                    .into_iter()
761                    .collect();
762            }
763
764            Graph(peers)
765        }
766    }
767
768    impl Graph {
769        fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
770            *self
771                .0
772                .keys()
773                .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
774                .fold(None, |acc, (distance_b, peer_id_b)| match acc {
775                    None => Some((distance_b, peer_id_b)),
776                    Some((distance_a, peer_id_a)) => {
777                        if distance_a < distance_b {
778                            Some((distance_a, peer_id_a))
779                        } else {
780                            Some((distance_b, peer_id_b))
781                        }
782                    }
783                })
784                .expect("Graph to have at least one peer.")
785                .1
786        }
787    }
788
789    #[derive(Debug, Clone)]
790    struct Peer {
791        known_peers: Vec<(PeerId, Key<PeerId>)>,
792    }
793
794    impl Peer {
795        fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
796            self.known_peers
797                .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
798
799            self.known_peers
800                .iter()
801                .take(K_VALUE.get())
802                .map(|(id, _)| id)
803                .cloned()
804                .collect()
805        }
806    }
807
808    enum PeerIterator {
809        Disjoint(ClosestDisjointPeersIter),
810        Closest(ClosestPeersIter),
811    }
812
813    impl PeerIterator {
814        fn next(&mut self, now: Instant) -> PeersIterState<'_> {
815            match self {
816                PeerIterator::Disjoint(iter) => iter.next(now),
817                PeerIterator::Closest(iter) => iter.next(now),
818            }
819        }
820
821        fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
822            match self {
823                PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
824                PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
825            };
826        }
827
828        fn into_result(self) -> Vec<PeerId> {
829            match self {
830                PeerIterator::Disjoint(iter) => iter.into_result().collect(),
831                PeerIterator::Closest(iter) => iter.into_result().collect(),
832            }
833        }
834    }
835
836    /// Ensure [`ClosestPeersIter`] and [`ClosestDisjointPeersIter`] yield same closest peers.
837    #[test]
838    fn closest_and_disjoint_closest_yield_same_result() {
839        fn prop(
840            target: Target,
841            graph: Graph,
842            parallelism: Parallelism,
843            num_results: NumResults,
844        ) -> TestResult {
845            if parallelism.0 > num_results.0 {
846                return TestResult::discard();
847            }
848
849            let target: KeyBytes = target.0;
850            let closest_peer = graph.get_closest_peer(&target);
851
852            let mut known_closest_peers = graph
853                .0
854                .iter()
855                .take(K_VALUE.get())
856                .map(|(key, _peers)| Key::from(*key))
857                .collect::<Vec<_>>();
858            known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
859
860            let cfg = ClosestPeersIterConfig {
861                parallelism: parallelism.0,
862                num_results: num_results.0,
863                ..ClosestPeersIterConfig::default()
864            };
865
866            let closest = drive_to_finish(
867                PeerIterator::Closest(ClosestPeersIter::with_config(
868                    cfg.clone(),
869                    target,
870                    known_closest_peers.clone(),
871                )),
872                graph.clone(),
873                &target,
874            );
875
876            let disjoint = drive_to_finish(
877                PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
878                    cfg,
879                    target,
880                    known_closest_peers.clone(),
881                )),
882                graph,
883                &target,
884            );
885
886            assert!(
887                closest.contains(&closest_peer),
888                "Expected `ClosestPeersIter` to find closest peer.",
889            );
890            assert!(
891                disjoint.contains(&closest_peer),
892                "Expected `ClosestDisjointPeersIter` to find closest peer.",
893            );
894
895            assert!(
896                closest.len() == num_results.0.get(),
897                "Expected `ClosestPeersIter` to find `num_results` closest \
898                 peers."
899            );
900            assert!(
901                disjoint.len() >= num_results.0.get(),
902                "Expected `ClosestDisjointPeersIter` to find at least \
903                 `num_results` closest peers."
904            );
905
906            if closest.len() > disjoint.len() {
907                let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
908
909                panic!(
910                    "Expected `ClosestDisjointPeersIter` to find all peers \
911                     found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
912                );
913            };
914
915            TestResult::passed()
916        }
917
918        fn drive_to_finish(
919            mut iter: PeerIterator,
920            mut graph: Graph,
921            target: &KeyBytes,
922        ) -> HashSet<PeerId> {
923            let now = Instant::now();
924            loop {
925                match iter.next(now) {
926                    PeersIterState::Waiting(Some(peer_id)) => {
927                        let peer_id = peer_id.clone().into_owned();
928                        let closest_peers =
929                            graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
930                        iter.on_success(&peer_id, closest_peers);
931                    }
932                    PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
933                        panic!("There is never more than one request in flight.")
934                    }
935                    PeersIterState::Finished => break,
936                }
937            }
938
939            let mut result = iter
940                .into_result()
941                .into_iter()
942                .map(Key::from)
943                .collect::<Vec<_>>();
944            result.sort_unstable_by_key(|a| target.distance(a));
945            result.into_iter().map(|k| k.into_preimage()).collect()
946        }
947
948        QuickCheck::new()
949            .tests(10)
950            .quickcheck(prop as fn(_, _, _, _) -> _)
951    }
952
953    #[test]
954    fn failure_can_not_overwrite_previous_success() {
955        let now = Instant::now();
956        let peer = PeerId::random();
957        let mut iter = ClosestDisjointPeersIter::new(
958            Key::from(PeerId::random()).into(),
959            iter::once(Key::from(peer)),
960        );
961
962        assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
963
964        // Expect peer to be marked as succeeded.
965        assert!(iter.on_success(&peer, iter::empty()));
966        assert_eq!(
967            iter.contacted_peers.get(&peer),
968            Some(&PeerState {
969                initiated_by: IteratorIndex(0),
970                response: ResponseState::Succeeded,
971            })
972        );
973
974        // Expect peer to stay marked as succeeded.
975        assert!(!iter.on_failure(&peer));
976        assert_eq!(
977            iter.contacted_peers.get(&peer),
978            Some(&PeerState {
979                initiated_by: IteratorIndex(0),
980                response: ResponseState::Succeeded,
981            })
982        );
983    }
984}