1use std::{
22 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23 num::NonZeroUsize,
24 sync::Arc,
25 task::{Context, Poll},
26 time::Duration,
27};
28
29use libp2p_core::{
30 multiaddr::{self, Protocol},
31 transport::PortUse,
32 ConnectedPoint, Endpoint, Multiaddr,
33};
34use libp2p_identity::{Keypair, PeerId, PublicKey};
35use libp2p_swarm::{
36 behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
37 ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
38 NetworkBehaviour, NotifyHandler, PeerAddresses, StreamUpgradeError, THandler, THandlerInEvent,
39 THandlerOutEvent, ToSwarm, _address_translation,
40};
41
42use crate::{
43 handler::{self, Handler, InEvent},
44 protocol::{Info, UpgradeError},
45};
46
47fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
49 use Protocol::*;
50 let mut iter = addr.iter();
51 let Some(first) = iter.next() else {
52 return false;
53 };
54 let Some(second) = iter.next() else {
55 return false;
56 };
57 let Some(third) = iter.next() else {
58 return false;
59 };
60 let fourth = iter.next();
61 let fifth = iter.next();
62
63 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
64 && matches!(second, Udp(_))
65 && if v1 {
66 matches!(third, QuicV1)
67 } else {
68 matches!(third, Quic)
69 }
70 && matches!(fourth, Some(P2p(_)) | None)
71 && fifth.is_none()
72}
73
74fn is_tcp_addr(addr: &Multiaddr) -> bool {
75 use Protocol::*;
76
77 let mut iter = addr.iter();
78
79 let Some(first) = iter.next() else {
80 return false;
81 };
82
83 let Some(second) = iter.next() else {
84 return false;
85 };
86
87 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
88}
89
90pub struct Behaviour {
96 config: Config,
97 connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
99
100 our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
102
103 outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
105
106 events: VecDeque<ToSwarm<Event, InEvent>>,
108 discovered_peers: PeerCache,
110
111 listen_addresses: ListenAddresses,
112 external_addresses: ExternalAddresses,
113}
114
115#[non_exhaustive]
117#[derive(Debug, Clone)]
118pub struct Config {
119 protocol_version: String,
122 local_key: Arc<KeyType>,
126 agent_version: String,
131 interval: Duration,
137
138 push_listen_addr_updates: bool,
147
148 cache_size: usize,
153
154 hide_listen_addrs: bool,
159}
160
161impl Config {
162 pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
167 Self::new_with_key(protocol_version, local_public_key)
168 }
169
170 pub fn new_with_signed_peer_record(protocol_version: String, local_keypair: &Keypair) -> Self {
175 Self::new_with_key(protocol_version, local_keypair)
176 }
177
178 fn new_with_key(protocol_version: String, key: impl Into<KeyType>) -> Self {
179 Self {
180 protocol_version,
181 agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
182 local_key: Arc::new(key.into()),
183 interval: Duration::from_secs(5 * 60),
184 push_listen_addr_updates: false,
185 cache_size: 100,
186 hide_listen_addrs: false,
187 }
188 }
189
190 pub fn with_agent_version(mut self, v: String) -> Self {
192 self.agent_version = v;
193 self
194 }
195
196 pub fn with_interval(mut self, d: Duration) -> Self {
199 self.interval = d;
200 self
201 }
202
203 pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
207 self.push_listen_addr_updates = b;
208 self
209 }
210
211 pub fn with_cache_size(mut self, cache_size: usize) -> Self {
213 self.cache_size = cache_size;
214 self
215 }
216
217 pub fn with_hide_listen_addrs(mut self, b: bool) -> Self {
219 self.hide_listen_addrs = b;
220 self
221 }
222
223 pub fn protocol_version(&self) -> &str {
225 &self.protocol_version
226 }
227
228 pub fn local_public_key(&self) -> &PublicKey {
230 self.local_key.public_key()
231 }
232
233 pub fn agent_version(&self) -> &str {
235 &self.agent_version
236 }
237
238 pub fn interval(&self) -> Duration {
240 self.interval
241 }
242
243 pub fn push_listen_addr_updates(&self) -> bool {
245 self.push_listen_addr_updates
246 }
247
248 pub fn cache_size(&self) -> usize {
250 self.cache_size
251 }
252
253 pub fn hide_listen_addrs(&self) -> bool {
255 self.hide_listen_addrs
256 }
257}
258
259impl Behaviour {
260 pub fn new(config: Config) -> Self {
262 let discovered_peers = match NonZeroUsize::new(config.cache_size) {
263 None => PeerCache::disabled(),
264 Some(size) => PeerCache::enabled(size),
265 };
266
267 Self {
268 config,
269 connected: HashMap::new(),
270 our_observed_addresses: Default::default(),
271 outbound_connections_with_ephemeral_port: Default::default(),
272 events: VecDeque::new(),
273 discovered_peers,
274 listen_addresses: Default::default(),
275 external_addresses: Default::default(),
276 }
277 }
278
279 pub fn push<I>(&mut self, peers: I)
281 where
282 I: IntoIterator<Item = PeerId>,
283 {
284 for p in peers {
285 if !self.connected.contains_key(&p) {
286 tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
287 continue;
288 }
289
290 self.events.push_back(ToSwarm::NotifyHandler {
291 peer_id: p,
292 handler: NotifyHandler::Any,
293 event: InEvent::Push,
294 });
295 }
296 }
297
298 fn on_connection_established(
299 &mut self,
300 ConnectionEstablished {
301 peer_id,
302 connection_id: conn,
303 endpoint,
304 failed_addresses,
305 ..
306 }: ConnectionEstablished,
307 ) {
308 let addr = match endpoint {
309 ConnectedPoint::Dialer { address, .. } => address.clone(),
310 ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
311 };
312
313 self.connected
314 .entry(peer_id)
315 .or_default()
316 .insert(conn, addr);
317
318 if let Some(cache) = self.discovered_peers.0.as_mut() {
319 for addr in failed_addresses {
320 cache.remove(&peer_id, addr);
321 }
322 }
323 }
324
325 fn all_addresses(&self) -> HashSet<Multiaddr> {
326 let mut addrs = HashSet::from_iter(self.external_addresses.iter().cloned());
327 if !self.config.hide_listen_addrs {
328 addrs.extend(self.listen_addresses.iter().cloned());
329 };
330 addrs
331 }
332
333 fn emit_new_external_addr_candidate_event(
334 &mut self,
335 connection_id: ConnectionId,
336 observed: &Multiaddr,
337 ) {
338 if self
339 .outbound_connections_with_ephemeral_port
340 .contains(&connection_id)
341 {
342 let translated_addresses = {
346 let mut addrs: Vec<_> = self
347 .listen_addresses
348 .iter()
349 .filter_map(|server| {
350 if (is_tcp_addr(server) && is_tcp_addr(observed))
351 || (is_quic_addr(server, true) && is_quic_addr(observed, true))
352 || (is_quic_addr(server, false) && is_quic_addr(observed, false))
353 {
354 _address_translation(server, observed)
355 } else {
356 None
357 }
358 })
359 .collect();
360
361 addrs.sort_unstable();
363 addrs.dedup();
364 addrs
365 };
366
367 if translated_addresses.is_empty() {
369 self.events
370 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
371 } else {
372 for addr in translated_addresses {
373 self.events
374 .push_back(ToSwarm::NewExternalAddrCandidate(addr));
375 }
376 }
377 return;
378 }
379
380 self.events
383 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
384 }
385}
386
387impl NetworkBehaviour for Behaviour {
388 type ConnectionHandler = Handler;
389 type ToSwarm = Event;
390
391 fn handle_established_inbound_connection(
392 &mut self,
393 _: ConnectionId,
394 peer: PeerId,
395 _: &Multiaddr,
396 remote_addr: &Multiaddr,
397 ) -> Result<THandler<Self>, ConnectionDenied> {
398 Ok(Handler::new(
399 self.config.interval,
400 peer,
401 self.config.local_key.clone(),
402 self.config.protocol_version.clone(),
403 self.config.agent_version.clone(),
404 remote_addr.clone(),
405 self.all_addresses(),
406 ))
407 }
408
409 fn handle_established_outbound_connection(
410 &mut self,
411 connection_id: ConnectionId,
412 peer: PeerId,
413 addr: &Multiaddr,
414 _: Endpoint,
415 port_use: PortUse,
416 ) -> Result<THandler<Self>, ConnectionDenied> {
417 let mut addr = addr.clone();
422 if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
423 addr.pop();
424 }
425
426 if port_use == PortUse::New {
427 self.outbound_connections_with_ephemeral_port
428 .insert(connection_id);
429 }
430
431 Ok(Handler::new(
432 self.config.interval,
433 peer,
434 self.config.local_key.clone(),
435 self.config.protocol_version.clone(),
436 self.config.agent_version.clone(),
437 addr.clone(),
440 self.all_addresses(),
441 ))
442 }
443
444 fn on_connection_handler_event(
445 &mut self,
446 peer_id: PeerId,
447 connection_id: ConnectionId,
448 event: THandlerOutEvent<Self>,
449 ) {
450 match event {
451 handler::Event::Identified(mut info) => {
452 info.listen_addrs
454 .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
455
456 let observed = info.observed_addr.clone();
457 self.events
458 .push_back(ToSwarm::GenerateEvent(Event::Received {
459 connection_id,
460 peer_id,
461 info: info.clone(),
462 }));
463
464 if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
465 for address in &info.listen_addrs {
466 if discovered_peers.add(peer_id, address.clone()) {
467 self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
468 peer_id,
469 address: address.clone(),
470 });
471 }
472 }
473 }
474
475 match self.our_observed_addresses.entry(connection_id) {
476 Entry::Vacant(not_yet_observed) => {
477 not_yet_observed.insert(observed.clone());
478 self.emit_new_external_addr_candidate_event(connection_id, &observed);
479 }
480 Entry::Occupied(already_observed) if already_observed.get() == &observed => {
481 }
483 Entry::Occupied(mut already_observed) => {
484 tracing::info!(
485 old_address=%already_observed.get(),
486 new_address=%observed,
487 "Our observed address on connection {connection_id} changed",
488 );
489
490 *already_observed.get_mut() = observed.clone();
491 self.emit_new_external_addr_candidate_event(connection_id, &observed);
492 }
493 }
494 }
495 handler::Event::Identification => {
496 self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
497 connection_id,
498 peer_id,
499 }));
500 }
501 handler::Event::IdentificationPushed(info) => {
502 self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
503 connection_id,
504 peer_id,
505 info,
506 }));
507 }
508 handler::Event::IdentificationError(error) => {
509 self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
510 connection_id,
511 peer_id,
512 error,
513 }));
514 }
515 }
516 }
517
518 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
519 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
520 if let Some(event) = self.events.pop_front() {
521 return Poll::Ready(event);
522 }
523
524 Poll::Pending
525 }
526
527 fn handle_pending_outbound_connection(
528 &mut self,
529 _connection_id: ConnectionId,
530 maybe_peer: Option<PeerId>,
531 _addresses: &[Multiaddr],
532 _effective_role: Endpoint,
533 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
534 let Some(peer) = maybe_peer else {
535 return Ok(vec![]);
536 };
537
538 Ok(self.discovered_peers.get(&peer))
539 }
540
541 fn on_swarm_event(&mut self, event: FromSwarm) {
542 let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
543 let external_addr_changed = self.external_addresses.on_swarm_event(&event);
544
545 if listen_addr_changed || external_addr_changed {
546 let change_events = self
548 .connected
549 .iter()
550 .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
551 .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
552 peer_id,
553 handler: NotifyHandler::One(*connection_id),
554 event: InEvent::AddressesChanged(self.all_addresses()),
555 })
556 .collect::<Vec<_>>();
557
558 self.events.extend(change_events)
559 }
560
561 if listen_addr_changed && self.config.push_listen_addr_updates {
562 let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
564 peer_id: *peer,
565 handler: NotifyHandler::Any,
566 event: InEvent::Push,
567 });
568
569 self.events.extend(push_events);
570 }
571
572 match event {
573 FromSwarm::ConnectionEstablished(connection_established) => {
574 self.on_connection_established(connection_established)
575 }
576 FromSwarm::ConnectionClosed(ConnectionClosed {
577 peer_id,
578 connection_id,
579 remaining_established,
580 ..
581 }) => {
582 if remaining_established == 0 {
583 self.connected.remove(&peer_id);
584 } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
585 addrs.remove(&connection_id);
586 }
587
588 self.our_observed_addresses.remove(&connection_id);
589 self.outbound_connections_with_ephemeral_port
590 .remove(&connection_id);
591 }
592 FromSwarm::DialFailure(DialFailure {
593 peer_id: Some(peer_id),
594 error,
595 ..
596 }) => {
597 if let Some(cache) = self.discovered_peers.0.as_mut() {
598 match error {
599 DialError::Transport(errors) => {
600 for (addr, _error) in errors {
601 cache.remove(&peer_id, addr);
602 }
603 }
604 DialError::WrongPeerId { address, .. }
605 | DialError::LocalPeerId { address } => {
606 cache.remove(&peer_id, address);
607 }
608 _ => (),
609 };
610 }
611 }
612 _ => {}
613 }
614 }
615}
616
617#[allow(clippy::large_enum_variant)]
619#[derive(Debug)]
620pub enum Event {
621 Received {
623 connection_id: ConnectionId,
625 peer_id: PeerId,
627 info: Info,
629 },
630 Sent {
633 connection_id: ConnectionId,
635 peer_id: PeerId,
637 },
638 Pushed {
641 connection_id: ConnectionId,
643 peer_id: PeerId,
645 info: Info,
648 },
649 Error {
651 connection_id: ConnectionId,
653 peer_id: PeerId,
655 error: StreamUpgradeError<UpgradeError>,
657 },
658}
659
660impl Event {
661 pub fn connection_id(&self) -> ConnectionId {
662 match self {
663 Event::Received { connection_id, .. }
664 | Event::Sent { connection_id, .. }
665 | Event::Pushed { connection_id, .. }
666 | Event::Error { connection_id, .. } => *connection_id,
667 }
668 }
669}
670
671fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
674 let last_component = addr.iter().last();
675 if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
676 return multi_addr_peer_id == *peer_id;
677 }
678 true
679}
680
681struct PeerCache(Option<PeerAddresses>);
682
683impl PeerCache {
684 fn disabled() -> Self {
685 Self(None)
686 }
687
688 fn enabled(size: NonZeroUsize) -> Self {
689 Self(Some(PeerAddresses::new(size)))
690 }
691
692 fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
693 if let Some(cache) = self.0.as_mut() {
694 cache.get(peer).collect()
695 } else {
696 Vec::new()
697 }
698 }
699}
700
701#[derive(Debug, Clone)]
702#[allow(clippy::large_enum_variant)]
703pub(crate) enum KeyType {
704 PublicKey(PublicKey),
705 Keypair {
706 keypair: Keypair,
707 public_key: PublicKey,
708 },
709}
710impl From<PublicKey> for KeyType {
711 fn from(value: PublicKey) -> Self {
712 Self::PublicKey(value)
713 }
714}
715impl From<&Keypair> for KeyType {
716 fn from(value: &Keypair) -> Self {
717 Self::Keypair {
718 public_key: value.public(),
719 keypair: value.clone(),
720 }
721 }
722}
723impl KeyType {
724 pub(crate) fn public_key(&self) -> &PublicKey {
725 match &self {
726 KeyType::PublicKey(pubkey) => pubkey,
727 KeyType::Keypair { public_key, .. } => public_key,
728 }
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn check_multiaddr_matches_peer_id() {
738 let peer_id = PeerId::random();
739 let other_peer_id = PeerId::random();
740 let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
741 .parse()
742 .expect("failed to parse multiaddr");
743
744 let addr_without_peer_id: Multiaddr = addr.clone();
745 let mut addr_with_other_peer_id = addr.clone();
746
747 addr.push(multiaddr::Protocol::P2p(peer_id));
748 addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
749
750 assert!(multiaddr_matches_peer_id(&addr, &peer_id));
751 assert!(!multiaddr_matches_peer_id(
752 &addr_with_other_peer_id,
753 &peer_id
754 ));
755 assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
756 }
757}