libp2p_gossipsub/
time_cache.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! This implements a time-based LRU cache for checking gossipsub message duplicates.
22
23use std::{
24    collections::{
25        hash_map::{
26            self,
27            Entry::{Occupied, Vacant},
28        },
29        VecDeque,
30    },
31    time::Duration,
32};
33
34use fnv::FnvHashMap;
35use web_time::Instant;
36
37struct ExpiringElement<Element> {
38    /// The element that expires
39    element: Element,
40    /// The expire time.
41    expires: Instant,
42}
43
44pub(crate) struct TimeCache<Key, Value> {
45    /// Mapping a key to its value together with its latest expire time (can be updated through
46    /// reinserts).
47    map: FnvHashMap<Key, ExpiringElement<Value>>,
48    /// An ordered list of keys by expires time.
49    list: VecDeque<ExpiringElement<Key>>,
50    /// The time elements remain in the cache.
51    ttl: Duration,
52}
53
54pub(crate) struct OccupiedEntry<'a, K, V> {
55    entry: hash_map::OccupiedEntry<'a, K, ExpiringElement<V>>,
56}
57
58impl<'a, K, V> OccupiedEntry<'a, K, V>
59where
60    K: Eq + std::hash::Hash + Clone,
61{
62    pub(crate) fn into_mut(self) -> &'a mut V {
63        &mut self.entry.into_mut().element
64    }
65}
66
67pub(crate) struct VacantEntry<'a, K, V> {
68    expiration: Instant,
69    entry: hash_map::VacantEntry<'a, K, ExpiringElement<V>>,
70    list: &'a mut VecDeque<ExpiringElement<K>>,
71}
72
73impl<'a, K, V> VacantEntry<'a, K, V>
74where
75    K: Eq + std::hash::Hash + Clone,
76{
77    pub(crate) fn insert(self, value: V) -> &'a mut V {
78        self.list.push_back(ExpiringElement {
79            element: self.entry.key().clone(),
80            expires: self.expiration,
81        });
82        &mut self
83            .entry
84            .insert(ExpiringElement {
85                element: value,
86                expires: self.expiration,
87            })
88            .element
89    }
90}
91
92pub(crate) enum Entry<'a, K: 'a, V: 'a> {
93    Occupied(OccupiedEntry<'a, K, V>),
94    Vacant(VacantEntry<'a, K, V>),
95}
96
97impl<'a, K: 'a, V: 'a> Entry<'a, K, V>
98where
99    K: Eq + std::hash::Hash + Clone,
100{
101    pub(crate) fn or_default(self) -> &'a mut V
102    where
103        V: Default,
104    {
105        match self {
106            Entry::Occupied(entry) => entry.into_mut(),
107            Entry::Vacant(entry) => entry.insert(V::default()),
108        }
109    }
110}
111
112impl<Key, Value> TimeCache<Key, Value>
113where
114    Key: Eq + std::hash::Hash + Clone,
115{
116    pub(crate) fn new(ttl: Duration) -> Self {
117        TimeCache {
118            map: FnvHashMap::default(),
119            list: VecDeque::new(),
120            ttl,
121        }
122    }
123
124    fn remove_expired_keys(&mut self, now: Instant) {
125        while let Some(element) = self.list.pop_front() {
126            if element.expires > now {
127                self.list.push_front(element);
128                break;
129            }
130            if let Occupied(entry) = self.map.entry(element.element.clone()) {
131                if entry.get().expires <= now {
132                    entry.remove();
133                }
134            }
135        }
136    }
137
138    pub(crate) fn entry(&mut self, key: Key) -> Entry<Key, Value> {
139        let now = Instant::now();
140        self.remove_expired_keys(now);
141        match self.map.entry(key) {
142            Occupied(entry) => Entry::Occupied(OccupiedEntry { entry }),
143            Vacant(entry) => Entry::Vacant(VacantEntry {
144                expiration: now + self.ttl,
145                entry,
146                list: &mut self.list,
147            }),
148        }
149    }
150
151    /// Empties the entire cache.
152    #[cfg(test)]
153    pub(crate) fn clear(&mut self) {
154        self.map.clear();
155        self.list.clear();
156    }
157
158    pub(crate) fn contains_key(&self, key: &Key) -> bool {
159        self.map.contains_key(key)
160    }
161}
162
163pub(crate) struct DuplicateCache<Key>(TimeCache<Key, ()>);
164
165impl<Key> DuplicateCache<Key>
166where
167    Key: Eq + std::hash::Hash + Clone,
168{
169    pub(crate) fn new(ttl: Duration) -> Self {
170        Self(TimeCache::new(ttl))
171    }
172
173    // Inserts new elements and removes any expired elements.
174    //
175    // If the key was not present this returns `true`. If the value was already present this
176    // returns `false`.
177    pub(crate) fn insert(&mut self, key: Key) -> bool {
178        if let Entry::Vacant(entry) = self.0.entry(key) {
179            entry.insert(());
180            true
181        } else {
182            false
183        }
184    }
185
186    pub(crate) fn contains(&self, key: &Key) -> bool {
187        self.0.contains_key(key)
188    }
189}
190
191#[cfg(test)]
192mod test {
193    use super::*;
194
195    #[test]
196    fn cache_added_entries_exist() {
197        let mut cache = DuplicateCache::new(Duration::from_secs(10));
198
199        cache.insert("t");
200        cache.insert("e");
201
202        // Should report that 't' and 't' already exists
203        assert!(!cache.insert("t"));
204        assert!(!cache.insert("e"));
205    }
206
207    #[test]
208    fn cache_entries_expire() {
209        let mut cache = DuplicateCache::new(Duration::from_millis(100));
210
211        cache.insert("t");
212        assert!(!cache.insert("t"));
213        cache.insert("e");
214        // assert!(!cache.insert("t"));
215        assert!(!cache.insert("e"));
216        // sleep until cache expiry
217        std::thread::sleep(Duration::from_millis(101));
218        // add another element to clear previous cache
219        cache.insert("s");
220
221        // should be removed from the cache
222        assert!(cache.insert("t"));
223    }
224}