libp2p_kad/query/peers/fixed.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{collections::hash_map::Entry, num::NonZeroUsize, vec};
22
23use fnv::FnvHashMap;
24
25use super::*;
26
27/// A peer iterator for a fixed set of peers.
28pub(crate) struct FixedPeersIter {
29 /// The permitted parallelism, i.e. number of pending results.
30 parallelism: NonZeroUsize,
31
32 /// The state of peers emitted by the iterator.
33 peers: FnvHashMap<PeerId, PeerState>,
34
35 /// The backlog of peers that can still be emitted.
36 iter: vec::IntoIter<PeerId>,
37
38 /// The internal state of the iterator.
39 state: State,
40}
41
42#[derive(Debug, PartialEq, Eq)]
43enum State {
44 Waiting { num_waiting: usize },
45 Finished,
46}
47
48#[derive(Copy, Clone, PartialEq, Eq)]
49enum PeerState {
50 /// The iterator is waiting for a result to be reported back for the peer.
51 Waiting,
52
53 /// The iterator has been informed that the attempt to contact the peer failed.
54 Failed,
55
56 /// The iterator has been informed of a successful result from the peer.
57 Succeeded,
58}
59
60impl FixedPeersIter {
61 #[allow(clippy::needless_collect)]
62 pub(crate) fn new<I>(peers: I, parallelism: NonZeroUsize) -> Self
63 where
64 I: IntoIterator<Item = PeerId>,
65 {
66 let peers = peers.into_iter().collect::<Vec<_>>();
67
68 Self {
69 parallelism,
70 peers: FnvHashMap::default(),
71 iter: peers.into_iter(),
72 state: State::Waiting { num_waiting: 0 },
73 }
74 }
75
76 /// Callback for delivering the result of a successful request to a peer.
77 ///
78 /// If the iterator is currently waiting for a result from `peer`,
79 /// the iterator state is updated and `true` is returned. In that
80 /// case, after calling this function, `next` should eventually be
81 /// called again to obtain the new state of the iterator.
82 ///
83 /// If the iterator is finished, it is not currently waiting for a
84 /// result from `peer`, or a result for `peer` has already been reported,
85 /// calling this function has no effect and `false` is returned.
86 pub(crate) fn on_success(&mut self, peer: &PeerId) -> bool {
87 if let State::Waiting { num_waiting } = &mut self.state {
88 if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
89 *state = PeerState::Succeeded;
90 *num_waiting -= 1;
91 return true;
92 }
93 }
94 false
95 }
96
97 /// Callback for informing the iterator about a failed request to a peer.
98 ///
99 /// If the iterator is currently waiting for a result from `peer`,
100 /// the iterator state is updated and `true` is returned. In that
101 /// case, after calling this function, `next` should eventually be
102 /// called again to obtain the new state of the iterator.
103 ///
104 /// If the iterator is finished, it is not currently waiting for a
105 /// result from `peer`, or a result for `peer` has already been reported,
106 /// calling this function has no effect and `false` is returned.
107 pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
108 if let State::Waiting { num_waiting } = &mut self.state {
109 if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
110 *state = PeerState::Failed;
111 *num_waiting -= 1;
112 return true;
113 }
114 }
115 false
116 }
117
118 pub(crate) fn finish(&mut self) {
119 if let State::Waiting { .. } = self.state {
120 self.state = State::Finished
121 }
122 }
123
124 /// Checks whether the iterator has finished.
125 pub(crate) fn is_finished(&self) -> bool {
126 self.state == State::Finished
127 }
128
129 pub(crate) fn next(&mut self) -> PeersIterState<'_> {
130 match &mut self.state {
131 State::Finished => PeersIterState::Finished,
132 State::Waiting { num_waiting } => {
133 if *num_waiting >= self.parallelism.get() {
134 return PeersIterState::WaitingAtCapacity;
135 }
136 loop {
137 match self.iter.next() {
138 None => {
139 if *num_waiting == 0 {
140 self.state = State::Finished;
141 return PeersIterState::Finished;
142 } else {
143 return PeersIterState::Waiting(None);
144 }
145 }
146 Some(p) => match self.peers.entry(p) {
147 Entry::Occupied(_) => {} // skip duplicates
148 Entry::Vacant(e) => {
149 *num_waiting += 1;
150 e.insert(PeerState::Waiting);
151 return PeersIterState::Waiting(Some(Cow::Owned(p)));
152 }
153 },
154 }
155 }
156 }
157 }
158 }
159
160 pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
161 self.peers.into_iter().filter_map(|(p, s)| {
162 if let PeerState::Succeeded = s {
163 Some(p)
164 } else {
165 None
166 }
167 })
168 }
169}
170
171#[cfg(test)]
172mod test {
173 use super::*;
174
175 #[test]
176 fn decrease_num_waiting_on_failure() {
177 let mut iter = FixedPeersIter::new(
178 vec![PeerId::random(), PeerId::random()],
179 NonZeroUsize::new(1).unwrap(),
180 );
181
182 match iter.next() {
183 PeersIterState::Waiting(Some(peer)) => {
184 let peer = peer.into_owned();
185 iter.on_failure(&peer);
186 }
187 _ => panic!("Expected iterator to yield peer."),
188 }
189
190 match iter.next() {
191 PeersIterState::Waiting(Some(_)) => {}
192 PeersIterState::WaitingAtCapacity => panic!(
193 "Expected iterator to return another peer given that the \
194 previous `on_failure` call should have allowed another peer \
195 to be queried.",
196 ),
197 _ => panic!("Expected iterator to yield peer."),
198 }
199 }
200}