libp2p_gossipsub/
backoff.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Data structure for efficiently storing known back-off's when pruning peers.
22use std::{
23    collections::{
24        hash_map::{Entry, HashMap},
25        HashSet,
26    },
27    time::Duration,
28};
29
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::topic::TopicHash;
34
35#[derive(Copy, Clone)]
36struct HeartbeatIndex(usize);
37
38/// Stores backoffs in an efficient manner.
39pub(crate) struct BackoffStorage {
40    /// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic.
41    backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
42    /// Stores peer topic pairs per heartbeat (this is cyclic the current index is
43    /// heartbeat_index).
44    backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
45    /// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat.
46    heartbeat_index: HeartbeatIndex,
47    /// The heartbeat interval duration from the config.
48    heartbeat_interval: Duration,
49    /// Backoff slack from the config.
50    backoff_slack: u32,
51}
52
53impl BackoffStorage {
54    fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
55        d.as_nanos().div_ceil(heartbeat_interval.as_nanos()) as usize
56    }
57
58    pub(crate) fn new(
59        prune_backoff: &Duration,
60        heartbeat_interval: Duration,
61        backoff_slack: u32,
62    ) -> BackoffStorage {
63        // We add one additional slot for partial heartbeat
64        let max_heartbeats =
65            Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
66        BackoffStorage {
67            backoffs: HashMap::new(),
68            backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
69            heartbeat_index: HeartbeatIndex(0),
70            heartbeat_interval,
71            backoff_slack,
72        }
73    }
74
75    /// Updates the backoff for a peer (if there is already a more restrictive backoff then this
76    /// call doesn't change anything).
77    pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
78        let instant = Instant::now() + time;
79        let insert_into_backoffs_by_heartbeat =
80            |heartbeat_index: HeartbeatIndex,
81             backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
82             heartbeat_interval,
83             backoff_slack| {
84                let pair = (topic.clone(), *peer);
85                let index = (heartbeat_index.0
86                    + Self::heartbeats(&time, heartbeat_interval)
87                    + backoff_slack as usize)
88                    % backoffs_by_heartbeat.len();
89                backoffs_by_heartbeat[index].insert(pair);
90                HeartbeatIndex(index)
91            };
92        match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
93            Entry::Occupied(mut o) => {
94                let (backoff, index) = o.get();
95                if backoff < &instant {
96                    let pair = (topic.clone(), *peer);
97                    if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
98                        s.remove(&pair);
99                    }
100                    let index = insert_into_backoffs_by_heartbeat(
101                        self.heartbeat_index,
102                        &mut self.backoffs_by_heartbeat,
103                        &self.heartbeat_interval,
104                        self.backoff_slack,
105                    );
106                    o.insert((instant, index));
107                }
108            }
109            Entry::Vacant(v) => {
110                let index = insert_into_backoffs_by_heartbeat(
111                    self.heartbeat_index,
112                    &mut self.backoffs_by_heartbeat,
113                    &self.heartbeat_interval,
114                    self.backoff_slack,
115                );
116                v.insert((instant, index));
117            }
118        };
119    }
120
121    /// Checks if a given peer is backoffed for the given topic. This method respects the
122    /// configured BACKOFF_SLACK and may return true even if the backup is already over.
123    /// It is guaranteed to return false if the backoff is not over and eventually if enough time
124    /// passed true if the backoff is over.
125    ///
126    /// This method should be used for deciding if we can already send a GRAFT to a previously
127    /// backoffed peer.
128    pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
129        self.backoffs
130            .get(topic)
131            .is_some_and(|m| m.contains_key(peer))
132    }
133
134    pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
135        Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
136    }
137
138    fn get_backoff_time_from_backoffs(
139        backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
140        topic: &TopicHash,
141        peer: &PeerId,
142    ) -> Option<Instant> {
143        backoffs
144            .get(topic)
145            .and_then(|m| m.get(peer).map(|(i, _)| *i))
146    }
147
148    /// Applies a heartbeat. That should be called regularly in intervals of length
149    /// `heartbeat_interval`.
150    pub(crate) fn heartbeat(&mut self) {
151        // Clean up backoffs_by_heartbeat
152        if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
153            let backoffs = &mut self.backoffs;
154            let slack = self.heartbeat_interval * self.backoff_slack;
155            let now = Instant::now();
156            s.retain(|(topic, peer)| {
157                let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
158                    Some(backoff_time) => backoff_time + slack > now,
159                    None => false,
160                };
161                if !keep {
162                    // remove from backoffs
163                    if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
164                        if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
165                            m.remove();
166                        }
167                    }
168                }
169
170                keep
171            });
172        }
173
174        // Increase heartbeat index
175        self.heartbeat_index =
176            HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
177    }
178}