libp2p_peer_store/
memory_store.rs

1//! An in-memory [`Store`] implementation.
2//!
3//! ## Usage
4//! ```
5//! use libp2p_peer_store::{memory_store::MemoryStore, Behaviour};
6//!
7//! let store: MemoryStore<()> = MemoryStore::new(Default::default());
8//! let behaviour = Behaviour::new(store);
9//! ```
10
11use std::{
12    collections::VecDeque,
13    num::NonZeroUsize,
14    task::{Poll, Waker},
15};
16
17use hashlink::LruCache;
18use libp2p_core::{Multiaddr, PeerId};
19use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
20
21use super::Store;
22
23/// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm).
24#[derive(Debug, Clone)]
25pub enum Event {
26    /// A new peer address has been added to the store.
27    PeerAddressAdded {
28        /// ID of the peer for which the address was added.
29        peer_id: PeerId,
30        /// The added address.
31        address: Multiaddr,
32        /// Whether the address will be kept in the store after a dial-failure.
33        ///
34        /// Set to `true` when an address was added explicitly through
35        /// [`MemoryStore::add_address`], `false` if the address was discovered through the
36        /// swarm or other behaviors.
37        ///
38        /// Only relevant when [`Config::is_remove_addr_on_dial_error`] is `true`.
39        is_permanent: bool,
40    },
41    /// A peer address has been removed from the store.
42    PeerAddressRemoved {
43        /// ID of the peer for which the address was removed.
44        peer_id: PeerId,
45        /// The removed address.
46        address: Multiaddr,
47    },
48}
49
50/// A in-memory store that uses LRU cache for bounded storage of addresses
51/// and a frequency-based ordering of addresses.
52pub struct MemoryStore<T = ()> {
53    /// The internal store.
54    records: LruCache<PeerId, PeerRecord<T>>,
55    /// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm).
56    pending_events: VecDeque<Event>,
57    /// Config of the store.
58    config: Config,
59    /// Waker for store events.
60    waker: Option<Waker>,
61}
62
63impl<T> MemoryStore<T> {
64    /// Create a new [`MemoryStore`] with the given config.
65    pub fn new(config: Config) -> Self {
66        Self {
67            records: LruCache::new(config.peer_capacity().get()),
68            config,
69            pending_events: VecDeque::default(),
70            waker: None,
71        }
72    }
73
74    /// Add an address for a peer.
75    ///
76    /// The added address will NOT be removed from the store on dial failure. If the added address
77    /// is supposed to be cleared from the store on dial failure, add it by emitting
78    /// [`FromSwarm::NewExternalAddrOfPeer`] to the swarm, e.g. via
79    /// [`Swarm::add_peer_address`](libp2p_swarm::Swarm::add_peer_address).
80    ///
81    /// Returns `true` if the address is new.
82    pub fn add_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
83        self.add_address_inner(peer, address, true)
84    }
85
86    /// Update an address record and notify the swarm.
87    ///
88    /// Returns `true` if the address is new.
89    fn add_address_inner(
90        &mut self,
91        peer: &PeerId,
92        address: &Multiaddr,
93        is_permanent: bool,
94    ) -> bool {
95        let record = self
96            .records
97            .entry(*peer)
98            .or_insert_with(|| PeerRecord::new(self.config.record_capacity));
99        let is_new = record.add_address(address, is_permanent);
100        if is_new {
101            self.push_event_and_wake(Event::PeerAddressAdded {
102                peer_id: *peer,
103                address: address.clone(),
104                is_permanent,
105            });
106        }
107        is_new
108    }
109
110    /// Remove an address record.
111    ///
112    /// Returns `true` when the address existed.
113    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
114        self.remove_address_inner(peer, address, true)
115    }
116
117    /// Remove an address record and notify the swarm.
118    ///
119    /// Returns `true` when the address is removed, `false` if the address didn't exist
120    /// or the address is permanent and `force` false.
121    fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
122        if let Some(record) = self.records.get_mut(peer) {
123            if record.remove_address(address, force) {
124                if record.addresses.is_empty() && record.custom_data.is_none() {
125                    self.records.remove(peer);
126                }
127                self.push_event_and_wake(Event::PeerAddressRemoved {
128                    peer_id: *peer,
129                    address: address.clone(),
130                });
131                return true;
132            }
133        }
134        false
135    }
136
137    /// Get a reference to a peer's custom data.
138    pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
139        self.records.peek(peer).and_then(|r| r.get_custom_data())
140    }
141
142    /// Take ownership of the internal data, leaving `None` in its place.
143    pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
144        if let Some(record) = self.records.get_mut(peer) {
145            let data = record.take_custom_data();
146            if record.addresses.is_empty() {
147                self.records.remove(peer);
148            }
149            return data;
150        }
151        None
152    }
153
154    /// Insert the data, dropping the old data if it exists.
155    pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
156        if let Some(r) = self.records.get_mut(peer) {
157            return r.insert_custom_data(custom_data);
158        }
159        let mut new_record = PeerRecord::new(self.config.record_capacity);
160        new_record.insert_custom_data(custom_data);
161        self.records.insert(*peer, new_record);
162    }
163
164    /// Get a mutable reference to a peer's custom data.
165    pub fn get_custom_data_mut(&mut self, peer: &PeerId) -> Option<&mut T> {
166        self.records
167            .get_mut(peer)
168            .and_then(|r| r.get_custom_data_mut())
169    }
170
171    /// Iterate over all internal records.
172    pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
173        self.records.iter()
174    }
175
176    /// Iterate over all internal records mutably.
177    ///
178    /// Changes to the records will not generate an event.
179    pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
180        self.records.iter_mut()
181    }
182
183    fn push_event_and_wake(&mut self, event: Event) {
184        self.pending_events.push_back(event);
185        if let Some(waker) = self.waker.take() {
186            waker.wake(); // wake up because of update
187        }
188    }
189}
190
191impl<T> Store for MemoryStore<T> {
192    type Event = Event;
193
194    fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
195        match swarm_event {
196            FromSwarm::NewExternalAddrOfPeer(info) => {
197                self.add_address_inner(&info.peer_id, info.addr, false);
198            }
199            FromSwarm::ConnectionEstablished(ConnectionEstablished {
200                peer_id,
201                failed_addresses,
202                endpoint,
203                ..
204            }) if endpoint.is_dialer() => {
205                if self.config.remove_addr_on_dial_error {
206                    for failed_addr in *failed_addresses {
207                        self.remove_address_inner(peer_id, failed_addr, false);
208                    }
209                }
210                self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
211            }
212            FromSwarm::DialFailure(info) => {
213                if !self.config.remove_addr_on_dial_error {
214                    return;
215                }
216
217                let Some(peer) = info.peer_id else {
218                    // We don't know which peer we are talking about here.
219                    return;
220                };
221
222                match info.error {
223                    DialError::WrongPeerId { obtained, address } => {
224                        // The stored peer id is incorrect, remove incorrect and add correct one.
225                        if self.remove_address_inner(&peer, address, false) {
226                            self.add_address_inner(obtained, address, false);
227                        }
228                    }
229                    DialError::Transport(errors) => {
230                        for (addr, _) in errors {
231                            self.remove_address_inner(&peer, addr, false);
232                        }
233                    }
234                    _ => {}
235                }
236            }
237            _ => {}
238        }
239    }
240
241    fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
242        self.records.peek(peer).map(|record| record.addresses())
243    }
244
245    fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
246        match self.pending_events.pop_front() {
247            Some(ev) => Poll::Ready(ev),
248            None => {
249                self.waker = Some(cx.waker().clone());
250                Poll::Pending
251            }
252        }
253    }
254}
255
256/// Config for [`MemoryStore`]. The available options are documented via their setters.
257#[derive(Debug, Clone)]
258pub struct Config {
259    peer_capacity: NonZeroUsize,
260    record_capacity: NonZeroUsize,
261    remove_addr_on_dial_error: bool,
262}
263
264impl Default for Config {
265    fn default() -> Self {
266        Self {
267            peer_capacity: NonZeroUsize::try_from(1000).expect("1000 > 0"),
268            record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
269            remove_addr_on_dial_error: true,
270        }
271    }
272}
273
274impl Config {
275    pub fn peer_capacity(&self) -> &NonZeroUsize {
276        &self.peer_capacity
277    }
278    /// The capacity of the address store per peer.
279    ///
280    /// The least recently updated peer will be discarded to make room for a new peer.
281    ///
282    /// `1000` by default.
283    pub fn set_peer_capacity(mut self, capacity: NonZeroUsize) -> Self {
284        self.peer_capacity = capacity;
285        self
286    }
287    pub fn record_capacity(&self) -> &NonZeroUsize {
288        &self.record_capacity
289    }
290    /// The capacity of the address store per peer.
291    ///
292    /// The least active address will be discarded to make room for a new address.
293    ///
294    /// `8` by default.
295    pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
296        self.record_capacity = capacity;
297        self
298    }
299    pub fn is_remove_addr_on_dial_error(&self) -> bool {
300        self.remove_addr_on_dial_error
301    }
302    /// If set to `true`, the store will remove addresses if the swarm indicates a dial failure.
303    /// More specifically:
304    /// - Failed dials indicated in [`ConnectionEstablished`]'s `failed_addresses` will be removed.
305    /// - [`DialError::LocalPeerId`] causes the full peer entry to be removed.
306    /// - On [`DialError::WrongPeerId`], the address will be removed from the incorrect peer's
307    ///   record and re-added to the correct peer's record.
308    /// - On [`DialError::Transport`], all failed addresses will be removed.
309    ///
310    /// If set to `false`, the logic above is not applied and the store only removes addresses
311    /// through calls to [`MemoryStore::remove_address`].
312    ///
313    /// `true` by default.
314    pub fn set_remove_addr_on_dial_error(mut self, value: bool) -> Self {
315        self.remove_addr_on_dial_error = value;
316        self
317    }
318}
319
320/// Internal record of [`MemoryStore`].
321#[derive(Debug, Clone)]
322pub struct PeerRecord<T> {
323    /// A LRU(Least Recently Used) cache for addresses.
324    /// Will delete the least-recently-used record when full.
325    /// If the associated `bool` is true, the address can only be force-removed.
326    addresses: LruCache<Multiaddr, bool>,
327    /// Custom data attached to the peer.
328    custom_data: Option<T>,
329}
330impl<T> PeerRecord<T> {
331    pub(crate) fn new(cap: NonZeroUsize) -> Self {
332        Self {
333            addresses: LruCache::new(cap.get()),
334            custom_data: None,
335        }
336    }
337
338    /// Iterate over all addresses. More recently-used address comes first.
339    /// Does not change the order.
340    pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
341        self.addresses.iter().rev().map(|(addr, _)| addr)
342    }
343
344    /// Update the address in the LRU cache, promote it to the front if it exists,
345    /// insert it to the front if not.
346    ///
347    /// Returns true when the address is new.
348    pub fn add_address(&mut self, address: &Multiaddr, is_permanent: bool) -> bool {
349        if let Some(was_permanent) = self.addresses.get(address) {
350            if !*was_permanent && is_permanent {
351                self.addresses.insert(address.clone(), is_permanent);
352            }
353            false
354        } else {
355            self.addresses.insert(address.clone(), is_permanent);
356            true
357        }
358    }
359
360    /// Remove the address in the LRU cache regardless of its position.
361    ///
362    /// Returns true when the address is removed, false when it didn't exist
363    /// or it is permanent and `force` is false.
364    pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool {
365        if !force && self.addresses.peek(address) == Some(&true) {
366            return false;
367        }
368        self.addresses.remove(address).is_some()
369    }
370
371    pub fn get_custom_data(&self) -> Option<&T> {
372        self.custom_data.as_ref()
373    }
374
375    pub fn get_custom_data_mut(&mut self) -> Option<&mut T> {
376        self.custom_data.as_mut()
377    }
378
379    pub fn take_custom_data(&mut self) -> Option<T> {
380        self.custom_data.take()
381    }
382
383    pub fn insert_custom_data(&mut self, custom_data: T) {
384        let _ = self.custom_data.insert(custom_data);
385    }
386
387    pub fn is_empty(&self) -> bool {
388        self.addresses.is_empty() && self.custom_data.is_none()
389    }
390}
391
392#[cfg(test)]
393mod test {
394    use std::{num::NonZero, str::FromStr};
395
396    use libp2p::identify;
397    use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId};
398    use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
399    use libp2p_swarm_test::SwarmExt;
400
401    use super::{Event, MemoryStore};
402    use crate::Store;
403
404    #[test]
405    fn recent_use_bubble_up() {
406        let mut store: MemoryStore = MemoryStore::new(Default::default());
407        let peer = PeerId::random();
408        let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
409        let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
410        let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
411        store.add_address(&peer, &addr1);
412        store.add_address(&peer, &addr2);
413        store.add_address(&peer, &addr3);
414        assert_eq!(
415            store
416                .records
417                .get(&peer)
418                .expect("peer to be in the store")
419                .addresses()
420                .collect::<Vec<_>>(),
421            vec![&addr3, &addr2, &addr1]
422        );
423        store.add_address(&peer, &addr1);
424        assert!(
425            store
426                .records
427                .get(&peer)
428                .expect("peer to be in the store")
429                .addresses()
430                .collect::<Vec<_>>()
431                == vec![&addr1, &addr3, &addr2]
432        );
433        store.add_address(&peer, &addr3);
434        assert!(
435            store
436                .records
437                .get(&peer)
438                .expect("peer to be in the store")
439                .addresses()
440                .collect::<Vec<_>>()
441                == vec![&addr3, &addr1, &addr2]
442        );
443    }
444
445    #[test]
446    fn bounded_store() {
447        let mut store: MemoryStore = MemoryStore::new(Default::default());
448        let peer = PeerId::random();
449        for i in 1..10 {
450            let addr_string = format!("/ip4/127.0.0.{i}");
451            store.add_address(
452                &peer,
453                &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
454            );
455        }
456        let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
457        assert!(!store
458            .addresses_of_peer(&peer)
459            .expect("peer to be in the store")
460            .any(|addr| *addr == first_record));
461        let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
462        assert!(
463            *store
464                .addresses_of_peer(&peer)
465                .expect("peer to be in the store")
466                .last()
467                .expect("addr to exist")
468                == second_record
469        );
470    }
471
472    #[test]
473    fn update_address_on_connect() {
474        async fn expect_record_update(
475            swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
476            expected_peer: PeerId,
477            expected_address: Option<&Multiaddr>,
478        ) {
479            match swarm.next_behaviour_event().await {
480                Event::PeerAddressAdded {
481                    peer_id,
482                    address,
483                    is_permanent,
484                } => {
485                    assert_eq!(peer_id, expected_peer);
486                    assert!(expected_address.is_none_or(|a| *a == address));
487                    assert!(!is_permanent)
488                }
489                ev => panic!("Unexpected event {ev:?}."),
490            }
491        }
492
493        let store1: MemoryStore<()> = MemoryStore::new(
494            crate::memory_store::Config::default()
495                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
496        );
497        let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
498        let store2: MemoryStore<()> = MemoryStore::new(
499            crate::memory_store::Config::default()
500                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
501        );
502        let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
503
504        let rt = tokio::runtime::Runtime::new().unwrap();
505
506        rt.block_on(async {
507            let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
508            let swarm1_peer_id = *swarm1.local_peer_id();
509            let swarm2_peer_id = *swarm2.local_peer_id();
510            swarm2.connect(&mut swarm1).await;
511
512            listen_addr.push(Protocol::P2p(swarm1_peer_id));
513            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
514            assert!(swarm2
515                .behaviour()
516                .address_of_peer(&swarm1_peer_id)
517                .expect("swarm should be connected and record about it should be created")
518                .any(|addr| *addr == listen_addr));
519            // Address from connection is not stored on the listener side.
520            assert!(swarm1
521                .behaviour()
522                .address_of_peer(&swarm2_peer_id)
523                .is_none());
524            let (mut new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
525            tokio::spawn(swarm1.loop_on_next());
526            swarm2
527                .dial(
528                    libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
529                        .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
530                        .addresses(vec![new_listen_addr.clone()])
531                        .build(),
532                )
533                .expect("dial to succeed");
534            swarm2
535                .wait(|ev| match ev {
536                    SwarmEvent::ConnectionEstablished { .. } => Some(()),
537                    _ => None,
538                })
539                .await;
540            new_listen_addr.push(Protocol::P2p(swarm1_peer_id));
541            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await;
542            // The address in store will contain peer ID.
543            let new_listen_addr = new_listen_addr
544                .with_p2p(swarm1_peer_id)
545                .expect("extend to succeed");
546            assert_eq!(
547                swarm2
548                    .behaviour()
549                    .address_of_peer(&swarm1_peer_id)
550                    .expect("peer to exist")
551                    .collect::<Vec<_>>(),
552                vec![&new_listen_addr, &listen_addr]
553            );
554        })
555    }
556
557    #[test]
558    fn identify_external_addr_report() {
559        #[derive(NetworkBehaviour)]
560        struct Behaviour {
561            peer_store: crate::Behaviour<MemoryStore>,
562            identify: identify::Behaviour,
563        }
564        async fn expect_record_update(
565            swarm: &mut Swarm<Behaviour>,
566            expected_peer: PeerId,
567            expected_address: Option<&Multiaddr>,
568        ) {
569            loop {
570                match swarm.next_behaviour_event().await {
571                    BehaviourEvent::PeerStore(Event::PeerAddressAdded {
572                        peer_id,
573                        address,
574                        is_permanent,
575                    }) => {
576                        assert_eq!(peer_id, expected_peer);
577                        assert!(expected_address.is_none_or(|a| *a == address));
578                        assert!(!is_permanent);
579                        break;
580                    }
581                    ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {ev:?}."),
582                    _ => {}
583                }
584            }
585        }
586        fn build_swarm() -> Swarm<Behaviour> {
587            Swarm::new_ephemeral_tokio(|kp| Behaviour {
588                peer_store: crate::Behaviour::new(MemoryStore::new(
589                    crate::memory_store::Config::default()
590                        .set_record_capacity(NonZero::new(4).expect("4 > 0")),
591                )),
592                identify: identify::Behaviour::new(identify::Config::new(
593                    "/TODO/0.0.1".to_string(),
594                    kp.public(),
595                )),
596            })
597        }
598        let mut swarm1 = build_swarm();
599        let mut swarm2 = build_swarm();
600        let rt = tokio::runtime::Runtime::new().unwrap();
601
602        rt.block_on(async {
603            let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
604            let swarm1_peer_id = *swarm1.local_peer_id();
605            let swarm2_peer_id = *swarm2.local_peer_id();
606            swarm2.connect(&mut swarm1).await;
607
608            listen_addr.push(Protocol::P2p(swarm1_peer_id));
609            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
610
611            assert_eq!(
612                swarm2
613                    .behaviour()
614                    .peer_store
615                    .address_of_peer(&swarm1_peer_id)
616                    .expect("swarm should be connected and record about it should be created")
617                    .collect::<Vec<_>>(),
618                vec![&listen_addr]
619            );
620
621            assert!(matches!(
622                swarm1.next_behaviour_event().await,
623                BehaviourEvent::Identify(identify::Event::Sent { .. })
624            ));
625            assert!(matches!(
626                swarm1.next_behaviour_event().await,
627                BehaviourEvent::Identify(identify::Event::Received { .. })
628            ));
629            let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
630            swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
631            tokio::spawn(swarm1.loop_on_next());
632            // Expecting 3 updates from Identify:
633            // 2 pair of mem and tcp address for two calls to `<Swarm as SwarmExt>::listen()`
634            // with one address already present through direct connection.
635            // FLAKY: tcp addresses are not explicitly marked as external addresses.
636            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
637            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
638            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
639            // The address in store won't contain peer ID because it is from Identify.
640            let known_listen_addresses = swarm2
641                .behaviour()
642                .peer_store
643                .address_of_peer(&swarm1_peer_id)
644                .expect("peer to exist")
645                .collect::<Vec<_>>();
646            assert!(known_listen_addresses.contains(&&new_listen_addr));
647            assert_eq!(known_listen_addresses.len(), 4)
648        })
649    }
650}