libp2p_relay/behaviour/
rate_limiter.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, VecDeque},
23    hash::Hash,
24    net::IpAddr,
25    num::NonZeroU32,
26    time::Duration,
27};
28
29use libp2p_core::multiaddr::{Multiaddr, Protocol};
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33/// Allows rate limiting access to some resource based on the [`PeerId`] and
34/// [`Multiaddr`] of a remote peer.
35// See [`new_per_peer`] and [`new_per_ip`] for precast implementations. Use
36// [`GenericRateLimiter`] to build your own, e.g. based on the autonomous system
37// number of a peers IP address.
38pub trait RateLimiter: Send {
39    fn try_next(&mut self, peer: PeerId, addr: &Multiaddr, now: Instant) -> bool;
40}
41
42pub(crate) fn new_per_peer(config: GenericRateLimiterConfig) -> Box<dyn RateLimiter> {
43    let mut limiter = GenericRateLimiter::new(config);
44    Box::new(move |peer_id, _addr: &Multiaddr, now| limiter.try_next(peer_id, now))
45}
46
47pub(crate) fn new_per_ip(config: GenericRateLimiterConfig) -> Box<dyn RateLimiter> {
48    let mut limiter = GenericRateLimiter::new(config);
49    Box::new(move |_peer_id, addr: &Multiaddr, now| {
50        multiaddr_to_ip(addr)
51            .map(|a| limiter.try_next(a, now))
52            .unwrap_or(true)
53    })
54}
55
56impl<T: FnMut(PeerId, &Multiaddr, Instant) -> bool + Send> RateLimiter for T {
57    fn try_next(&mut self, peer: PeerId, addr: &Multiaddr, now: Instant) -> bool {
58        self(peer, addr, now)
59    }
60}
61
62fn multiaddr_to_ip(addr: &Multiaddr) -> Option<IpAddr> {
63    addr.iter().find_map(|p| match p {
64        Protocol::Ip4(addr) => Some(addr.into()),
65        Protocol::Ip6(addr) => Some(addr.into()),
66        _ => None,
67    })
68}
69
70/// Rate limiter using the [Token Bucket] algorithm.
71///
72/// [Token Bucket]: https://en.wikipedia.org/wiki/Token_bucket
73pub(crate) struct GenericRateLimiter<Id> {
74    limit: u32,
75    interval: Duration,
76
77    refill_schedule: VecDeque<(Instant, Id)>,
78    buckets: HashMap<Id, u32>,
79}
80
81/// Configuration for a [`GenericRateLimiter`].
82#[derive(Debug, Clone, Copy)]
83pub(crate) struct GenericRateLimiterConfig {
84    // The maximum number of tokens in the bucket at any point in time.
85    pub(crate) limit: NonZeroU32,
86    // The interval at which a single token is added to the bucket.
87    pub(crate) interval: Duration,
88}
89
90impl<Id: Eq + PartialEq + Hash + Clone> GenericRateLimiter<Id> {
91    pub(crate) fn new(config: GenericRateLimiterConfig) -> Self {
92        assert!(!config.interval.is_zero());
93
94        Self {
95            limit: config.limit.into(),
96            interval: config.interval,
97            refill_schedule: Default::default(),
98            buckets: Default::default(),
99        }
100    }
101
102    pub(crate) fn try_next(&mut self, id: Id, now: Instant) -> bool {
103        self.refill(now);
104
105        match self.buckets.get_mut(&id) {
106            // If the bucket exists, try to take a token.
107            Some(balance) => match balance.checked_sub(1) {
108                Some(a) => {
109                    *balance = a;
110                    true
111                }
112                None => false,
113            },
114            // If the bucket is missing, act like the bucket has `limit` number of tokens. Take one
115            // token and track the new bucket balance.
116            None => {
117                self.buckets.insert(id.clone(), self.limit - 1);
118                self.refill_schedule.push_back((now, id));
119                true
120            }
121        }
122    }
123
124    fn refill(&mut self, now: Instant) {
125        // Note when used with a high number of buckets: This loop refills all the to-be-refilled
126        // buckets at once, thus potentially delaying the parent call to `try_next`.
127        loop {
128            match self.refill_schedule.front() {
129                // Only continue if (a) there is a bucket and (b) the bucket has not already been
130                // refilled recently.
131                Some((last_refill, _)) if now.duration_since(*last_refill) >= self.interval => {}
132                // Otherwise stop refilling. Items in `refill_schedule` are sorted, thus, if the
133                // first ain't ready, none of them are.
134                _ => return,
135            };
136
137            let (last_refill, id) = self
138                .refill_schedule
139                .pop_front()
140                .expect("Queue not to be empty.");
141
142            // Get the current balance of the bucket.
143            let balance = self
144                .buckets
145                .get(&id)
146                .expect("Entry can only be removed via refill.");
147
148            // Calculate the new balance.
149            let duration_since = now.duration_since(last_refill);
150            let new_tokens = duration_since
151                .as_micros()
152                // Note that the use of `as_micros` limits the number of tokens to 10^6 per second.
153                .checked_div(self.interval.as_micros())
154                .and_then(|i| i.try_into().ok())
155                .unwrap_or(u32::MAX);
156            let new_balance = balance.checked_add(new_tokens).unwrap_or(u32::MAX);
157
158            // If the new balance is below the limit, update the bucket.
159            if new_balance < self.limit {
160                self.buckets
161                    .insert(id.clone(), new_balance)
162                    .expect("To override value.");
163                self.refill_schedule.push_back((now, id));
164            } else {
165                // If the balance is above the limit, the bucket can be removed, given that a
166                // non-existing bucket is equivalent to a bucket with `limit` tokens.
167                self.buckets.remove(&id);
168            }
169        }
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use quickcheck::{QuickCheck, TestResult};
176
177    use super::*;
178
179    #[test]
180    fn first() {
181        let id = 1;
182        let mut l = GenericRateLimiter::new(GenericRateLimiterConfig {
183            limit: NonZeroU32::new(10).unwrap(),
184            interval: Duration::from_secs(1),
185        });
186        assert!(l.try_next(id, Instant::now()));
187    }
188
189    #[test]
190    fn limits() {
191        let id = 1;
192        let now = Instant::now();
193        let mut l = GenericRateLimiter::new(GenericRateLimiterConfig {
194            limit: NonZeroU32::new(10).unwrap(),
195            interval: Duration::from_secs(1),
196        });
197        for _ in 0..10 {
198            assert!(l.try_next(id, now));
199        }
200
201        assert!(!l.try_next(id, now));
202    }
203
204    #[test]
205    fn refills() {
206        let id = 1;
207        let now = Instant::now();
208        let mut l = GenericRateLimiter::new(GenericRateLimiterConfig {
209            limit: NonZeroU32::new(10).unwrap(),
210            interval: Duration::from_secs(1),
211        });
212
213        for _ in 0..10 {
214            assert!(l.try_next(id, now));
215        }
216        assert!(!l.try_next(id, now));
217
218        let now = now + Duration::from_secs(1);
219        assert!(l.try_next(id, now));
220        assert!(!l.try_next(id, now));
221
222        let now = now + Duration::from_secs(10);
223        for _ in 0..10 {
224            assert!(l.try_next(id, now));
225        }
226    }
227
228    #[test]
229    fn move_at_half_interval_steps() {
230        let id = 1;
231        let now = Instant::now();
232        let mut l = GenericRateLimiter::new(GenericRateLimiterConfig {
233            limit: NonZeroU32::new(1).unwrap(),
234            interval: Duration::from_secs(2),
235        });
236
237        assert!(l.try_next(id, now));
238        assert!(!l.try_next(id, now));
239
240        let now = now + Duration::from_secs(1);
241        assert!(!l.try_next(id, now));
242
243        let now = now + Duration::from_secs(1);
244        assert!(l.try_next(id, now));
245    }
246
247    #[test]
248    fn garbage_collects() {
249        let now = Instant::now();
250        let mut l = GenericRateLimiter::new(GenericRateLimiterConfig {
251            limit: NonZeroU32::new(1).unwrap(),
252            interval: Duration::from_secs(1),
253        });
254
255        assert!(l.try_next(1, now));
256
257        let now = now + Duration::from_secs(1);
258        assert!(l.try_next(2, now));
259
260        assert_eq!(l.buckets.len(), 1);
261        assert_eq!(l.refill_schedule.len(), 1);
262    }
263
264    #[test]
265    fn quick_check() {
266        fn prop(limit: NonZeroU32, interval: Duration, events: Vec<(u32, Duration)>) -> TestResult {
267            if interval.is_zero() {
268                return TestResult::discard();
269            }
270
271            let mut now = Instant::now();
272            let mut l = GenericRateLimiter::new(GenericRateLimiterConfig { limit, interval });
273
274            for (id, d) in events {
275                now = if let Some(now) = now.checked_add(d) {
276                    now
277                } else {
278                    return TestResult::discard();
279                };
280                l.try_next(id, now);
281            }
282
283            now = if let Some(now) = interval
284                .checked_mul(limit.into())
285                .and_then(|full_interval| now.checked_add(full_interval))
286            {
287                now
288            } else {
289                return TestResult::discard();
290            };
291            assert!(l.try_next(1, now));
292
293            assert_eq!(l.buckets.len(), 1);
294            assert_eq!(l.refill_schedule.len(), 1);
295
296            TestResult::passed()
297        }
298
299        QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _)
300    }
301}