libp2p_peer_store/
memory_store.rs

1//! An in-memory [`Store`] implementation.
2//!
3//! ## Usage
4//! ```
5//! use libp2p_peer_store::{memory_store::MemoryStore, Behaviour};
6//!
7//! let store: MemoryStore<()> = MemoryStore::new(Default::default());
8//! let behaviour = Behaviour::new(store);
9//! ```
10
11use std::{
12    collections::{HashMap, VecDeque},
13    num::NonZeroUsize,
14    task::Waker,
15};
16
17use libp2p_core::{Multiaddr, PeerId};
18use libp2p_swarm::FromSwarm;
19use lru::LruCache;
20
21use super::Store;
22
23/// Event from the store and emitted to [`Swarm`](libp2p_swarm::Swarm).
24#[derive(Debug, Clone)]
25pub enum Event {
26    /// Custom data of the peer has been updated.
27    CustomDataUpdated(PeerId),
28}
29
30/// A in-memory store that uses LRU cache for bounded storage of addresses
31/// and a frequency-based ordering of addresses.
32#[derive(Default)]
33pub struct MemoryStore<T = ()> {
34    /// The internal store.
35    records: HashMap<PeerId, PeerRecord<T>>,
36    /// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm)
37    pending_events: VecDeque<crate::store::Event<Event>>,
38    /// Config of the store.
39    config: Config,
40    /// Waker for store events.
41    waker: Option<Waker>,
42}
43
44impl<T> MemoryStore<T> {
45    /// Create a new [`MemoryStore`] with the given config.
46    pub fn new(config: Config) -> Self {
47        Self {
48            config,
49            records: HashMap::new(),
50            pending_events: VecDeque::default(),
51            waker: None,
52        }
53    }
54
55    /// Update an address record and notify swarm when the address is new.  
56    /// Returns `true` when the address is new.  
57    pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
58        let is_updated = self.update_address_silent(peer, address);
59        if is_updated {
60            self.pending_events
61                .push_back(crate::store::Event::RecordUpdated(*peer));
62            if let Some(waker) = self.waker.take() {
63                waker.wake();
64            }
65        }
66        is_updated
67    }
68
69    /// Update an address record without notifying swarm.  
70    /// Returns `true` when the address is new.  
71    pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
72        if let Some(record) = self.records.get_mut(peer) {
73            return record.update_address(address);
74        }
75        let mut new_record = PeerRecord::new(self.config.record_capacity);
76        new_record.update_address(address);
77        self.records.insert(*peer, new_record);
78        true
79    }
80
81    /// Remove an address record.
82    /// Returns `true` when the address is removed.
83    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
84        self.records
85            .get_mut(peer)
86            .is_some_and(|r| r.remove_address(address))
87    }
88
89    pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
90        self.records.get(peer).and_then(|r| r.get_custom_data())
91    }
92
93    /// Take ownership of the internal data, leaving `None` in its place.
94    pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
95        self.records
96            .get_mut(peer)
97            .and_then(|r| r.take_custom_data())
98    }
99
100    /// Insert the data and notify the swarm about the update, dropping the old data if it exists.
101    pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
102        self.insert_custom_data_silent(peer, custom_data);
103        self.pending_events
104            .push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer)));
105        if let Some(waker) = self.waker.take() {
106            waker.wake();
107        }
108    }
109
110    /// Insert the data without notifying the swarm. Old data will be dropped if it exists.
111    pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) {
112        if let Some(r) = self.records.get_mut(peer) {
113            return r.insert_custom_data(custom_data);
114        }
115        let mut new_record = PeerRecord::new(self.config.record_capacity);
116        new_record.insert_custom_data(custom_data);
117        self.records.insert(*peer, new_record);
118    }
119
120    /// Iterate over all internal records.
121    pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
122        self.records.iter()
123    }
124
125    /// Iterate over all internal records mutably.  
126    /// Will not wake up the task.
127    pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
128        self.records.iter_mut()
129    }
130}
131
132impl<T> Store for MemoryStore<T> {
133    type FromStore = Event;
134
135    fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
136        match swarm_event {
137            FromSwarm::NewExternalAddrOfPeer(info) => {
138                self.update_address(&info.peer_id, info.addr);
139            }
140            FromSwarm::ConnectionEstablished(info) => {
141                let mut is_record_updated = false;
142                for failed_addr in info.failed_addresses {
143                    is_record_updated |= self.remove_address(&info.peer_id, failed_addr);
144                }
145                is_record_updated |=
146                    self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address());
147                if is_record_updated {
148                    self.pending_events
149                        .push_back(crate::store::Event::RecordUpdated(info.peer_id));
150                    if let Some(waker) = self.waker.take() {
151                        waker.wake(); // wake up because of update
152                    }
153                }
154            }
155            _ => {}
156        }
157    }
158
159    fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
160        self.records.get(peer).map(|record| record.addresses())
161    }
162
163    fn poll(
164        &mut self,
165        cx: &mut std::task::Context<'_>,
166    ) -> Option<crate::store::Event<Self::FromStore>> {
167        if self.pending_events.is_empty() {
168            self.waker = Some(cx.waker().clone());
169        }
170        self.pending_events.pop_front()
171    }
172}
173
174/// Config for [`MemoryStore`].
175#[derive(Debug, Clone)]
176pub struct Config {
177    /// The capacaity of an address store.  
178    /// The least active address will be discarded to make room for new address.
179    record_capacity: NonZeroUsize,
180}
181
182impl Default for Config {
183    fn default() -> Self {
184        Self {
185            record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
186        }
187    }
188}
189
190impl Config {
191    /// Capacity for address records.
192    /// The least active address will be dropped to make room for new address.
193    pub fn record_capacity(&self) -> &NonZeroUsize {
194        &self.record_capacity
195    }
196    /// Set the capacity for address records.
197    pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
198        self.record_capacity = capacity;
199        self
200    }
201}
202
203/// Internal record of [`MemoryStore`].
204#[derive(Debug, Clone)]
205pub struct PeerRecord<T> {
206    /// A LRU(Least Recently Used) cache for addresses.  
207    /// Will delete the least-recently-used record when full.
208    addresses: LruCache<Multiaddr, ()>,
209    /// Custom data attached to the peer.
210    custom_data: Option<T>,
211}
212impl<T> PeerRecord<T> {
213    pub(crate) fn new(cap: NonZeroUsize) -> Self {
214        Self {
215            addresses: LruCache::new(cap),
216            custom_data: None,
217        }
218    }
219
220    /// Iterate over all addresses. More recently-used address comes first.
221    /// Does not change the order.
222    pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
223        self.addresses.iter().map(|(addr, _)| addr)
224    }
225
226    /// Update the address in the LRU cache, promote it to the front if it exists,
227    /// insert it to the front if not.
228    /// Returns true when the address is new.
229    pub fn update_address(&mut self, address: &Multiaddr) -> bool {
230        if self.addresses.get(address).is_some() {
231            return false;
232        }
233        self.addresses.get_or_insert(address.clone(), || ());
234        true
235    }
236
237    /// Remove the address in the LRU cache regardless of its position.
238    /// Returns true when the address is removed, false when not exist.
239    pub fn remove_address(&mut self, address: &Multiaddr) -> bool {
240        self.addresses.pop(address).is_some()
241    }
242
243    pub fn get_custom_data(&self) -> Option<&T> {
244        self.custom_data.as_ref()
245    }
246
247    pub fn take_custom_data(&mut self) -> Option<T> {
248        self.custom_data.take()
249    }
250
251    pub fn insert_custom_data(&mut self, custom_data: T) {
252        let _ = self.custom_data.insert(custom_data);
253    }
254}
255
256#[cfg(test)]
257mod test {
258    use std::{num::NonZero, str::FromStr};
259
260    use libp2p::Swarm;
261    use libp2p_core::{Multiaddr, PeerId};
262    use libp2p_swarm::{NetworkBehaviour, SwarmEvent};
263    use libp2p_swarm_test::SwarmExt;
264
265    use super::MemoryStore;
266    use crate::Store;
267
268    #[test]
269    fn recent_use_bubble_up() {
270        let mut store: MemoryStore = MemoryStore::new(Default::default());
271        let peer = PeerId::random();
272        let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
273        let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
274        let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
275        store.update_address(&peer, &addr1);
276        store.update_address(&peer, &addr2);
277        store.update_address(&peer, &addr3);
278        assert!(
279            store
280                .records
281                .get(&peer)
282                .expect("peer to be in the store")
283                .addresses()
284                .collect::<Vec<_>>()
285                == vec![&addr3, &addr2, &addr1]
286        );
287        store.update_address(&peer, &addr1);
288        assert!(
289            store
290                .records
291                .get(&peer)
292                .expect("peer to be in the store")
293                .addresses()
294                .collect::<Vec<_>>()
295                == vec![&addr1, &addr3, &addr2]
296        );
297        store.update_address(&peer, &addr3);
298        assert!(
299            store
300                .records
301                .get(&peer)
302                .expect("peer to be in the store")
303                .addresses()
304                .collect::<Vec<_>>()
305                == vec![&addr3, &addr1, &addr2]
306        );
307    }
308
309    #[test]
310    fn bounded_store() {
311        let mut store: MemoryStore = MemoryStore::new(Default::default());
312        let peer = PeerId::random();
313        for i in 1..10 {
314            let addr_string = format!("/ip4/127.0.0.{}", i);
315            store.update_address(
316                &peer,
317                &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
318            );
319        }
320        let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
321        assert!(!store
322            .addresses_of_peer(&peer)
323            .expect("peer to be in the store")
324            .any(|addr| *addr == first_record));
325        let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
326        assert!(
327            *store
328                .addresses_of_peer(&peer)
329                .expect("peer to be in the store")
330                .last()
331                .expect("addr to exist")
332                == second_record
333        );
334    }
335
336    #[test]
337    fn update_address_on_connect() {
338        async fn expect_record_update(
339            swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
340            expected_peer: PeerId,
341        ) {
342            swarm
343                .wait(|ev| match ev {
344                    SwarmEvent::Behaviour(crate::Event::RecordUpdated { peer }) => {
345                        (peer == expected_peer).then_some(())
346                    }
347                    _ => None,
348                })
349                .await
350        }
351
352        let store1: MemoryStore<()> = MemoryStore::new(
353            crate::memory_store::Config::default()
354                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
355        );
356        let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
357        let store2: MemoryStore<()> = MemoryStore::new(
358            crate::memory_store::Config::default()
359                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
360        );
361        let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
362
363        let rt = tokio::runtime::Runtime::new().unwrap();
364
365        rt.block_on(async {
366            let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
367            let swarm1_peer_id = *swarm1.local_peer_id();
368            swarm2.dial(listen_addr.clone()).expect("dial to succeed");
369            let handle = spawn_wait_conn_established(swarm1);
370            swarm2
371                .wait(|ev| match ev {
372                    SwarmEvent::ConnectionEstablished { .. } => Some(()),
373                    _ => None,
374                })
375                .await;
376            let mut swarm1 = handle.await.expect("future to complete");
377            assert!(swarm2
378                .behaviour()
379                .address_of_peer(&swarm1_peer_id)
380                .expect("swarm should be connected and record about it should be created")
381                .any(|addr| *addr == listen_addr));
382            expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await;
383            let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
384            let handle = spawn_wait_conn_established(swarm1);
385            swarm2
386                .dial(
387                    libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
388                        .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
389                        .addresses(vec![new_listen_addr.clone()])
390                        .build(),
391                )
392                .expect("dial to succeed");
393            swarm2
394                .wait(|ev| match ev {
395                    SwarmEvent::ConnectionEstablished { .. } => Some(()),
396                    _ => None,
397                })
398                .await;
399            handle.await.expect("future to complete");
400            expect_record_update(&mut swarm2, swarm1_peer_id).await;
401            // The address in store will contain peer ID.
402            let new_listen_addr = new_listen_addr
403                .with_p2p(swarm1_peer_id)
404                .expect("extend to succeed");
405            assert!(
406                swarm2
407                    .behaviour()
408                    .address_of_peer(&swarm1_peer_id)
409                    .expect("peer to exist")
410                    .collect::<Vec<_>>()
411                    == vec![&new_listen_addr, &listen_addr]
412            );
413        })
414    }
415
416    #[test]
417    fn identify_external_addr_report() {
418        #[derive(NetworkBehaviour)]
419        struct Behaviour {
420            peer_store: crate::Behaviour<MemoryStore>,
421            identify: libp2p::identify::Behaviour,
422        }
423        async fn expect_record_update(swarm: &mut Swarm<Behaviour>, expected_peer: PeerId) {
424            swarm
425                .wait(|ev| match ev {
426                    SwarmEvent::Behaviour(BehaviourEvent::PeerStore(
427                        crate::Event::RecordUpdated { peer },
428                    )) => (peer == expected_peer).then_some(()),
429                    _ => None,
430                })
431                .await
432        }
433        fn build_swarm() -> Swarm<Behaviour> {
434            Swarm::new_ephemeral_tokio(|kp| Behaviour {
435                peer_store: crate::Behaviour::new(MemoryStore::new(
436                    crate::memory_store::Config::default()
437                        .set_record_capacity(NonZero::new(4).expect("4 > 0")),
438                )),
439                identify: libp2p::identify::Behaviour::new(libp2p::identify::Config::new(
440                    "/TODO/0.0.1".to_string(),
441                    kp.public(),
442                )),
443            })
444        }
445        let mut swarm1 = build_swarm();
446        let mut swarm2 = build_swarm();
447        let rt = tokio::runtime::Runtime::new().unwrap();
448
449        rt.block_on(async {
450            let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
451            let swarm1_peer_id = *swarm1.local_peer_id();
452            let swarm2_peer_id = *swarm2.local_peer_id();
453            swarm2.dial(listen_addr.clone()).expect("dial to succeed");
454            let handle = spawn_wait_conn_established(swarm1);
455            let mut swarm2 = spawn_wait_conn_established(swarm2)
456                .await
457                .expect("future to complete");
458            let mut swarm1 = handle.await.expect("future to complete");
459            // expexting update from direct connection.
460            expect_record_update(&mut swarm2, swarm1_peer_id).await;
461            assert!(swarm2
462                .behaviour()
463                .peer_store
464                .address_of_peer(&swarm1_peer_id)
465                .expect("swarm should be connected and record about it should be created")
466                .any(|addr| *addr == listen_addr));
467            expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await;
468            swarm1.next_swarm_event().await; // skip `identify::Event::Sent`
469            swarm1.next_swarm_event().await; // skip `identify::Event::Received`
470            let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
471            swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
472            tokio::spawn(swarm1.loop_on_next());
473            // Expecting 3 updates from Identify:
474            // 2 pair of mem and tcp address for two calls to `<Swarm as SwarmExt>::listen()`
475            // with one address already present through direct connection.
476            // FLAKY: tcp addresses are not explicitly marked as external addresses.
477            expect_record_update(&mut swarm2, swarm1_peer_id).await;
478            expect_record_update(&mut swarm2, swarm1_peer_id).await;
479            expect_record_update(&mut swarm2, swarm1_peer_id).await;
480            // The address in store won't contain peer ID because it is from Identify.
481            assert!(swarm2
482                .behaviour()
483                .peer_store
484                .address_of_peer(&swarm1_peer_id)
485                .expect("peer to exist")
486                .any(|addr| *addr == new_listen_addr));
487        })
488    }
489
490    fn spawn_wait_conn_established<T>(mut swarm: Swarm<T>) -> tokio::task::JoinHandle<Swarm<T>>
491    where
492        T: NetworkBehaviour + Send + Sync,
493        Swarm<T>: SwarmExt,
494    {
495        tokio::spawn(async move {
496            swarm
497                .wait(|ev| match ev {
498                    SwarmEvent::ConnectionEstablished { .. } => Some(()),
499                    _ => None,
500                })
501                .await;
502            swarm
503        })
504    }
505}