libp2p_peer_store/
memory_store.rs

1//! An in-memory [`Store`] implementation.
2//!
3//! ## Usage
4//! ```
5//! use libp2p_peer_store::{memory_store::MemoryStore, Behaviour};
6//!
7//! let store: MemoryStore<()> = MemoryStore::new(Default::default());
8//! let behaviour = Behaviour::new(store);
9//! ```
10
11use std::{
12    collections::{HashMap, VecDeque},
13    num::NonZeroUsize,
14    task::{Poll, Waker},
15};
16
17use libp2p_core::{Multiaddr, PeerId};
18use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
19use lru::LruCache;
20
21use super::Store;
22
23/// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm).
24#[derive(Debug, Clone)]
25pub enum Event {
26    /// A new peer address has been added to the store.
27    PeerAddressAdded {
28        /// ID of the peer for which the address was added.
29        peer_id: PeerId,
30        /// The added address.
31        address: Multiaddr,
32        /// Whether the address will be kept in the store after a dial-failure.
33        ///
34        /// Set to `true` when an address was added explicitly through
35        /// [`MemoryStore::add_address`], `false` if the address was discovered through the
36        /// swarm or other behaviors.
37        ///
38        /// Only relevant when [`Config::is_remove_addr_on_dial_error`] is `true`.
39        is_permanent: bool,
40    },
41    /// A peer address has been removed from the store.
42    PeerAddressRemoved {
43        /// ID of the peer for which the address was removed.
44        peer_id: PeerId,
45        /// The removed address.
46        address: Multiaddr,
47    },
48}
49
50/// A in-memory store that uses LRU cache for bounded storage of addresses
51/// and a frequency-based ordering of addresses.
52#[derive(Default)]
53pub struct MemoryStore<T = ()> {
54    /// The internal store.
55    records: HashMap<PeerId, PeerRecord<T>>,
56    /// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm).
57    pending_events: VecDeque<Event>,
58    /// Config of the store.
59    config: Config,
60    /// Waker for store events.
61    waker: Option<Waker>,
62}
63
64impl<T> MemoryStore<T> {
65    /// Create a new [`MemoryStore`] with the given config.
66    pub fn new(config: Config) -> Self {
67        Self {
68            config,
69            records: HashMap::new(),
70            pending_events: VecDeque::default(),
71            waker: None,
72        }
73    }
74
75    /// Add an address for a peer.
76    ///
77    /// The added address will NOT be removed from the store on dial failure. If the added address
78    /// is supposed to be cleared from the store on dial failure, add it by emitting
79    /// [`FromSwarm::NewExternalAddrOfPeer`] to the swarm, e.g. via
80    /// [`Swarm::add_peer_address`](libp2p_swarm::Swarm::add_peer_address).
81    ///
82    /// Returns `true` if the address is new.
83    pub fn add_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
84        self.add_address_inner(peer, address, true)
85    }
86
87    /// Update an address record and notify the swarm.
88    ///
89    /// Returns `true` if the address is new.
90    fn add_address_inner(
91        &mut self,
92        peer: &PeerId,
93        address: &Multiaddr,
94        is_permanent: bool,
95    ) -> bool {
96        let record = self
97            .records
98            .entry(*peer)
99            .or_insert(PeerRecord::new(self.config.record_capacity));
100        let is_new = record.add_address(address, is_permanent);
101        if is_new {
102            self.push_event_and_wake(Event::PeerAddressAdded {
103                peer_id: *peer,
104                address: address.clone(),
105                is_permanent,
106            });
107        }
108        is_new
109    }
110
111    /// Remove an address record.
112    ///
113    /// Returns `true` when the address existed.
114    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
115        self.remove_address_inner(peer, address, true)
116    }
117
118    /// Remove an address record and notify the swarm.
119    ///
120    /// Returns `true` when the address is removed, `false` if the address didn't exist
121    /// or the address is permanent and `force` false.
122    fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
123        if let Some(record) = self.records.get_mut(peer) {
124            if record.remove_address(address, force) {
125                if record.addresses.is_empty() && record.custom_data.is_none() {
126                    self.records.remove(peer);
127                }
128                self.push_event_and_wake(Event::PeerAddressRemoved {
129                    peer_id: *peer,
130                    address: address.clone(),
131                });
132                return true;
133            }
134        }
135        false
136    }
137
138    /// Get a reference to a peer's custom data.
139    pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
140        self.records.get(peer).and_then(|r| r.get_custom_data())
141    }
142
143    /// Take ownership of the internal data, leaving `None` in its place.
144    pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
145        if let Some(record) = self.records.get_mut(peer) {
146            let data = record.take_custom_data();
147            if record.addresses.is_empty() {
148                self.records.remove(peer);
149            }
150            return data;
151        }
152        None
153    }
154
155    /// Insert the data, dropping the old data if it exists.
156    pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
157        if let Some(r) = self.records.get_mut(peer) {
158            return r.insert_custom_data(custom_data);
159        }
160        let mut new_record = PeerRecord::new(self.config.record_capacity);
161        new_record.insert_custom_data(custom_data);
162        self.records.insert(*peer, new_record);
163    }
164
165    /// Get a mutable reference to a peer's custom data.
166    pub fn get_custom_data_mut(&mut self, peer: &PeerId) -> Option<&mut T> {
167        self.records
168            .get_mut(peer)
169            .and_then(|r| r.get_custom_data_mut())
170    }
171
172    /// Iterate over all internal records.
173    pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
174        self.records.iter()
175    }
176
177    /// Iterate over all internal records mutably.
178    ///
179    /// Changes to the records will not generate an event.
180    pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
181        self.records.iter_mut()
182    }
183
184    fn push_event_and_wake(&mut self, event: Event) {
185        self.pending_events.push_back(event);
186        if let Some(waker) = self.waker.take() {
187            waker.wake(); // wake up because of update
188        }
189    }
190}
191
192impl<T> Store for MemoryStore<T> {
193    type Event = Event;
194
195    fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
196        match swarm_event {
197            FromSwarm::NewExternalAddrOfPeer(info) => {
198                self.add_address_inner(&info.peer_id, info.addr, false);
199            }
200            FromSwarm::ConnectionEstablished(ConnectionEstablished {
201                peer_id,
202                failed_addresses,
203                endpoint,
204                ..
205            }) if endpoint.is_dialer() => {
206                if self.config.remove_addr_on_dial_error {
207                    for failed_addr in *failed_addresses {
208                        self.remove_address_inner(peer_id, failed_addr, false);
209                    }
210                }
211                self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
212            }
213            FromSwarm::DialFailure(info) => {
214                if !self.config.remove_addr_on_dial_error {
215                    return;
216                }
217
218                let Some(peer) = info.peer_id else {
219                    // We don't know which peer we are talking about here.
220                    return;
221                };
222
223                match info.error {
224                    DialError::WrongPeerId { obtained, address } => {
225                        // The stored peer id is incorrect, remove incorrect and add correct one.
226                        if self.remove_address_inner(&peer, address, false) {
227                            self.add_address_inner(obtained, address, false);
228                        }
229                    }
230                    DialError::Transport(errors) => {
231                        for (addr, _) in errors {
232                            self.remove_address_inner(&peer, addr, false);
233                        }
234                    }
235                    _ => {}
236                }
237            }
238            _ => {}
239        }
240    }
241
242    fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
243        self.records.get(peer).map(|record| record.addresses())
244    }
245
246    fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
247        match self.pending_events.pop_front() {
248            Some(ev) => Poll::Ready(ev),
249            None => {
250                self.waker = Some(cx.waker().clone());
251                Poll::Pending
252            }
253        }
254    }
255}
256
257/// Config for [`MemoryStore`]. The available options are documented via their setters.
258#[derive(Debug, Clone)]
259pub struct Config {
260    record_capacity: NonZeroUsize,
261    remove_addr_on_dial_error: bool,
262}
263
264impl Default for Config {
265    fn default() -> Self {
266        Self {
267            record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
268            remove_addr_on_dial_error: true,
269        }
270    }
271}
272
273impl Config {
274    pub fn record_capacity(&self) -> &NonZeroUsize {
275        &self.record_capacity
276    }
277    /// The capacity of an address store.
278    ///
279    /// The least active address will be discarded to make room for new address.
280    ///
281    /// `8` by default.
282    pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
283        self.record_capacity = capacity;
284        self
285    }
286    pub fn is_remove_addr_on_dial_error(&self) -> bool {
287        self.remove_addr_on_dial_error
288    }
289    /// If set to `true`, the store will remove addresses if the swarm indicates a dial failure.
290    /// More specifically:
291    /// - Failed dials indicated in [`ConnectionEstablished`]'s `failed_addresses` will be removed.
292    /// - [`DialError::LocalPeerId`] causes the full peer entry to be removed.
293    /// - On [`DialError::WrongPeerId`], the address will be removed from the incorrect peer's
294    ///   record and re-added to the correct peer's record.
295    /// - On [`DialError::Transport`], all failed addresses will be removed.
296    ///
297    /// If set to `false`, the logic above is not applied and the store only removes addresses
298    /// through calls to [`MemoryStore::remove_address`].
299    ///
300    /// `true` by default.
301    pub fn set_remove_addr_on_dial_error(mut self, value: bool) -> Self {
302        self.remove_addr_on_dial_error = value;
303        self
304    }
305}
306
307/// Internal record of [`MemoryStore`].
308#[derive(Debug, Clone)]
309pub struct PeerRecord<T> {
310    /// A LRU(Least Recently Used) cache for addresses.
311    /// Will delete the least-recently-used record when full.
312    /// If the associated `bool` is true, the address can only be force-removed.
313    addresses: LruCache<Multiaddr, bool>,
314    /// Custom data attached to the peer.
315    custom_data: Option<T>,
316}
317impl<T> PeerRecord<T> {
318    pub(crate) fn new(cap: NonZeroUsize) -> Self {
319        Self {
320            addresses: LruCache::new(cap),
321            custom_data: None,
322        }
323    }
324
325    /// Iterate over all addresses. More recently-used address comes first.
326    /// Does not change the order.
327    pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
328        self.addresses.iter().map(|(addr, _)| addr)
329    }
330
331    /// Update the address in the LRU cache, promote it to the front if it exists,
332    /// insert it to the front if not.
333    ///
334    /// Returns true when the address is new.
335    pub fn add_address(&mut self, address: &Multiaddr, is_permanent: bool) -> bool {
336        if let Some(was_permanent) = self.addresses.get(address) {
337            if !*was_permanent && is_permanent {
338                self.addresses
339                    .get_or_insert(address.clone(), || is_permanent);
340            }
341            return false;
342        }
343        self.addresses
344            .get_or_insert(address.clone(), || is_permanent);
345        true
346    }
347
348    /// Remove the address in the LRU cache regardless of its position.
349    ///
350    /// Returns true when the address is removed, false when it didn't exist
351    /// or it is permanent and `force` is false.
352    pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool {
353        if !force && self.addresses.peek(address) == Some(&true) {
354            return false;
355        }
356        self.addresses.pop(address).is_some()
357    }
358
359    pub fn get_custom_data(&self) -> Option<&T> {
360        self.custom_data.as_ref()
361    }
362
363    pub fn get_custom_data_mut(&mut self) -> Option<&mut T> {
364        self.custom_data.as_mut()
365    }
366
367    pub fn take_custom_data(&mut self) -> Option<T> {
368        self.custom_data.take()
369    }
370
371    pub fn insert_custom_data(&mut self, custom_data: T) {
372        let _ = self.custom_data.insert(custom_data);
373    }
374
375    pub fn is_empty(&self) -> bool {
376        self.addresses.is_empty() && self.custom_data.is_none()
377    }
378}
379
380#[cfg(test)]
381mod test {
382    use std::{num::NonZero, str::FromStr};
383
384    use libp2p::identify;
385    use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId};
386    use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
387    use libp2p_swarm_test::SwarmExt;
388
389    use super::{Event, MemoryStore};
390    use crate::Store;
391
392    #[test]
393    fn recent_use_bubble_up() {
394        let mut store: MemoryStore = MemoryStore::new(Default::default());
395        let peer = PeerId::random();
396        let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
397        let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
398        let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
399        store.add_address(&peer, &addr1);
400        store.add_address(&peer, &addr2);
401        store.add_address(&peer, &addr3);
402        assert!(
403            store
404                .records
405                .get(&peer)
406                .expect("peer to be in the store")
407                .addresses()
408                .collect::<Vec<_>>()
409                == vec![&addr3, &addr2, &addr1]
410        );
411        store.add_address(&peer, &addr1);
412        assert!(
413            store
414                .records
415                .get(&peer)
416                .expect("peer to be in the store")
417                .addresses()
418                .collect::<Vec<_>>()
419                == vec![&addr1, &addr3, &addr2]
420        );
421        store.add_address(&peer, &addr3);
422        assert!(
423            store
424                .records
425                .get(&peer)
426                .expect("peer to be in the store")
427                .addresses()
428                .collect::<Vec<_>>()
429                == vec![&addr3, &addr1, &addr2]
430        );
431    }
432
433    #[test]
434    fn bounded_store() {
435        let mut store: MemoryStore = MemoryStore::new(Default::default());
436        let peer = PeerId::random();
437        for i in 1..10 {
438            let addr_string = format!("/ip4/127.0.0.{i}");
439            store.add_address(
440                &peer,
441                &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
442            );
443        }
444        let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
445        assert!(!store
446            .addresses_of_peer(&peer)
447            .expect("peer to be in the store")
448            .any(|addr| *addr == first_record));
449        let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
450        assert!(
451            *store
452                .addresses_of_peer(&peer)
453                .expect("peer to be in the store")
454                .last()
455                .expect("addr to exist")
456                == second_record
457        );
458    }
459
460    #[test]
461    fn update_address_on_connect() {
462        async fn expect_record_update(
463            swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
464            expected_peer: PeerId,
465            expected_address: Option<&Multiaddr>,
466        ) {
467            match swarm.next_behaviour_event().await {
468                Event::PeerAddressAdded {
469                    peer_id,
470                    address,
471                    is_permanent,
472                } => {
473                    assert_eq!(peer_id, expected_peer);
474                    assert!(expected_address.is_none_or(|a| *a == address));
475                    assert!(!is_permanent)
476                }
477                ev => panic!("Unexpected event {ev:?}."),
478            }
479        }
480
481        let store1: MemoryStore<()> = MemoryStore::new(
482            crate::memory_store::Config::default()
483                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
484        );
485        let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
486        let store2: MemoryStore<()> = MemoryStore::new(
487            crate::memory_store::Config::default()
488                .set_record_capacity(NonZero::new(2).expect("2 > 0")),
489        );
490        let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
491
492        let rt = tokio::runtime::Runtime::new().unwrap();
493
494        rt.block_on(async {
495            let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
496            let swarm1_peer_id = *swarm1.local_peer_id();
497            let swarm2_peer_id = *swarm2.local_peer_id();
498            swarm2.connect(&mut swarm1).await;
499
500            listen_addr.push(Protocol::P2p(swarm1_peer_id));
501            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
502            assert!(swarm2
503                .behaviour()
504                .address_of_peer(&swarm1_peer_id)
505                .expect("swarm should be connected and record about it should be created")
506                .any(|addr| *addr == listen_addr));
507            // Address from connection is not stored on the listener side.
508            assert!(swarm1
509                .behaviour()
510                .address_of_peer(&swarm2_peer_id)
511                .is_none());
512            let (mut new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
513            tokio::spawn(swarm1.loop_on_next());
514            swarm2
515                .dial(
516                    libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
517                        .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
518                        .addresses(vec![new_listen_addr.clone()])
519                        .build(),
520                )
521                .expect("dial to succeed");
522            swarm2
523                .wait(|ev| match ev {
524                    SwarmEvent::ConnectionEstablished { .. } => Some(()),
525                    _ => None,
526                })
527                .await;
528            new_listen_addr.push(Protocol::P2p(swarm1_peer_id));
529            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await;
530            // The address in store will contain peer ID.
531            let new_listen_addr = new_listen_addr
532                .with_p2p(swarm1_peer_id)
533                .expect("extend to succeed");
534            assert_eq!(
535                swarm2
536                    .behaviour()
537                    .address_of_peer(&swarm1_peer_id)
538                    .expect("peer to exist")
539                    .collect::<Vec<_>>(),
540                vec![&new_listen_addr, &listen_addr]
541            );
542        })
543    }
544
545    #[test]
546    fn identify_external_addr_report() {
547        #[derive(NetworkBehaviour)]
548        struct Behaviour {
549            peer_store: crate::Behaviour<MemoryStore>,
550            identify: identify::Behaviour,
551        }
552        async fn expect_record_update(
553            swarm: &mut Swarm<Behaviour>,
554            expected_peer: PeerId,
555            expected_address: Option<&Multiaddr>,
556        ) {
557            loop {
558                match swarm.next_behaviour_event().await {
559                    BehaviourEvent::PeerStore(Event::PeerAddressAdded {
560                        peer_id,
561                        address,
562                        is_permanent,
563                    }) => {
564                        assert_eq!(peer_id, expected_peer);
565                        assert!(expected_address.is_none_or(|a| *a == address));
566                        assert!(!is_permanent);
567                        break;
568                    }
569                    ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {ev:?}."),
570                    _ => {}
571                }
572            }
573        }
574        fn build_swarm() -> Swarm<Behaviour> {
575            Swarm::new_ephemeral_tokio(|kp| Behaviour {
576                peer_store: crate::Behaviour::new(MemoryStore::new(
577                    crate::memory_store::Config::default()
578                        .set_record_capacity(NonZero::new(4).expect("4 > 0")),
579                )),
580                identify: identify::Behaviour::new(identify::Config::new(
581                    "/TODO/0.0.1".to_string(),
582                    kp.public(),
583                )),
584            })
585        }
586        let mut swarm1 = build_swarm();
587        let mut swarm2 = build_swarm();
588        let rt = tokio::runtime::Runtime::new().unwrap();
589
590        rt.block_on(async {
591            let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
592            let swarm1_peer_id = *swarm1.local_peer_id();
593            let swarm2_peer_id = *swarm2.local_peer_id();
594            swarm2.connect(&mut swarm1).await;
595
596            listen_addr.push(Protocol::P2p(swarm1_peer_id));
597            expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
598
599            assert_eq!(
600                swarm2
601                    .behaviour()
602                    .peer_store
603                    .address_of_peer(&swarm1_peer_id)
604                    .expect("swarm should be connected and record about it should be created")
605                    .collect::<Vec<_>>(),
606                vec![&listen_addr]
607            );
608
609            assert!(matches!(
610                swarm1.next_behaviour_event().await,
611                BehaviourEvent::Identify(identify::Event::Sent { .. })
612            ));
613            assert!(matches!(
614                swarm1.next_behaviour_event().await,
615                BehaviourEvent::Identify(identify::Event::Received { .. })
616            ));
617            let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
618            swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
619            tokio::spawn(swarm1.loop_on_next());
620            // Expecting 3 updates from Identify:
621            // 2 pair of mem and tcp address for two calls to `<Swarm as SwarmExt>::listen()`
622            // with one address already present through direct connection.
623            // FLAKY: tcp addresses are not explicitly marked as external addresses.
624            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
625            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
626            expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
627            // The address in store won't contain peer ID because it is from Identify.
628            let known_listen_addresses = swarm2
629                .behaviour()
630                .peer_store
631                .address_of_peer(&swarm1_peer_id)
632                .expect("peer to exist")
633                .collect::<Vec<_>>();
634            assert!(known_listen_addresses.contains(&&new_listen_addr));
635            assert_eq!(known_listen_addresses.len(), 4)
636        })
637    }
638}