1use std::{
22 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23 num::NonZeroUsize,
24 sync::Arc,
25 task::{Context, Poll},
26 time::Duration,
27};
28
29use libp2p_core::{
30 multiaddr::{self, Protocol},
31 transport::PortUse,
32 ConnectedPoint, Endpoint, Multiaddr,
33};
34use libp2p_identity::{Keypair, PeerId, PublicKey};
35use libp2p_swarm::{
36 behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
37 ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
38 NetworkBehaviour, NotifyHandler, PeerAddresses, StreamUpgradeError, THandler, THandlerInEvent,
39 THandlerOutEvent, ToSwarm, _address_translation,
40};
41
42use crate::{
43 handler::{self, Handler, InEvent},
44 protocol::{Info, UpgradeError},
45};
46
47fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
49 use Protocol::*;
50 let mut iter = addr.iter();
51 let Some(first) = iter.next() else {
52 return false;
53 };
54 let Some(second) = iter.next() else {
55 return false;
56 };
57 let Some(third) = iter.next() else {
58 return false;
59 };
60 let fourth = iter.next();
61 let fifth = iter.next();
62
63 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
64 && matches!(second, Udp(_))
65 && if v1 {
66 matches!(third, QuicV1)
67 } else {
68 matches!(third, Quic)
69 }
70 && matches!(fourth, Some(P2p(_)) | None)
71 && fifth.is_none()
72}
73
74fn is_tcp_addr(addr: &Multiaddr) -> bool {
75 use Protocol::*;
76
77 let mut iter = addr.iter();
78
79 let first = match iter.next() {
80 None => return false,
81 Some(p) => p,
82 };
83 let second = match iter.next() {
84 None => return false,
85 Some(p) => p,
86 };
87
88 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
89}
90
91pub struct Behaviour {
97 config: Config,
98 connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
100
101 our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
103
104 outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
106
107 events: VecDeque<ToSwarm<Event, InEvent>>,
109 discovered_peers: PeerCache,
111
112 listen_addresses: ListenAddresses,
113 external_addresses: ExternalAddresses,
114}
115
116#[non_exhaustive]
118#[derive(Debug, Clone)]
119pub struct Config {
120 protocol_version: String,
123 local_key: Arc<KeyType>,
127 agent_version: String,
132 interval: Duration,
138
139 push_listen_addr_updates: bool,
148
149 cache_size: usize,
154
155 hide_listen_addrs: bool,
160}
161
162impl Config {
163 pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
168 Self::new_with_key(protocol_version, local_public_key)
169 }
170
171 pub fn new_with_signed_peer_record(protocol_version: String, local_keypair: &Keypair) -> Self {
176 Self::new_with_key(protocol_version, local_keypair)
177 }
178
179 fn new_with_key(protocol_version: String, key: impl Into<KeyType>) -> Self {
180 Self {
181 protocol_version,
182 agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
183 local_key: Arc::new(key.into()),
184 interval: Duration::from_secs(5 * 60),
185 push_listen_addr_updates: false,
186 cache_size: 100,
187 hide_listen_addrs: false,
188 }
189 }
190
191 pub fn with_agent_version(mut self, v: String) -> Self {
193 self.agent_version = v;
194 self
195 }
196
197 pub fn with_interval(mut self, d: Duration) -> Self {
200 self.interval = d;
201 self
202 }
203
204 pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
208 self.push_listen_addr_updates = b;
209 self
210 }
211
212 pub fn with_cache_size(mut self, cache_size: usize) -> Self {
214 self.cache_size = cache_size;
215 self
216 }
217
218 pub fn with_hide_listen_addrs(mut self, b: bool) -> Self {
220 self.hide_listen_addrs = b;
221 self
222 }
223
224 pub fn protocol_version(&self) -> &str {
226 &self.protocol_version
227 }
228
229 pub fn local_public_key(&self) -> &PublicKey {
231 self.local_key.public_key()
232 }
233
234 pub fn agent_version(&self) -> &str {
236 &self.agent_version
237 }
238
239 pub fn interval(&self) -> Duration {
241 self.interval
242 }
243
244 pub fn push_listen_addr_updates(&self) -> bool {
246 self.push_listen_addr_updates
247 }
248
249 pub fn cache_size(&self) -> usize {
251 self.cache_size
252 }
253
254 pub fn hide_listen_addrs(&self) -> bool {
256 self.hide_listen_addrs
257 }
258}
259
260impl Behaviour {
261 pub fn new(config: Config) -> Self {
263 let discovered_peers = match NonZeroUsize::new(config.cache_size) {
264 None => PeerCache::disabled(),
265 Some(size) => PeerCache::enabled(size),
266 };
267
268 Self {
269 config,
270 connected: HashMap::new(),
271 our_observed_addresses: Default::default(),
272 outbound_connections_with_ephemeral_port: Default::default(),
273 events: VecDeque::new(),
274 discovered_peers,
275 listen_addresses: Default::default(),
276 external_addresses: Default::default(),
277 }
278 }
279
280 pub fn push<I>(&mut self, peers: I)
282 where
283 I: IntoIterator<Item = PeerId>,
284 {
285 for p in peers {
286 if !self.connected.contains_key(&p) {
287 tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
288 continue;
289 }
290
291 self.events.push_back(ToSwarm::NotifyHandler {
292 peer_id: p,
293 handler: NotifyHandler::Any,
294 event: InEvent::Push,
295 });
296 }
297 }
298
299 fn on_connection_established(
300 &mut self,
301 ConnectionEstablished {
302 peer_id,
303 connection_id: conn,
304 endpoint,
305 failed_addresses,
306 ..
307 }: ConnectionEstablished,
308 ) {
309 let addr = match endpoint {
310 ConnectedPoint::Dialer { address, .. } => address.clone(),
311 ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
312 };
313
314 self.connected
315 .entry(peer_id)
316 .or_default()
317 .insert(conn, addr);
318
319 if let Some(cache) = self.discovered_peers.0.as_mut() {
320 for addr in failed_addresses {
321 cache.remove(&peer_id, addr);
322 }
323 }
324 }
325
326 fn all_addresses(&self) -> HashSet<Multiaddr> {
327 let mut addrs = HashSet::from_iter(self.external_addresses.iter().cloned());
328 if !self.config.hide_listen_addrs {
329 addrs.extend(self.listen_addresses.iter().cloned());
330 };
331 addrs
332 }
333
334 fn emit_new_external_addr_candidate_event(
335 &mut self,
336 connection_id: ConnectionId,
337 observed: &Multiaddr,
338 ) {
339 if self
340 .outbound_connections_with_ephemeral_port
341 .contains(&connection_id)
342 {
343 let translated_addresses = {
347 let mut addrs: Vec<_> = self
348 .listen_addresses
349 .iter()
350 .filter_map(|server| {
351 if (is_tcp_addr(server) && is_tcp_addr(observed))
352 || (is_quic_addr(server, true) && is_quic_addr(observed, true))
353 || (is_quic_addr(server, false) && is_quic_addr(observed, false))
354 {
355 _address_translation(server, observed)
356 } else {
357 None
358 }
359 })
360 .collect();
361
362 addrs.sort_unstable();
364 addrs.dedup();
365 addrs
366 };
367
368 if translated_addresses.is_empty() {
370 self.events
371 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
372 } else {
373 for addr in translated_addresses {
374 self.events
375 .push_back(ToSwarm::NewExternalAddrCandidate(addr));
376 }
377 }
378 return;
379 }
380
381 self.events
384 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
385 }
386}
387
388impl NetworkBehaviour for Behaviour {
389 type ConnectionHandler = Handler;
390 type ToSwarm = Event;
391
392 fn handle_established_inbound_connection(
393 &mut self,
394 _: ConnectionId,
395 peer: PeerId,
396 _: &Multiaddr,
397 remote_addr: &Multiaddr,
398 ) -> Result<THandler<Self>, ConnectionDenied> {
399 Ok(Handler::new(
400 self.config.interval,
401 peer,
402 self.config.local_key.clone(),
403 self.config.protocol_version.clone(),
404 self.config.agent_version.clone(),
405 remote_addr.clone(),
406 self.all_addresses(),
407 ))
408 }
409
410 fn handle_established_outbound_connection(
411 &mut self,
412 connection_id: ConnectionId,
413 peer: PeerId,
414 addr: &Multiaddr,
415 _: Endpoint,
416 port_use: PortUse,
417 ) -> Result<THandler<Self>, ConnectionDenied> {
418 let mut addr = addr.clone();
423 if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
424 addr.pop();
425 }
426
427 if port_use == PortUse::New {
428 self.outbound_connections_with_ephemeral_port
429 .insert(connection_id);
430 }
431
432 Ok(Handler::new(
433 self.config.interval,
434 peer,
435 self.config.local_key.clone(),
436 self.config.protocol_version.clone(),
437 self.config.agent_version.clone(),
438 addr.clone(),
441 self.all_addresses(),
442 ))
443 }
444
445 fn on_connection_handler_event(
446 &mut self,
447 peer_id: PeerId,
448 connection_id: ConnectionId,
449 event: THandlerOutEvent<Self>,
450 ) {
451 match event {
452 handler::Event::Identified(mut info) => {
453 info.listen_addrs
455 .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
456
457 let observed = info.observed_addr.clone();
458 self.events
459 .push_back(ToSwarm::GenerateEvent(Event::Received {
460 connection_id,
461 peer_id,
462 info: info.clone(),
463 }));
464
465 if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
466 for address in &info.listen_addrs {
467 if discovered_peers.add(peer_id, address.clone()) {
468 self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
469 peer_id,
470 address: address.clone(),
471 });
472 }
473 }
474 }
475
476 match self.our_observed_addresses.entry(connection_id) {
477 Entry::Vacant(not_yet_observed) => {
478 not_yet_observed.insert(observed.clone());
479 self.emit_new_external_addr_candidate_event(connection_id, &observed);
480 }
481 Entry::Occupied(already_observed) if already_observed.get() == &observed => {
482 }
484 Entry::Occupied(mut already_observed) => {
485 tracing::info!(
486 old_address=%already_observed.get(),
487 new_address=%observed,
488 "Our observed address on connection {connection_id} changed",
489 );
490
491 *already_observed.get_mut() = observed.clone();
492 self.emit_new_external_addr_candidate_event(connection_id, &observed);
493 }
494 }
495 }
496 handler::Event::Identification => {
497 self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
498 connection_id,
499 peer_id,
500 }));
501 }
502 handler::Event::IdentificationPushed(info) => {
503 self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
504 connection_id,
505 peer_id,
506 info,
507 }));
508 }
509 handler::Event::IdentificationError(error) => {
510 self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
511 connection_id,
512 peer_id,
513 error,
514 }));
515 }
516 }
517 }
518
519 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
520 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
521 if let Some(event) = self.events.pop_front() {
522 return Poll::Ready(event);
523 }
524
525 Poll::Pending
526 }
527
528 fn handle_pending_outbound_connection(
529 &mut self,
530 _connection_id: ConnectionId,
531 maybe_peer: Option<PeerId>,
532 _addresses: &[Multiaddr],
533 _effective_role: Endpoint,
534 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
535 let peer = match maybe_peer {
536 None => return Ok(vec![]),
537 Some(peer) => peer,
538 };
539
540 Ok(self.discovered_peers.get(&peer))
541 }
542
543 fn on_swarm_event(&mut self, event: FromSwarm) {
544 let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
545 let external_addr_changed = self.external_addresses.on_swarm_event(&event);
546
547 if listen_addr_changed || external_addr_changed {
548 let change_events = self
550 .connected
551 .iter()
552 .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
553 .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
554 peer_id,
555 handler: NotifyHandler::One(*connection_id),
556 event: InEvent::AddressesChanged(self.all_addresses()),
557 })
558 .collect::<Vec<_>>();
559
560 self.events.extend(change_events)
561 }
562
563 if listen_addr_changed && self.config.push_listen_addr_updates {
564 let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
566 peer_id: *peer,
567 handler: NotifyHandler::Any,
568 event: InEvent::Push,
569 });
570
571 self.events.extend(push_events);
572 }
573
574 match event {
575 FromSwarm::ConnectionEstablished(connection_established) => {
576 self.on_connection_established(connection_established)
577 }
578 FromSwarm::ConnectionClosed(ConnectionClosed {
579 peer_id,
580 connection_id,
581 remaining_established,
582 ..
583 }) => {
584 if remaining_established == 0 {
585 self.connected.remove(&peer_id);
586 } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
587 addrs.remove(&connection_id);
588 }
589
590 self.our_observed_addresses.remove(&connection_id);
591 self.outbound_connections_with_ephemeral_port
592 .remove(&connection_id);
593 }
594 FromSwarm::DialFailure(DialFailure {
595 peer_id: Some(peer_id),
596 error,
597 ..
598 }) => {
599 if let Some(cache) = self.discovered_peers.0.as_mut() {
600 match error {
601 DialError::Transport(errors) => {
602 for (addr, _error) in errors {
603 cache.remove(&peer_id, addr);
604 }
605 }
606 DialError::WrongPeerId { address, .. }
607 | DialError::LocalPeerId { address } => {
608 cache.remove(&peer_id, address);
609 }
610 _ => (),
611 };
612 }
613 }
614 _ => {}
615 }
616 }
617}
618
619#[allow(clippy::large_enum_variant)]
621#[derive(Debug)]
622pub enum Event {
623 Received {
625 connection_id: ConnectionId,
627 peer_id: PeerId,
629 info: Info,
631 },
632 Sent {
635 connection_id: ConnectionId,
637 peer_id: PeerId,
639 },
640 Pushed {
643 connection_id: ConnectionId,
645 peer_id: PeerId,
647 info: Info,
650 },
651 Error {
653 connection_id: ConnectionId,
655 peer_id: PeerId,
657 error: StreamUpgradeError<UpgradeError>,
659 },
660}
661
662impl Event {
663 pub fn connection_id(&self) -> ConnectionId {
664 match self {
665 Event::Received { connection_id, .. }
666 | Event::Sent { connection_id, .. }
667 | Event::Pushed { connection_id, .. }
668 | Event::Error { connection_id, .. } => *connection_id,
669 }
670 }
671}
672
673fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
676 let last_component = addr.iter().last();
677 if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
678 return multi_addr_peer_id == *peer_id;
679 }
680 true
681}
682
683struct PeerCache(Option<PeerAddresses>);
684
685impl PeerCache {
686 fn disabled() -> Self {
687 Self(None)
688 }
689
690 fn enabled(size: NonZeroUsize) -> Self {
691 Self(Some(PeerAddresses::new(size)))
692 }
693
694 fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
695 if let Some(cache) = self.0.as_mut() {
696 cache.get(peer).collect()
697 } else {
698 Vec::new()
699 }
700 }
701}
702
703#[derive(Debug, Clone)]
704#[allow(clippy::large_enum_variant)]
705pub(crate) enum KeyType {
706 PublicKey(PublicKey),
707 Keypair {
708 keypair: Keypair,
709 public_key: PublicKey,
710 },
711}
712impl From<PublicKey> for KeyType {
713 fn from(value: PublicKey) -> Self {
714 Self::PublicKey(value)
715 }
716}
717impl From<&Keypair> for KeyType {
718 fn from(value: &Keypair) -> Self {
719 Self::Keypair {
720 public_key: value.public(),
721 keypair: value.clone(),
722 }
723 }
724}
725impl KeyType {
726 pub(crate) fn public_key(&self) -> &PublicKey {
727 match &self {
728 KeyType::PublicKey(pubkey) => pubkey,
729 KeyType::Keypair { public_key, .. } => public_key,
730 }
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737
738 #[test]
739 fn check_multiaddr_matches_peer_id() {
740 let peer_id = PeerId::random();
741 let other_peer_id = PeerId::random();
742 let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
743 .parse()
744 .expect("failed to parse multiaddr");
745
746 let addr_without_peer_id: Multiaddr = addr.clone();
747 let mut addr_with_other_peer_id = addr.clone();
748
749 addr.push(multiaddr::Protocol::P2p(peer_id));
750 addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
751
752 assert!(multiaddr_matches_peer_id(&addr, &peer_id));
753 assert!(!multiaddr_matches_peer_id(
754 &addr_with_other_peer_id,
755 &peer_id
756 ));
757 assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
758 }
759}