1use std::{
22 cmp::{
23 max,
24 Ordering::{self, Equal},
25 },
26 collections::{BTreeSet, HashMap, HashSet, VecDeque},
27 fmt::{self, Debug},
28 net::IpAddr,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37 multiaddr::Protocol::{Ip4, Ip6},
38 transport::PortUse,
39 Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44 dial_opts::DialOpts,
45 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46 THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57 backoff::BackoffStorage,
58 config::{Config, ValidationMode},
59 gossip_promises::GossipPromises,
60 handler::{Handler, HandlerEvent, HandlerIn},
61 mcache::MessageCache,
62 peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63 protocol::SIGNING_PREFIX,
64 rpc::Sender,
65 rpc_proto::proto,
66 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67 time_cache::DuplicateCache,
68 topic::{Hasher, Topic, TopicHash},
69 transform::{DataTransform, IdentityTransform},
70 types::{
71 ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72 PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73 SubscriptionAction,
74 },
75 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81const IDONTWANT_CAP: usize = 10_000;
83
84const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87#[derive(Clone)]
94pub enum MessageAuthenticity {
95 Signed(Keypair),
98 Author(PeerId),
103 RandomAuthor,
108 Anonymous,
118}
119
120impl MessageAuthenticity {
121 pub fn is_signing(&self) -> bool {
123 matches!(self, MessageAuthenticity::Signed(_))
124 }
125
126 pub fn is_anonymous(&self) -> bool {
127 matches!(self, MessageAuthenticity::Anonymous)
128 }
129}
130
131#[derive(Debug)]
133pub enum Event {
134 Message {
136 propagation_source: PeerId,
138 message_id: MessageId,
141 message: Message,
143 },
144 Subscribed {
146 peer_id: PeerId,
148 topic: TopicHash,
150 },
151 Unsubscribed {
153 peer_id: PeerId,
155 topic: TopicHash,
157 },
158 GossipsubNotSupported { peer_id: PeerId },
160 SlowPeer {
162 peer_id: PeerId,
164 failed_messages: FailedMessages,
166 },
167}
168
169#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173 Signing {
174 keypair: Keypair,
175 author: PeerId,
176 inline_key: Option<Vec<u8>>,
177 last_seq_no: SequenceNumber,
178 },
179 Author(PeerId),
180 RandomAuthor,
181 Anonymous,
182}
183
184#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191 fn new() -> Self {
192 let unix_timestamp = SystemTime::now()
193 .duration_since(SystemTime::UNIX_EPOCH)
194 .expect("time to be linear")
195 .as_nanos();
196
197 Self(unix_timestamp as u64)
198 }
199
200 fn next(&mut self) -> u64 {
201 self.0 = self
202 .0
203 .checked_add(1)
204 .expect("to not exhaust u64 space for sequence numbers");
205
206 self.0
207 }
208}
209
210impl PublishConfig {
211 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212 match self {
213 Self::Signing { author, .. } => Some(author),
214 Self::Author(author) => Some(author),
215 _ => None,
216 }
217 }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221 fn from(authenticity: MessageAuthenticity) -> Self {
222 match authenticity {
223 MessageAuthenticity::Signed(keypair) => {
224 let public_key = keypair.public();
225 let key_enc = public_key.encode_protobuf();
226 let key = if key_enc.len() <= 42 {
227 None
231 } else {
232 Some(key_enc)
234 };
235
236 PublishConfig::Signing {
237 keypair,
238 author: public_key.to_peer_id(),
239 inline_key: key,
240 last_seq_no: SequenceNumber::new(),
241 }
242 }
243 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246 }
247 }
248}
249
250pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262 config: Config,
264
265 events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268 publish_config: PublishConfig,
270
271 duplicate_cache: DuplicateCache<MessageId>,
274
275 connected_peers: HashMap<PeerId, PeerDetails>,
278
279 explicit_peers: HashSet<PeerId>,
282
283 blacklisted_peers: HashSet<PeerId>,
286
287 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293 fanout_last_pub: HashMap<TopicHash, Instant>,
295
296 backoffs: BackoffStorage,
298
299 mcache: MessageCache,
301
302 heartbeat: Delay,
304
305 heartbeat_ticks: u64,
308
309 px_peers: HashSet<PeerId>,
314
315 peer_score: PeerScoreState,
318
319 count_received_ihave: HashMap<PeerId, usize>,
321
322 count_sent_iwant: HashMap<PeerId, usize>,
324
325 published_message_ids: DuplicateCache<MessageId>,
328
329 subscription_filter: F,
331
332 data_transform: D,
336
337 #[cfg(feature = "metrics")]
339 metrics: Option<Metrics>,
340
341 failed_messages: HashMap<PeerId, FailedMessages>,
343
344 gossip_promises: GossipPromises,
346}
347
348impl<D, F> Behaviour<D, F>
349where
350 D: DataTransform + Default,
351 F: TopicSubscriptionFilter + Default,
352{
353 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
356 Self::new_with_subscription_filter_and_transform(
357 privacy,
358 config,
359 F::default(),
360 D::default(),
361 )
362 }
363
364 #[cfg(feature = "metrics")]
367 pub fn with_metrics(
368 mut self,
369 metrics_registry: &mut Registry,
370 metrics_config: MetricsConfig,
371 ) -> Self {
372 self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
373 self
374 }
375}
376
377impl<D, F> Behaviour<D, F>
378where
379 D: DataTransform + Default,
380 F: TopicSubscriptionFilter,
381{
382 pub fn new_with_subscription_filter(
385 privacy: MessageAuthenticity,
386 config: Config,
387 subscription_filter: F,
388 ) -> Result<Self, &'static str> {
389 Self::new_with_subscription_filter_and_transform(
390 privacy,
391 config,
392 subscription_filter,
393 D::default(),
394 )
395 }
396}
397
398impl<D, F> Behaviour<D, F>
399where
400 D: DataTransform,
401 F: TopicSubscriptionFilter + Default,
402{
403 pub fn new_with_transform(
407 privacy: MessageAuthenticity,
408 config: Config,
409 data_transform: D,
410 ) -> Result<Self, &'static str> {
411 Self::new_with_subscription_filter_and_transform(
412 privacy,
413 config,
414 F::default(),
415 data_transform,
416 )
417 }
418}
419
420impl<D, F> Behaviour<D, F>
421where
422 D: DataTransform,
423 F: TopicSubscriptionFilter,
424{
425 pub fn new_with_subscription_filter_and_transform(
429 privacy: MessageAuthenticity,
430 config: Config,
431 subscription_filter: F,
432 data_transform: D,
433 ) -> Result<Self, &'static str> {
434 validate_config(&privacy, config.validation_mode())?;
439
440 Ok(Behaviour {
441 #[cfg(feature = "metrics")]
442 metrics: None,
443 events: VecDeque::new(),
444 publish_config: privacy.into(),
445 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
446 explicit_peers: HashSet::new(),
447 blacklisted_peers: HashSet::new(),
448 mesh: HashMap::new(),
449 fanout: HashMap::new(),
450 fanout_last_pub: HashMap::new(),
451 backoffs: BackoffStorage::new(
452 &config.prune_backoff(),
453 config.heartbeat_interval(),
454 config.backoff_slack(),
455 ),
456 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
457 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
458 heartbeat_ticks: 0,
459 px_peers: HashSet::new(),
460 peer_score: PeerScoreState::Disabled,
461 count_received_ihave: HashMap::new(),
462 count_sent_iwant: HashMap::new(),
463 connected_peers: HashMap::new(),
464 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
465 config,
466 subscription_filter,
467 data_transform,
468 failed_messages: Default::default(),
469 gossip_promises: Default::default(),
470 })
471 }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476 D: DataTransform + Send + 'static,
477 F: TopicSubscriptionFilter + Send + 'static,
478{
479 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481 self.mesh.keys()
482 }
483
484 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487 }
488
489 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490 let mut res = BTreeSet::new();
491 for peers in self.mesh.values() {
492 res.extend(peers);
493 }
494 res.into_iter()
495 }
496
497 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499 self.connected_peers
500 .iter()
501 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502 }
503
504 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507 }
508
509 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511 match &self.peer_score {
512 PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
513 PeerScoreState::Disabled => None,
514 }
515 }
516
517 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
522 tracing::debug!(%topic, "Subscribing to topic");
523 let topic_hash = topic.hash();
524 if !self.subscription_filter.can_subscribe(&topic_hash) {
525 return Err(SubscriptionError::NotAllowed);
526 }
527
528 if self.mesh.contains_key(&topic_hash) {
529 tracing::debug!(%topic, "Topic is already in the mesh");
530 return Ok(false);
531 }
532
533 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
535 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
536 let event = RpcOut::Subscribe(topic_hash.clone());
537 self.send_message(peer_id, event);
538 }
539
540 self.join(&topic_hash);
543 tracing::debug!(%topic, "Subscribed to topic");
544 Ok(true)
545 }
546
547 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
551 tracing::debug!(%topic, "Unsubscribing from topic");
552 let topic_hash = topic.hash();
553
554 if !self.mesh.contains_key(&topic_hash) {
555 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
556 return false;
558 }
559
560 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
562 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
563 let event = RpcOut::Unsubscribe(topic_hash.clone());
564 self.send_message(peer, event);
565 }
566
567 self.leave(&topic_hash);
570
571 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
572 true
573 }
574
575 pub fn publish(
577 &mut self,
578 topic: impl Into<TopicHash>,
579 data: impl Into<Vec<u8>>,
580 ) -> Result<MessageId, PublishError> {
581 let data = data.into();
582 let topic = topic.into();
583
584 let transformed_data = self
586 .data_transform
587 .outbound_transform(&topic.clone(), data.clone())?;
588
589 let max_transmit_size_for_topic = self
590 .config
591 .protocol_config()
592 .max_transmit_size_for_topic(&topic);
593
594 if transformed_data.len() > max_transmit_size_for_topic {
596 return Err(PublishError::MessageTooLarge);
597 }
598
599 let mesh_n = self.config.mesh_n_for_topic(&topic);
600 let raw_message = self.build_raw_message(topic, transformed_data)?;
601
602 let msg_id = self.config.message_id(&Message {
604 source: raw_message.source,
605 data, sequence_number: raw_message.sequence_number,
607 topic: raw_message.topic.clone(),
608 });
609
610 if self.duplicate_cache.contains(&msg_id) {
612 tracing::warn!(
615 message_id=%msg_id,
616 "Not publishing a message that has already been published"
617 );
618 return Err(PublishError::Duplicate);
619 }
620
621 tracing::trace!(message_id=%msg_id, "Publishing message");
622
623 let topic_hash = raw_message.topic.clone();
624
625 let mut peers_on_topic = self
626 .connected_peers
627 .iter()
628 .filter(|(_, p)| p.topics.contains(&topic_hash))
629 .map(|(peer_id, _)| peer_id)
630 .peekable();
631
632 if peers_on_topic.peek().is_none() {
633 return Err(PublishError::NoPeersSubscribedToTopic);
634 }
635
636 let mut recipient_peers = HashSet::new();
637 if self.config.flood_publish() {
638 recipient_peers.extend(peers_on_topic.filter(|p| {
640 self.explicit_peers.contains(*p)
641 || !self
642 .peer_score
643 .below_threshold(p, |ts| ts.publish_threshold)
644 .0
645 }));
646 } else {
647 match self.mesh.get(&topic_hash) {
648 Some(mesh_peers) => {
650 let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
653
654 if needed_extra_peers > 0 {
655 let peer_list = get_random_peers(
660 &self.connected_peers,
661 &topic_hash,
662 needed_extra_peers,
663 |peer| {
664 !mesh_peers.contains(peer)
665 && !self.explicit_peers.contains(peer)
666 && !self
667 .peer_score
668 .below_threshold(peer, |ts| ts.publish_threshold)
669 .0
670 },
671 );
672 recipient_peers.extend(peer_list);
673 }
674
675 recipient_peers.extend(mesh_peers);
676 }
677 None => {
679 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
680 let fanout_peers = self
682 .fanout
683 .get(&topic_hash)
684 .filter(|peers| !peers.is_empty());
685 if let Some(peers) = fanout_peers {
687 for peer in peers {
688 recipient_peers.insert(*peer);
689 }
690 } else {
691 let new_peers =
693 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
694 |p| {
695 !self.explicit_peers.contains(p)
696 && !self
697 .peer_score
698 .below_threshold(p, |ts| ts.publish_threshold)
699 .0
700 }
701 });
702 self.fanout.insert(topic_hash.clone(), new_peers.clone());
704 for peer in new_peers {
705 tracing::debug!(%peer, "Peer added to fanout");
706 recipient_peers.insert(peer);
707 }
708 }
709 self.fanout_last_pub
711 .insert(topic_hash.clone(), Instant::now());
712 }
713 }
714
715 recipient_peers
717 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
718
719 for (peer, connections) in &self.connected_peers {
721 if connections.kind == PeerKind::Floodsub
722 && connections.topics.contains(&topic_hash)
723 && !self
724 .peer_score
725 .below_threshold(peer, |ts| ts.publish_threshold)
726 .0
727 {
728 recipient_peers.insert(*peer);
729 }
730 }
731 }
732
733 self.duplicate_cache.insert(msg_id.clone());
736 self.mcache.put(&msg_id, raw_message.clone());
737
738 self.gossip_promises.message_delivered(&msg_id);
740
741 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
744 if !self.config.allow_self_origin() {
745 self.published_message_ids.insert(msg_id.clone());
746 }
747 }
748
749 let mut publish_failed = true;
751 for peer_id in recipient_peers.iter() {
752 tracing::trace!(peer=%peer_id, "Sending message to peer");
753 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
756 && self.config.idontwant_on_publish()
757 {
758 self.send_message(
759 *peer_id,
760 RpcOut::IDontWant(IDontWant {
761 message_ids: vec![msg_id.clone()],
762 }),
763 );
764 }
765
766 if self.send_message(
767 *peer_id,
768 RpcOut::Publish {
769 message: raw_message.clone(),
770 timeout: Delay::new(self.config.publish_queue_duration()),
771 },
772 ) {
773 publish_failed = false
774 }
775 }
776
777 if recipient_peers.is_empty() {
778 return Err(PublishError::NoPeersSubscribedToTopic);
779 }
780
781 if publish_failed {
782 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
783 }
784
785 tracing::debug!(message_id=%msg_id, "Published message");
786
787 #[cfg(feature = "metrics")]
788 if let Some(metrics) = self.metrics.as_mut() {
789 metrics.register_published_message(&topic_hash);
790 }
791
792 Ok(msg_id)
793 }
794
795 pub fn report_message_validation_result(
815 &mut self,
816 msg_id: &MessageId,
817 propagation_source: &PeerId,
818 acceptance: MessageAcceptance,
819 ) -> bool {
820 let reject_reason = match acceptance {
821 MessageAcceptance::Accept => {
822 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
823 Some((raw_message, originating_peers)) => {
824 (raw_message.clone(), originating_peers)
825 }
826 None => {
827 tracing::warn!(
828 message_id=%msg_id,
829 "Message not in cache. Ignoring forwarding"
830 );
831 #[cfg(feature = "metrics")]
832 if let Some(metrics) = self.metrics.as_mut() {
833 metrics.memcache_miss();
834 }
835 return false;
836 }
837 };
838
839 #[cfg(feature = "metrics")]
840 if let Some(metrics) = self.metrics.as_mut() {
841 metrics.register_msg_validation(&raw_message.topic, &acceptance);
842 }
843
844 self.forward_msg(
845 msg_id,
846 raw_message,
847 Some(propagation_source),
848 originating_peers,
849 );
850 return true;
851 }
852 MessageAcceptance::Reject => RejectReason::ValidationFailed,
853 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
854 };
855
856 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
857 #[cfg(feature = "metrics")]
858 if let Some(metrics) = self.metrics.as_mut() {
859 metrics.register_msg_validation(&raw_message.topic, &acceptance);
860 }
861
862 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
865 peer_score.reject_message(
866 propagation_source,
867 msg_id,
868 &raw_message.topic,
869 reject_reason,
870 );
871 for peer in originating_peers.iter() {
872 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
873 }
874 }
875 true
876 } else {
877 tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
878 false
879 }
880 }
881
882 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
884 tracing::debug!(peer=%peer_id, "Adding explicit peer");
885
886 self.explicit_peers.insert(*peer_id);
887
888 self.check_explicit_peer_connection(peer_id);
889 }
890
891 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
894 tracing::debug!(peer=%peer_id, "Removing explicit peer");
895 self.explicit_peers.remove(peer_id);
896 }
897
898 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
901 if self.blacklisted_peers.insert(*peer_id) {
902 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
903 }
904 }
905
906 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
908 if self.blacklisted_peers.remove(peer_id) {
909 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
910 }
911 }
912
913 pub fn with_peer_score(
917 &mut self,
918 params: PeerScoreParams,
919 threshold: PeerScoreThresholds,
920 ) -> Result<(), String> {
921 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
922 }
923
924 pub fn with_peer_score_and_message_delivery_time_callback(
927 &mut self,
928 params: PeerScoreParams,
929 thresholds: PeerScoreThresholds,
930 callback: Option<fn(&PeerId, &TopicHash, f64)>,
931 ) -> Result<(), String> {
932 params.validate()?;
933 thresholds.validate()?;
934
935 if let PeerScoreState::Active(_) = self.peer_score {
936 return Err("Peer score set twice".into());
937 }
938
939 let peer_score =
940 PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
941 self.peer_score = PeerScoreState::Active(Box::new(peer_score));
942 Ok(())
943 }
944
945 pub fn set_topic_params<H: Hasher>(
949 &mut self,
950 topic: Topic<H>,
951 params: TopicScoreParams,
952 ) -> Result<(), &'static str> {
953 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
954 peer_score.set_topic_params(topic.hash(), params);
955 Ok(())
956 } else {
957 Err("Peer score must be initialised with `with_peer_score()`")
958 }
959 }
960
961 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
963 match &self.peer_score {
964 PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
965 PeerScoreState::Disabled => None,
966 }
967 }
968
969 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
972 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
973 peer_score.set_application_score(peer_id, new_score)
974 } else {
975 false
976 }
977 }
978
979 fn join(&mut self, topic_hash: &TopicHash) {
981 tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
982
983 let mut added_peers = HashSet::new();
984 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
985 #[cfg(feature = "metrics")]
986 if let Some(m) = self.metrics.as_mut() {
987 m.joined(topic_hash)
988 }
989
990 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
993 tracing::debug!(
994 topic=%topic_hash,
995 "JOIN: Removing peers from the fanout for topic"
996 );
997
998 peers.retain(|p| {
1000 !self.explicit_peers.contains(p)
1001 && !self.peer_score.below_threshold(p, |_| 0.0).0
1002 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
1003 });
1004
1005 let add_peers = std::cmp::min(peers.len(), mesh_n);
1008 tracing::debug!(
1009 topic=%topic_hash,
1010 "JOIN: Adding {:?} peers from the fanout for topic",
1011 add_peers
1012 );
1013 added_peers.extend(peers.iter().take(add_peers));
1014
1015 self.mesh.insert(
1016 topic_hash.clone(),
1017 peers.into_iter().take(add_peers).collect(),
1018 );
1019
1020 self.fanout_last_pub.remove(topic_hash);
1022 }
1023
1024 #[cfg(feature = "metrics")]
1025 if let Some(m) = self.metrics.as_mut() {
1026 m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1027 }
1028
1029 if added_peers.len() < mesh_n {
1031 let random_added = get_random_peers(
1033 &self.connected_peers,
1034 topic_hash,
1035 mesh_n - added_peers.len(),
1036 |peer| {
1037 !added_peers.contains(peer)
1038 && !self.explicit_peers.contains(peer)
1039 && !self.peer_score.below_threshold(peer, |_| 0.0).0
1040 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1041 },
1042 );
1043
1044 added_peers.extend(random_added.clone());
1045 tracing::debug!(
1047 "JOIN: Inserting {:?} random peers into the mesh",
1048 random_added.len()
1049 );
1050
1051 #[cfg(feature = "metrics")]
1052 if let Some(m) = self.metrics.as_mut() {
1053 m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1054 }
1055
1056 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1057 mesh_peers.extend(random_added);
1058 }
1059
1060 for peer_id in added_peers {
1061 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1063 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1064 peer_score.graft(&peer_id, topic_hash.clone());
1065 }
1066 self.send_message(
1067 peer_id,
1068 RpcOut::Graft(Graft {
1069 topic_hash: topic_hash.clone(),
1070 }),
1071 );
1072
1073 peer_added_to_mesh(
1075 peer_id,
1076 vec![topic_hash],
1077 &self.mesh,
1078 &mut self.events,
1079 &self.connected_peers,
1080 );
1081 }
1082
1083 #[cfg(feature = "metrics")]
1084 {
1085 let mesh_peers = self.mesh_peers(topic_hash).count();
1086 if let Some(m) = self.metrics.as_mut() {
1087 m.set_mesh_peers(topic_hash, mesh_peers)
1088 }
1089 }
1090
1091 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1092 }
1093
1094 fn make_prune(
1096 &mut self,
1097 topic_hash: &TopicHash,
1098 peer: &PeerId,
1099 do_px: bool,
1100 on_unsubscribe: bool,
1101 ) -> Prune {
1102 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1103 peer_score.prune(peer, topic_hash.clone());
1104 }
1105
1106 match self.connected_peers.get(peer).map(|v| &v.kind) {
1107 Some(PeerKind::Floodsub) => {
1108 tracing::error!("Attempted to prune a Floodsub peer");
1109 }
1110 Some(PeerKind::Gossipsub) => {
1111 return Prune {
1113 topic_hash: topic_hash.clone(),
1114 peers: Vec::new(),
1115 backoff: None,
1116 };
1117 }
1118 None => {
1119 tracing::error!("Attempted to Prune an unknown peer");
1120 }
1121 _ => {} }
1123
1124 let peers = if do_px {
1126 get_random_peers(
1127 &self.connected_peers,
1128 topic_hash,
1129 self.config.prune_peers(),
1130 |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1131 )
1132 .into_iter()
1133 .map(|p| PeerInfo { peer_id: Some(p) })
1134 .collect()
1135 } else {
1136 Vec::new()
1137 };
1138
1139 let backoff = if on_unsubscribe {
1140 self.config.unsubscribe_backoff()
1141 } else {
1142 self.config.prune_backoff()
1143 };
1144
1145 self.backoffs.update_backoff(topic_hash, peer, backoff);
1147
1148 Prune {
1149 topic_hash: topic_hash.clone(),
1150 peers,
1151 backoff: Some(backoff.as_secs()),
1152 }
1153 }
1154
1155 fn leave(&mut self, topic_hash: &TopicHash) {
1157 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1158
1159 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1161 #[cfg(feature = "metrics")]
1162 if let Some(m) = self.metrics.as_mut() {
1163 m.left(topic_hash)
1164 }
1165 for peer_id in peers {
1166 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1168
1169 let on_unsubscribe = true;
1170 let prune =
1171 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1172 self.send_message(peer_id, RpcOut::Prune(prune));
1173
1174 peer_removed_from_mesh(
1176 peer_id,
1177 topic_hash,
1178 &self.mesh,
1179 &mut self.events,
1180 &self.connected_peers,
1181 );
1182 }
1183 }
1184 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1185 }
1186
1187 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1189 if !self.connected_peers.contains_key(peer_id) {
1190 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1192 self.events.push_back(ToSwarm::Dial {
1193 opts: DialOpts::peer_id(*peer_id).build(),
1194 });
1195 }
1196 }
1197
1198 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1201 if let (true, score) = self
1203 .peer_score
1204 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1205 {
1206 tracing::debug!(
1207 peer=%peer_id,
1208 %score,
1209 "IHAVE: ignoring peer with score below threshold"
1210 );
1211 return;
1212 }
1213
1214 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1216 *peer_have += 1;
1217 if *peer_have > self.config.max_ihave_messages() {
1218 tracing::debug!(
1219 peer=%peer_id,
1220 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1221 interval; ignoring",
1222 *peer_have
1223 );
1224 return;
1225 }
1226
1227 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1228 if *iasked >= self.config.max_ihave_length() {
1229 tracing::debug!(
1230 peer=%peer_id,
1231 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1232 *iasked
1233 );
1234 return;
1235 }
1236 }
1237
1238 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1239
1240 let mut iwant_ids = HashSet::new();
1241
1242 for (topic, ids) in ihave_msgs {
1243 if !self.mesh.contains_key(&topic) {
1245 tracing::debug!(
1246 %topic,
1247 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1248 );
1249 continue;
1250 }
1251
1252 for id in ids.into_iter().filter(|id| {
1253 if self.duplicate_cache.contains(id) {
1254 return false;
1255 }
1256
1257 !self.gossip_promises.contains(id)
1258 }) {
1259 if iwant_ids.insert(id) {
1261 #[cfg(feature = "metrics")]
1263 if let Some(metrics) = self.metrics.as_mut() {
1264 metrics.register_iwant(&topic);
1265 }
1266 }
1267 }
1268 }
1269
1270 if !iwant_ids.is_empty() {
1271 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1272 let mut iask = iwant_ids.len();
1273 if *iasked + iask > self.config.max_ihave_length() {
1274 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1275 }
1276
1277 tracing::debug!(
1279 peer=%peer_id,
1280 "IHAVE: Asking for {} out of {} messages from peer",
1281 iask,
1282 iwant_ids.len()
1283 );
1284
1285 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1287 let mut rng = thread_rng();
1288 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1289
1290 iwant_ids_vec.truncate(iask);
1291 *iasked += iask;
1292
1293 self.gossip_promises.add_promise(
1294 *peer_id,
1295 &iwant_ids_vec,
1296 Instant::now() + self.config.iwant_followup_time(),
1297 );
1298 tracing::trace!(
1299 peer=%peer_id,
1300 "IHAVE: Asking for the following messages from peer: {:?}",
1301 iwant_ids_vec
1302 );
1303
1304 self.send_message(
1305 *peer_id,
1306 RpcOut::IWant(IWant {
1307 message_ids: iwant_ids_vec,
1308 }),
1309 );
1310 }
1311 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1312 }
1313
1314 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1317 if let (true, score) = self
1319 .peer_score
1320 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1321 {
1322 tracing::debug!(
1323 peer=%peer_id,
1324 "IWANT: ignoring peer with score below threshold [score = {}]",
1325 score
1326 );
1327 return;
1328 }
1329
1330 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1331
1332 for id in iwant_msgs {
1333 if let Some((msg, count)) = self
1336 .mcache
1337 .get_with_iwant_counts(&id, peer_id)
1338 .map(|(msg, count)| (msg.clone(), count))
1339 {
1340 if count > self.config.gossip_retransimission() {
1341 tracing::debug!(
1342 peer=%peer_id,
1343 message_id=%id,
1344 "IWANT: Peer has asked for message too many times; ignoring request"
1345 );
1346 } else {
1347 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1348 if peer.dont_send.contains_key(&id) {
1349 tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1350 continue;
1351 }
1352 }
1353
1354 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1355 self.send_message(
1356 *peer_id,
1357 RpcOut::Forward {
1358 message: msg,
1359 timeout: Delay::new(self.config.forward_queue_duration()),
1360 },
1361 );
1362 }
1363 }
1364 }
1365 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1366 }
1367
1368 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1371 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1372
1373 let mut to_prune_topics = HashSet::new();
1374
1375 let mut do_px = self.config.do_px();
1376
1377 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1378 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1379 return;
1380 };
1381 let is_outbound = connected_peer.outbound;
1383
1384 for topic in &topics {
1387 if connected_peer.topics.insert(topic.clone()) {
1388 #[cfg(feature = "metrics")]
1389 if let Some(m) = self.metrics.as_mut() {
1390 m.inc_topic_peers(topic);
1391 }
1392 }
1393 }
1394
1395 if self.explicit_peers.contains(peer_id) {
1397 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1398 to_prune_topics = topics.into_iter().collect();
1400 do_px = false
1402 } else {
1403 let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1404 let now = Instant::now();
1405 for topic_hash in topics {
1406 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1407 if peers.contains(peer_id) {
1409 tracing::debug!(
1410 peer=%peer_id,
1411 topic=%&topic_hash,
1412 "GRAFT: Received graft for peer that is already in topic"
1413 );
1414 continue;
1415 }
1416
1417 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1419 {
1420 if backoff_time > now {
1421 tracing::warn!(
1422 peer=%peer_id,
1423 "[Penalty] Peer attempted graft within backoff time, penalizing"
1424 );
1425 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1427 #[cfg(feature = "metrics")]
1428 if let Some(metrics) = self.metrics.as_mut() {
1429 metrics.register_score_penalty(Penalty::GraftBackoff);
1430 }
1431 peer_score.add_penalty(peer_id, 1);
1432
1433 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1436 let flood_cutoff = (backoff_time
1437 + self.config.graft_flood_threshold())
1438 - self.config.prune_backoff();
1439 if flood_cutoff > now {
1440 peer_score.add_penalty(peer_id, 1);
1442 }
1443 }
1444 do_px = false;
1446
1447 to_prune_topics.insert(topic_hash.clone());
1448 continue;
1449 }
1450 }
1451
1452 if below_zero {
1454 tracing::debug!(
1456 peer=%peer_id,
1457 %score,
1458 topic=%topic_hash,
1459 "GRAFT: ignoring peer with negative score"
1460 );
1461 to_prune_topics.insert(topic_hash.clone());
1464 do_px = false;
1466 continue;
1467 }
1468
1469 let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1472
1473 if peers.len() >= mesh_n_high && !is_outbound {
1474 to_prune_topics.insert(topic_hash.clone());
1475 continue;
1476 }
1477
1478 tracing::debug!(
1480 peer=%peer_id,
1481 topic=%topic_hash,
1482 "GRAFT: Mesh link added for peer in topic"
1483 );
1484
1485 if peers.insert(*peer_id) {
1486 #[cfg(feature = "metrics")]
1487 if let Some(m) = self.metrics.as_mut() {
1488 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1489 }
1490 }
1491
1492 peer_added_to_mesh(
1494 *peer_id,
1495 vec![&topic_hash],
1496 &self.mesh,
1497 &mut self.events,
1498 &self.connected_peers,
1499 );
1500
1501 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1502 peer_score.graft(peer_id, topic_hash);
1503 }
1504 } else {
1505 do_px = false;
1507 tracing::debug!(
1508 peer=%peer_id,
1509 topic=%topic_hash,
1510 "GRAFT: Received graft for unknown topic from peer"
1511 );
1512 continue;
1514 }
1515 }
1516 }
1517
1518 if !to_prune_topics.is_empty() {
1519 let on_unsubscribe = false;
1521
1522 for prune in to_prune_topics
1523 .iter()
1524 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1525 .collect::<Vec<_>>()
1526 {
1527 self.send_message(*peer_id, RpcOut::Prune(prune));
1528 }
1529 tracing::debug!(
1531 peer=%peer_id,
1532 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1533 );
1534 }
1535 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1536 }
1537
1538 fn remove_peer_from_mesh(
1540 &mut self,
1541 peer_id: &PeerId,
1542 topic_hash: &TopicHash,
1543 backoff: Option<u64>,
1544 always_update_backoff: bool,
1545 ) -> bool {
1546 let mut peer_removed = false;
1547 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1548 peer_removed = peers.remove(peer_id);
1550 if peer_removed {
1551 tracing::debug!(
1552 peer=%peer_id,
1553 topic=%topic_hash,
1554 "PRUNE: Removing peer from the mesh for topic"
1555 );
1556
1557 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1558 peer_score.prune(peer_id, topic_hash.clone());
1559 }
1560
1561 peer_removed_from_mesh(
1563 *peer_id,
1564 topic_hash,
1565 &self.mesh,
1566 &mut self.events,
1567 &self.connected_peers,
1568 );
1569 }
1570 }
1571 if always_update_backoff || peer_removed {
1572 let time = if let Some(backoff) = backoff {
1573 Duration::from_secs(backoff)
1574 } else {
1575 self.config.prune_backoff()
1576 };
1577 self.backoffs.update_backoff(topic_hash, peer_id, time);
1579 }
1580 peer_removed
1581 }
1582
1583 fn handle_prune(
1585 &mut self,
1586 peer_id: &PeerId,
1587 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1588 ) {
1589 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1590 let (below_threshold, score) = self
1591 .peer_score
1592 .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1593 for (topic_hash, px, backoff) in prune_data {
1594 if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1595 #[cfg(feature = "metrics")]
1596 if let Some(m) = self.metrics.as_mut() {
1597 m.peers_removed(&topic_hash, Churn::Prune, 1);
1598 }
1599 }
1600
1601 if self.mesh.contains_key(&topic_hash) {
1602 if !px.is_empty() {
1604 if below_threshold {
1606 tracing::debug!(
1607 peer=%peer_id,
1608 %score,
1609 topic=%topic_hash,
1610 "PRUNE: ignoring PX from peer with insufficient score"
1611 );
1612 continue;
1613 }
1614
1615 if self.config.prune_peers() > 0 {
1622 self.px_connect(px);
1623 }
1624 }
1625 }
1626 }
1627 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1628 }
1629
1630 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1631 let n = self.config.prune_peers();
1632 px.retain(|p| p.peer_id.is_some());
1637 if px.len() > n {
1638 let mut rng = thread_rng();
1640 px.partial_shuffle(&mut rng, n);
1641 px = px.into_iter().take(n).collect();
1642 }
1643
1644 for p in px {
1645 if let Some(peer_id) = p.peer_id {
1648 self.px_peers.insert(peer_id);
1650
1651 self.events.push_back(ToSwarm::Dial {
1653 opts: DialOpts::peer_id(peer_id).build(),
1654 });
1655 }
1656 }
1657 }
1658
1659 fn message_is_valid(
1662 &mut self,
1663 msg_id: &MessageId,
1664 raw_message: &mut RawMessage,
1665 propagation_source: &PeerId,
1666 ) -> bool {
1667 tracing::debug!(
1668 peer=%propagation_source,
1669 message_id=%msg_id,
1670 "Handling message from peer"
1671 );
1672
1673 if self.blacklisted_peers.contains(propagation_source) {
1675 tracing::debug!(
1676 peer=%propagation_source,
1677 "Rejecting message from blacklisted peer"
1678 );
1679 self.gossip_promises
1680 .reject_message(msg_id, &RejectReason::BlackListedPeer);
1681 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1682 peer_score.reject_message(
1683 propagation_source,
1684 msg_id,
1685 &raw_message.topic,
1686 RejectReason::BlackListedPeer,
1687 );
1688 }
1689 return false;
1690 }
1691
1692 if let Some(source) = raw_message.source.as_ref() {
1694 if self.blacklisted_peers.contains(source) {
1695 tracing::debug!(
1696 peer=%propagation_source,
1697 %source,
1698 "Rejecting message from peer because of blacklisted source"
1699 );
1700 self.handle_invalid_message(
1701 propagation_source,
1702 &raw_message.topic,
1703 Some(msg_id),
1704 RejectReason::BlackListedSource,
1705 );
1706 return false;
1707 }
1708 }
1709
1710 if !self.config.validate_messages() {
1714 raw_message.validated = true;
1715 }
1716
1717 let self_published = !self.config.allow_self_origin()
1719 && if let Some(own_id) = self.publish_config.get_own_id() {
1720 own_id != propagation_source
1721 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1722 } else {
1723 self.published_message_ids.contains(msg_id)
1724 };
1725
1726 if self_published {
1727 tracing::debug!(
1728 message_id=%msg_id,
1729 source=%propagation_source,
1730 "Dropping message claiming to be from self but forwarded from source"
1731 );
1732 self.handle_invalid_message(
1733 propagation_source,
1734 &raw_message.topic,
1735 Some(msg_id),
1736 RejectReason::SelfOrigin,
1737 );
1738 return false;
1739 }
1740
1741 true
1742 }
1743
1744 fn handle_received_message(
1748 &mut self,
1749 mut raw_message: RawMessage,
1750 propagation_source: &PeerId,
1751 ) {
1752 #[cfg(feature = "metrics")]
1754 if let Some(metrics) = self.metrics.as_mut() {
1755 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1756 }
1757
1758 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1760 Ok(message) => message,
1761 Err(e) => {
1762 tracing::debug!("Invalid message. Transform error: {:?}", e);
1763 self.handle_invalid_message(
1765 propagation_source,
1766 &raw_message.topic,
1767 None,
1768 RejectReason::ValidationError(ValidationError::TransformFailed),
1769 );
1770 return;
1771 }
1772 };
1773
1774 let msg_id = self.config.message_id(&message);
1776
1777 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1779 let recipient_peers = self
1780 .mesh
1781 .get(&message.topic)
1782 .map(|mesh| mesh.iter())
1783 .unwrap_or_default()
1784 .copied()
1785 .chain(self.gossip_promises.peers_for_message(&msg_id))
1786 .filter(|peer_id| {
1787 peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1788 })
1789 .collect::<Vec<PeerId>>();
1790
1791 for peer_id in recipient_peers {
1792 self.send_message(
1793 peer_id,
1794 RpcOut::IDontWant(IDontWant {
1795 message_ids: vec![msg_id.clone()],
1796 }),
1797 );
1798 }
1799 }
1800
1801 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1805 return;
1806 }
1807
1808 if !self.duplicate_cache.insert(msg_id.clone()) {
1809 tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1810 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1811 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1812 }
1813 self.mcache.observe_duplicate(&msg_id, propagation_source);
1814 return;
1815 }
1816
1817 tracing::debug!(
1818 message_id=%msg_id,
1819 "Put message in duplicate_cache and resolve promises"
1820 );
1821
1822 #[cfg(feature = "metrics")]
1824 if let Some(metrics) = self.metrics.as_mut() {
1825 metrics.msg_recvd(&message.topic);
1826 }
1827
1828 self.gossip_promises.message_delivered(&msg_id);
1831
1832 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1834 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1835 }
1836
1837 self.mcache.put(&msg_id, raw_message.clone());
1839
1840 #[allow(
1842 clippy::map_entry,
1843 reason = "False positive, see rust-lang/rust-clippy#14449."
1844 )]
1845 if self.mesh.contains_key(&message.topic) {
1846 tracing::debug!("Sending received message to user");
1847 self.events
1848 .push_back(ToSwarm::GenerateEvent(Event::Message {
1849 propagation_source: *propagation_source,
1850 message_id: msg_id.clone(),
1851 message,
1852 }));
1853 } else {
1854 tracing::debug!(
1855 topic=%message.topic,
1856 "Received message on a topic we are not subscribed to"
1857 );
1858 return;
1859 }
1860
1861 if !self.config.validate_messages() {
1863 self.forward_msg(
1864 &msg_id,
1865 raw_message,
1866 Some(propagation_source),
1867 HashSet::new(),
1868 );
1869 tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1870 }
1871 }
1872
1873 fn handle_invalid_message(
1875 &mut self,
1876 propagation_source: &PeerId,
1877 topic_hash: &TopicHash,
1878 message_id: Option<&MessageId>,
1879 reject_reason: RejectReason,
1880 ) {
1881 #[cfg(feature = "metrics")]
1882 if let Some(metrics) = self.metrics.as_mut() {
1883 metrics.register_invalid_message(topic_hash);
1884 }
1885 if let Some(msg_id) = message_id {
1886 self.gossip_promises.reject_message(msg_id, &reject_reason);
1888 }
1889 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1890 if let Some(msg_id) = message_id {
1892 peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1895 } else {
1896 peer_score.reject_invalid_message(propagation_source, topic_hash);
1900 }
1901 }
1902 }
1903
1904 fn handle_received_subscriptions(
1906 &mut self,
1907 subscriptions: &[Subscription],
1908 propagation_source: &PeerId,
1909 ) {
1910 tracing::debug!(
1911 source=%propagation_source,
1912 "Handling subscriptions: {:?}",
1913 subscriptions,
1914 );
1915
1916 let mut unsubscribed_peers = Vec::new();
1917
1918 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1919 tracing::error!(
1920 peer=%propagation_source,
1921 "Subscription by unknown peer"
1922 );
1923 return;
1924 };
1925
1926 let mut topics_to_graft = Vec::new();
1928
1929 let mut application_event = Vec::new();
1931
1932 let filtered_topics = match self
1933 .subscription_filter
1934 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1935 {
1936 Ok(topics) => topics,
1937 Err(s) => {
1938 tracing::error!(
1939 peer=%propagation_source,
1940 "Subscription filter error: {}; ignoring RPC from peer",
1941 s
1942 );
1943 return;
1944 }
1945 };
1946
1947 for subscription in filtered_topics {
1948 let topic_hash = &subscription.topic_hash;
1950
1951 match subscription.action {
1952 SubscriptionAction::Subscribe => {
1953 if peer.topics.insert(topic_hash.clone()) {
1954 tracing::debug!(
1955 peer=%propagation_source,
1956 topic=%topic_hash,
1957 "SUBSCRIPTION: Adding gossip peer to topic"
1958 );
1959
1960 #[cfg(feature = "metrics")]
1961 if let Some(m) = self.metrics.as_mut() {
1962 m.inc_topic_peers(topic_hash);
1963 }
1964 }
1965
1966 if !self.explicit_peers.contains(propagation_source)
1968 && peer.kind.is_gossipsub()
1969 && !self
1970 .peer_score
1971 .below_threshold(propagation_source, |_| 0.0)
1972 .0
1973 && !self
1974 .backoffs
1975 .is_backoff_with_slack(topic_hash, propagation_source)
1976 {
1977 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1978 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1979
1980 if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1981 tracing::debug!(
1982 peer=%propagation_source,
1983 topic=%topic_hash,
1984 "SUBSCRIPTION: Adding peer to the mesh for topic"
1985 );
1986 #[cfg(feature = "metrics")]
1987 if let Some(m) = self.metrics.as_mut() {
1988 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1989 }
1990 tracing::debug!(
1992 peer=%propagation_source,
1993 topic=%topic_hash,
1994 "Sending GRAFT to peer for topic"
1995 );
1996 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1997 peer_score.graft(propagation_source, topic_hash.clone());
1998 }
1999 topics_to_graft.push(topic_hash.clone());
2000 }
2001 }
2002 }
2003 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
2005 peer_id: *propagation_source,
2006 topic: topic_hash.clone(),
2007 }));
2008 }
2009 SubscriptionAction::Unsubscribe => {
2010 if peer.topics.remove(topic_hash) {
2011 tracing::debug!(
2012 peer=%propagation_source,
2013 topic=%topic_hash,
2014 "SUBSCRIPTION: Removing gossip peer from topic"
2015 );
2016
2017 #[cfg(feature = "metrics")]
2018 if let Some(m) = self.metrics.as_mut() {
2019 m.dec_topic_peers(topic_hash);
2020 }
2021 }
2022
2023 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2024 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2026 peer_id: *propagation_source,
2027 topic: topic_hash.clone(),
2028 }));
2029 }
2030 }
2031 }
2032
2033 for (peer_id, topic_hash) in unsubscribed_peers {
2035 self.fanout
2036 .get_mut(&topic_hash)
2037 .map(|peers| peers.remove(&peer_id));
2038 if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2039 #[cfg(feature = "metrics")]
2040 if let Some(m) = self.metrics.as_mut() {
2041 m.peers_removed(&topic_hash, Churn::Unsub, 1);
2042 }
2043 };
2044 }
2045
2046 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2048 if !topics_joined.is_empty() {
2049 peer_added_to_mesh(
2050 *propagation_source,
2051 topics_joined,
2052 &self.mesh,
2053 &mut self.events,
2054 &self.connected_peers,
2055 );
2056 }
2057
2058 for topic_hash in topics_to_graft.into_iter() {
2061 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2062 }
2063
2064 for event in application_event {
2066 self.events.push_back(event);
2067 }
2068
2069 tracing::trace!(
2070 source=%propagation_source,
2071 "Completed handling subscriptions from source"
2072 );
2073 }
2074
2075 fn apply_iwant_penalties(&mut self) {
2077 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2078 for (peer, count) in self.gossip_promises.get_broken_promises() {
2079 peer_score.add_penalty(&peer, count);
2080 #[cfg(feature = "metrics")]
2081 if let Some(metrics) = self.metrics.as_mut() {
2082 metrics.register_score_penalty(Penalty::BrokenPromise);
2083 }
2084 }
2085 }
2086 }
2087
2088 fn heartbeat(&mut self) {
2090 tracing::debug!("Starting heartbeat");
2091 #[cfg(feature = "metrics")]
2092 let start = Instant::now();
2093
2094 #[cfg(feature = "metrics")]
2098 if let Some(m) = &mut self.metrics {
2099 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2100 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2101 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2102 }
2103 }
2104
2105 self.heartbeat_ticks += 1;
2106
2107 let mut to_graft = HashMap::new();
2108 let mut to_prune = HashMap::new();
2109 let mut no_px = HashSet::new();
2110
2111 self.backoffs.heartbeat();
2113
2114 self.count_sent_iwant.clear();
2116 self.count_received_ihave.clear();
2117
2118 self.apply_iwant_penalties();
2120
2121 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2123 for p in self.explicit_peers.clone() {
2124 self.check_explicit_peer_connection(&p);
2125 }
2126 }
2127
2128 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2130 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2131 for peer_id in self.connected_peers.keys() {
2132 #[allow(unused_variables)]
2133 let report = scores
2134 .entry(peer_id)
2135 .or_insert_with(|| peer_score.score_report(peer_id));
2136
2137 #[cfg(feature = "metrics")]
2138 if let Some(metrics) = self.metrics.as_mut() {
2139 for penalty in &report.penalties {
2140 metrics.register_score_penalty(*penalty);
2141 }
2142 }
2143 }
2144 }
2145
2146 for (topic_hash, peers) in self.mesh.iter_mut() {
2148 let explicit_peers = &self.explicit_peers;
2149 let backoffs = &self.backoffs;
2150
2151 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2152 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2153 let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2154 let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2155
2156 let mut to_remove_peers = Vec::new();
2160 for peer_id in peers.iter() {
2161 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2162
2163 #[cfg(feature = "metrics")]
2165 if let Some(metrics) = self.metrics.as_mut() {
2166 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2167 }
2168
2169 if peer_score < 0.0 {
2170 tracing::debug!(
2171 peer=%peer_id,
2172 score=%peer_score,
2173 topic=%topic_hash,
2174 "HEARTBEAT: Prune peer with negative score"
2175 );
2176
2177 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2178 current_topic.push(topic_hash.clone());
2179 no_px.insert(*peer_id);
2180 to_remove_peers.push(*peer_id);
2181 }
2182 }
2183
2184 #[cfg(feature = "metrics")]
2185 if let Some(m) = self.metrics.as_mut() {
2186 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2187 }
2188
2189 for peer_id in to_remove_peers {
2190 peers.remove(&peer_id);
2191 }
2192
2193 if peers.len() < mesh_n_low {
2195 tracing::debug!(
2196 topic=%topic_hash,
2197 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2198 peers.len(),
2199 self.config.mesh_n()
2200 );
2201 let desired_peers = mesh_n - peers.len();
2203 let peer_list =
2204 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2205 !peers.contains(peer)
2206 && !explicit_peers.contains(peer)
2207 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2208 && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2209 });
2210 for peer in &peer_list {
2211 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2212 current_topic.push(topic_hash.clone());
2213 }
2214 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2216 #[cfg(feature = "metrics")]
2217 if let Some(m) = self.metrics.as_mut() {
2218 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2219 }
2220 peers.extend(peer_list);
2221 }
2222
2223 if peers.len() > mesh_n_high {
2225 tracing::debug!(
2226 topic=%topic_hash,
2227 "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2228 peers.len(),
2229 self.config.mesh_n()
2230 );
2231 let excess_peer_no = peers.len() - mesh_n;
2232
2233 let mut rng = thread_rng();
2235 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2236 shuffled.shuffle(&mut rng);
2237 shuffled.sort_by(|p1, p2| {
2238 let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2239 let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2240
2241 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2242 });
2243 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2245
2246 let mut outbound = shuffled
2248 .iter()
2249 .filter(|peer_id| {
2250 self.connected_peers
2251 .get(peer_id)
2252 .is_some_and(|peer| peer.outbound)
2253 })
2254 .count();
2255
2256 let mut removed = 0;
2259 for peer in shuffled {
2260 if removed == excess_peer_no {
2261 break;
2262 }
2263 if self
2264 .connected_peers
2265 .get(&peer)
2266 .is_some_and(|peer| peer.outbound)
2267 {
2268 if outbound <= mesh_outbound_min {
2269 continue;
2271 }
2272 outbound -= 1;
2274 }
2275
2276 peers.remove(&peer);
2278 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2279 current_topic.push(topic_hash.clone());
2280 removed += 1;
2281 }
2282
2283 #[cfg(feature = "metrics")]
2284 if let Some(m) = self.metrics.as_mut() {
2285 m.peers_removed(topic_hash, Churn::Excess, removed)
2286 }
2287 }
2288
2289 if peers.len() >= mesh_n_low {
2291 let outbound = peers
2293 .iter()
2294 .filter(|peer_id| {
2295 self.connected_peers
2296 .get(peer_id)
2297 .is_some_and(|peer| peer.outbound)
2298 })
2299 .count();
2300
2301 if outbound < mesh_outbound_min {
2303 let needed = mesh_outbound_min - outbound;
2304 let peer_list =
2305 get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2306 !peers.contains(peer_id)
2307 && !explicit_peers.contains(peer_id)
2308 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2309 && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2310 && self
2311 .connected_peers
2312 .get(peer_id)
2313 .is_some_and(|peer| peer.outbound)
2314 });
2315
2316 for peer in &peer_list {
2317 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2318 current_topic.push(topic_hash.clone());
2319 }
2320 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2322 #[cfg(feature = "metrics")]
2323 if let Some(m) = self.metrics.as_mut() {
2324 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2325 }
2326 peers.extend(peer_list);
2327 }
2328 }
2329
2330 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2332 && peers.len() > 1
2333 {
2334 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2335 let mut peers_by_score: Vec<_> = peers.iter().collect();
2345 peers_by_score.sort_by(|p1, p2| {
2346 let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2347 let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2348 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2349 });
2350
2351 let middle = peers_by_score.len() / 2;
2352 let median = if peers_by_score.len() % 2 == 0 {
2353 let sub_middle_peer = *peers_by_score
2354 .get(middle - 1)
2355 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2356 let sub_middle_score = scores
2357 .get(sub_middle_peer)
2358 .map(|r| r.score)
2359 .unwrap_or_default();
2360 let middle_peer =
2361 *peers_by_score.get(middle).expect("middle < vector length");
2362 let middle_score =
2363 scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2364
2365 (sub_middle_score + middle_score) * 0.5
2366 } else {
2367 scores
2368 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2369 .map(|r| r.score)
2370 .unwrap_or_default()
2371 };
2372
2373 if median < peer_score.thresholds.opportunistic_graft_threshold {
2376 let peer_list = get_random_peers(
2377 &self.connected_peers,
2378 topic_hash,
2379 self.config.opportunistic_graft_peers(),
2380 |peer_id| {
2381 !peers.contains(peer_id)
2382 && !explicit_peers.contains(peer_id)
2383 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2384 && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2385 > median
2386 },
2387 );
2388 for peer in &peer_list {
2389 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2390 current_topic.push(topic_hash.clone());
2391 }
2392 tracing::debug!(
2394 topic=%topic_hash,
2395 "Opportunistically graft in topic with peers {:?}",
2396 peer_list
2397 );
2398 #[cfg(feature = "metrics")]
2399 if let Some(m) = self.metrics.as_mut() {
2400 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2401 }
2402 peers.extend(peer_list);
2403 }
2404 }
2405 }
2406 #[cfg(feature = "metrics")]
2408 if let Some(m) = self.metrics.as_mut() {
2409 m.set_mesh_peers(topic_hash, peers.len())
2410 }
2411 }
2412
2413 {
2415 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2417 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2418 if *last_pub_time + fanout_ttl < Instant::now() {
2419 tracing::debug!(
2420 topic=%topic_hash,
2421 "HEARTBEAT: Fanout topic removed due to timeout"
2422 );
2423 fanout.remove(topic_hash);
2424 return false;
2425 }
2426 true
2427 });
2428 }
2429
2430 for (topic_hash, peers) in self.fanout.iter_mut() {
2433 let mut to_remove_peers = Vec::new();
2434 let publish_threshold = match &self.peer_score {
2435 PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2436 _ => 0.0,
2437 };
2438 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2439
2440 for peer_id in peers.iter() {
2441 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2443 match self.connected_peers.get(peer_id) {
2444 Some(peer) => {
2445 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2446 tracing::debug!(
2447 topic=%topic_hash,
2448 "HEARTBEAT: Peer removed from fanout for topic"
2449 );
2450 to_remove_peers.push(*peer_id);
2451 }
2452 }
2453 None => {
2454 to_remove_peers.push(*peer_id);
2456 }
2457 }
2458 }
2459 for to_remove in to_remove_peers {
2460 peers.remove(&to_remove);
2461 }
2462
2463 if peers.len() < mesh_n {
2465 tracing::debug!(
2466 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2467 peers.len(),
2468 mesh_n
2469 );
2470 let needed_peers = mesh_n - peers.len();
2471 let explicit_peers = &self.explicit_peers;
2472 let new_peers =
2473 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2474 !peers.contains(peer_id)
2475 && !explicit_peers.contains(peer_id)
2476 && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2477 < publish_threshold
2478 });
2479 peers.extend(new_peers);
2480 }
2481 }
2482
2483 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2484 tracing::trace!("Mesh message deliveries: {:?}", {
2485 self.mesh
2486 .iter()
2487 .map(|(t, peers)| {
2488 (
2489 t.clone(),
2490 peers
2491 .iter()
2492 .map(|p| {
2493 (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2494 })
2495 .collect::<HashMap<PeerId, f64>>(),
2496 )
2497 })
2498 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2499 })
2500 }
2501
2502 self.emit_gossip();
2503
2504 if !to_graft.is_empty() | !to_prune.is_empty() {
2506 self.send_graft_prune(to_graft, to_prune, no_px);
2507 }
2508
2509 self.mcache.shift();
2511
2512 for (peer_id, failed_messages) in self.failed_messages.drain() {
2514 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2515 self.events
2516 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2517 peer_id,
2518 failed_messages,
2519 }));
2520 }
2521 self.failed_messages.shrink_to_fit();
2522
2523 for peer in self.connected_peers.values_mut() {
2525 while let Some((_front, instant)) = peer.dont_send.front() {
2526 if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2527 break;
2528 } else {
2529 peer.dont_send.pop_front();
2530 }
2531 }
2532 }
2533
2534 tracing::debug!("Completed Heartbeat");
2535 #[cfg(feature = "metrics")]
2536 if let Some(metrics) = self.metrics.as_mut() {
2537 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2538 metrics.observe_heartbeat_duration(duration);
2539 }
2540 }
2541
2542 fn emit_gossip(&mut self) {
2545 let mut rng = thread_rng();
2546 let mut messages = Vec::new();
2547 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2548 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2549 if message_ids.is_empty() {
2550 continue;
2551 }
2552
2553 if message_ids.len() > self.config.max_ihave_length() {
2555 tracing::debug!(
2557 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2558 message_ids.len()
2559 );
2560 } else {
2561 message_ids.shuffle(&mut rng);
2563 }
2564
2565 let n_map = |m| {
2567 max(
2568 self.config.gossip_lazy(),
2569 (self.config.gossip_factor() * m as f64) as usize,
2570 )
2571 };
2572 let to_msg_peers =
2574 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2575 !peers.contains(peer)
2576 && !self.explicit_peers.contains(peer)
2577 && !self
2578 .peer_score
2579 .below_threshold(peer, |ts| ts.gossip_threshold)
2580 .0
2581 });
2582
2583 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2584
2585 for peer_id in to_msg_peers {
2586 let mut peer_message_ids = message_ids.clone();
2587
2588 if peer_message_ids.len() > self.config.max_ihave_length() {
2589 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2593 peer_message_ids.truncate(self.config.max_ihave_length());
2594 }
2595
2596 messages.push((
2598 peer_id,
2599 RpcOut::IHave(IHave {
2600 topic_hash: topic_hash.clone(),
2601 message_ids: peer_message_ids,
2602 }),
2603 ));
2604 }
2605 }
2606 for (peer_id, message) in messages {
2607 self.send_message(peer_id, message);
2608 }
2609 }
2610
2611 fn send_graft_prune(
2614 &mut self,
2615 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2616 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2617 no_px: HashSet<PeerId>,
2618 ) {
2619 for (peer_id, topics) in to_graft.into_iter() {
2621 for topic in &topics {
2622 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2624 peer_score.graft(&peer_id, topic.clone());
2625 }
2626
2627 peer_added_to_mesh(
2630 peer_id,
2631 vec![topic],
2632 &self.mesh,
2633 &mut self.events,
2634 &self.connected_peers,
2635 );
2636 }
2637 let rpc_msgs = topics.iter().map(|topic_hash| {
2638 RpcOut::Graft(Graft {
2639 topic_hash: topic_hash.clone(),
2640 })
2641 });
2642
2643 let prune_msgs = to_prune
2650 .remove(&peer_id)
2651 .into_iter()
2652 .flatten()
2653 .map(|topic_hash| {
2654 let prune = self.make_prune(
2655 &topic_hash,
2656 &peer_id,
2657 self.config.do_px() && !no_px.contains(&peer_id),
2658 false,
2659 );
2660 RpcOut::Prune(prune)
2661 });
2662
2663 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2665 self.send_message(peer_id, msg);
2666 }
2667 }
2668
2669 for (peer_id, topics) in to_prune.iter() {
2672 for topic_hash in topics {
2673 let prune = self.make_prune(
2674 topic_hash,
2675 peer_id,
2676 self.config.do_px() && !no_px.contains(peer_id),
2677 false,
2678 );
2679 self.send_message(*peer_id, RpcOut::Prune(prune));
2680
2681 peer_removed_from_mesh(
2683 *peer_id,
2684 topic_hash,
2685 &self.mesh,
2686 &mut self.events,
2687 &self.connected_peers,
2688 );
2689 }
2690 }
2691 }
2692
2693 fn forward_msg(
2697 &mut self,
2698 msg_id: &MessageId,
2699 message: RawMessage,
2700 propagation_source: Option<&PeerId>,
2701 originating_peers: HashSet<PeerId>,
2702 ) -> bool {
2703 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2705 if let Some(peer) = propagation_source {
2706 peer_score.deliver_message(peer, msg_id, &message.topic);
2707 }
2708 }
2709
2710 tracing::debug!(message_id=%msg_id, "Forwarding message");
2711 let mut recipient_peers = HashSet::new();
2712
2713 for (peer_id, peer) in &self.connected_peers {
2717 if Some(peer_id) != propagation_source
2718 && !originating_peers.contains(peer_id)
2719 && Some(peer_id) != message.source.as_ref()
2720 && peer.topics.contains(&message.topic)
2721 && (self.explicit_peers.contains(peer_id)
2722 || (peer.kind == PeerKind::Floodsub
2723 && !self
2724 .peer_score
2725 .below_threshold(peer_id, |ts| ts.publish_threshold)
2726 .0))
2727 {
2728 recipient_peers.insert(*peer_id);
2729 }
2730 }
2731
2732 let topic = &message.topic;
2734 if let Some(mesh_peers) = self.mesh.get(topic) {
2736 for peer_id in mesh_peers {
2737 if Some(peer_id) != propagation_source
2738 && !originating_peers.contains(peer_id)
2739 && Some(peer_id) != message.source.as_ref()
2740 {
2741 recipient_peers.insert(*peer_id);
2742 }
2743 }
2744 }
2745
2746 if recipient_peers.is_empty() {
2747 return false;
2748 }
2749
2750 for peer_id in recipient_peers.iter() {
2752 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2753 if peer.dont_send.contains_key(msg_id) {
2754 tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2755 continue;
2756 }
2757
2758 tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2759
2760 self.send_message(
2761 *peer_id,
2762 RpcOut::Forward {
2763 message: message.clone(),
2764 timeout: Delay::new(self.config.forward_queue_duration()),
2765 },
2766 );
2767 }
2768 }
2769 tracing::debug!("Completed forwarding message");
2770 true
2771 }
2772
2773 pub(crate) fn build_raw_message(
2775 &mut self,
2776 topic: TopicHash,
2777 data: Vec<u8>,
2778 ) -> Result<RawMessage, PublishError> {
2779 match &mut self.publish_config {
2780 PublishConfig::Signing {
2781 ref keypair,
2782 author,
2783 inline_key,
2784 last_seq_no,
2785 } => {
2786 let sequence_number = last_seq_no.next();
2787
2788 let signature = {
2789 let message = proto::Message {
2790 from: Some(author.to_bytes()),
2791 data: Some(data.clone()),
2792 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2793 topic: topic.clone().into_string(),
2794 signature: None,
2795 key: None,
2796 };
2797
2798 let mut buf = Vec::with_capacity(message.get_size());
2799 let mut writer = Writer::new(&mut buf);
2800
2801 message
2802 .write_message(&mut writer)
2803 .expect("Encoding to succeed");
2804
2805 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2807 signature_bytes.extend_from_slice(&buf);
2808 Some(keypair.sign(&signature_bytes)?)
2809 };
2810
2811 Ok(RawMessage {
2812 source: Some(*author),
2813 data,
2814 sequence_number: Some(sequence_number),
2817 topic,
2818 signature,
2819 key: inline_key.clone(),
2820 validated: true, })
2822 }
2823 PublishConfig::Author(peer_id) => {
2824 Ok(RawMessage {
2825 source: Some(*peer_id),
2826 data,
2827 sequence_number: Some(rand::random()),
2830 topic,
2831 signature: None,
2832 key: None,
2833 validated: true, })
2835 }
2836 PublishConfig::RandomAuthor => {
2837 Ok(RawMessage {
2838 source: Some(PeerId::random()),
2839 data,
2840 sequence_number: Some(rand::random()),
2843 topic,
2844 signature: None,
2845 key: None,
2846 validated: true, })
2848 }
2849 PublishConfig::Anonymous => {
2850 Ok(RawMessage {
2851 source: None,
2852 data,
2853 sequence_number: None,
2856 topic,
2857 signature: None,
2858 key: None,
2859 validated: true, })
2861 }
2862 }
2863 }
2864
2865 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2872 #[cfg(feature = "metrics")]
2873 if let Some(m) = self.metrics.as_mut() {
2874 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2875 m.msg_sent(&message.topic, message.raw_protobuf_len());
2877 }
2878 }
2879
2880 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2881 tracing::error!(peer = %peer_id,
2882 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2883 return false;
2884 };
2885
2886 if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2887 tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2888 return false;
2889 }
2890
2891 match peer.sender.send_message(rpc) {
2893 Ok(()) => true,
2894 Err(rpc) => {
2895 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2897
2898 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2900 match rpc {
2901 RpcOut::Publish { .. } => {
2902 failed_messages.priority += 1;
2903 failed_messages.publish += 1;
2904 }
2905 RpcOut::Forward { .. } => {
2906 failed_messages.non_priority += 1;
2907 failed_messages.forward += 1;
2908 }
2909 RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2910 failed_messages.non_priority += 1;
2911 }
2912 RpcOut::Graft(_)
2913 | RpcOut::Prune(_)
2914 | RpcOut::Subscribe(_)
2915 | RpcOut::Unsubscribe(_) => {
2916 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2917 }
2918 }
2919
2920 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2922 peer_score.failed_message_slow_peer(&peer_id);
2923 }
2924
2925 false
2926 }
2927 }
2928 }
2929
2930 fn on_connection_established(
2931 &mut self,
2932 ConnectionEstablished {
2933 peer_id,
2934 endpoint,
2935 other_established,
2936 ..
2937 }: ConnectionEstablished,
2938 ) {
2939 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2941 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2942 peer_score.add_ip(&peer_id, ip);
2943 } else {
2944 tracing::trace!(
2945 peer=%peer_id,
2946 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2947 endpoint
2948 )
2949 }
2950 }
2951
2952 if other_established > 0 {
2953 return; }
2955
2956 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2957 peer_score.add_peer(peer_id);
2958 }
2959
2960 if self.blacklisted_peers.contains(&peer_id) {
2962 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2963 return;
2964 }
2965
2966 tracing::debug!(peer=%peer_id, "New peer connected");
2967 for topic_hash in self.mesh.clone().into_keys() {
2969 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2970 }
2971 }
2972
2973 fn on_connection_closed(
2974 &mut self,
2975 ConnectionClosed {
2976 peer_id,
2977 connection_id,
2978 endpoint,
2979 remaining_established,
2980 ..
2981 }: ConnectionClosed,
2982 ) {
2983 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2985 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2986 peer_score.remove_ip(&peer_id, &ip);
2987 } else {
2988 tracing::trace!(
2989 peer=%peer_id,
2990 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2991 endpoint
2992 )
2993 }
2994 }
2995
2996 if remaining_established != 0 {
2997 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2999 let index = peer
3000 .connections
3001 .iter()
3002 .position(|v| v == &connection_id)
3003 .expect("Previously established connection to peer must be present");
3004 peer.connections.remove(index);
3005
3006 if !peer.connections.is_empty() {
3009 for topic in &peer.topics {
3010 if let Some(mesh_peers) = self.mesh.get(topic) {
3011 if mesh_peers.contains(&peer_id) {
3012 self.events.push_back(ToSwarm::NotifyHandler {
3013 peer_id,
3014 event: HandlerIn::JoinedMesh,
3015 handler: NotifyHandler::One(peer.connections[0]),
3016 });
3017 break;
3018 }
3019 }
3020 }
3021 }
3022 }
3023 } else {
3024 tracing::debug!(peer=%peer_id, "Peer disconnected");
3026 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3027 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3028 return;
3029 };
3030
3031 for topic in &connected_peer.topics {
3033 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3035 if mesh_peers.remove(&peer_id) {
3037 #[cfg(feature = "metrics")]
3038 if let Some(m) = self.metrics.as_mut() {
3039 m.peers_removed(topic, Churn::Dc, 1);
3040 m.set_mesh_peers(topic, mesh_peers.len());
3041 }
3042 };
3043 }
3044
3045 #[cfg(feature = "metrics")]
3046 if let Some(m) = self.metrics.as_mut() {
3047 m.dec_topic_peers(topic);
3048 }
3049
3050 self.fanout
3052 .get_mut(topic)
3053 .map(|peers| peers.remove(&peer_id));
3054 }
3055
3056 self.px_peers.remove(&peer_id);
3058
3059 #[cfg(feature = "metrics")]
3061 if let Some(metrics) = self.metrics.as_mut() {
3062 metrics.peer_protocol_disconnected(connected_peer.kind);
3063 }
3064
3065 self.connected_peers.remove(&peer_id);
3066
3067 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3068 peer_score.remove_peer(&peer_id);
3069 }
3070 }
3071 }
3072
3073 fn on_address_change(
3074 &mut self,
3075 AddressChange {
3076 peer_id,
3077 old: endpoint_old,
3078 new: endpoint_new,
3079 ..
3080 }: AddressChange,
3081 ) {
3082 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3084 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3085 peer_score.remove_ip(&peer_id, &ip);
3086 } else {
3087 tracing::trace!(
3088 peer=%&peer_id,
3089 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3090 endpoint_old
3091 )
3092 }
3093 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3094 peer_score.add_ip(&peer_id, ip);
3095 } else {
3096 tracing::trace!(
3097 peer=%peer_id,
3098 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3099 endpoint_new
3100 )
3101 }
3102 }
3103 }
3104
3105 #[cfg(feature = "metrics")]
3106 pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3108 if let Some(metrics) = &mut self.metrics {
3109 metrics.register_allowed_topics(topics);
3110 }
3111 }
3112}
3113
3114fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3115 addr.iter().find_map(|p| match p {
3116 Ip4(addr) => Some(IpAddr::V4(addr)),
3117 Ip6(addr) => Some(IpAddr::V6(addr)),
3118 _ => None,
3119 })
3120}
3121
3122impl<C, F> NetworkBehaviour for Behaviour<C, F>
3123where
3124 C: Send + 'static + DataTransform,
3125 F: Send + 'static + TopicSubscriptionFilter,
3126{
3127 type ConnectionHandler = Handler;
3128 type ToSwarm = Event;
3129
3130 fn handle_established_inbound_connection(
3131 &mut self,
3132 connection_id: ConnectionId,
3133 peer_id: PeerId,
3134 _: &Multiaddr,
3135 _: &Multiaddr,
3136 ) -> Result<THandler<Self>, ConnectionDenied> {
3137 let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3143 kind: PeerKind::Floodsub,
3144 connections: vec![],
3145 outbound: false,
3146 sender: Sender::new(self.config.connection_handler_queue_len()),
3147 topics: Default::default(),
3148 dont_send: LinkedHashMap::new(),
3149 });
3150 connected_peer.connections.push(connection_id);
3152
3153 Ok(Handler::new(
3154 self.config.protocol_config(),
3155 connected_peer.sender.new_receiver(),
3156 ))
3157 }
3158
3159 fn handle_established_outbound_connection(
3160 &mut self,
3161 connection_id: ConnectionId,
3162 peer_id: PeerId,
3163 _: &Multiaddr,
3164 _: Endpoint,
3165 _: PortUse,
3166 ) -> Result<THandler<Self>, ConnectionDenied> {
3167 let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3168 kind: PeerKind::Floodsub,
3169 connections: vec![],
3170 outbound: !self.px_peers.contains(&peer_id),
3173 sender: Sender::new(self.config.connection_handler_queue_len()),
3174 topics: Default::default(),
3175 dont_send: LinkedHashMap::new(),
3176 });
3177 connected_peer.connections.push(connection_id);
3179
3180 Ok(Handler::new(
3181 self.config.protocol_config(),
3182 connected_peer.sender.new_receiver(),
3183 ))
3184 }
3185
3186 fn on_connection_handler_event(
3187 &mut self,
3188 propagation_source: PeerId,
3189 _connection_id: ConnectionId,
3190 handler_event: THandlerOutEvent<Self>,
3191 ) {
3192 match handler_event {
3193 HandlerEvent::PeerKind(kind) => {
3194 #[cfg(feature = "metrics")]
3197 if let Some(metrics) = self.metrics.as_mut() {
3198 metrics.peer_protocol_connected(kind);
3199 }
3200
3201 if let PeerKind::NotSupported = kind {
3202 tracing::debug!(
3203 peer=%propagation_source,
3204 "Peer does not support gossipsub protocols"
3205 );
3206 self.events
3207 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3208 peer_id: propagation_source,
3209 }));
3210 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3211 tracing::debug!(
3215 peer=%propagation_source,
3216 peer_type=%kind,
3217 "New peer type found for peer"
3218 );
3219 if let PeerKind::Floodsub = conn.kind {
3220 conn.kind = kind;
3221 }
3222 }
3223 }
3224 HandlerEvent::MessageDropped(rpc) => {
3225 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3227 peer_score.failed_message_slow_peer(&propagation_source);
3228 }
3229
3230 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3232 failed_messages.timeout += 1;
3233 match rpc {
3234 RpcOut::Publish { .. } => {
3235 failed_messages.publish += 1;
3236 }
3237 RpcOut::Forward { .. } => {
3238 failed_messages.forward += 1;
3239 }
3240 _ => {}
3241 }
3242
3243 #[cfg(feature = "metrics")]
3245 if let Some(metrics) = self.metrics.as_mut() {
3246 match rpc {
3247 RpcOut::Publish { message, .. } => {
3248 metrics.publish_msg_dropped(&message.topic);
3249 metrics.timeout_msg_dropped(&message.topic);
3250 }
3251 RpcOut::Forward { message, .. } => {
3252 metrics.forward_msg_dropped(&message.topic);
3253 metrics.timeout_msg_dropped(&message.topic);
3254 }
3255 _ => {}
3256 }
3257 }
3258 }
3259 HandlerEvent::Message {
3260 rpc,
3261 invalid_messages,
3262 } => {
3263 if !rpc.subscriptions.is_empty() {
3268 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3269 }
3270
3271 if let (true, _) = self
3273 .peer_score
3274 .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3275 {
3276 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3277 return;
3278 }
3279
3280 if let PeerScoreState::Active(_) = self.peer_score {
3282 for (raw_message, validation_error) in invalid_messages {
3283 self.handle_invalid_message(
3284 &propagation_source,
3285 &raw_message.topic,
3286 None,
3287 RejectReason::ValidationError(validation_error),
3288 )
3289 }
3290 } else {
3291 for (message, validation_error) in invalid_messages {
3293 tracing::warn!(
3294 peer=%propagation_source,
3295 source=?message.source,
3296 "Invalid message from peer. Reason: {:?}",
3297 validation_error,
3298 );
3299 }
3300 }
3301
3302 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3304 if self
3306 .config
3307 .max_messages_per_rpc()
3308 .is_some_and(|max_msg| count >= max_msg)
3309 {
3310 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3311 break;
3312 }
3313 self.handle_received_message(raw_message, &propagation_source);
3314 }
3315
3316 let mut ihave_msgs = vec![];
3320 let mut graft_msgs = vec![];
3321 let mut prune_msgs = vec![];
3322 for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3323 if self
3325 .config
3326 .max_messages_per_rpc()
3327 .is_some_and(|max_msg| count >= max_msg)
3328 {
3329 tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3330 break;
3331 }
3332
3333 match control_msg {
3334 ControlAction::IHave(IHave {
3335 topic_hash,
3336 message_ids,
3337 }) => {
3338 ihave_msgs.push((topic_hash, message_ids));
3339 }
3340 ControlAction::IWant(IWant { message_ids }) => {
3341 self.handle_iwant(&propagation_source, message_ids)
3342 }
3343 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3344 ControlAction::Prune(Prune {
3345 topic_hash,
3346 peers,
3347 backoff,
3348 }) => prune_msgs.push((topic_hash, peers, backoff)),
3349 ControlAction::IDontWant(IDontWant { message_ids }) => {
3350 let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3351 else {
3352 tracing::error!(peer = %propagation_source,
3353 "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3354 continue;
3355 };
3356 #[cfg(feature = "metrics")]
3357 if let Some(metrics) = self.metrics.as_mut() {
3358 metrics.register_idontwant(message_ids.len());
3359 }
3360 for message_id in message_ids {
3361 peer.dont_send.insert(message_id, Instant::now());
3362 if peer.dont_send.len() > IDONTWANT_CAP {
3364 peer.dont_send.pop_front();
3365 }
3366 }
3367 }
3368 }
3369 }
3370 if !ihave_msgs.is_empty() {
3371 self.handle_ihave(&propagation_source, ihave_msgs);
3372 }
3373 if !graft_msgs.is_empty() {
3374 self.handle_graft(&propagation_source, graft_msgs);
3375 }
3376 if !prune_msgs.is_empty() {
3377 self.handle_prune(&propagation_source, prune_msgs);
3378 }
3379 }
3380 }
3381 }
3382
3383 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3384 fn poll(
3385 &mut self,
3386 cx: &mut Context<'_>,
3387 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3388 if let Some(event) = self.events.pop_front() {
3389 return Poll::Ready(event);
3390 }
3391
3392 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3394 if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3395 peer_score.refresh_scores();
3396 peer_score
3397 .decay_interval
3398 .reset(peer_score.params.decay_interval);
3399 }
3400 }
3401
3402 if self.heartbeat.poll_unpin(cx).is_ready() {
3403 self.heartbeat();
3404 self.heartbeat.reset(self.config.heartbeat_interval());
3405 }
3406
3407 Poll::Pending
3408 }
3409
3410 fn on_swarm_event(&mut self, event: FromSwarm) {
3411 match event {
3412 FromSwarm::ConnectionEstablished(connection_established) => {
3413 self.on_connection_established(connection_established)
3414 }
3415 FromSwarm::ConnectionClosed(connection_closed) => {
3416 self.on_connection_closed(connection_closed)
3417 }
3418 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3419 _ => {}
3420 }
3421 }
3422}
3423
3424fn peer_added_to_mesh(
3428 peer_id: PeerId,
3429 new_topics: Vec<&TopicHash>,
3430 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3431 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3432 connections: &HashMap<PeerId, PeerDetails>,
3433) {
3434 let connection_id = match connections.get(&peer_id) {
3436 Some(p) => p
3437 .connections
3438 .first()
3439 .expect("There should be at least one connection to a peer."),
3440 None => {
3441 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3442 return;
3443 }
3444 };
3445
3446 if let Some(peer) = connections.get(&peer_id) {
3447 for topic in &peer.topics {
3448 if !new_topics.contains(&topic) {
3449 if let Some(mesh_peers) = mesh.get(topic) {
3450 if mesh_peers.contains(&peer_id) {
3451 return;
3453 }
3454 }
3455 }
3456 }
3457 }
3458 events.push_back(ToSwarm::NotifyHandler {
3460 peer_id,
3461 event: HandlerIn::JoinedMesh,
3462 handler: NotifyHandler::One(*connection_id),
3463 });
3464}
3465
3466fn peer_removed_from_mesh(
3470 peer_id: PeerId,
3471 old_topic: &TopicHash,
3472 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3473 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3474 connections: &HashMap<PeerId, PeerDetails>,
3475) {
3476 let connection_id = match connections.get(&peer_id) {
3478 Some(p) => p
3479 .connections
3480 .first()
3481 .expect("There should be at least one connection to a peer."),
3482 None => {
3483 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3484 return;
3485 }
3486 };
3487
3488 if let Some(peer) = connections.get(&peer_id) {
3489 for topic in &peer.topics {
3490 if topic != old_topic {
3491 if let Some(mesh_peers) = mesh.get(topic) {
3492 if mesh_peers.contains(&peer_id) {
3493 return;
3495 }
3496 }
3497 }
3498 }
3499 }
3500 events.push_back(ToSwarm::NotifyHandler {
3502 peer_id,
3503 event: HandlerIn::LeftMesh,
3504 handler: NotifyHandler::One(*connection_id),
3505 });
3506}
3507
3508fn get_random_peers_dynamic(
3512 connected_peers: &HashMap<PeerId, PeerDetails>,
3513 topic_hash: &TopicHash,
3514 n_map: impl Fn(usize) -> usize,
3516 mut f: impl FnMut(&PeerId) -> bool,
3517) -> BTreeSet<PeerId> {
3518 let mut gossip_peers = connected_peers
3519 .iter()
3520 .filter(|(_, p)| p.topics.contains(topic_hash))
3521 .filter(|(peer_id, _)| f(peer_id))
3522 .filter(|(_, p)| p.kind.is_gossipsub())
3523 .map(|(peer_id, _)| *peer_id)
3524 .collect::<Vec<PeerId>>();
3525
3526 let n = n_map(gossip_peers.len());
3528 if gossip_peers.len() <= n {
3529 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3530 return gossip_peers.into_iter().collect();
3531 }
3532
3533 let mut rng = thread_rng();
3535 gossip_peers.partial_shuffle(&mut rng, n);
3536
3537 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3538
3539 gossip_peers.into_iter().take(n).collect()
3540}
3541
3542fn get_random_peers(
3545 connected_peers: &HashMap<PeerId, PeerDetails>,
3546 topic_hash: &TopicHash,
3547 n: usize,
3548 f: impl FnMut(&PeerId) -> bool,
3549) -> BTreeSet<PeerId> {
3550 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3551}
3552
3553fn validate_config(
3556 authenticity: &MessageAuthenticity,
3557 validation_mode: &ValidationMode,
3558) -> Result<(), &'static str> {
3559 match validation_mode {
3560 ValidationMode::Anonymous => {
3561 if authenticity.is_signing() {
3562 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3563 }
3564
3565 if !authenticity.is_anonymous() {
3566 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3567 }
3568 }
3569 ValidationMode::Strict => {
3570 if !authenticity.is_signing() {
3571 return Err(
3572 "Messages will be
3573 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3574 the validation or privacy settings in the config"
3575 );
3576 }
3577 }
3578 _ => {}
3579 }
3580 Ok(())
3581}
3582
3583impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3585 f.debug_struct("Behaviour")
3586 .field("config", &self.config)
3587 .field("events", &self.events.len())
3588 .field("publish_config", &self.publish_config)
3589 .field("mesh", &self.mesh)
3590 .field("fanout", &self.fanout)
3591 .field("fanout_last_pub", &self.fanout_last_pub)
3592 .field("mcache", &self.mcache)
3593 .field("heartbeat", &self.heartbeat)
3594 .finish()
3595 }
3596}
3597
3598impl fmt::Debug for PublishConfig {
3599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3600 match self {
3601 PublishConfig::Signing { author, .. } => {
3602 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3603 }
3604 PublishConfig::Author(author) => {
3605 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3606 }
3607 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3608 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3609 }
3610 }
3611}