libp2p_gossipsub/
backoff.rs1use std::{
23 collections::{
24 hash_map::{Entry, HashMap},
25 HashSet,
26 },
27 time::Duration,
28};
29
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::topic::TopicHash;
34
35#[derive(Copy, Clone)]
36struct HeartbeatIndex(usize);
37
38pub(crate) struct BackoffStorage {
40 backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
42 backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
45 heartbeat_index: HeartbeatIndex,
47 heartbeat_interval: Duration,
49 backoff_slack: u32,
51}
52
53impl BackoffStorage {
54 fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
55 d.as_nanos().div_ceil(heartbeat_interval.as_nanos()) as usize
56 }
57
58 pub(crate) fn new(
59 prune_backoff: &Duration,
60 heartbeat_interval: Duration,
61 backoff_slack: u32,
62 ) -> BackoffStorage {
63 let max_heartbeats =
65 Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
66 BackoffStorage {
67 backoffs: HashMap::new(),
68 backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
69 heartbeat_index: HeartbeatIndex(0),
70 heartbeat_interval,
71 backoff_slack,
72 }
73 }
74
75 pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
78 let instant = Instant::now() + time;
79 let insert_into_backoffs_by_heartbeat =
80 |heartbeat_index: HeartbeatIndex,
81 backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
82 heartbeat_interval,
83 backoff_slack| {
84 let pair = (topic.clone(), *peer);
85 let index = (heartbeat_index.0
86 + Self::heartbeats(&time, heartbeat_interval)
87 + backoff_slack as usize)
88 % backoffs_by_heartbeat.len();
89 backoffs_by_heartbeat[index].insert(pair);
90 HeartbeatIndex(index)
91 };
92 match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
93 Entry::Occupied(mut o) => {
94 let (backoff, index) = o.get();
95 if backoff < &instant {
96 let pair = (topic.clone(), *peer);
97 if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
98 s.remove(&pair);
99 }
100 let index = insert_into_backoffs_by_heartbeat(
101 self.heartbeat_index,
102 &mut self.backoffs_by_heartbeat,
103 &self.heartbeat_interval,
104 self.backoff_slack,
105 );
106 o.insert((instant, index));
107 }
108 }
109 Entry::Vacant(v) => {
110 let index = insert_into_backoffs_by_heartbeat(
111 self.heartbeat_index,
112 &mut self.backoffs_by_heartbeat,
113 &self.heartbeat_interval,
114 self.backoff_slack,
115 );
116 v.insert((instant, index));
117 }
118 };
119 }
120
121 pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
129 self.backoffs
130 .get(topic)
131 .is_some_and(|m| m.contains_key(peer))
132 }
133
134 pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
135 Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
136 }
137
138 fn get_backoff_time_from_backoffs(
139 backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
140 topic: &TopicHash,
141 peer: &PeerId,
142 ) -> Option<Instant> {
143 backoffs
144 .get(topic)
145 .and_then(|m| m.get(peer).map(|(i, _)| *i))
146 }
147
148 pub(crate) fn heartbeat(&mut self) {
151 if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
153 let backoffs = &mut self.backoffs;
154 let slack = self.heartbeat_interval * self.backoff_slack;
155 let now = Instant::now();
156 s.retain(|(topic, peer)| {
157 let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
158 Some(backoff_time) => backoff_time + slack > now,
159 None => false,
160 };
161 if !keep {
162 if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
164 if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
165 m.remove();
166 }
167 }
168 }
169
170 keep
171 });
172 }
173
174 self.heartbeat_index =
176 HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
177 }
178}