libp2p_kad/record/store/
memory.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{hash_map, hash_set, HashMap, HashSet},
23    iter,
24};
25
26use smallvec::SmallVec;
27
28use super::*;
29use crate::kbucket;
30
31/// In-memory implementation of a `RecordStore`.
32pub struct MemoryStore {
33    /// The identity of the peer owning the store.
34    local_key: kbucket::Key<PeerId>,
35    /// The configuration of the store.
36    config: MemoryStoreConfig,
37    /// The stored (regular) records.
38    records: HashMap<Key, Record>,
39    /// The stored provider records.
40    providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
41    /// The set of all provider records for the node identified by `local_key`.
42    ///
43    /// Must be kept in sync with `providers`.
44    provided: HashSet<ProviderRecord>,
45}
46
47/// Configuration for a `MemoryStore`.
48#[derive(Debug, Clone)]
49pub struct MemoryStoreConfig {
50    /// The maximum number of records.
51    pub max_records: usize,
52    /// The maximum size of record values, in bytes.
53    pub max_value_bytes: usize,
54    /// The maximum number of providers stored for a key.
55    ///
56    /// This should match up with the chosen replication factor.
57    pub max_providers_per_key: usize,
58    /// The maximum number of provider records for which the
59    /// local node is the provider.
60    pub max_provided_keys: usize,
61}
62
63impl Default for MemoryStoreConfig {
64    fn default() -> Self {
65        Self {
66            max_records: 1024,
67            max_value_bytes: 65 * 1024,
68            max_provided_keys: 1024,
69            max_providers_per_key: K_VALUE.get(),
70        }
71    }
72}
73
74impl MemoryStore {
75    /// Creates a new `MemoryRecordStore` with a default configuration.
76    pub fn new(local_id: PeerId) -> Self {
77        Self::with_config(local_id, Default::default())
78    }
79
80    /// Creates a new `MemoryRecordStore` with the given configuration.
81    pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
82        MemoryStore {
83            local_key: kbucket::Key::from(local_id),
84            config,
85            records: HashMap::default(),
86            provided: HashSet::default(),
87            providers: HashMap::default(),
88        }
89    }
90
91    /// Retains the records satisfying a predicate.
92    pub fn retain<F>(&mut self, f: F)
93    where
94        F: FnMut(&Key, &mut Record) -> bool,
95    {
96        self.records.retain(f);
97    }
98}
99
100impl RecordStore for MemoryStore {
101    type RecordsIter<'a> =
102        iter::Map<hash_map::Values<'a, Key, Record>, fn(&'a Record) -> Cow<'a, Record>>;
103
104    type ProvidedIter<'a> = iter::Map<
105        hash_set::Iter<'a, ProviderRecord>,
106        fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
107    >;
108
109    fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
110        self.records.get(k).map(Cow::Borrowed)
111    }
112
113    fn put(&mut self, r: Record) -> Result<()> {
114        if r.value.len() >= self.config.max_value_bytes {
115            return Err(Error::ValueTooLarge);
116        }
117
118        let num_records = self.records.len();
119
120        match self.records.entry(r.key.clone()) {
121            hash_map::Entry::Occupied(mut e) => {
122                e.insert(r);
123            }
124            hash_map::Entry::Vacant(e) => {
125                if num_records >= self.config.max_records {
126                    return Err(Error::MaxRecords);
127                }
128                e.insert(r);
129            }
130        }
131
132        Ok(())
133    }
134
135    fn remove(&mut self, k: &Key) {
136        self.records.remove(k);
137    }
138
139    fn records(&self) -> Self::RecordsIter<'_> {
140        self.records.values().map(Cow::Borrowed)
141    }
142
143    fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
144        let num_keys = self.providers.len();
145
146        // Obtain the entry
147        let providers = match self.providers.entry(record.key.clone()) {
148            e @ hash_map::Entry::Occupied(_) => e,
149            e @ hash_map::Entry::Vacant(_) => {
150                if self.config.max_provided_keys == num_keys {
151                    return Err(Error::MaxProvidedKeys);
152                }
153                e
154            }
155        }
156        .or_insert_with(Default::default);
157
158        for p in providers.iter_mut() {
159            if p.provider == record.provider {
160                // In-place update of an existing provider record.
161                if self.local_key.preimage() == &record.provider {
162                    self.provided.remove(p);
163                    self.provided.insert(record.clone());
164                }
165                *p = record;
166                return Ok(());
167            }
168        }
169
170        // If the providers list is full, we ignore the new provider.
171        // This strategy can mitigate Sybil attacks, in which an attacker
172        // floods the network with fake provider records.
173        if providers.len() == self.config.max_providers_per_key {
174            return Ok(());
175        }
176
177        // Otherwise, insert the new provider record.
178        if self.local_key.preimage() == &record.provider {
179            self.provided.insert(record.clone());
180        }
181        providers.push(record);
182
183        Ok(())
184    }
185
186    fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
187        self.providers
188            .get(key)
189            .map_or_else(Vec::new, |ps| ps.clone().into_vec())
190    }
191
192    fn provided(&self) -> Self::ProvidedIter<'_> {
193        self.provided.iter().map(Cow::Borrowed)
194    }
195
196    fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
197        if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
198            let providers = e.get_mut();
199            if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
200                let p = providers.remove(i);
201                if &p.provider == self.local_key.preimage() {
202                    self.provided.remove(&p);
203                }
204            }
205            if providers.is_empty() {
206                e.remove();
207            }
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use quickcheck::*;
215    use rand::Rng;
216
217    use super::*;
218    use crate::SHA_256_MH;
219
220    fn random_multihash() -> Multihash<64> {
221        Multihash::wrap(SHA_256_MH, &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
222    }
223    #[test]
224    fn put_get_remove_record() {
225        fn prop(r: Record) {
226            let mut store = MemoryStore::new(PeerId::random());
227            assert!(store.put(r.clone()).is_ok());
228            assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
229            store.remove(&r.key);
230            assert!(store.get(&r.key).is_none());
231        }
232        quickcheck(prop as fn(_))
233    }
234
235    #[test]
236    fn add_get_remove_provider() {
237        fn prop(r: ProviderRecord) {
238            let mut store = MemoryStore::new(PeerId::random());
239            assert!(store.add_provider(r.clone()).is_ok());
240            assert!(store.providers(&r.key).contains(&r));
241            store.remove_provider(&r.key, &r.provider);
242            assert!(!store.providers(&r.key).contains(&r));
243        }
244        quickcheck(prop as fn(_))
245    }
246
247    #[test]
248    fn provided() {
249        let id = PeerId::random();
250        let mut store = MemoryStore::new(id);
251        let key = random_multihash();
252        let rec = ProviderRecord::new(key, id, Vec::new());
253        assert!(store.add_provider(rec.clone()).is_ok());
254        assert_eq!(
255            vec![Cow::Borrowed(&rec)],
256            store.provided().collect::<Vec<_>>()
257        );
258        store.remove_provider(&rec.key, &id);
259        assert_eq!(store.provided().count(), 0);
260    }
261
262    #[test]
263    fn update_provider() {
264        let mut store = MemoryStore::new(PeerId::random());
265        let key = random_multihash();
266        let prv = PeerId::random();
267        let mut rec = ProviderRecord::new(key, prv, Vec::new());
268        assert!(store.add_provider(rec.clone()).is_ok());
269        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
270        rec.expires = Some(Instant::now());
271        assert!(store.add_provider(rec.clone()).is_ok());
272        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
273    }
274
275    #[test]
276    fn update_provided() {
277        let prv = PeerId::random();
278        let mut store = MemoryStore::new(prv);
279        let key = random_multihash();
280        let mut rec = ProviderRecord::new(key, prv, Vec::new());
281        assert!(store.add_provider(rec.clone()).is_ok());
282        assert_eq!(
283            vec![Cow::Borrowed(&rec)],
284            store.provided().collect::<Vec<_>>()
285        );
286        rec.expires = Some(Instant::now());
287        assert!(store.add_provider(rec.clone()).is_ok());
288        assert_eq!(
289            vec![Cow::Borrowed(&rec)],
290            store.provided().collect::<Vec<_>>()
291        );
292    }
293
294    #[test]
295    fn max_providers_per_key() {
296        let config = MemoryStoreConfig::default();
297        let key = kbucket::Key::new(Key::from(random_multihash()));
298
299        let mut store = MemoryStore::with_config(PeerId::random(), config.clone());
300        let peers = (0..config.max_providers_per_key)
301            .map(|_| PeerId::random())
302            .collect::<Vec<_>>();
303        for peer in peers {
304            let rec = ProviderRecord::new(key.preimage().clone(), peer, Vec::new());
305            assert!(store.add_provider(rec).is_ok());
306        }
307
308        // The new provider cannot be added because the key is already saturated.
309        let peer = PeerId::random();
310        let rec = ProviderRecord::new(key.preimage().clone(), peer, Vec::new());
311        assert!(store.add_provider(rec.clone()).is_ok());
312        assert!(!store.providers(&rec.key).contains(&rec));
313    }
314
315    #[test]
316    fn max_provided_keys() {
317        let mut store = MemoryStore::new(PeerId::random());
318        for _ in 0..store.config.max_provided_keys {
319            let key = random_multihash();
320            let prv = PeerId::random();
321            let rec = ProviderRecord::new(key, prv, Vec::new());
322            let _ = store.add_provider(rec);
323        }
324        let key = random_multihash();
325        let prv = PeerId::random();
326        let rec = ProviderRecord::new(key, prv, Vec::new());
327        match store.add_provider(rec) {
328            Err(Error::MaxProvidedKeys) => {}
329            _ => panic!("Unexpected result"),
330        }
331    }
332}