1use std::{
12 collections::VecDeque,
13 num::NonZeroUsize,
14 task::{Poll, Waker},
15};
16
17use hashlink::LruCache;
18use libp2p_core::{Multiaddr, PeerId};
19use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
20
21use super::Store;
22
23#[derive(Debug, Clone)]
25pub enum Event {
26 PeerAddressAdded {
28 peer_id: PeerId,
30 address: Multiaddr,
32 is_permanent: bool,
40 },
41 PeerAddressRemoved {
43 peer_id: PeerId,
45 address: Multiaddr,
47 },
48}
49
50pub struct MemoryStore<T = ()> {
53 records: LruCache<PeerId, PeerRecord<T>>,
55 pending_events: VecDeque<Event>,
57 config: Config,
59 waker: Option<Waker>,
61}
62
63impl<T> MemoryStore<T> {
64 pub fn new(config: Config) -> Self {
66 Self {
67 records: LruCache::new(config.peer_capacity().get()),
68 config,
69 pending_events: VecDeque::default(),
70 waker: None,
71 }
72 }
73
74 pub fn add_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
83 self.add_address_inner(peer, address, true)
84 }
85
86 fn add_address_inner(
90 &mut self,
91 peer: &PeerId,
92 address: &Multiaddr,
93 is_permanent: bool,
94 ) -> bool {
95 let record = self
96 .records
97 .entry(*peer)
98 .or_insert_with(|| PeerRecord::new(self.config.record_capacity));
99 let is_new = record.add_address(address, is_permanent);
100 if is_new {
101 self.push_event_and_wake(Event::PeerAddressAdded {
102 peer_id: *peer,
103 address: address.clone(),
104 is_permanent,
105 });
106 }
107 is_new
108 }
109
110 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
114 self.remove_address_inner(peer, address, true)
115 }
116
117 fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
122 if let Some(record) = self.records.get_mut(peer) {
123 if record.remove_address(address, force) {
124 if record.addresses.is_empty() && record.custom_data.is_none() {
125 self.records.remove(peer);
126 }
127 self.push_event_and_wake(Event::PeerAddressRemoved {
128 peer_id: *peer,
129 address: address.clone(),
130 });
131 return true;
132 }
133 }
134 false
135 }
136
137 pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
139 self.records.peek(peer).and_then(|r| r.get_custom_data())
140 }
141
142 pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
144 if let Some(record) = self.records.get_mut(peer) {
145 let data = record.take_custom_data();
146 if record.addresses.is_empty() {
147 self.records.remove(peer);
148 }
149 return data;
150 }
151 None
152 }
153
154 pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
156 if let Some(r) = self.records.get_mut(peer) {
157 return r.insert_custom_data(custom_data);
158 }
159 let mut new_record = PeerRecord::new(self.config.record_capacity);
160 new_record.insert_custom_data(custom_data);
161 self.records.insert(*peer, new_record);
162 }
163
164 pub fn get_custom_data_mut(&mut self, peer: &PeerId) -> Option<&mut T> {
166 self.records
167 .get_mut(peer)
168 .and_then(|r| r.get_custom_data_mut())
169 }
170
171 pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
173 self.records.iter()
174 }
175
176 pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
180 self.records.iter_mut()
181 }
182
183 fn push_event_and_wake(&mut self, event: Event) {
184 self.pending_events.push_back(event);
185 if let Some(waker) = self.waker.take() {
186 waker.wake(); }
188 }
189}
190
191impl<T> Store for MemoryStore<T> {
192 type Event = Event;
193
194 fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
195 match swarm_event {
196 FromSwarm::NewExternalAddrOfPeer(info) => {
197 self.add_address_inner(&info.peer_id, info.addr, false);
198 }
199 FromSwarm::ConnectionEstablished(ConnectionEstablished {
200 peer_id,
201 failed_addresses,
202 endpoint,
203 ..
204 }) if endpoint.is_dialer() => {
205 if self.config.remove_addr_on_dial_error {
206 for failed_addr in *failed_addresses {
207 self.remove_address_inner(peer_id, failed_addr, false);
208 }
209 }
210 self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
211 }
212 FromSwarm::DialFailure(info) => {
213 if !self.config.remove_addr_on_dial_error {
214 return;
215 }
216
217 let Some(peer) = info.peer_id else {
218 return;
220 };
221
222 match info.error {
223 DialError::WrongPeerId { obtained, address } => {
224 if self.remove_address_inner(&peer, address, false) {
226 self.add_address_inner(obtained, address, false);
227 }
228 }
229 DialError::Transport(errors) => {
230 for (addr, _) in errors {
231 self.remove_address_inner(&peer, addr, false);
232 }
233 }
234 _ => {}
235 }
236 }
237 _ => {}
238 }
239 }
240
241 fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
242 self.records.peek(peer).map(|record| record.addresses())
243 }
244
245 fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
246 match self.pending_events.pop_front() {
247 Some(ev) => Poll::Ready(ev),
248 None => {
249 self.waker = Some(cx.waker().clone());
250 Poll::Pending
251 }
252 }
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct Config {
259 peer_capacity: NonZeroUsize,
260 record_capacity: NonZeroUsize,
261 remove_addr_on_dial_error: bool,
262}
263
264impl Default for Config {
265 fn default() -> Self {
266 Self {
267 peer_capacity: NonZeroUsize::try_from(1000).expect("1000 > 0"),
268 record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
269 remove_addr_on_dial_error: true,
270 }
271 }
272}
273
274impl Config {
275 pub fn peer_capacity(&self) -> &NonZeroUsize {
276 &self.peer_capacity
277 }
278 pub fn set_peer_capacity(mut self, capacity: NonZeroUsize) -> Self {
284 self.peer_capacity = capacity;
285 self
286 }
287 pub fn record_capacity(&self) -> &NonZeroUsize {
288 &self.record_capacity
289 }
290 pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
296 self.record_capacity = capacity;
297 self
298 }
299 pub fn is_remove_addr_on_dial_error(&self) -> bool {
300 self.remove_addr_on_dial_error
301 }
302 pub fn set_remove_addr_on_dial_error(mut self, value: bool) -> Self {
315 self.remove_addr_on_dial_error = value;
316 self
317 }
318}
319
320#[derive(Debug, Clone)]
322pub struct PeerRecord<T> {
323 addresses: LruCache<Multiaddr, bool>,
327 custom_data: Option<T>,
329}
330impl<T> PeerRecord<T> {
331 pub(crate) fn new(cap: NonZeroUsize) -> Self {
332 Self {
333 addresses: LruCache::new(cap.get()),
334 custom_data: None,
335 }
336 }
337
338 pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
341 self.addresses.iter().rev().map(|(addr, _)| addr)
342 }
343
344 pub fn add_address(&mut self, address: &Multiaddr, is_permanent: bool) -> bool {
349 if let Some(was_permanent) = self.addresses.get(address) {
350 if !*was_permanent && is_permanent {
351 self.addresses.insert(address.clone(), is_permanent);
352 }
353 false
354 } else {
355 self.addresses.insert(address.clone(), is_permanent);
356 true
357 }
358 }
359
360 pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool {
365 if !force && self.addresses.peek(address) == Some(&true) {
366 return false;
367 }
368 self.addresses.remove(address).is_some()
369 }
370
371 pub fn get_custom_data(&self) -> Option<&T> {
372 self.custom_data.as_ref()
373 }
374
375 pub fn get_custom_data_mut(&mut self) -> Option<&mut T> {
376 self.custom_data.as_mut()
377 }
378
379 pub fn take_custom_data(&mut self) -> Option<T> {
380 self.custom_data.take()
381 }
382
383 pub fn insert_custom_data(&mut self, custom_data: T) {
384 let _ = self.custom_data.insert(custom_data);
385 }
386
387 pub fn is_empty(&self) -> bool {
388 self.addresses.is_empty() && self.custom_data.is_none()
389 }
390}
391
392#[cfg(test)]
393mod test {
394 use std::{num::NonZero, str::FromStr};
395
396 use libp2p::identify;
397 use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId};
398 use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
399 use libp2p_swarm_test::SwarmExt;
400
401 use super::{Event, MemoryStore};
402 use crate::Store;
403
404 #[test]
405 fn recent_use_bubble_up() {
406 let mut store: MemoryStore = MemoryStore::new(Default::default());
407 let peer = PeerId::random();
408 let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
409 let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
410 let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
411 store.add_address(&peer, &addr1);
412 store.add_address(&peer, &addr2);
413 store.add_address(&peer, &addr3);
414 assert_eq!(
415 store
416 .records
417 .get(&peer)
418 .expect("peer to be in the store")
419 .addresses()
420 .collect::<Vec<_>>(),
421 vec![&addr3, &addr2, &addr1]
422 );
423 store.add_address(&peer, &addr1);
424 assert!(
425 store
426 .records
427 .get(&peer)
428 .expect("peer to be in the store")
429 .addresses()
430 .collect::<Vec<_>>()
431 == vec![&addr1, &addr3, &addr2]
432 );
433 store.add_address(&peer, &addr3);
434 assert!(
435 store
436 .records
437 .get(&peer)
438 .expect("peer to be in the store")
439 .addresses()
440 .collect::<Vec<_>>()
441 == vec![&addr3, &addr1, &addr2]
442 );
443 }
444
445 #[test]
446 fn bounded_store() {
447 let mut store: MemoryStore = MemoryStore::new(Default::default());
448 let peer = PeerId::random();
449 for i in 1..10 {
450 let addr_string = format!("/ip4/127.0.0.{i}");
451 store.add_address(
452 &peer,
453 &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
454 );
455 }
456 let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
457 assert!(!store
458 .addresses_of_peer(&peer)
459 .expect("peer to be in the store")
460 .any(|addr| *addr == first_record));
461 let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
462 assert!(
463 *store
464 .addresses_of_peer(&peer)
465 .expect("peer to be in the store")
466 .last()
467 .expect("addr to exist")
468 == second_record
469 );
470 }
471
472 #[test]
473 fn update_address_on_connect() {
474 async fn expect_record_update(
475 swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
476 expected_peer: PeerId,
477 expected_address: Option<&Multiaddr>,
478 ) {
479 match swarm.next_behaviour_event().await {
480 Event::PeerAddressAdded {
481 peer_id,
482 address,
483 is_permanent,
484 } => {
485 assert_eq!(peer_id, expected_peer);
486 assert!(expected_address.is_none_or(|a| *a == address));
487 assert!(!is_permanent)
488 }
489 ev => panic!("Unexpected event {ev:?}."),
490 }
491 }
492
493 let store1: MemoryStore<()> = MemoryStore::new(
494 crate::memory_store::Config::default()
495 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
496 );
497 let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
498 let store2: MemoryStore<()> = MemoryStore::new(
499 crate::memory_store::Config::default()
500 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
501 );
502 let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
503
504 let rt = tokio::runtime::Runtime::new().unwrap();
505
506 rt.block_on(async {
507 let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
508 let swarm1_peer_id = *swarm1.local_peer_id();
509 let swarm2_peer_id = *swarm2.local_peer_id();
510 swarm2.connect(&mut swarm1).await;
511
512 listen_addr.push(Protocol::P2p(swarm1_peer_id));
513 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
514 assert!(swarm2
515 .behaviour()
516 .address_of_peer(&swarm1_peer_id)
517 .expect("swarm should be connected and record about it should be created")
518 .any(|addr| *addr == listen_addr));
519 assert!(swarm1
521 .behaviour()
522 .address_of_peer(&swarm2_peer_id)
523 .is_none());
524 let (mut new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
525 tokio::spawn(swarm1.loop_on_next());
526 swarm2
527 .dial(
528 libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
529 .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
530 .addresses(vec![new_listen_addr.clone()])
531 .build(),
532 )
533 .expect("dial to succeed");
534 swarm2
535 .wait(|ev| match ev {
536 SwarmEvent::ConnectionEstablished { .. } => Some(()),
537 _ => None,
538 })
539 .await;
540 new_listen_addr.push(Protocol::P2p(swarm1_peer_id));
541 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await;
542 let new_listen_addr = new_listen_addr
544 .with_p2p(swarm1_peer_id)
545 .expect("extend to succeed");
546 assert_eq!(
547 swarm2
548 .behaviour()
549 .address_of_peer(&swarm1_peer_id)
550 .expect("peer to exist")
551 .collect::<Vec<_>>(),
552 vec![&new_listen_addr, &listen_addr]
553 );
554 })
555 }
556
557 #[test]
558 fn identify_external_addr_report() {
559 #[derive(NetworkBehaviour)]
560 struct Behaviour {
561 peer_store: crate::Behaviour<MemoryStore>,
562 identify: identify::Behaviour,
563 }
564 async fn expect_record_update(
565 swarm: &mut Swarm<Behaviour>,
566 expected_peer: PeerId,
567 expected_address: Option<&Multiaddr>,
568 ) {
569 loop {
570 match swarm.next_behaviour_event().await {
571 BehaviourEvent::PeerStore(Event::PeerAddressAdded {
572 peer_id,
573 address,
574 is_permanent,
575 }) => {
576 assert_eq!(peer_id, expected_peer);
577 assert!(expected_address.is_none_or(|a| *a == address));
578 assert!(!is_permanent);
579 break;
580 }
581 ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {ev:?}."),
582 _ => {}
583 }
584 }
585 }
586 fn build_swarm() -> Swarm<Behaviour> {
587 Swarm::new_ephemeral_tokio(|kp| Behaviour {
588 peer_store: crate::Behaviour::new(MemoryStore::new(
589 crate::memory_store::Config::default()
590 .set_record_capacity(NonZero::new(4).expect("4 > 0")),
591 )),
592 identify: identify::Behaviour::new(identify::Config::new(
593 "/TODO/0.0.1".to_string(),
594 kp.public(),
595 )),
596 })
597 }
598 let mut swarm1 = build_swarm();
599 let mut swarm2 = build_swarm();
600 let rt = tokio::runtime::Runtime::new().unwrap();
601
602 rt.block_on(async {
603 let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
604 let swarm1_peer_id = *swarm1.local_peer_id();
605 let swarm2_peer_id = *swarm2.local_peer_id();
606 swarm2.connect(&mut swarm1).await;
607
608 listen_addr.push(Protocol::P2p(swarm1_peer_id));
609 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
610
611 assert_eq!(
612 swarm2
613 .behaviour()
614 .peer_store
615 .address_of_peer(&swarm1_peer_id)
616 .expect("swarm should be connected and record about it should be created")
617 .collect::<Vec<_>>(),
618 vec![&listen_addr]
619 );
620
621 assert!(matches!(
622 swarm1.next_behaviour_event().await,
623 BehaviourEvent::Identify(identify::Event::Sent { .. })
624 ));
625 assert!(matches!(
626 swarm1.next_behaviour_event().await,
627 BehaviourEvent::Identify(identify::Event::Received { .. })
628 ));
629 let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
630 swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
631 tokio::spawn(swarm1.loop_on_next());
632 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
637 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
638 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
639 let known_listen_addresses = swarm2
641 .behaviour()
642 .peer_store
643 .address_of_peer(&swarm1_peer_id)
644 .expect("peer to exist")
645 .collect::<Vec<_>>();
646 assert!(known_listen_addresses.contains(&&new_listen_addr));
647 assert_eq!(known_listen_addresses.len(), 4)
648 })
649 }
650}