libp2p_kad/query/peers/
fixed.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{collections::hash_map::Entry, num::NonZeroUsize, vec};
22
23use fnv::FnvHashMap;
24
25use super::*;
26
27/// A peer iterator for a fixed set of peers.
28pub(crate) struct FixedPeersIter {
29    /// The permitted parallelism, i.e. number of pending results.
30    parallelism: NonZeroUsize,
31
32    /// The state of peers emitted by the iterator.
33    peers: FnvHashMap<PeerId, PeerState>,
34
35    /// The backlog of peers that can still be emitted.
36    iter: vec::IntoIter<PeerId>,
37
38    /// The internal state of the iterator.
39    state: State,
40}
41
42#[derive(Debug, PartialEq, Eq)]
43enum State {
44    Waiting { num_waiting: usize },
45    Finished,
46}
47
48#[derive(Copy, Clone, PartialEq, Eq)]
49enum PeerState {
50    /// The iterator is waiting for a result to be reported back for the peer.
51    Waiting,
52
53    /// The iterator has been informed that the attempt to contact the peer failed.
54    Failed,
55
56    /// The iterator has been informed of a successful result from the peer.
57    Succeeded,
58}
59
60impl FixedPeersIter {
61    #[allow(clippy::needless_collect)]
62    pub(crate) fn new<I>(peers: I, parallelism: NonZeroUsize) -> Self
63    where
64        I: IntoIterator<Item = PeerId>,
65    {
66        let peers = peers.into_iter().collect::<Vec<_>>();
67
68        Self {
69            parallelism,
70            peers: FnvHashMap::default(),
71            iter: peers.into_iter(),
72            state: State::Waiting { num_waiting: 0 },
73        }
74    }
75
76    /// Callback for delivering the result of a successful request to a peer.
77    ///
78    /// If the iterator is currently waiting for a result from `peer`,
79    /// the iterator state is updated and `true` is returned. In that
80    /// case, after calling this function, `next` should eventually be
81    /// called again to obtain the new state of the iterator.
82    ///
83    /// If the iterator is finished, it is not currently waiting for a
84    /// result from `peer`, or a result for `peer` has already been reported,
85    /// calling this function has no effect and `false` is returned.
86    pub(crate) fn on_success(&mut self, peer: &PeerId) -> bool {
87        if let State::Waiting { num_waiting } = &mut self.state {
88            if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
89                *state = PeerState::Succeeded;
90                *num_waiting -= 1;
91                return true;
92            }
93        }
94        false
95    }
96
97    /// Callback for informing the iterator about a failed request to a peer.
98    ///
99    /// If the iterator is currently waiting for a result from `peer`,
100    /// the iterator state is updated and `true` is returned. In that
101    /// case, after calling this function, `next` should eventually be
102    /// called again to obtain the new state of the iterator.
103    ///
104    /// If the iterator is finished, it is not currently waiting for a
105    /// result from `peer`, or a result for `peer` has already been reported,
106    /// calling this function has no effect and `false` is returned.
107    pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
108        if let State::Waiting { num_waiting } = &mut self.state {
109            if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
110                *state = PeerState::Failed;
111                *num_waiting -= 1;
112                return true;
113            }
114        }
115        false
116    }
117
118    pub(crate) fn finish(&mut self) {
119        if let State::Waiting { .. } = self.state {
120            self.state = State::Finished
121        }
122    }
123
124    /// Checks whether the iterator has finished.
125    pub(crate) fn is_finished(&self) -> bool {
126        self.state == State::Finished
127    }
128
129    pub(crate) fn next(&mut self) -> PeersIterState<'_> {
130        match &mut self.state {
131            State::Finished => PeersIterState::Finished,
132            State::Waiting { num_waiting } => {
133                if *num_waiting >= self.parallelism.get() {
134                    return PeersIterState::WaitingAtCapacity;
135                }
136                loop {
137                    match self.iter.next() {
138                        None => {
139                            if *num_waiting == 0 {
140                                self.state = State::Finished;
141                                return PeersIterState::Finished;
142                            } else {
143                                return PeersIterState::Waiting(None);
144                            }
145                        }
146                        Some(p) => match self.peers.entry(p) {
147                            Entry::Occupied(_) => {} // skip duplicates
148                            Entry::Vacant(e) => {
149                                *num_waiting += 1;
150                                e.insert(PeerState::Waiting);
151                                return PeersIterState::Waiting(Some(Cow::Owned(p)));
152                            }
153                        },
154                    }
155                }
156            }
157        }
158    }
159
160    pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
161        self.peers.into_iter().filter_map(|(p, s)| {
162            if let PeerState::Succeeded = s {
163                Some(p)
164            } else {
165                None
166            }
167        })
168    }
169}
170
171#[cfg(test)]
172mod test {
173    use super::*;
174
175    #[test]
176    fn decrease_num_waiting_on_failure() {
177        let mut iter = FixedPeersIter::new(
178            vec![PeerId::random(), PeerId::random()],
179            NonZeroUsize::new(1).unwrap(),
180        );
181
182        match iter.next() {
183            PeersIterState::Waiting(Some(peer)) => {
184                let peer = peer.into_owned();
185                iter.on_failure(&peer);
186            }
187            _ => panic!("Expected iterator to yield peer."),
188        }
189
190        match iter.next() {
191            PeersIterState::Waiting(Some(_)) => {}
192            PeersIterState::WaitingAtCapacity => panic!(
193                "Expected iterator to return another peer given that the \
194                 previous `on_failure` call should have allowed another peer \
195                 to be queried.",
196            ),
197            _ => panic!("Expected iterator to yield peer."),
198        }
199    }
200}