1use std::{
22 collections::HashMap,
23 iter::{Cycle, Map, Peekable},
24 ops::{Index, IndexMut, Range},
25};
26
27use super::*;
28
29pub(crate) struct ClosestDisjointPeersIter {
32 target: KeyBytes,
33
34 iters: Vec<ClosestPeersIter>,
36 iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
39
40 contacted_peers: HashMap<PeerId, PeerState>,
47}
48
49impl ClosestDisjointPeersIter {
50 #[cfg(test)]
52 pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
53 where
54 I: IntoIterator<Item = Key<PeerId>>,
55 {
56 Self::with_config(
57 ClosestPeersIterConfig::default(),
58 target,
59 known_closest_peers,
60 )
61 }
62
63 pub(crate) fn with_config<I, T>(
65 config: ClosestPeersIterConfig,
66 target: T,
67 known_closest_peers: I,
68 ) -> Self
69 where
70 I: IntoIterator<Item = Key<PeerId>>,
71 T: Into<KeyBytes> + Clone,
72 {
73 let peers = known_closest_peers
74 .into_iter()
75 .take(K_VALUE.get())
76 .collect::<Vec<_>>();
77 let iters = (0..config.parallelism.get())
78 .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
84 .collect::<Vec<_>>();
85
86 let iters_len = iters.len();
87
88 ClosestDisjointPeersIter {
89 target: target.into(),
90 iters,
91 iter_order: (0..iters_len)
92 .map(IteratorIndex as fn(usize) -> IteratorIndex)
93 .cycle(),
94 contacted_peers: HashMap::new(),
95 }
96 }
97
98 pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
109 let mut updated = false;
110
111 if let Some(PeerState {
112 initiated_by,
113 response,
114 }) = self.contacted_peers.get_mut(peer)
115 {
116 updated = self.iters[*initiated_by].on_failure(peer);
117
118 if updated {
119 *response = ResponseState::Failed;
120 }
121
122 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
123 if IteratorIndex(i) != *initiated_by {
124 iter.on_failure(peer);
127 }
128 }
129 }
130
131 updated
132 }
133
134 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
152 where
153 I: IntoIterator<Item = PeerId>,
154 {
155 let mut updated = false;
156
157 if let Some(PeerState {
158 initiated_by,
159 response,
160 }) = self.contacted_peers.get_mut(peer)
161 {
162 updated = self.iters[*initiated_by].on_success(peer, closer_peers);
165
166 if updated {
167 *response = ResponseState::Succeeded;
172 }
173
174 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
175 if IteratorIndex(i) != *initiated_by {
176 iter.on_success(peer, std::iter::empty());
183 }
184 }
185 }
186
187 updated
188 }
189
190 pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
191 let mut state = None;
192
193 for _ in 0..self.iters.len() {
195 let i = self.iter_order.next().expect("Cycle never ends.");
196 let iter = &mut self.iters[i];
197
198 loop {
199 match iter.next(now) {
200 PeersIterState::Waiting(None) => {
201 match state {
202 Some(PeersIterState::Waiting(Some(_))) => {
203 unreachable!();
207 }
208 Some(PeersIterState::Waiting(None)) => {}
209 Some(PeersIterState::WaitingAtCapacity) => {
210 state = Some(PeersIterState::Waiting(None))
213 }
214 Some(PeersIterState::Finished) => {
215 unreachable!();
217 }
218 None => state = Some(PeersIterState::Waiting(None)),
219 };
220
221 break;
222 }
223 PeersIterState::Waiting(Some(peer)) => {
224 match self.contacted_peers.get_mut(&*peer) {
225 Some(PeerState { response, .. }) => {
226 let peer = peer.into_owned();
228
229 match response {
230 ResponseState::Waiting => {}
234 ResponseState::Succeeded => {
235 iter.on_success(&peer, std::iter::empty());
240 }
241 ResponseState::Failed => {
242 iter.on_failure(&peer);
243 }
244 }
245 }
246 None => {
247 self.contacted_peers
249 .insert(peer.clone().into_owned(), PeerState::new(i));
250 return PeersIterState::Waiting(Some(Cow::Owned(
251 peer.into_owned(),
252 )));
253 }
254 }
255 }
256 PeersIterState::WaitingAtCapacity => {
257 match state {
258 Some(PeersIterState::Waiting(Some(_))) => {
259 unreachable!();
263 }
264 Some(PeersIterState::Waiting(None)) => {}
265 Some(PeersIterState::WaitingAtCapacity) => {}
266 Some(PeersIterState::Finished) => {
267 unreachable!();
269 }
270 None => state = Some(PeersIterState::WaitingAtCapacity),
271 };
272
273 break;
274 }
275 PeersIterState::Finished => break,
276 }
277 }
278 }
279
280 state.unwrap_or(PeersIterState::Finished)
281 }
282
283 pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
287 where
288 I: IntoIterator<Item = &'a PeerId>,
289 {
290 for peer in peers {
291 if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
292 self.iters[*initiated_by].finish();
293 }
294 }
295
296 self.is_finished()
297 }
298
299 pub(crate) fn finish(&mut self) {
301 for iter in &mut self.iters {
302 iter.finish();
303 }
304 }
305
306 pub(crate) fn is_finished(&self) -> bool {
308 self.iters.iter().all(|i| i.is_finished())
309 }
310
311 pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
320 let result_per_path = self
321 .iters
322 .into_iter()
323 .map(|iter| iter.into_result().map(Key::from));
324
325 ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
326 }
327}
328
329#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331struct IteratorIndex(usize);
332
333impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
334 type Output = ClosestPeersIter;
335
336 fn index(&self, index: IteratorIndex) -> &Self::Output {
337 &self[index.0]
338 }
339}
340
341impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
342 fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
343 &mut self[index.0]
344 }
345}
346
347#[derive(Debug, PartialEq, Eq)]
350struct PeerState {
351 initiated_by: IteratorIndex,
354 response: ResponseState,
357}
358
359impl PeerState {
360 fn new(initiated_by: IteratorIndex) -> Self {
361 PeerState {
362 initiated_by,
363 response: ResponseState::Waiting,
364 }
365 }
366}
367
368#[derive(Debug, PartialEq, Eq)]
369enum ResponseState {
370 Waiting,
371 Succeeded,
372 Failed,
373}
374
375#[derive(Clone, Debug)]
379struct ResultIter<I>
380where
381 I: Iterator<Item = Key<PeerId>>,
382{
383 target: KeyBytes,
384 iters: Vec<Peekable<I>>,
385}
386
387impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
388 fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
389 ResultIter {
390 target,
391 iters: iters.map(Iterator::peekable).collect(),
392 }
393 }
394}
395
396impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
397 type Item = I::Item;
398
399 fn next(&mut self) -> Option<Self::Item> {
400 let target = &self.target;
401
402 self.iters
403 .iter_mut()
404 .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
406 let Some(iter_a) = iter_a else {
407 return Some(iter_b);
408 };
409
410 match (iter_a.peek(), iter_b.peek()) {
411 (Some(next_a), Some(next_b)) => {
412 if next_a == next_b {
413 iter_b.next();
415 return Some(iter_a);
416 }
417
418 if target.distance(next_a) < target.distance(next_b) {
419 Some(iter_a)
420 } else {
421 Some(iter_b)
422 }
423 }
424 (Some(_), None) => Some(iter_a),
425 (None, Some(_)) => Some(iter_b),
426 (None, None) => None,
427 }
428 })
429 .and_then(Iterator::next)
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use std::{collections::HashSet, iter};
437
438 use libp2p_core::multihash::Multihash;
439 use quickcheck::*;
440
441 use super::*;
442 use crate::SHA_256_MH;
443
444 impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
445 fn arbitrary(g: &mut Gen) -> Self {
446 let target = Target::arbitrary(g).0;
447 let num_closest_iters = g.gen_range(0..20 + 1);
448 let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
449
450 let iters = (0..num_closest_iters).map(|_| {
451 let num_peers = g.gen_range(0..20 + 1);
452 let mut peers = g
453 .choose_multiple(&peers, num_peers)
454 .cloned()
455 .map(Key::from)
456 .collect::<Vec<_>>();
457
458 peers.sort_unstable_by_key(|a| target.distance(a));
459
460 peers.into_iter()
461 });
462
463 ResultIter::new(target, iters)
464 }
465
466 fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
467 let peers = self
468 .iters
469 .clone()
470 .into_iter()
471 .flatten()
472 .collect::<HashSet<_>>()
473 .into_iter()
474 .collect::<Vec<_>>();
475
476 let iters = self
477 .iters
478 .clone()
479 .into_iter()
480 .map(|iter| iter.collect::<Vec<_>>())
481 .collect();
482
483 Box::new(ResultIterShrinker {
484 target: self.target,
485 peers,
486 iters,
487 })
488 }
489 }
490
491 struct ResultIterShrinker {
492 target: KeyBytes,
493 peers: Vec<Key<PeerId>>,
494 iters: Vec<Vec<Key<PeerId>>>,
495 }
496
497 impl Iterator for ResultIterShrinker {
498 type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
499
500 fn next(&mut self) -> Option<Self::Item> {
503 let peer = self.peers.pop()?;
505
506 let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
507 iter.retain(|p| p != &peer);
508 if iter.is_empty() {
509 return None;
510 }
511 Some(iter.into_iter())
512 });
513
514 Some(ResultIter::new(self.target, iters))
515 }
516 }
517
518 #[derive(Clone, Debug)]
519 struct ArbitraryPeerId(PeerId);
520
521 impl Arbitrary for ArbitraryPeerId {
522 fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
523 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
524 let peer_id =
525 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
526 ArbitraryPeerId(peer_id)
527 }
528 }
529
530 #[derive(Clone, Debug)]
531 struct Target(KeyBytes);
532
533 impl Arbitrary for Target {
534 fn arbitrary(g: &mut Gen) -> Self {
535 let peer_id = ArbitraryPeerId::arbitrary(g).0;
536 Target(Key::from(peer_id).into())
537 }
538 }
539
540 fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
541 (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
542 }
543
544 #[test]
545 fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
546 fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
547 let expected = {
548 let mut deduplicated = result_iter
549 .clone()
550 .iters
551 .into_iter()
552 .flatten()
553 .collect::<HashSet<_>>()
554 .into_iter()
555 .collect::<Vec<_>>();
556
557 deduplicated.sort_unstable_by(|a, b| {
558 result_iter
559 .target
560 .distance(a)
561 .cmp(&result_iter.target.distance(b))
562 });
563
564 deduplicated
565 };
566
567 assert_eq!(expected, result_iter.collect::<Vec<_>>());
568 }
569
570 QuickCheck::new().quickcheck(prop as fn(_))
571 }
572
573 #[derive(Debug, Clone)]
574 struct Parallelism(NonZeroUsize);
575
576 impl Arbitrary for Parallelism {
577 fn arbitrary(g: &mut Gen) -> Self {
578 Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
579 }
580 }
581
582 #[derive(Debug, Clone)]
583 struct NumResults(NonZeroUsize);
584
585 impl Arbitrary for NumResults {
586 fn arbitrary(g: &mut Gen) -> Self {
587 NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
588 }
589 }
590
591 impl Arbitrary for ClosestPeersIterConfig {
592 fn arbitrary(g: &mut Gen) -> Self {
593 ClosestPeersIterConfig {
594 parallelism: Parallelism::arbitrary(g).0,
595 num_results: NumResults::arbitrary(g).0,
596 peer_timeout: Duration::from_secs(1),
597 }
598 }
599 }
600
601 #[test]
602 fn s_kademlia_disjoint_paths() {
603 let now = Instant::now();
604 let target: KeyBytes = Key::from(PeerId::random()).into();
605
606 let mut pool = [0; 12]
607 .iter()
608 .map(|_| Key::from(PeerId::random()))
609 .collect::<Vec<_>>();
610
611 pool.sort_unstable_by_key(|a| target.distance(a));
612
613 let known_closest_peers = pool.split_off(pool.len() - 3);
614
615 let config = ClosestPeersIterConfig {
616 parallelism: NonZeroUsize::new(3).unwrap(),
617 num_results: NonZeroUsize::new(3).unwrap(),
618 ..ClosestPeersIterConfig::default()
619 };
620
621 let mut peers_iter =
622 ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
623
624 for _ in 0..3 {
628 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
629 assert!(known_closest_peers.contains(&Key::from(peer)));
630 } else {
631 panic!("Expected iterator to return peer to query.");
632 }
633 }
634
635 assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
636
637 let response_2 = pool.split_off(pool.len() - 3);
638 let response_3 = pool.split_off(pool.len() - 3);
639 let malicious_response_1 = pool.split_off(pool.len() - 3);
642
643 peers_iter.on_success(
645 known_closest_peers[0].preimage(),
646 malicious_response_1
647 .clone()
648 .into_iter()
649 .map(|k| *k.preimage()),
650 );
651
652 peers_iter.on_success(
654 known_closest_peers[1].preimage(),
655 response_2.clone().into_iter().map(|k| *k.preimage()),
656 );
657
658 peers_iter.on_success(
660 known_closest_peers[2].preimage(),
661 response_3.clone().into_iter().map(|k| *k.preimage()),
662 );
663
664 let mut next_to_query = vec![];
668 for _ in 0..3 {
669 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
670 next_to_query.push(peer)
671 } else {
672 panic!("Expected iterator to return peer to query.");
673 }
674 }
675
676 assert!(next_to_query.contains(malicious_response_1[0].preimage()));
678 assert!(next_to_query.contains(response_2[0].preimage()));
679 assert!(next_to_query.contains(response_3[0].preimage()));
680
681 for peer in next_to_query {
682 peers_iter.on_success(&peer, vec![]);
683 }
684
685 for _ in 0..6 {
687 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
688 peers_iter.on_success(&peer, vec![]);
689 } else {
690 panic!("Expected iterator to return peer to query.");
691 }
692 }
693
694 assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
695
696 let final_peers: Vec<_> = peers_iter.into_result().collect();
697
698 assert!(final_peers.contains(malicious_response_1[0].preimage()));
701 assert!(final_peers.contains(response_2[0].preimage()));
702 assert!(final_peers.contains(response_3[0].preimage()));
703 }
704
705 #[derive(Clone)]
706 struct Graph(HashMap<PeerId, Peer>);
707
708 impl std::fmt::Debug for Graph {
709 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 fmt.debug_list().entries(self.0.keys()).finish()
711 }
712 }
713
714 impl Arbitrary for Graph {
715 fn arbitrary(g: &mut Gen) -> Self {
716 let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
717 .into_iter()
718 .map(|peer_id| (peer_id, Key::from(peer_id)))
719 .collect::<Vec<_>>();
720
721 let mut peers = peer_ids
723 .clone()
724 .into_iter()
725 .map(|(peer_id, key)| {
726 peer_ids
727 .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
728
729 assert_eq!(peer_id, peer_ids[0].0);
730
731 let known_peers = peer_ids
732 .iter()
733 .skip(1)
735 .take(K_VALUE.get())
736 .cloned()
737 .collect::<Vec<_>>();
738
739 (peer_id, Peer { known_peers })
740 })
741 .collect::<HashMap<_, _>>();
742
743 for (peer_id, peer) in peers.iter_mut() {
745 g.shuffle(&mut peer_ids);
746
747 let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
748 let mut random_peer_ids = g
749 .choose_multiple(&peer_ids, num_peers)
750 .filter(|(id, _)| peer_id != id)
752 .cloned()
753 .collect::<Vec<_>>();
754
755 peer.known_peers.append(&mut random_peer_ids);
756 peer.known_peers = std::mem::take(&mut peer.known_peers)
757 .into_iter()
759 .collect::<HashSet<_>>()
760 .into_iter()
761 .collect();
762 }
763
764 Graph(peers)
765 }
766 }
767
768 impl Graph {
769 fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
770 *self
771 .0
772 .keys()
773 .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
774 .fold(None, |acc, (distance_b, peer_id_b)| match acc {
775 None => Some((distance_b, peer_id_b)),
776 Some((distance_a, peer_id_a)) => {
777 if distance_a < distance_b {
778 Some((distance_a, peer_id_a))
779 } else {
780 Some((distance_b, peer_id_b))
781 }
782 }
783 })
784 .expect("Graph to have at least one peer.")
785 .1
786 }
787 }
788
789 #[derive(Debug, Clone)]
790 struct Peer {
791 known_peers: Vec<(PeerId, Key<PeerId>)>,
792 }
793
794 impl Peer {
795 fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
796 self.known_peers
797 .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
798
799 self.known_peers
800 .iter()
801 .take(K_VALUE.get())
802 .map(|(id, _)| id)
803 .cloned()
804 .collect()
805 }
806 }
807
808 enum PeerIterator {
809 Disjoint(ClosestDisjointPeersIter),
810 Closest(ClosestPeersIter),
811 }
812
813 impl PeerIterator {
814 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
815 match self {
816 PeerIterator::Disjoint(iter) => iter.next(now),
817 PeerIterator::Closest(iter) => iter.next(now),
818 }
819 }
820
821 fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
822 match self {
823 PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
824 PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
825 };
826 }
827
828 fn into_result(self) -> Vec<PeerId> {
829 match self {
830 PeerIterator::Disjoint(iter) => iter.into_result().collect(),
831 PeerIterator::Closest(iter) => iter.into_result().collect(),
832 }
833 }
834 }
835
836 #[test]
838 fn closest_and_disjoint_closest_yield_same_result() {
839 fn prop(
840 target: Target,
841 graph: Graph,
842 parallelism: Parallelism,
843 num_results: NumResults,
844 ) -> TestResult {
845 if parallelism.0 > num_results.0 {
846 return TestResult::discard();
847 }
848
849 let target: KeyBytes = target.0;
850 let closest_peer = graph.get_closest_peer(&target);
851
852 let mut known_closest_peers = graph
853 .0
854 .iter()
855 .take(K_VALUE.get())
856 .map(|(key, _peers)| Key::from(*key))
857 .collect::<Vec<_>>();
858 known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
859
860 let cfg = ClosestPeersIterConfig {
861 parallelism: parallelism.0,
862 num_results: num_results.0,
863 ..ClosestPeersIterConfig::default()
864 };
865
866 let closest = drive_to_finish(
867 PeerIterator::Closest(ClosestPeersIter::with_config(
868 cfg.clone(),
869 target,
870 known_closest_peers.clone(),
871 )),
872 graph.clone(),
873 &target,
874 );
875
876 let disjoint = drive_to_finish(
877 PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
878 cfg,
879 target,
880 known_closest_peers.clone(),
881 )),
882 graph,
883 &target,
884 );
885
886 assert!(
887 closest.contains(&closest_peer),
888 "Expected `ClosestPeersIter` to find closest peer.",
889 );
890 assert!(
891 disjoint.contains(&closest_peer),
892 "Expected `ClosestDisjointPeersIter` to find closest peer.",
893 );
894
895 assert!(
896 closest.len() == num_results.0.get(),
897 "Expected `ClosestPeersIter` to find `num_results` closest \
898 peers."
899 );
900 assert!(
901 disjoint.len() >= num_results.0.get(),
902 "Expected `ClosestDisjointPeersIter` to find at least \
903 `num_results` closest peers."
904 );
905
906 if closest.len() > disjoint.len() {
907 let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
908
909 panic!(
910 "Expected `ClosestDisjointPeersIter` to find all peers \
911 found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
912 );
913 };
914
915 TestResult::passed()
916 }
917
918 fn drive_to_finish(
919 mut iter: PeerIterator,
920 mut graph: Graph,
921 target: &KeyBytes,
922 ) -> HashSet<PeerId> {
923 let now = Instant::now();
924 loop {
925 match iter.next(now) {
926 PeersIterState::Waiting(Some(peer_id)) => {
927 let peer_id = peer_id.clone().into_owned();
928 let closest_peers =
929 graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
930 iter.on_success(&peer_id, closest_peers);
931 }
932 PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
933 panic!("There is never more than one request in flight.")
934 }
935 PeersIterState::Finished => break,
936 }
937 }
938
939 let mut result = iter
940 .into_result()
941 .into_iter()
942 .map(Key::from)
943 .collect::<Vec<_>>();
944 result.sort_unstable_by_key(|a| target.distance(a));
945 result.into_iter().map(|k| k.into_preimage()).collect()
946 }
947
948 QuickCheck::new()
949 .tests(10)
950 .quickcheck(prop as fn(_, _, _, _) -> _)
951 }
952
953 #[test]
954 fn failure_can_not_overwrite_previous_success() {
955 let now = Instant::now();
956 let peer = PeerId::random();
957 let mut iter = ClosestDisjointPeersIter::new(
958 Key::from(PeerId::random()).into(),
959 iter::once(Key::from(peer)),
960 );
961
962 assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
963
964 assert!(iter.on_success(&peer, iter::empty()));
966 assert_eq!(
967 iter.contacted_peers.get(&peer),
968 Some(&PeerState {
969 initiated_by: IteratorIndex(0),
970 response: ResponseState::Succeeded,
971 })
972 );
973
974 assert!(!iter.on_failure(&peer));
976 assert_eq!(
977 iter.contacted_peers.get(&peer),
978 Some(&PeerState {
979 initiated_by: IteratorIndex(0),
980 response: ResponseState::Succeeded,
981 })
982 );
983 }
984}