1use std::{
22 cmp::{
23 max,
24 Ordering::{self, Equal},
25 },
26 collections::{BTreeSet, HashMap, HashSet, VecDeque},
27 fmt::{self, Debug},
28 net::IpAddr,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37 multiaddr::Protocol::{Ip4, Ip6},
38 transport::PortUse,
39 Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44 dial_opts::DialOpts,
45 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46 THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57 backoff::BackoffStorage,
58 config::{Config, ValidationMode},
59 gossip_promises::GossipPromises,
60 handler::{Handler, HandlerEvent, HandlerIn},
61 mcache::MessageCache,
62 peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63 protocol::SIGNING_PREFIX,
64 rpc::Sender,
65 rpc_proto::proto,
66 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67 time_cache::DuplicateCache,
68 topic::{Hasher, Topic, TopicHash},
69 transform::{DataTransform, IdentityTransform},
70 types::{
71 ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72 PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73 SubscriptionAction,
74 },
75 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81const IDONTWANT_CAP: usize = 10_000;
83
84const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87#[derive(Clone)]
94pub enum MessageAuthenticity {
95 Signed(Keypair),
98 Author(PeerId),
103 RandomAuthor,
108 Anonymous,
118}
119
120impl MessageAuthenticity {
121 pub fn is_signing(&self) -> bool {
123 matches!(self, MessageAuthenticity::Signed(_))
124 }
125
126 pub fn is_anonymous(&self) -> bool {
127 matches!(self, MessageAuthenticity::Anonymous)
128 }
129}
130
131#[derive(Debug)]
133pub enum Event {
134 Message {
136 propagation_source: PeerId,
138 message_id: MessageId,
141 message: Message,
143 },
144 Subscribed {
146 peer_id: PeerId,
148 topic: TopicHash,
150 },
151 Unsubscribed {
153 peer_id: PeerId,
155 topic: TopicHash,
157 },
158 GossipsubNotSupported { peer_id: PeerId },
160 SlowPeer {
162 peer_id: PeerId,
164 failed_messages: FailedMessages,
166 },
167}
168
169#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173 Signing {
174 keypair: Keypair,
175 author: PeerId,
176 inline_key: Option<Vec<u8>>,
177 last_seq_no: SequenceNumber,
178 },
179 Author(PeerId),
180 RandomAuthor,
181 Anonymous,
182}
183
184#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191 fn new() -> Self {
192 let unix_timestamp = SystemTime::now()
193 .duration_since(SystemTime::UNIX_EPOCH)
194 .expect("time to be linear")
195 .as_nanos();
196
197 Self(unix_timestamp as u64)
198 }
199
200 fn next(&mut self) -> u64 {
201 self.0 = self
202 .0
203 .checked_add(1)
204 .expect("to not exhaust u64 space for sequence numbers");
205
206 self.0
207 }
208}
209
210impl PublishConfig {
211 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212 match self {
213 Self::Signing { author, .. } => Some(author),
214 Self::Author(author) => Some(author),
215 _ => None,
216 }
217 }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221 fn from(authenticity: MessageAuthenticity) -> Self {
222 match authenticity {
223 MessageAuthenticity::Signed(keypair) => {
224 let public_key = keypair.public();
225 let key_enc = public_key.encode_protobuf();
226 let key = if key_enc.len() <= 42 {
227 None
231 } else {
232 Some(key_enc)
234 };
235
236 PublishConfig::Signing {
237 keypair,
238 author: public_key.to_peer_id(),
239 inline_key: key,
240 last_seq_no: SequenceNumber::new(),
241 }
242 }
243 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246 }
247 }
248}
249
250pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262 config: Config,
264
265 events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268 publish_config: PublishConfig,
270
271 duplicate_cache: DuplicateCache<MessageId>,
274
275 connected_peers: HashMap<PeerId, PeerDetails>,
278
279 explicit_peers: HashSet<PeerId>,
282
283 blacklisted_peers: HashSet<PeerId>,
286
287 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293 fanout_last_pub: HashMap<TopicHash, Instant>,
295
296 backoffs: BackoffStorage,
298
299 mcache: MessageCache,
301
302 heartbeat: Delay,
304
305 heartbeat_ticks: u64,
308
309 px_peers: HashSet<PeerId>,
314
315 peer_score: PeerScoreState,
318
319 count_received_ihave: HashMap<PeerId, usize>,
321
322 count_sent_iwant: HashMap<PeerId, usize>,
324
325 subscription_filter: F,
327
328 data_transform: D,
332
333 #[cfg(feature = "metrics")]
335 metrics: Option<Metrics>,
336
337 failed_messages: HashMap<PeerId, FailedMessages>,
339
340 gossip_promises: GossipPromises,
342}
343
344impl<D, F> Behaviour<D, F>
345where
346 D: DataTransform + Default,
347 F: TopicSubscriptionFilter + Default,
348{
349 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
352 Self::new_with_subscription_filter_and_transform(
353 privacy,
354 config,
355 F::default(),
356 D::default(),
357 )
358 }
359}
360
361impl<D, F> Behaviour<D, F>
362where
363 D: DataTransform + Default,
364 F: TopicSubscriptionFilter,
365{
366 pub fn new_with_subscription_filter(
369 privacy: MessageAuthenticity,
370 config: Config,
371 subscription_filter: F,
372 ) -> Result<Self, &'static str> {
373 Self::new_with_subscription_filter_and_transform(
374 privacy,
375 config,
376 subscription_filter,
377 D::default(),
378 )
379 }
380}
381
382impl<D, F> Behaviour<D, F>
383where
384 D: DataTransform,
385 F: TopicSubscriptionFilter + Default,
386{
387 pub fn new_with_transform(
391 privacy: MessageAuthenticity,
392 config: Config,
393 data_transform: D,
394 ) -> Result<Self, &'static str> {
395 Self::new_with_subscription_filter_and_transform(
396 privacy,
397 config,
398 F::default(),
399 data_transform,
400 )
401 }
402}
403
404impl<D, F> Behaviour<D, F>
405where
406 D: DataTransform,
407 F: TopicSubscriptionFilter,
408{
409 pub fn new_with_subscription_filter_and_transform(
413 privacy: MessageAuthenticity,
414 config: Config,
415 subscription_filter: F,
416 data_transform: D,
417 ) -> Result<Self, &'static str> {
418 validate_config(&privacy, config.validation_mode())?;
423
424 Ok(Behaviour {
425 #[cfg(feature = "metrics")]
426 metrics: None,
427 events: VecDeque::new(),
428 publish_config: privacy.into(),
429 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
430 explicit_peers: HashSet::new(),
431 blacklisted_peers: HashSet::new(),
432 mesh: HashMap::new(),
433 fanout: HashMap::new(),
434 fanout_last_pub: HashMap::new(),
435 backoffs: BackoffStorage::new(
436 &config.prune_backoff(),
437 config.heartbeat_interval(),
438 config.backoff_slack(),
439 ),
440 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
441 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
442 heartbeat_ticks: 0,
443 px_peers: HashSet::new(),
444 peer_score: PeerScoreState::Disabled,
445 count_received_ihave: HashMap::new(),
446 count_sent_iwant: HashMap::new(),
447 connected_peers: HashMap::new(),
448 config,
449 subscription_filter,
450 data_transform,
451 failed_messages: Default::default(),
452 gossip_promises: Default::default(),
453 })
454 }
455
456 #[cfg(feature = "metrics")]
459 pub fn with_metrics(
460 mut self,
461 metrics_registry: &mut Registry,
462 metrics_config: MetricsConfig,
463 ) -> Self {
464 self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
465 self
466 }
467}
468
469impl<D, F> Behaviour<D, F>
470where
471 D: DataTransform + Send + 'static,
472 F: TopicSubscriptionFilter + Send + 'static,
473{
474 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
476 self.mesh.keys()
477 }
478
479 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
481 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
482 }
483
484 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
485 let mut res = BTreeSet::new();
486 for peers in self.mesh.values() {
487 res.extend(peers);
488 }
489 res.into_iter()
490 }
491
492 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
494 self.connected_peers
495 .iter()
496 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
497 }
498
499 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
501 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
502 }
503
504 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
506 match &self.peer_score {
507 PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
508 PeerScoreState::Disabled => None,
509 }
510 }
511
512 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
517 let topic_hash = topic.hash();
518 if !self.subscription_filter.can_subscribe(&topic_hash) {
519 return Err(SubscriptionError::NotAllowed);
520 }
521
522 if self.mesh.contains_key(&topic_hash) {
523 tracing::debug!(%topic, "Topic is already in the mesh");
524 return Ok(false);
525 }
526
527 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
529 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
530 let event = RpcOut::Subscribe(topic_hash.clone());
531 self.send_message(peer_id, event);
532 }
533
534 self.join(&topic_hash);
537 tracing::debug!(%topic, "Subscribed to topic");
538 Ok(true)
539 }
540
541 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
545 let topic_hash = topic.hash();
546
547 if !self.mesh.contains_key(&topic_hash) {
548 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
549 return false;
551 }
552
553 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
555 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
556 let event = RpcOut::Unsubscribe(topic_hash.clone());
557 self.send_message(peer, event);
558 }
559
560 self.leave(&topic_hash);
563
564 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
565 true
566 }
567
568 pub fn publish(
570 &mut self,
571 topic: impl Into<TopicHash>,
572 data: impl Into<Vec<u8>>,
573 ) -> Result<MessageId, PublishError> {
574 let data = data.into();
575 let topic = topic.into();
576
577 let transformed_data = self
579 .data_transform
580 .outbound_transform(&topic.clone(), data.clone())?;
581
582 let max_transmit_size_for_topic = self
583 .config
584 .protocol_config()
585 .max_transmit_size_for_topic(&topic);
586
587 if transformed_data.len() > max_transmit_size_for_topic {
589 return Err(PublishError::MessageTooLarge);
590 }
591
592 let mesh_n = self.config.mesh_n_for_topic(&topic);
593 let raw_message = self.build_raw_message(topic, transformed_data)?;
594
595 let msg_id = self.config.message_id(&Message {
597 source: raw_message.source,
598 data, sequence_number: raw_message.sequence_number,
600 topic: raw_message.topic.clone(),
601 });
602
603 if self.duplicate_cache.contains(&msg_id) {
605 tracing::warn!(
608 message_id=%msg_id,
609 "Not publishing a message that has already been published"
610 );
611 return Err(PublishError::Duplicate);
612 }
613
614 tracing::trace!(message_id=%msg_id, "Publishing message");
615
616 let topic_hash = raw_message.topic.clone();
617
618 let mut peers_on_topic = self
619 .connected_peers
620 .iter()
621 .filter(|(_, p)| p.topics.contains(&topic_hash))
622 .map(|(peer_id, _)| peer_id)
623 .peekable();
624
625 if peers_on_topic.peek().is_none() {
626 return Err(PublishError::NoPeersSubscribedToTopic);
627 }
628
629 let mut recipient_peers = HashSet::new();
630 if self.config.flood_publish() {
631 recipient_peers.extend(peers_on_topic.filter(|p| {
633 self.explicit_peers.contains(*p)
634 || !self
635 .peer_score
636 .below_threshold(p, |ts| ts.publish_threshold)
637 .0
638 }));
639 } else {
640 match self.mesh.get(&topic_hash) {
641 Some(mesh_peers) => {
643 let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
646
647 if needed_extra_peers > 0 {
648 let peer_list = get_random_peers(
653 &self.connected_peers,
654 &topic_hash,
655 needed_extra_peers,
656 |peer| {
657 !mesh_peers.contains(peer)
658 && !self.explicit_peers.contains(peer)
659 && !self
660 .peer_score
661 .below_threshold(peer, |ts| ts.publish_threshold)
662 .0
663 },
664 );
665 recipient_peers.extend(peer_list);
666 }
667
668 recipient_peers.extend(mesh_peers);
669 }
670 None => {
672 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
673 let fanout_peers = self
675 .fanout
676 .get(&topic_hash)
677 .filter(|peers| !peers.is_empty());
678 if let Some(peers) = fanout_peers {
680 for peer in peers {
681 recipient_peers.insert(*peer);
682 }
683 } else {
684 let new_peers =
686 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
687 |p| {
688 !self.explicit_peers.contains(p)
689 && !self
690 .peer_score
691 .below_threshold(p, |ts| ts.publish_threshold)
692 .0
693 }
694 });
695 self.fanout.insert(topic_hash.clone(), new_peers.clone());
697 for peer in new_peers {
698 tracing::debug!(%peer, "Peer added to fanout");
699 recipient_peers.insert(peer);
700 }
701 }
702 self.fanout_last_pub
704 .insert(topic_hash.clone(), Instant::now());
705 }
706 }
707
708 recipient_peers
710 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
711
712 for (peer, connections) in &self.connected_peers {
714 if connections.kind == PeerKind::Floodsub
715 && connections.topics.contains(&topic_hash)
716 && !self
717 .peer_score
718 .below_threshold(peer, |ts| ts.publish_threshold)
719 .0
720 {
721 recipient_peers.insert(*peer);
722 }
723 }
724 }
725
726 self.duplicate_cache.insert(msg_id.clone());
729 self.mcache.put(&msg_id, raw_message.clone());
730
731 self.gossip_promises.message_delivered(&msg_id);
733
734 let mut publish_failed = true;
736 for peer_id in recipient_peers.iter() {
737 tracing::trace!(peer=%peer_id, "Sending message to peer");
738 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
741 && self.config.idontwant_on_publish()
742 {
743 self.send_message(
744 *peer_id,
745 RpcOut::IDontWant(IDontWant {
746 message_ids: vec![msg_id.clone()],
747 }),
748 );
749 }
750
751 if self.send_message(
752 *peer_id,
753 RpcOut::Publish {
754 message: raw_message.clone(),
755 timeout: Delay::new(self.config.publish_queue_duration()),
756 },
757 ) {
758 publish_failed = false
759 }
760 }
761
762 if recipient_peers.is_empty() {
763 return Err(PublishError::NoPeersSubscribedToTopic);
764 }
765
766 if publish_failed {
767 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
768 }
769
770 tracing::debug!(message_id=%msg_id, "Published message");
771
772 #[cfg(feature = "metrics")]
773 if let Some(metrics) = self.metrics.as_mut() {
774 metrics.register_published_message(&topic_hash);
775 }
776
777 Ok(msg_id)
778 }
779
780 pub fn report_message_validation_result(
800 &mut self,
801 msg_id: &MessageId,
802 propagation_source: &PeerId,
803 acceptance: MessageAcceptance,
804 ) -> bool {
805 let reject_reason = match acceptance {
806 MessageAcceptance::Accept => {
807 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
808 Some((raw_message, originating_peers)) => {
809 (raw_message.clone(), originating_peers)
810 }
811 None => {
812 tracing::warn!(
813 message_id=%msg_id,
814 "Message not in cache. Ignoring forwarding"
815 );
816 #[cfg(feature = "metrics")]
817 if let Some(metrics) = self.metrics.as_mut() {
818 metrics.memcache_miss();
819 }
820 return false;
821 }
822 };
823
824 #[cfg(feature = "metrics")]
825 if let Some(metrics) = self.metrics.as_mut() {
826 metrics.register_msg_validation(&raw_message.topic, &acceptance);
827 }
828
829 self.forward_msg(
830 msg_id,
831 raw_message,
832 Some(propagation_source),
833 originating_peers,
834 );
835 return true;
836 }
837 MessageAcceptance::Reject => RejectReason::ValidationFailed,
838 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
839 };
840
841 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
842 #[cfg(feature = "metrics")]
843 if let Some(metrics) = self.metrics.as_mut() {
844 metrics.register_msg_validation(&raw_message.topic, &acceptance);
845 }
846
847 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
850 peer_score.reject_message(
851 propagation_source,
852 msg_id,
853 &raw_message.topic,
854 reject_reason,
855 );
856 for peer in originating_peers.iter() {
857 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
858 }
859 }
860 true
861 } else {
862 tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
863 false
864 }
865 }
866
867 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
869 tracing::debug!(peer=%peer_id, "Adding explicit peer");
870
871 self.explicit_peers.insert(*peer_id);
872
873 self.check_explicit_peer_connection(peer_id);
874 }
875
876 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
879 tracing::debug!(peer=%peer_id, "Removing explicit peer");
880 self.explicit_peers.remove(peer_id);
881 }
882
883 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
886 if self.blacklisted_peers.insert(*peer_id) {
887 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
888 }
889 }
890
891 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
893 if self.blacklisted_peers.remove(peer_id) {
894 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
895 }
896 }
897
898 pub fn with_peer_score(
902 &mut self,
903 params: PeerScoreParams,
904 threshold: PeerScoreThresholds,
905 ) -> Result<(), String> {
906 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
907 }
908
909 pub fn with_peer_score_and_message_delivery_time_callback(
912 &mut self,
913 params: PeerScoreParams,
914 thresholds: PeerScoreThresholds,
915 callback: Option<fn(&PeerId, &TopicHash, f64)>,
916 ) -> Result<(), String> {
917 params.validate()?;
918 thresholds.validate()?;
919
920 if let PeerScoreState::Active(_) = self.peer_score {
921 return Err("Peer score set twice".into());
922 }
923
924 let peer_score =
925 PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
926 self.peer_score = PeerScoreState::Active(Box::new(peer_score));
927 Ok(())
928 }
929
930 pub fn set_topic_params<H: Hasher>(
934 &mut self,
935 topic: Topic<H>,
936 params: TopicScoreParams,
937 ) -> Result<(), &'static str> {
938 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
939 peer_score.set_topic_params(topic.hash(), params);
940 Ok(())
941 } else {
942 Err("Peer score must be initialised with `with_peer_score()`")
943 }
944 }
945
946 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
948 match &self.peer_score {
949 PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
950 PeerScoreState::Disabled => None,
951 }
952 }
953
954 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
957 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
958 peer_score.set_application_score(peer_id, new_score)
959 } else {
960 false
961 }
962 }
963
964 fn join(&mut self, topic_hash: &TopicHash) {
966 let mut added_peers = HashSet::new();
967 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
968 #[cfg(feature = "metrics")]
969 if let Some(m) = self.metrics.as_mut() {
970 m.joined(topic_hash)
971 }
972
973 self.mesh.entry(topic_hash.clone()).or_default();
975
976 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
979 tracing::debug!(
980 topic=%topic_hash,
981 "JOIN: Removing peers from the fanout for topic"
982 );
983
984 peers.retain(|p| {
986 !self.explicit_peers.contains(p)
987 && !self.peer_score.below_threshold(p, |_| 0.0).0
988 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
989 });
990
991 let add_peers = std::cmp::min(peers.len(), mesh_n);
994 tracing::debug!(
995 topic=%topic_hash,
996 "JOIN: Adding {:?} peers from the fanout for topic",
997 add_peers
998 );
999 added_peers.extend(peers.iter().take(add_peers));
1000
1001 self.mesh.insert(
1002 topic_hash.clone(),
1003 peers.into_iter().take(add_peers).collect(),
1004 );
1005
1006 self.fanout_last_pub.remove(topic_hash);
1008 }
1009
1010 #[cfg(feature = "metrics")]
1011 if let Some(m) = self.metrics.as_mut() {
1012 m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1013 }
1014
1015 if added_peers.len() < mesh_n {
1017 let random_added = get_random_peers(
1019 &self.connected_peers,
1020 topic_hash,
1021 mesh_n - added_peers.len(),
1022 |peer| {
1023 !added_peers.contains(peer)
1024 && !self.explicit_peers.contains(peer)
1025 && !self.peer_score.below_threshold(peer, |_| 0.0).0
1026 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1027 },
1028 );
1029
1030 added_peers.extend(random_added.clone());
1031 tracing::debug!(
1033 "JOIN: Inserting {:?} random peers into the mesh",
1034 random_added.len()
1035 );
1036
1037 #[cfg(feature = "metrics")]
1038 if let Some(m) = self.metrics.as_mut() {
1039 m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1040 }
1041
1042 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1043 mesh_peers.extend(random_added);
1044 }
1045
1046 for peer_id in added_peers {
1047 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1049 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1050 peer_score.graft(&peer_id, topic_hash.clone());
1051 }
1052 self.send_message(
1053 peer_id,
1054 RpcOut::Graft(Graft {
1055 topic_hash: topic_hash.clone(),
1056 }),
1057 );
1058
1059 peer_added_to_mesh(
1061 peer_id,
1062 vec![topic_hash],
1063 &self.mesh,
1064 &mut self.events,
1065 &self.connected_peers,
1066 );
1067 }
1068
1069 #[cfg(feature = "metrics")]
1070 {
1071 let mesh_peers = self.mesh_peers(topic_hash).count();
1072 if let Some(m) = self.metrics.as_mut() {
1073 m.set_mesh_peers(topic_hash, mesh_peers)
1074 }
1075 }
1076
1077 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1078 }
1079
1080 fn make_prune(
1082 &mut self,
1083 topic_hash: &TopicHash,
1084 peer: &PeerId,
1085 do_px: bool,
1086 on_unsubscribe: bool,
1087 ) -> Prune {
1088 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1089 peer_score.prune(peer, topic_hash.clone());
1090 }
1091
1092 match self.connected_peers.get(peer).map(|v| &v.kind) {
1093 Some(PeerKind::Floodsub) => {
1094 tracing::error!("Attempted to prune a Floodsub peer");
1095 }
1096 Some(PeerKind::Gossipsub) => {
1097 return Prune {
1099 topic_hash: topic_hash.clone(),
1100 peers: Vec::new(),
1101 backoff: None,
1102 };
1103 }
1104 None => {
1105 tracing::error!("Attempted to Prune an unknown peer");
1106 }
1107 _ => {} }
1109
1110 let peers = if do_px {
1112 get_random_peers(
1113 &self.connected_peers,
1114 topic_hash,
1115 self.config.prune_peers(),
1116 |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1117 )
1118 .into_iter()
1119 .map(|p| PeerInfo { peer_id: Some(p) })
1120 .collect()
1121 } else {
1122 Vec::new()
1123 };
1124
1125 let backoff = if on_unsubscribe {
1126 self.config.unsubscribe_backoff()
1127 } else {
1128 self.config.prune_backoff()
1129 };
1130
1131 self.backoffs.update_backoff(topic_hash, peer, backoff);
1133
1134 Prune {
1135 topic_hash: topic_hash.clone(),
1136 peers,
1137 backoff: Some(backoff.as_secs()),
1138 }
1139 }
1140
1141 fn leave(&mut self, topic_hash: &TopicHash) {
1143 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1144
1145 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1147 #[cfg(feature = "metrics")]
1148 if let Some(m) = self.metrics.as_mut() {
1149 m.left(topic_hash)
1150 }
1151 for peer_id in peers {
1152 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1154
1155 let on_unsubscribe = true;
1156 let prune =
1157 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1158 self.send_message(peer_id, RpcOut::Prune(prune));
1159
1160 peer_removed_from_mesh(
1162 peer_id,
1163 topic_hash,
1164 &self.mesh,
1165 &mut self.events,
1166 &self.connected_peers,
1167 );
1168 }
1169 }
1170 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1171 }
1172
1173 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1175 if !self.connected_peers.contains_key(peer_id) {
1176 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1178 self.events.push_back(ToSwarm::Dial {
1179 opts: DialOpts::peer_id(*peer_id).build(),
1180 });
1181 }
1182 }
1183
1184 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1187 if let (true, score) = self
1189 .peer_score
1190 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1191 {
1192 tracing::debug!(
1193 peer=%peer_id,
1194 %score,
1195 "IHAVE: ignoring peer with score below threshold"
1196 );
1197 return;
1198 }
1199
1200 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1202 *peer_have += 1;
1203 if *peer_have > self.config.max_ihave_messages() {
1204 tracing::debug!(
1205 peer=%peer_id,
1206 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1207 interval; ignoring",
1208 *peer_have
1209 );
1210 return;
1211 }
1212
1213 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1214 if *iasked >= self.config.max_ihave_length() {
1215 tracing::debug!(
1216 peer=%peer_id,
1217 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1218 *iasked
1219 );
1220 return;
1221 }
1222 }
1223
1224 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1225
1226 let mut iwant_ids = HashSet::new();
1227
1228 for (topic, ids) in ihave_msgs {
1229 if !self.mesh.contains_key(&topic) {
1231 tracing::debug!(
1232 %topic,
1233 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1234 );
1235 continue;
1236 }
1237
1238 for id in ids.into_iter().filter(|id| {
1239 if self.duplicate_cache.contains(id) {
1240 return false;
1241 }
1242
1243 !self.gossip_promises.contains(id)
1244 }) {
1245 if iwant_ids.insert(id) {
1247 #[cfg(feature = "metrics")]
1249 if let Some(metrics) = self.metrics.as_mut() {
1250 metrics.register_iwant(&topic);
1251 }
1252 }
1253 }
1254 }
1255
1256 if !iwant_ids.is_empty() {
1257 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1258 let mut iask = iwant_ids.len();
1259 if *iasked + iask > self.config.max_ihave_length() {
1260 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1261 }
1262
1263 tracing::debug!(
1265 peer=%peer_id,
1266 "IHAVE: Asking for {} out of {} messages from peer",
1267 iask,
1268 iwant_ids.len()
1269 );
1270
1271 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1273 let mut rng = thread_rng();
1274 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1275
1276 iwant_ids_vec.truncate(iask);
1277 *iasked += iask;
1278
1279 self.gossip_promises.add_promise(
1280 *peer_id,
1281 &iwant_ids_vec,
1282 Instant::now() + self.config.iwant_followup_time(),
1283 );
1284 tracing::trace!(
1285 peer=%peer_id,
1286 "IHAVE: Asking for the following messages from peer: {:?}",
1287 iwant_ids_vec
1288 );
1289
1290 self.send_message(
1291 *peer_id,
1292 RpcOut::IWant(IWant {
1293 message_ids: iwant_ids_vec,
1294 }),
1295 );
1296 }
1297 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1298 }
1299
1300 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1303 if let (true, score) = self
1305 .peer_score
1306 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1307 {
1308 tracing::debug!(
1309 peer=%peer_id,
1310 "IWANT: ignoring peer with score below threshold [score = {}]",
1311 score
1312 );
1313 return;
1314 }
1315
1316 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1317
1318 for id in iwant_msgs {
1319 if let Some((msg, count)) = self
1322 .mcache
1323 .get_with_iwant_counts(&id, peer_id)
1324 .map(|(msg, count)| (msg.clone(), count))
1325 {
1326 if count > self.config.gossip_retransimission() {
1327 tracing::debug!(
1328 peer=%peer_id,
1329 message_id=%id,
1330 "IWANT: Peer has asked for message too many times; ignoring request"
1331 );
1332 } else {
1333 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1334 if peer.dont_send.contains_key(&id) {
1335 tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1336 continue;
1337 }
1338 }
1339
1340 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1341 self.send_message(
1342 *peer_id,
1343 RpcOut::Forward {
1344 message: msg,
1345 timeout: Delay::new(self.config.forward_queue_duration()),
1346 },
1347 );
1348 }
1349 }
1350 }
1351 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1352 }
1353
1354 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1357 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1358
1359 let mut to_prune_topics = HashSet::new();
1360
1361 let mut do_px = self.config.do_px();
1362
1363 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1364 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1365 return;
1366 };
1367 let is_outbound = connected_peer.outbound;
1369
1370 for topic in &topics {
1373 if connected_peer.topics.insert(topic.clone()) {
1374 #[cfg(feature = "metrics")]
1375 if let Some(m) = self.metrics.as_mut() {
1376 m.inc_topic_peers(topic);
1377 }
1378 }
1379 }
1380
1381 if self.explicit_peers.contains(peer_id) {
1383 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1384 to_prune_topics = topics.into_iter().collect();
1386 do_px = false
1388 } else {
1389 let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1390 let now = Instant::now();
1391 for topic_hash in topics {
1392 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1393 if peers.contains(peer_id) {
1395 tracing::debug!(
1396 peer=%peer_id,
1397 topic=%&topic_hash,
1398 "GRAFT: Received graft for peer that is already in topic"
1399 );
1400 continue;
1401 }
1402
1403 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1405 {
1406 if backoff_time > now {
1407 tracing::warn!(
1408 peer=%peer_id,
1409 "[Penalty] Peer attempted graft within backoff time, penalizing"
1410 );
1411 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1413 #[cfg(feature = "metrics")]
1414 if let Some(metrics) = self.metrics.as_mut() {
1415 metrics.register_score_penalty(Penalty::GraftBackoff);
1416 }
1417 peer_score.add_penalty(peer_id, 1);
1418
1419 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1422 let flood_cutoff = (backoff_time
1423 + self.config.graft_flood_threshold())
1424 - self.config.prune_backoff();
1425 if flood_cutoff > now {
1426 peer_score.add_penalty(peer_id, 1);
1428 }
1429 }
1430 do_px = false;
1432
1433 to_prune_topics.insert(topic_hash.clone());
1434 continue;
1435 }
1436 }
1437
1438 if below_zero {
1440 tracing::debug!(
1442 peer=%peer_id,
1443 %score,
1444 topic=%topic_hash,
1445 "GRAFT: ignoring peer with negative score"
1446 );
1447 to_prune_topics.insert(topic_hash.clone());
1450 do_px = false;
1452 continue;
1453 }
1454
1455 let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1458
1459 if peers.len() >= mesh_n_high && !is_outbound {
1460 to_prune_topics.insert(topic_hash.clone());
1461 continue;
1462 }
1463
1464 tracing::debug!(
1466 peer=%peer_id,
1467 topic=%topic_hash,
1468 "GRAFT: Mesh link added for peer in topic"
1469 );
1470
1471 if peers.insert(*peer_id) {
1472 #[cfg(feature = "metrics")]
1473 if let Some(m) = self.metrics.as_mut() {
1474 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1475 }
1476 }
1477
1478 peer_added_to_mesh(
1480 *peer_id,
1481 vec![&topic_hash],
1482 &self.mesh,
1483 &mut self.events,
1484 &self.connected_peers,
1485 );
1486
1487 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1488 peer_score.graft(peer_id, topic_hash);
1489 }
1490 } else {
1491 do_px = false;
1493 tracing::debug!(
1494 peer=%peer_id,
1495 topic=%topic_hash,
1496 "GRAFT: Received graft for unknown topic from peer"
1497 );
1498 continue;
1500 }
1501 }
1502 }
1503
1504 if !to_prune_topics.is_empty() {
1505 let on_unsubscribe = false;
1507
1508 for prune in to_prune_topics
1509 .iter()
1510 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1511 .collect::<Vec<_>>()
1512 {
1513 self.send_message(*peer_id, RpcOut::Prune(prune));
1514 }
1515 tracing::debug!(
1517 peer=%peer_id,
1518 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1519 );
1520 }
1521 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1522 }
1523
1524 fn remove_peer_from_mesh(
1526 &mut self,
1527 peer_id: &PeerId,
1528 topic_hash: &TopicHash,
1529 backoff: Option<u64>,
1530 always_update_backoff: bool,
1531 ) -> bool {
1532 let mut peer_removed = false;
1533 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1534 peer_removed = peers.remove(peer_id);
1536 if peer_removed {
1537 tracing::debug!(
1538 peer=%peer_id,
1539 topic=%topic_hash,
1540 "PRUNE: Removing peer from the mesh for topic"
1541 );
1542
1543 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1544 peer_score.prune(peer_id, topic_hash.clone());
1545 }
1546
1547 peer_removed_from_mesh(
1549 *peer_id,
1550 topic_hash,
1551 &self.mesh,
1552 &mut self.events,
1553 &self.connected_peers,
1554 );
1555 }
1556 }
1557 if always_update_backoff || peer_removed {
1558 let time = if let Some(backoff) = backoff {
1559 Duration::from_secs(backoff)
1560 } else {
1561 self.config.prune_backoff()
1562 };
1563 self.backoffs.update_backoff(topic_hash, peer_id, time);
1565 }
1566 peer_removed
1567 }
1568
1569 fn handle_prune(
1571 &mut self,
1572 peer_id: &PeerId,
1573 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1574 ) {
1575 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1576 let (below_threshold, score) = self
1577 .peer_score
1578 .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1579 for (topic_hash, px, backoff) in prune_data {
1580 if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1581 #[cfg(feature = "metrics")]
1582 if let Some(m) = self.metrics.as_mut() {
1583 m.peers_removed(&topic_hash, Churn::Prune, 1);
1584 }
1585 }
1586
1587 if self.mesh.contains_key(&topic_hash) {
1588 if !px.is_empty() {
1590 if below_threshold {
1592 tracing::debug!(
1593 peer=%peer_id,
1594 %score,
1595 topic=%topic_hash,
1596 "PRUNE: ignoring PX from peer with insufficient score"
1597 );
1598 continue;
1599 }
1600
1601 if self.config.prune_peers() > 0 {
1608 self.px_connect(px);
1609 }
1610 }
1611 }
1612 }
1613 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1614 }
1615
1616 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1617 let n = self.config.prune_peers();
1618 px.retain(|p| p.peer_id.is_some());
1623 if px.len() > n {
1624 let mut rng = thread_rng();
1626 px.partial_shuffle(&mut rng, n);
1627 px = px.into_iter().take(n).collect();
1628 }
1629
1630 for p in px {
1631 if let Some(peer_id) = p.peer_id {
1634 self.px_peers.insert(peer_id);
1636
1637 self.events.push_back(ToSwarm::Dial {
1639 opts: DialOpts::peer_id(peer_id).build(),
1640 });
1641 }
1642 }
1643 }
1644
1645 fn message_is_valid(
1648 &mut self,
1649 msg_id: &MessageId,
1650 raw_message: &mut RawMessage,
1651 propagation_source: &PeerId,
1652 ) -> bool {
1653 tracing::debug!(
1654 peer=%propagation_source,
1655 message_id=%msg_id,
1656 "Handling message from peer"
1657 );
1658
1659 if self.blacklisted_peers.contains(propagation_source) {
1661 tracing::debug!(
1662 peer=%propagation_source,
1663 "Rejecting message from blacklisted peer"
1664 );
1665 self.gossip_promises
1666 .reject_message(msg_id, &RejectReason::BlackListedPeer);
1667 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1668 peer_score.reject_message(
1669 propagation_source,
1670 msg_id,
1671 &raw_message.topic,
1672 RejectReason::BlackListedPeer,
1673 );
1674 }
1675 return false;
1676 }
1677
1678 if let Some(source) = raw_message.source.as_ref() {
1680 if self.blacklisted_peers.contains(source) {
1681 tracing::debug!(
1682 peer=%propagation_source,
1683 %source,
1684 "Rejecting message from peer because of blacklisted source"
1685 );
1686 self.handle_invalid_message(
1687 propagation_source,
1688 &raw_message.topic,
1689 Some(msg_id),
1690 RejectReason::BlackListedSource,
1691 );
1692 return false;
1693 }
1694 }
1695
1696 if !self.config.validate_messages() {
1700 raw_message.validated = true;
1701 }
1702
1703 let self_published = !self.config.allow_self_origin()
1705 && if let Some(own_id) = self.publish_config.get_own_id() {
1706 own_id != propagation_source
1707 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1708 } else {
1709 false
1710 };
1711
1712 if self_published {
1713 tracing::debug!(
1714 message_id=%msg_id,
1715 source=%propagation_source,
1716 "Dropping message claiming to be from self but forwarded from source"
1717 );
1718 self.handle_invalid_message(
1719 propagation_source,
1720 &raw_message.topic,
1721 Some(msg_id),
1722 RejectReason::SelfOrigin,
1723 );
1724 return false;
1725 }
1726
1727 true
1728 }
1729
1730 fn handle_received_message(
1734 &mut self,
1735 mut raw_message: RawMessage,
1736 propagation_source: &PeerId,
1737 ) {
1738 #[cfg(feature = "metrics")]
1740 if let Some(metrics) = self.metrics.as_mut() {
1741 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1742 }
1743
1744 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1746 Ok(message) => message,
1747 Err(e) => {
1748 tracing::debug!("Invalid message. Transform error: {:?}", e);
1749 self.handle_invalid_message(
1751 propagation_source,
1752 &raw_message.topic,
1753 None,
1754 RejectReason::ValidationError(ValidationError::TransformFailed),
1755 );
1756 return;
1757 }
1758 };
1759
1760 let msg_id = self.config.message_id(&message);
1762
1763 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1765 let recipient_peers = self
1766 .mesh
1767 .get(&message.topic)
1768 .map(|mesh| mesh.iter())
1769 .unwrap_or_default()
1770 .copied()
1771 .chain(self.gossip_promises.peers_for_message(&msg_id))
1772 .filter(|peer_id| {
1773 peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1774 })
1775 .collect::<Vec<PeerId>>();
1776
1777 for peer_id in recipient_peers {
1778 self.send_message(
1779 peer_id,
1780 RpcOut::IDontWant(IDontWant {
1781 message_ids: vec![msg_id.clone()],
1782 }),
1783 );
1784 }
1785 }
1786
1787 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1791 return;
1792 }
1793
1794 if !self.duplicate_cache.insert(msg_id.clone()) {
1795 tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1796 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1797 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1798 }
1799 self.mcache.observe_duplicate(&msg_id, propagation_source);
1800 return;
1801 }
1802
1803 tracing::debug!(
1804 message_id=%msg_id,
1805 "Put message in duplicate_cache and resolve promises"
1806 );
1807
1808 #[cfg(feature = "metrics")]
1810 if let Some(metrics) = self.metrics.as_mut() {
1811 metrics.msg_recvd(&message.topic);
1812 }
1813
1814 self.gossip_promises.message_delivered(&msg_id);
1817
1818 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1820 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1821 }
1822
1823 self.mcache.put(&msg_id, raw_message.clone());
1825
1826 #[allow(
1828 clippy::map_entry,
1829 reason = "False positive, see rust-lang/rust-clippy#14449."
1830 )]
1831 if self.mesh.contains_key(&message.topic) {
1832 tracing::debug!("Sending received message to user");
1833 self.events
1834 .push_back(ToSwarm::GenerateEvent(Event::Message {
1835 propagation_source: *propagation_source,
1836 message_id: msg_id.clone(),
1837 message,
1838 }));
1839 } else {
1840 tracing::debug!(
1841 topic=%message.topic,
1842 "Received message on a topic we are not subscribed to"
1843 );
1844 return;
1845 }
1846
1847 if !self.config.validate_messages() {
1849 self.forward_msg(
1850 &msg_id,
1851 raw_message,
1852 Some(propagation_source),
1853 HashSet::new(),
1854 );
1855 tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1856 }
1857 }
1858
1859 fn handle_invalid_message(
1861 &mut self,
1862 propagation_source: &PeerId,
1863 topic_hash: &TopicHash,
1864 message_id: Option<&MessageId>,
1865 reject_reason: RejectReason,
1866 ) {
1867 #[cfg(feature = "metrics")]
1868 if let Some(metrics) = self.metrics.as_mut() {
1869 metrics.register_invalid_message(topic_hash);
1870 }
1871 if let Some(msg_id) = message_id {
1872 self.gossip_promises.reject_message(msg_id, &reject_reason);
1874 }
1875 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1876 if let Some(msg_id) = message_id {
1878 peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1881 } else {
1882 peer_score.reject_invalid_message(propagation_source, topic_hash);
1886 }
1887 }
1888 }
1889
1890 fn handle_received_subscriptions(
1892 &mut self,
1893 subscriptions: &[Subscription],
1894 propagation_source: &PeerId,
1895 ) {
1896 tracing::trace!(
1897 source=%propagation_source,
1898 "Handling subscriptions: {:?}",
1899 subscriptions,
1900 );
1901
1902 let mut unsubscribed_peers = Vec::new();
1903
1904 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1905 tracing::error!(
1906 peer=%propagation_source,
1907 "Subscription by unknown peer"
1908 );
1909 return;
1910 };
1911
1912 let mut topics_to_graft = Vec::new();
1914
1915 let mut application_event = Vec::new();
1917
1918 let filtered_topics = match self
1919 .subscription_filter
1920 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1921 {
1922 Ok(topics) => topics,
1923 Err(s) => {
1924 tracing::error!(
1925 peer=%propagation_source,
1926 "Subscription filter error: {}; ignoring RPC from peer",
1927 s
1928 );
1929 return;
1930 }
1931 };
1932
1933 for subscription in filtered_topics {
1934 let topic_hash = &subscription.topic_hash;
1936
1937 match subscription.action {
1938 SubscriptionAction::Subscribe => {
1939 if peer.topics.insert(topic_hash.clone()) {
1940 tracing::debug!(
1941 peer=%propagation_source,
1942 topic=%topic_hash,
1943 "SUBSCRIPTION: Adding gossip peer to topic"
1944 );
1945
1946 #[cfg(feature = "metrics")]
1947 if let Some(m) = self.metrics.as_mut() {
1948 m.inc_topic_peers(topic_hash);
1949 }
1950 }
1951
1952 if !self.explicit_peers.contains(propagation_source)
1954 && peer.kind.is_gossipsub()
1955 && !self
1956 .peer_score
1957 .below_threshold(propagation_source, |_| 0.0)
1958 .0
1959 && !self
1960 .backoffs
1961 .is_backoff_with_slack(topic_hash, propagation_source)
1962 {
1963 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1964 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1965
1966 if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1967 tracing::debug!(
1968 peer=%propagation_source,
1969 topic=%topic_hash,
1970 "SUBSCRIPTION: Adding peer to the mesh for topic"
1971 );
1972 #[cfg(feature = "metrics")]
1973 if let Some(m) = self.metrics.as_mut() {
1974 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1975 }
1976 tracing::debug!(
1978 peer=%propagation_source,
1979 topic=%topic_hash,
1980 "Sending GRAFT to peer for topic"
1981 );
1982 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1983 peer_score.graft(propagation_source, topic_hash.clone());
1984 }
1985 topics_to_graft.push(topic_hash.clone());
1986 }
1987 }
1988 }
1989 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1991 peer_id: *propagation_source,
1992 topic: topic_hash.clone(),
1993 }));
1994 }
1995 SubscriptionAction::Unsubscribe => {
1996 if peer.topics.remove(topic_hash) {
1997 tracing::debug!(
1998 peer=%propagation_source,
1999 topic=%topic_hash,
2000 "SUBSCRIPTION: Removing gossip peer from topic"
2001 );
2002
2003 #[cfg(feature = "metrics")]
2004 if let Some(m) = self.metrics.as_mut() {
2005 m.dec_topic_peers(topic_hash);
2006 }
2007 }
2008
2009 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2010 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2012 peer_id: *propagation_source,
2013 topic: topic_hash.clone(),
2014 }));
2015 }
2016 }
2017 }
2018
2019 for (peer_id, topic_hash) in unsubscribed_peers {
2021 self.fanout
2022 .get_mut(&topic_hash)
2023 .map(|peers| peers.remove(&peer_id));
2024 if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2025 #[cfg(feature = "metrics")]
2026 if let Some(m) = self.metrics.as_mut() {
2027 m.peers_removed(&topic_hash, Churn::Unsub, 1);
2028 }
2029 };
2030 }
2031
2032 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2034 if !topics_joined.is_empty() {
2035 peer_added_to_mesh(
2036 *propagation_source,
2037 topics_joined,
2038 &self.mesh,
2039 &mut self.events,
2040 &self.connected_peers,
2041 );
2042 }
2043
2044 for topic_hash in topics_to_graft.into_iter() {
2047 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2048 }
2049
2050 for event in application_event {
2052 self.events.push_back(event);
2053 }
2054
2055 tracing::trace!(
2056 source=%propagation_source,
2057 "Completed handling subscriptions from source"
2058 );
2059 }
2060
2061 fn apply_iwant_penalties(&mut self) {
2063 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2064 for (peer, count) in self.gossip_promises.get_broken_promises() {
2065 peer_score.add_penalty(&peer, count);
2066 #[cfg(feature = "metrics")]
2067 if let Some(metrics) = self.metrics.as_mut() {
2068 metrics.register_score_penalty(Penalty::BrokenPromise);
2069 }
2070 }
2071 }
2072 }
2073
2074 fn heartbeat(&mut self) {
2076 #[cfg(feature = "metrics")]
2077 let start = Instant::now();
2078
2079 #[cfg(feature = "metrics")]
2083 if let Some(m) = &mut self.metrics {
2084 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2085 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2086 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2087 }
2088 }
2089
2090 self.heartbeat_ticks += 1;
2091
2092 let mut to_graft = HashMap::new();
2093 let mut to_prune = HashMap::new();
2094 let mut no_px = HashSet::new();
2095
2096 self.backoffs.heartbeat();
2098
2099 self.count_sent_iwant.clear();
2101 self.count_received_ihave.clear();
2102
2103 self.apply_iwant_penalties();
2105
2106 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2108 for p in self.explicit_peers.clone() {
2109 self.check_explicit_peer_connection(&p);
2110 }
2111 }
2112
2113 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2115 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2116 for peer_id in self.connected_peers.keys() {
2117 #[allow(unused_variables)]
2118 let report = scores
2119 .entry(peer_id)
2120 .or_insert_with(|| peer_score.score_report(peer_id));
2121
2122 #[cfg(feature = "metrics")]
2123 if let Some(metrics) = self.metrics.as_mut() {
2124 for penalty in &report.penalties {
2125 metrics.register_score_penalty(*penalty);
2126 }
2127 }
2128 }
2129 }
2130
2131 for (topic_hash, peers) in self.mesh.iter_mut() {
2133 let explicit_peers = &self.explicit_peers;
2134 let backoffs = &self.backoffs;
2135
2136 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2137 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2138 let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2139 let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2140
2141 let mut to_remove_peers = Vec::new();
2145 for peer_id in peers.iter() {
2146 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2147
2148 #[cfg(feature = "metrics")]
2150 if let Some(metrics) = self.metrics.as_mut() {
2151 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2152 }
2153
2154 if peer_score < 0.0 {
2155 tracing::debug!(
2156 peer=%peer_id,
2157 score=%peer_score,
2158 topic=%topic_hash,
2159 "HEARTBEAT: Prune peer with negative score"
2160 );
2161
2162 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2163 current_topic.push(topic_hash.clone());
2164 no_px.insert(*peer_id);
2165 to_remove_peers.push(*peer_id);
2166 }
2167 }
2168
2169 #[cfg(feature = "metrics")]
2170 if let Some(m) = self.metrics.as_mut() {
2171 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2172 }
2173
2174 for peer_id in to_remove_peers {
2175 peers.remove(&peer_id);
2176 }
2177
2178 if peers.len() < mesh_n_low {
2180 tracing::debug!(
2181 topic=%topic_hash,
2182 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2183 peers.len(),
2184 self.config.mesh_n()
2185 );
2186 let desired_peers = mesh_n - peers.len();
2188 let peer_list =
2189 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2190 !peers.contains(peer)
2191 && !explicit_peers.contains(peer)
2192 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2193 && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2194 });
2195 for peer in &peer_list {
2196 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2197 current_topic.push(topic_hash.clone());
2198 }
2199 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2201 #[cfg(feature = "metrics")]
2202 if let Some(m) = self.metrics.as_mut() {
2203 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2204 }
2205 peers.extend(peer_list);
2206 }
2207
2208 if peers.len() > mesh_n_high {
2210 tracing::debug!(
2211 topic=%topic_hash,
2212 "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2213 peers.len(),
2214 self.config.mesh_n()
2215 );
2216 let excess_peer_no = peers.len() - mesh_n;
2217
2218 let mut rng = thread_rng();
2220 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2221 shuffled.shuffle(&mut rng);
2222 shuffled.sort_by(|p1, p2| {
2223 let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2224 let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2225
2226 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2227 });
2228 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2230
2231 let mut outbound = shuffled
2233 .iter()
2234 .filter(|peer_id| {
2235 self.connected_peers
2236 .get(peer_id)
2237 .is_some_and(|peer| peer.outbound)
2238 })
2239 .count();
2240
2241 let mut removed = 0;
2244 for peer in shuffled {
2245 if removed == excess_peer_no {
2246 break;
2247 }
2248 if self
2249 .connected_peers
2250 .get(&peer)
2251 .is_some_and(|peer| peer.outbound)
2252 {
2253 if outbound <= mesh_outbound_min {
2254 continue;
2256 }
2257 outbound -= 1;
2259 }
2260
2261 peers.remove(&peer);
2263 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2264 current_topic.push(topic_hash.clone());
2265 removed += 1;
2266 }
2267
2268 #[cfg(feature = "metrics")]
2269 if let Some(m) = self.metrics.as_mut() {
2270 m.peers_removed(topic_hash, Churn::Excess, removed)
2271 }
2272 }
2273
2274 if peers.len() >= mesh_n_low {
2276 let outbound = peers
2278 .iter()
2279 .filter(|peer_id| {
2280 self.connected_peers
2281 .get(peer_id)
2282 .is_some_and(|peer| peer.outbound)
2283 })
2284 .count();
2285
2286 if outbound < mesh_outbound_min {
2288 let needed = mesh_outbound_min - outbound;
2289 let peer_list =
2290 get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2291 !peers.contains(peer_id)
2292 && !explicit_peers.contains(peer_id)
2293 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2294 && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2295 && self
2296 .connected_peers
2297 .get(peer_id)
2298 .is_some_and(|peer| peer.outbound)
2299 });
2300
2301 for peer in &peer_list {
2302 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2303 current_topic.push(topic_hash.clone());
2304 }
2305 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2307 #[cfg(feature = "metrics")]
2308 if let Some(m) = self.metrics.as_mut() {
2309 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2310 }
2311 peers.extend(peer_list);
2312 }
2313 }
2314
2315 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2317 && peers.len() > 1
2318 {
2319 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2320 let mut peers_by_score: Vec<_> = peers.iter().collect();
2330 peers_by_score.sort_by(|p1, p2| {
2331 let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2332 let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2333 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2334 });
2335
2336 let middle = peers_by_score.len() / 2;
2337 let median = if peers_by_score.len() % 2 == 0 {
2338 let sub_middle_peer = *peers_by_score
2339 .get(middle - 1)
2340 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2341 let sub_middle_score = scores
2342 .get(sub_middle_peer)
2343 .map(|r| r.score)
2344 .unwrap_or_default();
2345 let middle_peer =
2346 *peers_by_score.get(middle).expect("middle < vector length");
2347 let middle_score =
2348 scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2349
2350 (sub_middle_score + middle_score) * 0.5
2351 } else {
2352 scores
2353 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2354 .map(|r| r.score)
2355 .unwrap_or_default()
2356 };
2357
2358 if median < peer_score.thresholds.opportunistic_graft_threshold {
2361 let peer_list = get_random_peers(
2362 &self.connected_peers,
2363 topic_hash,
2364 self.config.opportunistic_graft_peers(),
2365 |peer_id| {
2366 !peers.contains(peer_id)
2367 && !explicit_peers.contains(peer_id)
2368 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2369 && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2370 > median
2371 },
2372 );
2373 for peer in &peer_list {
2374 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2375 current_topic.push(topic_hash.clone());
2376 }
2377 tracing::debug!(
2379 topic=%topic_hash,
2380 "Opportunistically graft in topic with peers {:?}",
2381 peer_list
2382 );
2383 #[cfg(feature = "metrics")]
2384 if let Some(m) = self.metrics.as_mut() {
2385 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2386 }
2387 peers.extend(peer_list);
2388 }
2389 }
2390 }
2391 #[cfg(feature = "metrics")]
2393 if let Some(m) = self.metrics.as_mut() {
2394 m.set_mesh_peers(topic_hash, peers.len())
2395 }
2396 }
2397
2398 {
2400 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2402 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2403 if *last_pub_time + fanout_ttl < Instant::now() {
2404 tracing::debug!(
2405 topic=%topic_hash,
2406 "HEARTBEAT: Fanout topic removed due to timeout"
2407 );
2408 fanout.remove(topic_hash);
2409 return false;
2410 }
2411 true
2412 });
2413 }
2414
2415 for (topic_hash, peers) in self.fanout.iter_mut() {
2418 let mut to_remove_peers = Vec::new();
2419 let publish_threshold = match &self.peer_score {
2420 PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2421 _ => 0.0,
2422 };
2423 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2424
2425 for peer_id in peers.iter() {
2426 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2428 match self.connected_peers.get(peer_id) {
2429 Some(peer) => {
2430 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2431 tracing::debug!(
2432 topic=%topic_hash,
2433 "HEARTBEAT: Peer removed from fanout for topic"
2434 );
2435 to_remove_peers.push(*peer_id);
2436 }
2437 }
2438 None => {
2439 to_remove_peers.push(*peer_id);
2441 }
2442 }
2443 }
2444 for to_remove in to_remove_peers {
2445 peers.remove(&to_remove);
2446 }
2447
2448 if peers.len() < mesh_n {
2450 tracing::debug!(
2451 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2452 peers.len(),
2453 mesh_n
2454 );
2455 let needed_peers = mesh_n - peers.len();
2456 let explicit_peers = &self.explicit_peers;
2457 let new_peers =
2458 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2459 !peers.contains(peer_id)
2460 && !explicit_peers.contains(peer_id)
2461 && !self
2462 .peer_score
2463 .below_threshold(peer_id, |ts| ts.publish_threshold)
2464 .0
2465 });
2466 peers.extend(new_peers);
2467 }
2468 }
2469
2470 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2471 tracing::trace!("Mesh message deliveries: {:?}", {
2472 self.mesh
2473 .iter()
2474 .map(|(t, peers)| {
2475 (
2476 t.clone(),
2477 peers
2478 .iter()
2479 .map(|p| {
2480 (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2481 })
2482 .collect::<HashMap<PeerId, f64>>(),
2483 )
2484 })
2485 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2486 })
2487 }
2488
2489 self.emit_gossip();
2490
2491 if !to_graft.is_empty() | !to_prune.is_empty() {
2493 self.send_graft_prune(to_graft, to_prune, no_px);
2494 }
2495
2496 self.mcache.shift();
2498
2499 for (peer_id, failed_messages) in self.failed_messages.drain() {
2501 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2502 self.events
2503 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2504 peer_id,
2505 failed_messages,
2506 }));
2507 }
2508 self.failed_messages.shrink_to_fit();
2509
2510 for peer in self.connected_peers.values_mut() {
2512 while let Some((_front, instant)) = peer.dont_send.front() {
2513 if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2514 break;
2515 } else {
2516 peer.dont_send.pop_front();
2517 }
2518 }
2519 }
2520
2521 #[cfg(feature = "metrics")]
2522 if let Some(metrics) = self.metrics.as_mut() {
2523 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2524 metrics.observe_heartbeat_duration(duration);
2525 }
2526 }
2527
2528 fn emit_gossip(&mut self) {
2531 let mut rng = thread_rng();
2532 let mut messages = Vec::new();
2533 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2534 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2535 if message_ids.is_empty() {
2536 continue;
2537 }
2538
2539 if message_ids.len() > self.config.max_ihave_length() {
2541 tracing::debug!(
2543 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2544 message_ids.len()
2545 );
2546 } else {
2547 message_ids.shuffle(&mut rng);
2549 }
2550
2551 let n_map = |m| {
2553 max(
2554 self.config.gossip_lazy(),
2555 (self.config.gossip_factor() * m as f64) as usize,
2556 )
2557 };
2558 let to_msg_peers =
2560 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2561 !peers.contains(peer)
2562 && !self.explicit_peers.contains(peer)
2563 && !self
2564 .peer_score
2565 .below_threshold(peer, |ts| ts.gossip_threshold)
2566 .0
2567 });
2568
2569 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2570
2571 for peer_id in to_msg_peers {
2572 let mut peer_message_ids = message_ids.clone();
2573
2574 if peer_message_ids.len() > self.config.max_ihave_length() {
2575 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2579 peer_message_ids.truncate(self.config.max_ihave_length());
2580 }
2581
2582 messages.push((
2584 peer_id,
2585 RpcOut::IHave(IHave {
2586 topic_hash: topic_hash.clone(),
2587 message_ids: peer_message_ids,
2588 }),
2589 ));
2590 }
2591 }
2592 for (peer_id, message) in messages {
2593 self.send_message(peer_id, message);
2594 }
2595 }
2596
2597 fn send_graft_prune(
2600 &mut self,
2601 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2602 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2603 no_px: HashSet<PeerId>,
2604 ) {
2605 for (peer_id, topics) in to_graft.into_iter() {
2607 for topic in &topics {
2608 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2610 peer_score.graft(&peer_id, topic.clone());
2611 }
2612
2613 peer_added_to_mesh(
2616 peer_id,
2617 vec![topic],
2618 &self.mesh,
2619 &mut self.events,
2620 &self.connected_peers,
2621 );
2622 }
2623 let rpc_msgs = topics.iter().map(|topic_hash| {
2624 RpcOut::Graft(Graft {
2625 topic_hash: topic_hash.clone(),
2626 })
2627 });
2628
2629 let prune_msgs = to_prune
2636 .remove(&peer_id)
2637 .into_iter()
2638 .flatten()
2639 .map(|topic_hash| {
2640 let prune = self.make_prune(
2641 &topic_hash,
2642 &peer_id,
2643 self.config.do_px() && !no_px.contains(&peer_id),
2644 false,
2645 );
2646 RpcOut::Prune(prune)
2647 });
2648
2649 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2651 self.send_message(peer_id, msg);
2652 }
2653 }
2654
2655 for (peer_id, topics) in to_prune.iter() {
2658 for topic_hash in topics {
2659 let prune = self.make_prune(
2660 topic_hash,
2661 peer_id,
2662 self.config.do_px() && !no_px.contains(peer_id),
2663 false,
2664 );
2665 self.send_message(*peer_id, RpcOut::Prune(prune));
2666
2667 peer_removed_from_mesh(
2669 *peer_id,
2670 topic_hash,
2671 &self.mesh,
2672 &mut self.events,
2673 &self.connected_peers,
2674 );
2675 }
2676 }
2677 }
2678
2679 fn forward_msg(
2683 &mut self,
2684 msg_id: &MessageId,
2685 message: RawMessage,
2686 propagation_source: Option<&PeerId>,
2687 originating_peers: HashSet<PeerId>,
2688 ) -> bool {
2689 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2691 if let Some(peer) = propagation_source {
2692 peer_score.deliver_message(peer, msg_id, &message.topic);
2693 }
2694 }
2695
2696 tracing::debug!(message_id=%msg_id, "Forwarding message");
2697 let mut recipient_peers = HashSet::new();
2698
2699 for (peer_id, peer) in &self.connected_peers {
2703 if Some(peer_id) != propagation_source
2704 && !originating_peers.contains(peer_id)
2705 && Some(peer_id) != message.source.as_ref()
2706 && peer.topics.contains(&message.topic)
2707 && (self.explicit_peers.contains(peer_id)
2708 || (peer.kind == PeerKind::Floodsub
2709 && !self
2710 .peer_score
2711 .below_threshold(peer_id, |ts| ts.publish_threshold)
2712 .0))
2713 {
2714 recipient_peers.insert(*peer_id);
2715 }
2716 }
2717
2718 let topic = &message.topic;
2720 if let Some(mesh_peers) = self.mesh.get(topic) {
2722 for peer_id in mesh_peers {
2723 if Some(peer_id) != propagation_source
2724 && !originating_peers.contains(peer_id)
2725 && Some(peer_id) != message.source.as_ref()
2726 {
2727 recipient_peers.insert(*peer_id);
2728 }
2729 }
2730 }
2731
2732 if recipient_peers.is_empty() {
2733 return false;
2734 }
2735
2736 for peer_id in recipient_peers.iter() {
2738 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2739 if peer.dont_send.contains_key(msg_id) {
2740 tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2741 continue;
2742 }
2743
2744 tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2745
2746 self.send_message(
2747 *peer_id,
2748 RpcOut::Forward {
2749 message: message.clone(),
2750 timeout: Delay::new(self.config.forward_queue_duration()),
2751 },
2752 );
2753 }
2754 }
2755 tracing::debug!("Completed forwarding message");
2756 true
2757 }
2758
2759 pub(crate) fn build_raw_message(
2761 &mut self,
2762 topic: TopicHash,
2763 data: Vec<u8>,
2764 ) -> Result<RawMessage, PublishError> {
2765 match &mut self.publish_config {
2766 PublishConfig::Signing {
2767 ref keypair,
2768 author,
2769 inline_key,
2770 last_seq_no,
2771 } => {
2772 let sequence_number = last_seq_no.next();
2773
2774 let signature = {
2775 let message = proto::Message {
2776 from: Some(author.to_bytes()),
2777 data: Some(data.clone()),
2778 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2779 topic: topic.clone().into_string(),
2780 signature: None,
2781 key: None,
2782 };
2783
2784 let mut buf = Vec::with_capacity(message.get_size());
2785 let mut writer = Writer::new(&mut buf);
2786
2787 message
2788 .write_message(&mut writer)
2789 .expect("Encoding to succeed");
2790
2791 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2793 signature_bytes.extend_from_slice(&buf);
2794 Some(keypair.sign(&signature_bytes)?)
2795 };
2796
2797 Ok(RawMessage {
2798 source: Some(*author),
2799 data,
2800 sequence_number: Some(sequence_number),
2803 topic,
2804 signature,
2805 key: inline_key.clone(),
2806 validated: true, })
2808 }
2809 PublishConfig::Author(peer_id) => {
2810 Ok(RawMessage {
2811 source: Some(*peer_id),
2812 data,
2813 sequence_number: Some(rand::random()),
2816 topic,
2817 signature: None,
2818 key: None,
2819 validated: true, })
2821 }
2822 PublishConfig::RandomAuthor => {
2823 Ok(RawMessage {
2824 source: Some(PeerId::random()),
2825 data,
2826 sequence_number: Some(rand::random()),
2829 topic,
2830 signature: None,
2831 key: None,
2832 validated: true, })
2834 }
2835 PublishConfig::Anonymous => {
2836 Ok(RawMessage {
2837 source: None,
2838 data,
2839 sequence_number: None,
2842 topic,
2843 signature: None,
2844 key: None,
2845 validated: true, })
2847 }
2848 }
2849 }
2850
2851 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2858 #[cfg(feature = "metrics")]
2859 if let Some(m) = self.metrics.as_mut() {
2860 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2861 m.msg_sent(&message.topic, message.raw_protobuf_len());
2863 }
2864 }
2865
2866 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2867 tracing::error!(peer = %peer_id,
2868 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2869 return false;
2870 };
2871
2872 if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2873 tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2874 return false;
2875 }
2876
2877 match peer.sender.send_message(rpc) {
2879 Ok(()) => true,
2880 Err(rpc) => {
2881 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2883
2884 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2886 match rpc {
2887 RpcOut::Publish { .. } => {
2888 failed_messages.priority += 1;
2889 failed_messages.publish += 1;
2890 }
2891 RpcOut::Forward { .. } => {
2892 failed_messages.non_priority += 1;
2893 failed_messages.forward += 1;
2894 }
2895 RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2896 failed_messages.non_priority += 1;
2897 }
2898 RpcOut::Graft(_)
2899 | RpcOut::Prune(_)
2900 | RpcOut::Subscribe(_)
2901 | RpcOut::Unsubscribe(_) => {
2902 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2903 }
2904 }
2905
2906 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2908 peer_score.failed_message_slow_peer(&peer_id);
2909 }
2910
2911 false
2912 }
2913 }
2914 }
2915
2916 fn on_connection_established(
2917 &mut self,
2918 ConnectionEstablished {
2919 peer_id,
2920 endpoint,
2921 other_established,
2922 ..
2923 }: ConnectionEstablished,
2924 ) {
2925 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2927 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2928 peer_score.add_ip(&peer_id, ip);
2929 } else {
2930 tracing::trace!(
2931 peer=%peer_id,
2932 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2933 endpoint
2934 )
2935 }
2936 }
2937
2938 if other_established > 0 {
2939 return; }
2941
2942 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2943 peer_score.add_peer(peer_id);
2944 }
2945
2946 if self.blacklisted_peers.contains(&peer_id) {
2948 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2949 return;
2950 }
2951
2952 tracing::debug!(peer=%peer_id, "New peer connected");
2953 for topic_hash in self.mesh.clone().into_keys() {
2955 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2956 }
2957 }
2958
2959 fn on_connection_closed(
2960 &mut self,
2961 ConnectionClosed {
2962 peer_id,
2963 connection_id,
2964 endpoint,
2965 remaining_established,
2966 ..
2967 }: ConnectionClosed,
2968 ) {
2969 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2971 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2972 peer_score.remove_ip(&peer_id, &ip);
2973 } else {
2974 tracing::trace!(
2975 peer=%peer_id,
2976 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2977 endpoint
2978 )
2979 }
2980 }
2981
2982 if remaining_established != 0 {
2983 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2985 let index = peer
2986 .connections
2987 .iter()
2988 .position(|v| v == &connection_id)
2989 .expect("Previously established connection to peer must be present");
2990 peer.connections.remove(index);
2991
2992 if !peer.connections.is_empty() {
2995 for topic in &peer.topics {
2996 if let Some(mesh_peers) = self.mesh.get(topic) {
2997 if mesh_peers.contains(&peer_id) {
2998 self.events.push_back(ToSwarm::NotifyHandler {
2999 peer_id,
3000 event: HandlerIn::JoinedMesh,
3001 handler: NotifyHandler::One(peer.connections[0]),
3002 });
3003 break;
3004 }
3005 }
3006 }
3007 }
3008 }
3009 } else {
3010 tracing::debug!(peer=%peer_id, "Peer disconnected");
3012 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3013 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3014 return;
3015 };
3016
3017 for topic in &connected_peer.topics {
3019 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3021 if mesh_peers.remove(&peer_id) {
3023 #[cfg(feature = "metrics")]
3024 if let Some(m) = self.metrics.as_mut() {
3025 m.peers_removed(topic, Churn::Dc, 1);
3026 m.set_mesh_peers(topic, mesh_peers.len());
3027 }
3028 };
3029 }
3030
3031 #[cfg(feature = "metrics")]
3032 if let Some(m) = self.metrics.as_mut() {
3033 m.dec_topic_peers(topic);
3034 }
3035
3036 self.fanout
3038 .get_mut(topic)
3039 .map(|peers| peers.remove(&peer_id));
3040 }
3041
3042 self.px_peers.remove(&peer_id);
3044
3045 #[cfg(feature = "metrics")]
3047 if let Some(metrics) = self.metrics.as_mut() {
3048 metrics.peer_protocol_disconnected(connected_peer.kind);
3049 }
3050
3051 self.connected_peers.remove(&peer_id);
3052
3053 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3054 peer_score.remove_peer(&peer_id);
3055 }
3056 }
3057 }
3058
3059 fn on_address_change(
3060 &mut self,
3061 AddressChange {
3062 peer_id,
3063 old: endpoint_old,
3064 new: endpoint_new,
3065 ..
3066 }: AddressChange,
3067 ) {
3068 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3070 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3071 peer_score.remove_ip(&peer_id, &ip);
3072 } else {
3073 tracing::trace!(
3074 peer=%&peer_id,
3075 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3076 endpoint_old
3077 )
3078 }
3079 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3080 peer_score.add_ip(&peer_id, ip);
3081 } else {
3082 tracing::trace!(
3083 peer=%peer_id,
3084 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3085 endpoint_new
3086 )
3087 }
3088 }
3089 }
3090
3091 #[cfg(feature = "metrics")]
3092 pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3094 if let Some(metrics) = &mut self.metrics {
3095 metrics.register_allowed_topics(topics);
3096 }
3097 }
3098}
3099
3100fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3101 addr.iter().find_map(|p| match p {
3102 Ip4(addr) => Some(IpAddr::V4(addr)),
3103 Ip6(addr) => Some(IpAddr::V6(addr)),
3104 _ => None,
3105 })
3106}
3107
3108impl<C, F> NetworkBehaviour for Behaviour<C, F>
3109where
3110 C: Send + 'static + DataTransform,
3111 F: Send + 'static + TopicSubscriptionFilter,
3112{
3113 type ConnectionHandler = Handler;
3114 type ToSwarm = Event;
3115
3116 fn handle_established_inbound_connection(
3117 &mut self,
3118 connection_id: ConnectionId,
3119 peer_id: PeerId,
3120 _: &Multiaddr,
3121 _: &Multiaddr,
3122 ) -> Result<THandler<Self>, ConnectionDenied> {
3123 let connected_peer = self
3129 .connected_peers
3130 .entry(peer_id)
3131 .or_insert_with(|| PeerDetails {
3132 kind: PeerKind::Floodsub,
3133 connections: vec![],
3134 outbound: false,
3135 sender: Sender::new(self.config.connection_handler_queue_len()),
3136 topics: Default::default(),
3137 dont_send: LinkedHashMap::new(),
3138 });
3139 connected_peer.connections.push(connection_id);
3141
3142 Ok(Handler::new(
3143 self.config.protocol_config(),
3144 connected_peer.sender.new_receiver(),
3145 ))
3146 }
3147
3148 fn handle_established_outbound_connection(
3149 &mut self,
3150 connection_id: ConnectionId,
3151 peer_id: PeerId,
3152 _: &Multiaddr,
3153 _: Endpoint,
3154 _: PortUse,
3155 ) -> Result<THandler<Self>, ConnectionDenied> {
3156 let connected_peer = self
3157 .connected_peers
3158 .entry(peer_id)
3159 .or_insert_with(|| PeerDetails {
3160 kind: PeerKind::Floodsub,
3161 connections: vec![],
3162 outbound: !self.px_peers.contains(&peer_id),
3165 sender: Sender::new(self.config.connection_handler_queue_len()),
3166 topics: Default::default(),
3167 dont_send: LinkedHashMap::new(),
3168 });
3169 connected_peer.connections.push(connection_id);
3171
3172 Ok(Handler::new(
3173 self.config.protocol_config(),
3174 connected_peer.sender.new_receiver(),
3175 ))
3176 }
3177
3178 fn on_connection_handler_event(
3179 &mut self,
3180 propagation_source: PeerId,
3181 _connection_id: ConnectionId,
3182 handler_event: THandlerOutEvent<Self>,
3183 ) {
3184 match handler_event {
3185 HandlerEvent::PeerKind(kind) => {
3186 #[cfg(feature = "metrics")]
3189 if let Some(metrics) = self.metrics.as_mut() {
3190 metrics.peer_protocol_connected(kind);
3191 }
3192
3193 if let PeerKind::NotSupported = kind {
3194 tracing::debug!(
3195 peer=%propagation_source,
3196 "Peer does not support gossipsub protocols"
3197 );
3198 self.events
3199 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3200 peer_id: propagation_source,
3201 }));
3202 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3203 tracing::debug!(
3207 peer=%propagation_source,
3208 peer_type=%kind,
3209 "New peer type found for peer"
3210 );
3211 if let PeerKind::Floodsub = conn.kind {
3212 conn.kind = kind;
3213 }
3214 }
3215 }
3216 HandlerEvent::MessageDropped(rpc) => {
3217 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3219 peer_score.failed_message_slow_peer(&propagation_source);
3220 }
3221
3222 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3224 failed_messages.timeout += 1;
3225 match rpc {
3226 RpcOut::Publish { .. } => {
3227 failed_messages.publish += 1;
3228 }
3229 RpcOut::Forward { .. } => {
3230 failed_messages.forward += 1;
3231 }
3232 _ => {}
3233 }
3234
3235 #[cfg(feature = "metrics")]
3237 if let Some(metrics) = self.metrics.as_mut() {
3238 match rpc {
3239 RpcOut::Publish { message, .. } => {
3240 metrics.publish_msg_dropped(&message.topic);
3241 metrics.timeout_msg_dropped(&message.topic);
3242 }
3243 RpcOut::Forward { message, .. } => {
3244 metrics.forward_msg_dropped(&message.topic);
3245 metrics.timeout_msg_dropped(&message.topic);
3246 }
3247 _ => {}
3248 }
3249 }
3250 }
3251 HandlerEvent::Message {
3252 rpc,
3253 invalid_messages,
3254 } => {
3255 if !rpc.subscriptions.is_empty() {
3260 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3261 }
3262
3263 if let (true, _) = self
3265 .peer_score
3266 .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3267 {
3268 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3269 return;
3270 }
3271
3272 if let PeerScoreState::Active(_) = self.peer_score {
3274 for (raw_message, validation_error) in invalid_messages {
3275 self.handle_invalid_message(
3276 &propagation_source,
3277 &raw_message.topic,
3278 None,
3279 RejectReason::ValidationError(validation_error),
3280 )
3281 }
3282 } else {
3283 for (message, validation_error) in invalid_messages {
3285 tracing::warn!(
3286 peer=%propagation_source,
3287 source=?message.source,
3288 "Invalid message from peer. Reason: {:?}",
3289 validation_error,
3290 );
3291 }
3292 }
3293
3294 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3296 if self
3298 .config
3299 .max_messages_per_rpc()
3300 .is_some_and(|max_msg| count >= max_msg)
3301 {
3302 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3303 break;
3304 }
3305 self.handle_received_message(raw_message, &propagation_source);
3306 }
3307
3308 let mut ihave_msgs = vec![];
3312 let mut graft_msgs = vec![];
3313 let mut prune_msgs = vec![];
3314 for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3315 if self
3317 .config
3318 .max_messages_per_rpc()
3319 .is_some_and(|max_msg| count >= max_msg)
3320 {
3321 tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3322 break;
3323 }
3324
3325 match control_msg {
3326 ControlAction::IHave(IHave {
3327 topic_hash,
3328 message_ids,
3329 }) => {
3330 ihave_msgs.push((topic_hash, message_ids));
3331 }
3332 ControlAction::IWant(IWant { message_ids }) => {
3333 self.handle_iwant(&propagation_source, message_ids)
3334 }
3335 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3336 ControlAction::Prune(Prune {
3337 topic_hash,
3338 peers,
3339 backoff,
3340 }) => prune_msgs.push((topic_hash, peers, backoff)),
3341 ControlAction::IDontWant(IDontWant { message_ids }) => {
3342 let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3343 else {
3344 tracing::error!(peer = %propagation_source,
3345 "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3346 continue;
3347 };
3348 #[cfg(feature = "metrics")]
3349 if let Some(metrics) = self.metrics.as_mut() {
3350 metrics.register_idontwant(message_ids.len());
3351 }
3352 for message_id in message_ids {
3353 peer.dont_send.insert(message_id, Instant::now());
3354 if peer.dont_send.len() > IDONTWANT_CAP {
3356 peer.dont_send.pop_front();
3357 }
3358 }
3359 }
3360 }
3361 }
3362 if !ihave_msgs.is_empty() {
3363 self.handle_ihave(&propagation_source, ihave_msgs);
3364 }
3365 if !graft_msgs.is_empty() {
3366 self.handle_graft(&propagation_source, graft_msgs);
3367 }
3368 if !prune_msgs.is_empty() {
3369 self.handle_prune(&propagation_source, prune_msgs);
3370 }
3371 }
3372 }
3373 }
3374
3375 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3376 fn poll(
3377 &mut self,
3378 cx: &mut Context<'_>,
3379 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3380 if let Some(event) = self.events.pop_front() {
3381 return Poll::Ready(event);
3382 }
3383
3384 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3386 if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3387 peer_score.refresh_scores();
3388 peer_score
3389 .decay_interval
3390 .reset(peer_score.params.decay_interval);
3391 }
3392 }
3393
3394 if self.heartbeat.poll_unpin(cx).is_ready() {
3395 self.heartbeat();
3396 self.heartbeat.reset(self.config.heartbeat_interval());
3397 }
3398
3399 Poll::Pending
3400 }
3401
3402 fn on_swarm_event(&mut self, event: FromSwarm) {
3403 match event {
3404 FromSwarm::ConnectionEstablished(connection_established) => {
3405 self.on_connection_established(connection_established)
3406 }
3407 FromSwarm::ConnectionClosed(connection_closed) => {
3408 self.on_connection_closed(connection_closed)
3409 }
3410 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3411 _ => {}
3412 }
3413 }
3414}
3415
3416fn peer_added_to_mesh(
3420 peer_id: PeerId,
3421 new_topics: Vec<&TopicHash>,
3422 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3423 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3424 connections: &HashMap<PeerId, PeerDetails>,
3425) {
3426 let connection_id = match connections.get(&peer_id) {
3428 Some(p) => p
3429 .connections
3430 .first()
3431 .expect("There should be at least one connection to a peer."),
3432 None => {
3433 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3434 return;
3435 }
3436 };
3437
3438 if let Some(peer) = connections.get(&peer_id) {
3439 for topic in &peer.topics {
3440 if !new_topics.contains(&topic) {
3441 if let Some(mesh_peers) = mesh.get(topic) {
3442 if mesh_peers.contains(&peer_id) {
3443 return;
3445 }
3446 }
3447 }
3448 }
3449 }
3450 events.push_back(ToSwarm::NotifyHandler {
3452 peer_id,
3453 event: HandlerIn::JoinedMesh,
3454 handler: NotifyHandler::One(*connection_id),
3455 });
3456}
3457
3458fn peer_removed_from_mesh(
3462 peer_id: PeerId,
3463 old_topic: &TopicHash,
3464 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3465 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3466 connections: &HashMap<PeerId, PeerDetails>,
3467) {
3468 let connection_id = match connections.get(&peer_id) {
3470 Some(p) => p
3471 .connections
3472 .first()
3473 .expect("There should be at least one connection to a peer."),
3474 None => {
3475 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3476 return;
3477 }
3478 };
3479
3480 if let Some(peer) = connections.get(&peer_id) {
3481 for topic in &peer.topics {
3482 if topic != old_topic {
3483 if let Some(mesh_peers) = mesh.get(topic) {
3484 if mesh_peers.contains(&peer_id) {
3485 return;
3487 }
3488 }
3489 }
3490 }
3491 }
3492 events.push_back(ToSwarm::NotifyHandler {
3494 peer_id,
3495 event: HandlerIn::LeftMesh,
3496 handler: NotifyHandler::One(*connection_id),
3497 });
3498}
3499
3500fn get_random_peers_dynamic(
3504 connected_peers: &HashMap<PeerId, PeerDetails>,
3505 topic_hash: &TopicHash,
3506 n_map: impl Fn(usize) -> usize,
3508 mut f: impl FnMut(&PeerId) -> bool,
3509) -> BTreeSet<PeerId> {
3510 let mut gossip_peers = connected_peers
3511 .iter()
3512 .filter(|(_, p)| p.topics.contains(topic_hash))
3513 .filter(|(peer_id, _)| f(peer_id))
3514 .filter(|(_, p)| p.kind.is_gossipsub())
3515 .map(|(peer_id, _)| *peer_id)
3516 .collect::<Vec<PeerId>>();
3517
3518 let n = n_map(gossip_peers.len());
3520 if gossip_peers.len() <= n {
3521 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3522 return gossip_peers.into_iter().collect();
3523 }
3524
3525 let mut rng = thread_rng();
3527 gossip_peers.partial_shuffle(&mut rng, n);
3528
3529 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3530
3531 gossip_peers.into_iter().take(n).collect()
3532}
3533
3534fn get_random_peers(
3537 connected_peers: &HashMap<PeerId, PeerDetails>,
3538 topic_hash: &TopicHash,
3539 n: usize,
3540 f: impl FnMut(&PeerId) -> bool,
3541) -> BTreeSet<PeerId> {
3542 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3543}
3544
3545fn validate_config(
3548 authenticity: &MessageAuthenticity,
3549 validation_mode: &ValidationMode,
3550) -> Result<(), &'static str> {
3551 match validation_mode {
3552 ValidationMode::Anonymous => {
3553 if authenticity.is_signing() {
3554 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3555 }
3556
3557 if !authenticity.is_anonymous() {
3558 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3559 }
3560 }
3561 ValidationMode::Strict => {
3562 if !authenticity.is_signing() {
3563 return Err(
3564 "Messages will be
3565 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3566 the validation or privacy settings in the config"
3567 );
3568 }
3569 }
3570 _ => {}
3571 }
3572 Ok(())
3573}
3574
3575impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3577 f.debug_struct("Behaviour")
3578 .field("config", &self.config)
3579 .field("events", &self.events.len())
3580 .field("publish_config", &self.publish_config)
3581 .field("mesh", &self.mesh)
3582 .field("fanout", &self.fanout)
3583 .field("fanout_last_pub", &self.fanout_last_pub)
3584 .field("mcache", &self.mcache)
3585 .field("heartbeat", &self.heartbeat)
3586 .finish()
3587 }
3588}
3589
3590impl fmt::Debug for PublishConfig {
3591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3592 match self {
3593 PublishConfig::Signing { author, .. } => {
3594 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3595 }
3596 PublishConfig::Author(author) => {
3597 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3598 }
3599 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3600 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3601 }
3602 }
3603}