libp2p_swarm/behaviour/
peer_addresses.rs

1use std::num::NonZeroUsize;
2
3use libp2p_core::Multiaddr;
4use libp2p_identity::PeerId;
5use lru::LruCache;
6
7use crate::{behaviour::FromSwarm, DialError, DialFailure, NewExternalAddrOfPeer};
8
9/// Struct for tracking peers' external addresses of the [`Swarm`](crate::Swarm).
10#[derive(Debug)]
11pub struct PeerAddresses(LruCache<PeerId, LruCache<Multiaddr, ()>>);
12
13impl PeerAddresses {
14    /// Creates a [`PeerAddresses`] cache with capacity for the given number of peers.
15    ///
16    /// For each peer, we will at most store 10 addresses.
17    pub fn new(number_of_peers: NonZeroUsize) -> Self {
18        Self(LruCache::new(number_of_peers))
19    }
20
21    /// Feed a [`FromSwarm`] event to this struct.
22    ///
23    /// Returns whether the event changed peer's known external addresses.
24    pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool {
25        match event {
26            FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => {
27                self.add(*peer_id, (*addr).clone())
28            }
29            FromSwarm::DialFailure(DialFailure {
30                peer_id: Some(peer_id),
31                error: DialError::Transport(errors),
32                ..
33            }) => {
34                for (addr, _error) in errors {
35                    self.remove(peer_id, addr);
36                }
37                true
38            }
39            _ => false,
40        }
41    }
42
43    /// Adds address to cache.
44    /// Appends address to the existing set if peer addresses already exist.
45    /// Creates a new cache entry for peer_id if no addresses are present.
46    /// Returns true if the newly added address was not previously in the cache.
47    pub fn add(&mut self, peer: PeerId, address: Multiaddr) -> bool {
48        match prepare_addr(&peer, &address) {
49            Ok(address) => {
50                if let Some(cached) = self.0.get_mut(&peer) {
51                    cached.put(address, ()).is_none()
52                } else {
53                    let mut set = LruCache::new(NonZeroUsize::new(10).expect("10 > 0"));
54                    set.put(address, ());
55                    self.0.put(peer, set);
56
57                    true
58                }
59            }
60            Err(_) => false,
61        }
62    }
63
64    /// Returns peer's external addresses.
65    pub fn get(&mut self, peer: &PeerId) -> impl Iterator<Item = Multiaddr> + '_ {
66        self.0
67            .get(peer)
68            .into_iter()
69            .flat_map(|c| c.iter().map(|(m, ())| m))
70            .cloned()
71    }
72
73    /// Removes address from peer addresses cache.
74    /// Returns true if the address was removed.
75    pub fn remove(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
76        match self.0.get_mut(peer) {
77            Some(addrs) => match prepare_addr(peer, address) {
78                Ok(address) => addrs.pop(&address).is_some(),
79                Err(_) => false,
80            },
81            None => false,
82        }
83    }
84}
85
86fn prepare_addr(peer: &PeerId, addr: &Multiaddr) -> Result<Multiaddr, Multiaddr> {
87    addr.clone().with_p2p(*peer)
88}
89
90impl Default for PeerAddresses {
91    fn default() -> Self {
92        Self(LruCache::new(NonZeroUsize::new(100).unwrap()))
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::{io, sync::LazyLock};
99
100    use libp2p_core::{
101        multiaddr::Protocol,
102        transport::{memory::MemoryTransportError, TransportError},
103    };
104
105    use super::*;
106    use crate::ConnectionId;
107
108    #[test]
109    fn new_peer_addr_returns_correct_changed_value() {
110        let mut cache = PeerAddresses::default();
111        let peer_id = PeerId::random();
112
113        let event = new_external_addr_of_peer1(peer_id);
114
115        let changed = cache.on_swarm_event(&event);
116        assert!(changed);
117
118        let changed = cache.on_swarm_event(&event);
119        assert!(!changed);
120    }
121
122    #[test]
123    fn new_peer_addr_saves_peer_addrs() {
124        let mut cache = PeerAddresses::default();
125        let peer_id = PeerId::random();
126        let event = new_external_addr_of_peer1(peer_id);
127
128        let changed = cache.on_swarm_event(&event);
129        assert!(changed);
130
131        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
132        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
133        assert_eq!(expected, vec![addr1]);
134
135        let event = new_external_addr_of_peer2(peer_id);
136        let changed = cache.on_swarm_event(&event);
137
138        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
139        let addr2 = MEMORY_ADDR_2000.clone().with_p2p(peer_id).unwrap();
140
141        let expected_addrs = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
142        assert!(expected_addrs.contains(&addr1));
143        assert!(expected_addrs.contains(&addr2));
144
145        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>().len();
146        assert_eq!(expected, 2);
147
148        assert!(changed);
149    }
150
151    #[test]
152    fn existing_addr_is_not_added_to_cache() {
153        let mut cache = PeerAddresses::default();
154        let peer_id = PeerId::random();
155
156        let event = new_external_addr_of_peer1(peer_id);
157
158        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
159        let changed = cache.on_swarm_event(&event);
160        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
161        assert!(changed);
162        assert_eq!(expected, vec![addr1]);
163
164        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
165        let changed = cache.on_swarm_event(&event);
166        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
167        assert!(!changed);
168        assert_eq!(expected, [addr1]);
169    }
170
171    #[test]
172    fn addresses_of_peer_are_removed_when_received_dial_failure() {
173        let mut cache = PeerAddresses::default();
174        let peer_id = PeerId::random();
175
176        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
177        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
178        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
179
180        cache.add(peer_id, addr.clone());
181        cache.add(peer_id, addr2.clone());
182        cache.add(peer_id, addr3.clone());
183
184        let error = DialError::Transport(prepare_errors(vec![addr, addr3]));
185
186        let event = FromSwarm::DialFailure(DialFailure {
187            peer_id: Some(peer_id),
188            error: &error,
189            connection_id: ConnectionId::new_unchecked(8),
190        });
191
192        let changed = cache.on_swarm_event(&event);
193
194        assert!(changed);
195
196        let cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
197        let expected = prepare_expected_addrs(peer_id, [addr2].into_iter());
198
199        assert_eq!(cached, expected);
200    }
201
202    #[test]
203    fn remove_removes_address_if_present() {
204        let mut cache = PeerAddresses::default();
205        let peer_id = PeerId::random();
206        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
207
208        cache.add(peer_id, addr.clone());
209
210        assert!(cache.remove(&peer_id, &addr));
211    }
212
213    #[test]
214    fn remove_returns_false_if_address_not_present() {
215        let mut cache = PeerAddresses::default();
216        let peer_id = PeerId::random();
217        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
218
219        assert!(!cache.remove(&peer_id, &addr));
220    }
221
222    #[test]
223    fn remove_returns_false_if_peer_not_present() {
224        let mut cache = PeerAddresses::default();
225        let peer_id = PeerId::random();
226        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
227
228        assert!(!cache.remove(&peer_id, &addr));
229    }
230
231    #[test]
232    fn remove_removes_address_provided_in_param() {
233        let mut cache = PeerAddresses::default();
234        let peer_id = PeerId::random();
235        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
236        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
237        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
238
239        cache.add(peer_id, addr.clone());
240        cache.add(peer_id, addr2.clone());
241        cache.add(peer_id, addr3.clone());
242
243        assert!(cache.remove(&peer_id, &addr2));
244
245        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
246        cached.sort();
247
248        let expected = prepare_expected_addrs(peer_id, [addr, addr3].into_iter());
249
250        assert_eq!(cached, expected);
251    }
252
253    #[test]
254    fn add_adds_new_address_to_cache() {
255        let mut cache = PeerAddresses::default();
256        let peer_id = PeerId::random();
257        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
258
259        assert!(cache.add(peer_id, addr.clone()));
260
261        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
262        cached.sort();
263        let expected = prepare_expected_addrs(peer_id, [addr].into_iter());
264
265        assert_eq!(cached, expected);
266    }
267
268    #[test]
269    fn add_adds_address_to_cache_to_existing_key() {
270        let mut cache = PeerAddresses::default();
271        let peer_id = PeerId::random();
272        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
273        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
274        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
275
276        assert!(cache.add(peer_id, addr.clone()));
277
278        cache.add(peer_id, addr2.clone());
279        cache.add(peer_id, addr3.clone());
280
281        let expected = prepare_expected_addrs(peer_id, [addr, addr2, addr3].into_iter());
282
283        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
284        cached.sort();
285
286        assert_eq!(cached, expected);
287    }
288
289    fn prepare_expected_addrs(
290        peer_id: PeerId,
291        addrs: impl Iterator<Item = Multiaddr>,
292    ) -> Vec<Multiaddr> {
293        let mut addrs = addrs
294            .filter_map(|a| a.with_p2p(peer_id).ok())
295            .collect::<Vec<Multiaddr>>();
296        addrs.sort();
297        addrs
298    }
299
300    fn new_external_addr_of_peer1(peer_id: PeerId) -> FromSwarm<'static> {
301        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
302            peer_id,
303            addr: &MEMORY_ADDR_1000,
304        })
305    }
306
307    fn new_external_addr_of_peer2(peer_id: PeerId) -> FromSwarm<'static> {
308        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
309            peer_id,
310            addr: &MEMORY_ADDR_2000,
311        })
312    }
313
314    fn prepare_errors(addrs: Vec<Multiaddr>) -> Vec<(Multiaddr, TransportError<io::Error>)> {
315        let errors: Vec<(Multiaddr, TransportError<io::Error>)> = addrs
316            .iter()
317            .map(|addr| {
318                (
319                    addr.clone(),
320                    TransportError::Other(io::Error::new(
321                        io::ErrorKind::Other,
322                        MemoryTransportError::Unreachable,
323                    )),
324                )
325            })
326            .collect();
327        errors
328    }
329
330    static MEMORY_ADDR_1000: LazyLock<Multiaddr> =
331        LazyLock::new(|| Multiaddr::empty().with(Protocol::Memory(1000)));
332    static MEMORY_ADDR_2000: LazyLock<Multiaddr> =
333        LazyLock::new(|| Multiaddr::empty().with(Protocol::Memory(2000)));
334}