1use std::{
12 collections::{HashMap, VecDeque},
13 num::NonZeroUsize,
14 task::{Poll, Waker},
15};
16
17use libp2p_core::{Multiaddr, PeerId};
18use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
19use lru::LruCache;
20
21use super::Store;
22
23#[derive(Debug, Clone)]
25pub enum Event {
26 PeerAddressAdded {
28 peer_id: PeerId,
30 address: Multiaddr,
32 is_permanent: bool,
40 },
41 PeerAddressRemoved {
43 peer_id: PeerId,
45 address: Multiaddr,
47 },
48}
49
50#[derive(Default)]
53pub struct MemoryStore<T = ()> {
54 records: HashMap<PeerId, PeerRecord<T>>,
56 pending_events: VecDeque<Event>,
58 config: Config,
60 waker: Option<Waker>,
62}
63
64impl<T> MemoryStore<T> {
65 pub fn new(config: Config) -> Self {
67 Self {
68 config,
69 records: HashMap::new(),
70 pending_events: VecDeque::default(),
71 waker: None,
72 }
73 }
74
75 pub fn add_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
84 self.add_address_inner(peer, address, true)
85 }
86
87 fn add_address_inner(
91 &mut self,
92 peer: &PeerId,
93 address: &Multiaddr,
94 is_permanent: bool,
95 ) -> bool {
96 let record = self
97 .records
98 .entry(*peer)
99 .or_insert(PeerRecord::new(self.config.record_capacity));
100 let is_new = record.add_address(address, is_permanent);
101 if is_new {
102 self.push_event_and_wake(Event::PeerAddressAdded {
103 peer_id: *peer,
104 address: address.clone(),
105 is_permanent,
106 });
107 }
108 is_new
109 }
110
111 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
115 self.remove_address_inner(peer, address, true)
116 }
117
118 fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
123 if let Some(record) = self.records.get_mut(peer) {
124 if record.remove_address(address, force) {
125 if record.addresses.is_empty() && record.custom_data.is_none() {
126 self.records.remove(peer);
127 }
128 self.push_event_and_wake(Event::PeerAddressRemoved {
129 peer_id: *peer,
130 address: address.clone(),
131 });
132 return true;
133 }
134 }
135 false
136 }
137
138 pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
140 self.records.get(peer).and_then(|r| r.get_custom_data())
141 }
142
143 pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
145 if let Some(record) = self.records.get_mut(peer) {
146 let data = record.take_custom_data();
147 if record.addresses.is_empty() {
148 self.records.remove(peer);
149 }
150 return data;
151 }
152 None
153 }
154
155 pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
157 if let Some(r) = self.records.get_mut(peer) {
158 return r.insert_custom_data(custom_data);
159 }
160 let mut new_record = PeerRecord::new(self.config.record_capacity);
161 new_record.insert_custom_data(custom_data);
162 self.records.insert(*peer, new_record);
163 }
164
165 pub fn get_custom_data_mut(&mut self, peer: &PeerId) -> Option<&mut T> {
167 self.records
168 .get_mut(peer)
169 .and_then(|r| r.get_custom_data_mut())
170 }
171
172 pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
174 self.records.iter()
175 }
176
177 pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
181 self.records.iter_mut()
182 }
183
184 fn push_event_and_wake(&mut self, event: Event) {
185 self.pending_events.push_back(event);
186 if let Some(waker) = self.waker.take() {
187 waker.wake(); }
189 }
190}
191
192impl<T> Store for MemoryStore<T> {
193 type Event = Event;
194
195 fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
196 match swarm_event {
197 FromSwarm::NewExternalAddrOfPeer(info) => {
198 self.add_address_inner(&info.peer_id, info.addr, false);
199 }
200 FromSwarm::ConnectionEstablished(ConnectionEstablished {
201 peer_id,
202 failed_addresses,
203 endpoint,
204 ..
205 }) if endpoint.is_dialer() => {
206 if self.config.remove_addr_on_dial_error {
207 for failed_addr in *failed_addresses {
208 self.remove_address_inner(peer_id, failed_addr, false);
209 }
210 }
211 self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
212 }
213 FromSwarm::DialFailure(info) => {
214 if !self.config.remove_addr_on_dial_error {
215 return;
216 }
217
218 let Some(peer) = info.peer_id else {
219 return;
221 };
222
223 match info.error {
224 DialError::WrongPeerId { obtained, address } => {
225 if self.remove_address_inner(&peer, address, false) {
227 self.add_address_inner(obtained, address, false);
228 }
229 }
230 DialError::Transport(errors) => {
231 for (addr, _) in errors {
232 self.remove_address_inner(&peer, addr, false);
233 }
234 }
235 _ => {}
236 }
237 }
238 _ => {}
239 }
240 }
241
242 fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
243 self.records.get(peer).map(|record| record.addresses())
244 }
245
246 fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
247 match self.pending_events.pop_front() {
248 Some(ev) => Poll::Ready(ev),
249 None => {
250 self.waker = Some(cx.waker().clone());
251 Poll::Pending
252 }
253 }
254 }
255}
256
257#[derive(Debug, Clone)]
259pub struct Config {
260 record_capacity: NonZeroUsize,
261 remove_addr_on_dial_error: bool,
262}
263
264impl Default for Config {
265 fn default() -> Self {
266 Self {
267 record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
268 remove_addr_on_dial_error: true,
269 }
270 }
271}
272
273impl Config {
274 pub fn record_capacity(&self) -> &NonZeroUsize {
275 &self.record_capacity
276 }
277 pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
283 self.record_capacity = capacity;
284 self
285 }
286 pub fn is_remove_addr_on_dial_error(&self) -> bool {
287 self.remove_addr_on_dial_error
288 }
289 pub fn set_remove_addr_on_dial_error(mut self, value: bool) -> Self {
302 self.remove_addr_on_dial_error = value;
303 self
304 }
305}
306
307#[derive(Debug, Clone)]
309pub struct PeerRecord<T> {
310 addresses: LruCache<Multiaddr, bool>,
314 custom_data: Option<T>,
316}
317impl<T> PeerRecord<T> {
318 pub(crate) fn new(cap: NonZeroUsize) -> Self {
319 Self {
320 addresses: LruCache::new(cap),
321 custom_data: None,
322 }
323 }
324
325 pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
328 self.addresses.iter().map(|(addr, _)| addr)
329 }
330
331 pub fn add_address(&mut self, address: &Multiaddr, is_permanent: bool) -> bool {
336 if let Some(was_permanent) = self.addresses.get(address) {
337 if !*was_permanent && is_permanent {
338 self.addresses
339 .get_or_insert(address.clone(), || is_permanent);
340 }
341 return false;
342 }
343 self.addresses
344 .get_or_insert(address.clone(), || is_permanent);
345 true
346 }
347
348 pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool {
353 if !force && self.addresses.peek(address) == Some(&true) {
354 return false;
355 }
356 self.addresses.pop(address).is_some()
357 }
358
359 pub fn get_custom_data(&self) -> Option<&T> {
360 self.custom_data.as_ref()
361 }
362
363 pub fn get_custom_data_mut(&mut self) -> Option<&mut T> {
364 self.custom_data.as_mut()
365 }
366
367 pub fn take_custom_data(&mut self) -> Option<T> {
368 self.custom_data.take()
369 }
370
371 pub fn insert_custom_data(&mut self, custom_data: T) {
372 let _ = self.custom_data.insert(custom_data);
373 }
374
375 pub fn is_empty(&self) -> bool {
376 self.addresses.is_empty() && self.custom_data.is_none()
377 }
378}
379
380#[cfg(test)]
381mod test {
382 use std::{num::NonZero, str::FromStr};
383
384 use libp2p::identify;
385 use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId};
386 use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
387 use libp2p_swarm_test::SwarmExt;
388
389 use super::{Event, MemoryStore};
390 use crate::Store;
391
392 #[test]
393 fn recent_use_bubble_up() {
394 let mut store: MemoryStore = MemoryStore::new(Default::default());
395 let peer = PeerId::random();
396 let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
397 let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
398 let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
399 store.add_address(&peer, &addr1);
400 store.add_address(&peer, &addr2);
401 store.add_address(&peer, &addr3);
402 assert!(
403 store
404 .records
405 .get(&peer)
406 .expect("peer to be in the store")
407 .addresses()
408 .collect::<Vec<_>>()
409 == vec![&addr3, &addr2, &addr1]
410 );
411 store.add_address(&peer, &addr1);
412 assert!(
413 store
414 .records
415 .get(&peer)
416 .expect("peer to be in the store")
417 .addresses()
418 .collect::<Vec<_>>()
419 == vec![&addr1, &addr3, &addr2]
420 );
421 store.add_address(&peer, &addr3);
422 assert!(
423 store
424 .records
425 .get(&peer)
426 .expect("peer to be in the store")
427 .addresses()
428 .collect::<Vec<_>>()
429 == vec![&addr3, &addr1, &addr2]
430 );
431 }
432
433 #[test]
434 fn bounded_store() {
435 let mut store: MemoryStore = MemoryStore::new(Default::default());
436 let peer = PeerId::random();
437 for i in 1..10 {
438 let addr_string = format!("/ip4/127.0.0.{i}");
439 store.add_address(
440 &peer,
441 &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
442 );
443 }
444 let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
445 assert!(!store
446 .addresses_of_peer(&peer)
447 .expect("peer to be in the store")
448 .any(|addr| *addr == first_record));
449 let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
450 assert!(
451 *store
452 .addresses_of_peer(&peer)
453 .expect("peer to be in the store")
454 .last()
455 .expect("addr to exist")
456 == second_record
457 );
458 }
459
460 #[test]
461 fn update_address_on_connect() {
462 async fn expect_record_update(
463 swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
464 expected_peer: PeerId,
465 expected_address: Option<&Multiaddr>,
466 ) {
467 match swarm.next_behaviour_event().await {
468 Event::PeerAddressAdded {
469 peer_id,
470 address,
471 is_permanent,
472 } => {
473 assert_eq!(peer_id, expected_peer);
474 assert!(expected_address.is_none_or(|a| *a == address));
475 assert!(!is_permanent)
476 }
477 ev => panic!("Unexpected event {ev:?}."),
478 }
479 }
480
481 let store1: MemoryStore<()> = MemoryStore::new(
482 crate::memory_store::Config::default()
483 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
484 );
485 let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
486 let store2: MemoryStore<()> = MemoryStore::new(
487 crate::memory_store::Config::default()
488 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
489 );
490 let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
491
492 let rt = tokio::runtime::Runtime::new().unwrap();
493
494 rt.block_on(async {
495 let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
496 let swarm1_peer_id = *swarm1.local_peer_id();
497 let swarm2_peer_id = *swarm2.local_peer_id();
498 swarm2.connect(&mut swarm1).await;
499
500 listen_addr.push(Protocol::P2p(swarm1_peer_id));
501 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
502 assert!(swarm2
503 .behaviour()
504 .address_of_peer(&swarm1_peer_id)
505 .expect("swarm should be connected and record about it should be created")
506 .any(|addr| *addr == listen_addr));
507 assert!(swarm1
509 .behaviour()
510 .address_of_peer(&swarm2_peer_id)
511 .is_none());
512 let (mut new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
513 tokio::spawn(swarm1.loop_on_next());
514 swarm2
515 .dial(
516 libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
517 .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
518 .addresses(vec![new_listen_addr.clone()])
519 .build(),
520 )
521 .expect("dial to succeed");
522 swarm2
523 .wait(|ev| match ev {
524 SwarmEvent::ConnectionEstablished { .. } => Some(()),
525 _ => None,
526 })
527 .await;
528 new_listen_addr.push(Protocol::P2p(swarm1_peer_id));
529 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await;
530 let new_listen_addr = new_listen_addr
532 .with_p2p(swarm1_peer_id)
533 .expect("extend to succeed");
534 assert_eq!(
535 swarm2
536 .behaviour()
537 .address_of_peer(&swarm1_peer_id)
538 .expect("peer to exist")
539 .collect::<Vec<_>>(),
540 vec![&new_listen_addr, &listen_addr]
541 );
542 })
543 }
544
545 #[test]
546 fn identify_external_addr_report() {
547 #[derive(NetworkBehaviour)]
548 struct Behaviour {
549 peer_store: crate::Behaviour<MemoryStore>,
550 identify: identify::Behaviour,
551 }
552 async fn expect_record_update(
553 swarm: &mut Swarm<Behaviour>,
554 expected_peer: PeerId,
555 expected_address: Option<&Multiaddr>,
556 ) {
557 loop {
558 match swarm.next_behaviour_event().await {
559 BehaviourEvent::PeerStore(Event::PeerAddressAdded {
560 peer_id,
561 address,
562 is_permanent,
563 }) => {
564 assert_eq!(peer_id, expected_peer);
565 assert!(expected_address.is_none_or(|a| *a == address));
566 assert!(!is_permanent);
567 break;
568 }
569 ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {ev:?}."),
570 _ => {}
571 }
572 }
573 }
574 fn build_swarm() -> Swarm<Behaviour> {
575 Swarm::new_ephemeral_tokio(|kp| Behaviour {
576 peer_store: crate::Behaviour::new(MemoryStore::new(
577 crate::memory_store::Config::default()
578 .set_record_capacity(NonZero::new(4).expect("4 > 0")),
579 )),
580 identify: identify::Behaviour::new(identify::Config::new(
581 "/TODO/0.0.1".to_string(),
582 kp.public(),
583 )),
584 })
585 }
586 let mut swarm1 = build_swarm();
587 let mut swarm2 = build_swarm();
588 let rt = tokio::runtime::Runtime::new().unwrap();
589
590 rt.block_on(async {
591 let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
592 let swarm1_peer_id = *swarm1.local_peer_id();
593 let swarm2_peer_id = *swarm2.local_peer_id();
594 swarm2.connect(&mut swarm1).await;
595
596 listen_addr.push(Protocol::P2p(swarm1_peer_id));
597 expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await;
598
599 assert_eq!(
600 swarm2
601 .behaviour()
602 .peer_store
603 .address_of_peer(&swarm1_peer_id)
604 .expect("swarm should be connected and record about it should be created")
605 .collect::<Vec<_>>(),
606 vec![&listen_addr]
607 );
608
609 assert!(matches!(
610 swarm1.next_behaviour_event().await,
611 BehaviourEvent::Identify(identify::Event::Sent { .. })
612 ));
613 assert!(matches!(
614 swarm1.next_behaviour_event().await,
615 BehaviourEvent::Identify(identify::Event::Received { .. })
616 ));
617 let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
618 swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
619 tokio::spawn(swarm1.loop_on_next());
620 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
625 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
626 expect_record_update(&mut swarm2, swarm1_peer_id, None).await;
627 let known_listen_addresses = swarm2
629 .behaviour()
630 .peer_store
631 .address_of_peer(&swarm1_peer_id)
632 .expect("peer to exist")
633 .collect::<Vec<_>>();
634 assert!(known_listen_addresses.contains(&&new_listen_addr));
635 assert_eq!(known_listen_addresses.len(), 4)
636 })
637 }
638}