1use std::{
12 collections::{HashMap, VecDeque},
13 num::NonZeroUsize,
14 task::Waker,
15};
16
17use libp2p_core::{Multiaddr, PeerId};
18use libp2p_swarm::FromSwarm;
19use lru::LruCache;
20
21use super::Store;
22
23#[derive(Debug, Clone)]
25pub enum Event {
26 CustomDataUpdated(PeerId),
28}
29
30#[derive(Default)]
33pub struct MemoryStore<T = ()> {
34 records: HashMap<PeerId, PeerRecord<T>>,
36 pending_events: VecDeque<crate::store::Event<Event>>,
38 config: Config,
40 waker: Option<Waker>,
42}
43
44impl<T> MemoryStore<T> {
45 pub fn new(config: Config) -> Self {
47 Self {
48 config,
49 records: HashMap::new(),
50 pending_events: VecDeque::default(),
51 waker: None,
52 }
53 }
54
55 pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
58 let is_updated = self.update_address_silent(peer, address);
59 if is_updated {
60 self.pending_events
61 .push_back(crate::store::Event::RecordUpdated(*peer));
62 if let Some(waker) = self.waker.take() {
63 waker.wake();
64 }
65 }
66 is_updated
67 }
68
69 pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
72 if let Some(record) = self.records.get_mut(peer) {
73 return record.update_address(address);
74 }
75 let mut new_record = PeerRecord::new(self.config.record_capacity);
76 new_record.update_address(address);
77 self.records.insert(*peer, new_record);
78 true
79 }
80
81 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
84 self.records
85 .get_mut(peer)
86 .is_some_and(|r| r.remove_address(address))
87 }
88
89 pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
90 self.records.get(peer).and_then(|r| r.get_custom_data())
91 }
92
93 pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
95 self.records
96 .get_mut(peer)
97 .and_then(|r| r.take_custom_data())
98 }
99
100 pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
102 self.insert_custom_data_silent(peer, custom_data);
103 self.pending_events
104 .push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer)));
105 if let Some(waker) = self.waker.take() {
106 waker.wake();
107 }
108 }
109
110 pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) {
112 if let Some(r) = self.records.get_mut(peer) {
113 return r.insert_custom_data(custom_data);
114 }
115 let mut new_record = PeerRecord::new(self.config.record_capacity);
116 new_record.insert_custom_data(custom_data);
117 self.records.insert(*peer, new_record);
118 }
119
120 pub fn record_iter(&self) -> impl Iterator<Item = (&PeerId, &PeerRecord<T>)> {
122 self.records.iter()
123 }
124
125 pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
128 self.records.iter_mut()
129 }
130}
131
132impl<T> Store for MemoryStore<T> {
133 type FromStore = Event;
134
135 fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
136 match swarm_event {
137 FromSwarm::NewExternalAddrOfPeer(info) => {
138 self.update_address(&info.peer_id, info.addr);
139 }
140 FromSwarm::ConnectionEstablished(info) => {
141 let mut is_record_updated = false;
142 for failed_addr in info.failed_addresses {
143 is_record_updated |= self.remove_address(&info.peer_id, failed_addr);
144 }
145 is_record_updated |=
146 self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address());
147 if is_record_updated {
148 self.pending_events
149 .push_back(crate::store::Event::RecordUpdated(info.peer_id));
150 if let Some(waker) = self.waker.take() {
151 waker.wake(); }
153 }
154 }
155 _ => {}
156 }
157 }
158
159 fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
160 self.records.get(peer).map(|record| record.addresses())
161 }
162
163 fn poll(
164 &mut self,
165 cx: &mut std::task::Context<'_>,
166 ) -> Option<crate::store::Event<Self::FromStore>> {
167 if self.pending_events.is_empty() {
168 self.waker = Some(cx.waker().clone());
169 }
170 self.pending_events.pop_front()
171 }
172}
173
174#[derive(Debug, Clone)]
176pub struct Config {
177 record_capacity: NonZeroUsize,
180}
181
182impl Default for Config {
183 fn default() -> Self {
184 Self {
185 record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
186 }
187 }
188}
189
190impl Config {
191 pub fn record_capacity(&self) -> &NonZeroUsize {
194 &self.record_capacity
195 }
196 pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {
198 self.record_capacity = capacity;
199 self
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct PeerRecord<T> {
206 addresses: LruCache<Multiaddr, ()>,
209 custom_data: Option<T>,
211}
212impl<T> PeerRecord<T> {
213 pub(crate) fn new(cap: NonZeroUsize) -> Self {
214 Self {
215 addresses: LruCache::new(cap),
216 custom_data: None,
217 }
218 }
219
220 pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
223 self.addresses.iter().map(|(addr, _)| addr)
224 }
225
226 pub fn update_address(&mut self, address: &Multiaddr) -> bool {
230 if self.addresses.get(address).is_some() {
231 return false;
232 }
233 self.addresses.get_or_insert(address.clone(), || ());
234 true
235 }
236
237 pub fn remove_address(&mut self, address: &Multiaddr) -> bool {
240 self.addresses.pop(address).is_some()
241 }
242
243 pub fn get_custom_data(&self) -> Option<&T> {
244 self.custom_data.as_ref()
245 }
246
247 pub fn take_custom_data(&mut self) -> Option<T> {
248 self.custom_data.take()
249 }
250
251 pub fn insert_custom_data(&mut self, custom_data: T) {
252 let _ = self.custom_data.insert(custom_data);
253 }
254}
255
256#[cfg(test)]
257mod test {
258 use std::{num::NonZero, str::FromStr};
259
260 use libp2p::Swarm;
261 use libp2p_core::{Multiaddr, PeerId};
262 use libp2p_swarm::{NetworkBehaviour, SwarmEvent};
263 use libp2p_swarm_test::SwarmExt;
264
265 use super::MemoryStore;
266 use crate::Store;
267
268 #[test]
269 fn recent_use_bubble_up() {
270 let mut store: MemoryStore = MemoryStore::new(Default::default());
271 let peer = PeerId::random();
272 let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
273 let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
274 let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
275 store.update_address(&peer, &addr1);
276 store.update_address(&peer, &addr2);
277 store.update_address(&peer, &addr3);
278 assert!(
279 store
280 .records
281 .get(&peer)
282 .expect("peer to be in the store")
283 .addresses()
284 .collect::<Vec<_>>()
285 == vec![&addr3, &addr2, &addr1]
286 );
287 store.update_address(&peer, &addr1);
288 assert!(
289 store
290 .records
291 .get(&peer)
292 .expect("peer to be in the store")
293 .addresses()
294 .collect::<Vec<_>>()
295 == vec![&addr1, &addr3, &addr2]
296 );
297 store.update_address(&peer, &addr3);
298 assert!(
299 store
300 .records
301 .get(&peer)
302 .expect("peer to be in the store")
303 .addresses()
304 .collect::<Vec<_>>()
305 == vec![&addr3, &addr1, &addr2]
306 );
307 }
308
309 #[test]
310 fn bounded_store() {
311 let mut store: MemoryStore = MemoryStore::new(Default::default());
312 let peer = PeerId::random();
313 for i in 1..10 {
314 let addr_string = format!("/ip4/127.0.0.{}", i);
315 store.update_address(
316 &peer,
317 &Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
318 );
319 }
320 let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
321 assert!(!store
322 .addresses_of_peer(&peer)
323 .expect("peer to be in the store")
324 .any(|addr| *addr == first_record));
325 let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
326 assert!(
327 *store
328 .addresses_of_peer(&peer)
329 .expect("peer to be in the store")
330 .last()
331 .expect("addr to exist")
332 == second_record
333 );
334 }
335
336 #[test]
337 fn update_address_on_connect() {
338 async fn expect_record_update(
339 swarm: &mut Swarm<crate::Behaviour<MemoryStore>>,
340 expected_peer: PeerId,
341 ) {
342 swarm
343 .wait(|ev| match ev {
344 SwarmEvent::Behaviour(crate::Event::RecordUpdated { peer }) => {
345 (peer == expected_peer).then_some(())
346 }
347 _ => None,
348 })
349 .await
350 }
351
352 let store1: MemoryStore<()> = MemoryStore::new(
353 crate::memory_store::Config::default()
354 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
355 );
356 let mut swarm1 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store1));
357 let store2: MemoryStore<()> = MemoryStore::new(
358 crate::memory_store::Config::default()
359 .set_record_capacity(NonZero::new(2).expect("2 > 0")),
360 );
361 let mut swarm2 = Swarm::new_ephemeral_tokio(|_| crate::Behaviour::new(store2));
362
363 let rt = tokio::runtime::Runtime::new().unwrap();
364
365 rt.block_on(async {
366 let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
367 let swarm1_peer_id = *swarm1.local_peer_id();
368 swarm2.dial(listen_addr.clone()).expect("dial to succeed");
369 let handle = spawn_wait_conn_established(swarm1);
370 swarm2
371 .wait(|ev| match ev {
372 SwarmEvent::ConnectionEstablished { .. } => Some(()),
373 _ => None,
374 })
375 .await;
376 let mut swarm1 = handle.await.expect("future to complete");
377 assert!(swarm2
378 .behaviour()
379 .address_of_peer(&swarm1_peer_id)
380 .expect("swarm should be connected and record about it should be created")
381 .any(|addr| *addr == listen_addr));
382 expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await;
383 let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
384 let handle = spawn_wait_conn_established(swarm1);
385 swarm2
386 .dial(
387 libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id)
388 .condition(libp2p_swarm::dial_opts::PeerCondition::Always)
389 .addresses(vec![new_listen_addr.clone()])
390 .build(),
391 )
392 .expect("dial to succeed");
393 swarm2
394 .wait(|ev| match ev {
395 SwarmEvent::ConnectionEstablished { .. } => Some(()),
396 _ => None,
397 })
398 .await;
399 handle.await.expect("future to complete");
400 expect_record_update(&mut swarm2, swarm1_peer_id).await;
401 let new_listen_addr = new_listen_addr
403 .with_p2p(swarm1_peer_id)
404 .expect("extend to succeed");
405 assert!(
406 swarm2
407 .behaviour()
408 .address_of_peer(&swarm1_peer_id)
409 .expect("peer to exist")
410 .collect::<Vec<_>>()
411 == vec![&new_listen_addr, &listen_addr]
412 );
413 })
414 }
415
416 #[test]
417 fn identify_external_addr_report() {
418 #[derive(NetworkBehaviour)]
419 struct Behaviour {
420 peer_store: crate::Behaviour<MemoryStore>,
421 identify: libp2p::identify::Behaviour,
422 }
423 async fn expect_record_update(swarm: &mut Swarm<Behaviour>, expected_peer: PeerId) {
424 swarm
425 .wait(|ev| match ev {
426 SwarmEvent::Behaviour(BehaviourEvent::PeerStore(
427 crate::Event::RecordUpdated { peer },
428 )) => (peer == expected_peer).then_some(()),
429 _ => None,
430 })
431 .await
432 }
433 fn build_swarm() -> Swarm<Behaviour> {
434 Swarm::new_ephemeral_tokio(|kp| Behaviour {
435 peer_store: crate::Behaviour::new(MemoryStore::new(
436 crate::memory_store::Config::default()
437 .set_record_capacity(NonZero::new(4).expect("4 > 0")),
438 )),
439 identify: libp2p::identify::Behaviour::new(libp2p::identify::Config::new(
440 "/TODO/0.0.1".to_string(),
441 kp.public(),
442 )),
443 })
444 }
445 let mut swarm1 = build_swarm();
446 let mut swarm2 = build_swarm();
447 let rt = tokio::runtime::Runtime::new().unwrap();
448
449 rt.block_on(async {
450 let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
451 let swarm1_peer_id = *swarm1.local_peer_id();
452 let swarm2_peer_id = *swarm2.local_peer_id();
453 swarm2.dial(listen_addr.clone()).expect("dial to succeed");
454 let handle = spawn_wait_conn_established(swarm1);
455 let mut swarm2 = spawn_wait_conn_established(swarm2)
456 .await
457 .expect("future to complete");
458 let mut swarm1 = handle.await.expect("future to complete");
459 expect_record_update(&mut swarm2, swarm1_peer_id).await;
461 assert!(swarm2
462 .behaviour()
463 .peer_store
464 .address_of_peer(&swarm1_peer_id)
465 .expect("swarm should be connected and record about it should be created")
466 .any(|addr| *addr == listen_addr));
467 expect_record_update(&mut swarm1, *swarm2.local_peer_id()).await;
468 swarm1.next_swarm_event().await; swarm1.next_swarm_event().await; let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
471 swarm1.behaviour_mut().identify.push([swarm2_peer_id]);
472 tokio::spawn(swarm1.loop_on_next());
473 expect_record_update(&mut swarm2, swarm1_peer_id).await;
478 expect_record_update(&mut swarm2, swarm1_peer_id).await;
479 expect_record_update(&mut swarm2, swarm1_peer_id).await;
480 assert!(swarm2
482 .behaviour()
483 .peer_store
484 .address_of_peer(&swarm1_peer_id)
485 .expect("peer to exist")
486 .any(|addr| *addr == new_listen_addr));
487 })
488 }
489
490 fn spawn_wait_conn_established<T>(mut swarm: Swarm<T>) -> tokio::task::JoinHandle<Swarm<T>>
491 where
492 T: NetworkBehaviour + Send + Sync,
493 Swarm<T>: SwarmExt,
494 {
495 tokio::spawn(async move {
496 swarm
497 .wait(|ev| match ev {
498 SwarmEvent::ConnectionEstablished { .. } => Some(()),
499 _ => None,
500 })
501 .await;
502 swarm
503 })
504 }
505}