1use std::{
22 cmp::{max, Ordering, Ordering::Equal},
23 collections::{BTreeSet, HashMap, HashSet, VecDeque},
24 fmt,
25 fmt::Debug,
26 net::IpAddr,
27 task::{Context, Poll},
28 time::Duration,
29};
30
31use futures::FutureExt;
32use futures_timer::Delay;
33use hashlink::LinkedHashMap;
34use libp2p_core::{
35 multiaddr::Protocol::{Ip4, Ip6},
36 transport::PortUse,
37 Endpoint, Multiaddr,
38};
39use libp2p_identity::{Keypair, PeerId};
40use libp2p_swarm::{
41 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
42 dial_opts::DialOpts,
43 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
44 THandlerOutEvent, ToSwarm,
45};
46use prometheus_client::registry::Registry;
47use quick_protobuf::{MessageWrite, Writer};
48use rand::{seq::SliceRandom, thread_rng};
49use web_time::{Instant, SystemTime};
50
51use crate::{
52 backoff::BackoffStorage,
53 config::{Config, ValidationMode},
54 gossip_promises::GossipPromises,
55 handler::{Handler, HandlerEvent, HandlerIn},
56 mcache::MessageCache,
57 metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
58 peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
59 protocol::SIGNING_PREFIX,
60 rpc::Sender,
61 rpc_proto::proto,
62 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
63 time_cache::DuplicateCache,
64 topic::{Hasher, Topic, TopicHash},
65 transform::{DataTransform, IdentityTransform},
66 types::{
67 ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
68 PeerConnections, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
69 SubscriptionAction,
70 },
71 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
72};
73
74#[cfg(test)]
75mod tests;
76
77const IDONTWANT_CAP: usize = 10_000;
79
80const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
82
83#[derive(Clone)]
90pub enum MessageAuthenticity {
91 Signed(Keypair),
94 Author(PeerId),
99 RandomAuthor,
104 Anonymous,
114}
115
116impl MessageAuthenticity {
117 pub fn is_signing(&self) -> bool {
119 matches!(self, MessageAuthenticity::Signed(_))
120 }
121
122 pub fn is_anonymous(&self) -> bool {
123 matches!(self, MessageAuthenticity::Anonymous)
124 }
125}
126
127#[derive(Debug)]
129pub enum Event {
130 Message {
132 propagation_source: PeerId,
134 message_id: MessageId,
137 message: Message,
139 },
140 Subscribed {
142 peer_id: PeerId,
144 topic: TopicHash,
146 },
147 Unsubscribed {
149 peer_id: PeerId,
151 topic: TopicHash,
153 },
154 GossipsubNotSupported { peer_id: PeerId },
156 SlowPeer {
158 peer_id: PeerId,
160 failed_messages: FailedMessages,
162 },
163}
164
165#[allow(clippy::large_enum_variant)]
168enum PublishConfig {
169 Signing {
170 keypair: Keypair,
171 author: PeerId,
172 inline_key: Option<Vec<u8>>,
173 last_seq_no: SequenceNumber,
174 },
175 Author(PeerId),
176 RandomAuthor,
177 Anonymous,
178}
179
180#[derive(Debug)]
184struct SequenceNumber(u64);
185
186impl SequenceNumber {
187 fn new() -> Self {
188 let unix_timestamp = SystemTime::now()
189 .duration_since(SystemTime::UNIX_EPOCH)
190 .expect("time to be linear")
191 .as_nanos();
192
193 Self(unix_timestamp as u64)
194 }
195
196 fn next(&mut self) -> u64 {
197 self.0 = self
198 .0
199 .checked_add(1)
200 .expect("to not exhaust u64 space for sequence numbers");
201
202 self.0
203 }
204}
205
206impl PublishConfig {
207 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
208 match self {
209 Self::Signing { author, .. } => Some(author),
210 Self::Author(author) => Some(author),
211 _ => None,
212 }
213 }
214}
215
216impl From<MessageAuthenticity> for PublishConfig {
217 fn from(authenticity: MessageAuthenticity) -> Self {
218 match authenticity {
219 MessageAuthenticity::Signed(keypair) => {
220 let public_key = keypair.public();
221 let key_enc = public_key.encode_protobuf();
222 let key = if key_enc.len() <= 42 {
223 None
227 } else {
228 Some(key_enc)
230 };
231
232 PublishConfig::Signing {
233 keypair,
234 author: public_key.to_peer_id(),
235 inline_key: key,
236 last_seq_no: SequenceNumber::new(),
237 }
238 }
239 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
240 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
241 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
242 }
243 }
244}
245
246pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
258 config: Config,
260
261 events: VecDeque<ToSwarm<Event, HandlerIn>>,
263
264 publish_config: PublishConfig,
266
267 duplicate_cache: DuplicateCache<MessageId>,
270
271 connected_peers: HashMap<PeerId, PeerConnections>,
274
275 explicit_peers: HashSet<PeerId>,
278
279 blacklisted_peers: HashSet<PeerId>,
282
283 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
285
286 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
288
289 fanout_last_pub: HashMap<TopicHash, Instant>,
291
292 backoffs: BackoffStorage,
294
295 mcache: MessageCache,
297
298 heartbeat: Delay,
300
301 heartbeat_ticks: u64,
304
305 px_peers: HashSet<PeerId>,
310
311 outbound_peers: HashSet<PeerId>,
314
315 peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,
318
319 count_received_ihave: HashMap<PeerId, usize>,
321
322 count_sent_iwant: HashMap<PeerId, usize>,
324
325 published_message_ids: DuplicateCache<MessageId>,
328
329 subscription_filter: F,
331
332 data_transform: D,
336
337 metrics: Option<Metrics>,
339
340 failed_messages: HashMap<PeerId, FailedMessages>,
342
343 gossip_promises: GossipPromises,
345}
346
347impl<D, F> Behaviour<D, F>
348where
349 D: DataTransform + Default,
350 F: TopicSubscriptionFilter + Default,
351{
352 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
355 Self::new_with_subscription_filter_and_transform(
356 privacy,
357 config,
358 None,
359 F::default(),
360 D::default(),
361 )
362 }
363
364 pub fn new_with_metrics(
368 privacy: MessageAuthenticity,
369 config: Config,
370 metrics_registry: &mut Registry,
371 metrics_config: MetricsConfig,
372 ) -> Result<Self, &'static str> {
373 Self::new_with_subscription_filter_and_transform(
374 privacy,
375 config,
376 Some((metrics_registry, metrics_config)),
377 F::default(),
378 D::default(),
379 )
380 }
381}
382
383impl<D, F> Behaviour<D, F>
384where
385 D: DataTransform + Default,
386 F: TopicSubscriptionFilter,
387{
388 pub fn new_with_subscription_filter(
391 privacy: MessageAuthenticity,
392 config: Config,
393 metrics: Option<(&mut Registry, MetricsConfig)>,
394 subscription_filter: F,
395 ) -> Result<Self, &'static str> {
396 Self::new_with_subscription_filter_and_transform(
397 privacy,
398 config,
399 metrics,
400 subscription_filter,
401 D::default(),
402 )
403 }
404}
405
406impl<D, F> Behaviour<D, F>
407where
408 D: DataTransform,
409 F: TopicSubscriptionFilter + Default,
410{
411 pub fn new_with_transform(
414 privacy: MessageAuthenticity,
415 config: Config,
416 metrics: Option<(&mut Registry, MetricsConfig)>,
417 data_transform: D,
418 ) -> Result<Self, &'static str> {
419 Self::new_with_subscription_filter_and_transform(
420 privacy,
421 config,
422 metrics,
423 F::default(),
424 data_transform,
425 )
426 }
427}
428
429impl<D, F> Behaviour<D, F>
430where
431 D: DataTransform,
432 F: TopicSubscriptionFilter,
433{
434 pub fn new_with_subscription_filter_and_transform(
437 privacy: MessageAuthenticity,
438 config: Config,
439 metrics: Option<(&mut Registry, MetricsConfig)>,
440 subscription_filter: F,
441 data_transform: D,
442 ) -> Result<Self, &'static str> {
443 validate_config(&privacy, config.validation_mode())?;
448
449 Ok(Behaviour {
450 metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
451 events: VecDeque::new(),
452 publish_config: privacy.into(),
453 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
454 explicit_peers: HashSet::new(),
455 blacklisted_peers: HashSet::new(),
456 mesh: HashMap::new(),
457 fanout: HashMap::new(),
458 fanout_last_pub: HashMap::new(),
459 backoffs: BackoffStorage::new(
460 &config.prune_backoff(),
461 config.heartbeat_interval(),
462 config.backoff_slack(),
463 ),
464 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
465 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
466 heartbeat_ticks: 0,
467 px_peers: HashSet::new(),
468 outbound_peers: HashSet::new(),
469 peer_score: None,
470 count_received_ihave: HashMap::new(),
471 count_sent_iwant: HashMap::new(),
472 connected_peers: HashMap::new(),
473 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
474 config,
475 subscription_filter,
476 data_transform,
477 failed_messages: Default::default(),
478 gossip_promises: Default::default(),
479 })
480 }
481}
482
483impl<D, F> Behaviour<D, F>
484where
485 D: DataTransform + Send + 'static,
486 F: TopicSubscriptionFilter + Send + 'static,
487{
488 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
490 self.mesh.keys()
491 }
492
493 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
495 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
496 }
497
498 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
499 let mut res = BTreeSet::new();
500 for peers in self.mesh.values() {
501 res.extend(peers);
502 }
503 res.into_iter()
504 }
505
506 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
508 self.connected_peers
509 .iter()
510 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
511 }
512
513 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
515 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
516 }
517
518 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
520 self.peer_score
521 .as_ref()
522 .map(|(score, ..)| score.score(peer_id))
523 }
524
525 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
530 tracing::debug!(%topic, "Subscribing to topic");
531 let topic_hash = topic.hash();
532 if !self.subscription_filter.can_subscribe(&topic_hash) {
533 return Err(SubscriptionError::NotAllowed);
534 }
535
536 if self.mesh.contains_key(&topic_hash) {
537 tracing::debug!(%topic, "Topic is already in the mesh");
538 return Ok(false);
539 }
540
541 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
543 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
544 let event = RpcOut::Subscribe(topic_hash.clone());
545 self.send_message(peer_id, event);
546 }
547
548 self.join(&topic_hash);
551 tracing::debug!(%topic, "Subscribed to topic");
552 Ok(true)
553 }
554
555 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
559 tracing::debug!(%topic, "Unsubscribing from topic");
560 let topic_hash = topic.hash();
561
562 if !self.mesh.contains_key(&topic_hash) {
563 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
564 return false;
566 }
567
568 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
570 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
571 let event = RpcOut::Unsubscribe(topic_hash.clone());
572 self.send_message(peer, event);
573 }
574
575 self.leave(&topic_hash);
578
579 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
580 true
581 }
582
583 pub fn publish(
585 &mut self,
586 topic: impl Into<TopicHash>,
587 data: impl Into<Vec<u8>>,
588 ) -> Result<MessageId, PublishError> {
589 let data = data.into();
590 let topic = topic.into();
591
592 let transformed_data = self
594 .data_transform
595 .outbound_transform(&topic, data.clone())?;
596
597 if transformed_data.len() > self.config.max_transmit_size() {
599 return Err(PublishError::MessageTooLarge);
600 }
601
602 let raw_message = self.build_raw_message(topic, transformed_data)?;
603
604 let msg_id = self.config.message_id(&Message {
606 source: raw_message.source,
607 data, sequence_number: raw_message.sequence_number,
609 topic: raw_message.topic.clone(),
610 });
611
612 if self.duplicate_cache.contains(&msg_id) {
614 tracing::warn!(
617 message=%msg_id,
618 "Not publishing a message that has already been published"
619 );
620 return Err(PublishError::Duplicate);
621 }
622
623 tracing::trace!(message=%msg_id, "Publishing message");
624
625 let topic_hash = raw_message.topic.clone();
626
627 let mut peers_on_topic = self
628 .connected_peers
629 .iter()
630 .filter(|(_, p)| p.topics.contains(&topic_hash))
631 .map(|(peer_id, _)| peer_id)
632 .peekable();
633
634 if peers_on_topic.peek().is_none() {
635 return Err(PublishError::NoPeersSubscribedToTopic);
636 }
637
638 let mut recipient_peers = HashSet::new();
639 if self.config.flood_publish() {
640 recipient_peers.extend(peers_on_topic.filter(|p| {
642 self.explicit_peers.contains(*p)
643 || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
644 }));
645 } else {
646 match self.mesh.get(&topic_hash) {
647 Some(mesh_peers) => {
649 let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
652
653 if needed_extra_peers > 0 {
654 let peer_list = get_random_peers(
659 &self.connected_peers,
660 &topic_hash,
661 needed_extra_peers,
662 |peer| {
663 !mesh_peers.contains(peer)
664 && !self.explicit_peers.contains(peer)
665 && !self
666 .score_below_threshold(peer, |pst| pst.publish_threshold)
667 .0
668 },
669 );
670 recipient_peers.extend(peer_list);
671 }
672
673 recipient_peers.extend(mesh_peers);
674 }
675 None => {
677 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678 let fanout_peers = self
680 .fanout
681 .get(&topic_hash)
682 .filter(|peers| !peers.is_empty());
683 if let Some(peers) = fanout_peers {
685 for peer in peers {
686 recipient_peers.insert(*peer);
687 }
688 } else {
689 let mesh_n = self.config.mesh_n();
691 let new_peers =
692 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
693 |p| {
694 !self.explicit_peers.contains(p)
695 && !self
696 .score_below_threshold(p, |pst| pst.publish_threshold)
697 .0
698 }
699 });
700 self.fanout.insert(topic_hash.clone(), new_peers.clone());
702 for peer in new_peers {
703 tracing::debug!(%peer, "Peer added to fanout");
704 recipient_peers.insert(peer);
705 }
706 }
707 self.fanout_last_pub
709 .insert(topic_hash.clone(), Instant::now());
710 }
711 }
712
713 recipient_peers
715 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717 for (peer, connections) in &self.connected_peers {
719 if connections.kind == PeerKind::Floodsub
720 && connections.topics.contains(&topic_hash)
721 && !self
722 .score_below_threshold(peer, |ts| ts.publish_threshold)
723 .0
724 {
725 recipient_peers.insert(*peer);
726 }
727 }
728 }
729
730 self.duplicate_cache.insert(msg_id.clone());
733 self.mcache.put(&msg_id, raw_message.clone());
734
735 self.gossip_promises.message_delivered(&msg_id);
737
738 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
741 if !self.config.allow_self_origin() {
742 self.published_message_ids.insert(msg_id.clone());
743 }
744 }
745
746 let mut publish_failed = true;
748 for peer_id in recipient_peers.iter() {
749 tracing::trace!(peer=%peer_id, "Sending message to peer");
750 if self.send_message(
751 *peer_id,
752 RpcOut::Publish {
753 message: raw_message.clone(),
754 timeout: Delay::new(self.config.publish_queue_duration()),
755 },
756 ) {
757 publish_failed = false
758 }
759 }
760
761 if recipient_peers.is_empty() {
762 return Err(PublishError::NoPeersSubscribedToTopic);
763 }
764
765 if publish_failed {
766 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
767 }
768
769 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
771 && self.config.idontwant_on_publish()
772 {
773 self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
774 }
775
776 tracing::debug!(message=%msg_id, "Published message");
777
778 if let Some(metrics) = self.metrics.as_mut() {
779 metrics.register_published_message(&topic_hash);
780 }
781
782 Ok(msg_id)
783 }
784
785 pub fn report_message_validation_result(
805 &mut self,
806 msg_id: &MessageId,
807 propagation_source: &PeerId,
808 acceptance: MessageAcceptance,
809 ) -> bool {
810 let reject_reason = match acceptance {
811 MessageAcceptance::Accept => {
812 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
813 Some((raw_message, originating_peers)) => {
814 (raw_message.clone(), originating_peers)
815 }
816 None => {
817 tracing::warn!(
818 message=%msg_id,
819 "Message not in cache. Ignoring forwarding"
820 );
821 if let Some(metrics) = self.metrics.as_mut() {
822 metrics.memcache_miss();
823 }
824 return false;
825 }
826 };
827
828 if let Some(metrics) = self.metrics.as_mut() {
829 metrics.register_msg_validation(&raw_message.topic, &acceptance);
830 }
831
832 self.forward_msg(
833 msg_id,
834 raw_message,
835 Some(propagation_source),
836 originating_peers,
837 );
838 return true;
839 }
840 MessageAcceptance::Reject => RejectReason::ValidationFailed,
841 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
842 };
843
844 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
845 if let Some(metrics) = self.metrics.as_mut() {
846 metrics.register_msg_validation(&raw_message.topic, &acceptance);
847 }
848
849 if let Some((peer_score, ..)) = &mut self.peer_score {
852 peer_score.reject_message(
853 propagation_source,
854 msg_id,
855 &raw_message.topic,
856 reject_reason,
857 );
858 for peer in originating_peers.iter() {
859 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
860 }
861 }
862 true
863 } else {
864 tracing::warn!(message=%msg_id, "Rejected message not in cache");
865 false
866 }
867 }
868
869 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
871 tracing::debug!(peer=%peer_id, "Adding explicit peer");
872
873 self.explicit_peers.insert(*peer_id);
874
875 self.check_explicit_peer_connection(peer_id);
876 }
877
878 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
881 tracing::debug!(peer=%peer_id, "Removing explicit peer");
882 self.explicit_peers.remove(peer_id);
883 }
884
885 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
888 if self.blacklisted_peers.insert(*peer_id) {
889 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
890 }
891 }
892
893 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
895 if self.blacklisted_peers.remove(peer_id) {
896 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
897 }
898 }
899
900 pub fn with_peer_score(
904 &mut self,
905 params: PeerScoreParams,
906 threshold: PeerScoreThresholds,
907 ) -> Result<(), String> {
908 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
909 }
910
911 pub fn with_peer_score_and_message_delivery_time_callback(
914 &mut self,
915 params: PeerScoreParams,
916 threshold: PeerScoreThresholds,
917 callback: Option<fn(&PeerId, &TopicHash, f64)>,
918 ) -> Result<(), String> {
919 params.validate()?;
920 threshold.validate()?;
921
922 if self.peer_score.is_some() {
923 return Err("Peer score set twice".into());
924 }
925
926 let interval = Delay::new(params.decay_interval);
927 let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
928 self.peer_score = Some((peer_score, threshold, interval));
929 Ok(())
930 }
931
932 pub fn set_topic_params<H: Hasher>(
936 &mut self,
937 topic: Topic<H>,
938 params: TopicScoreParams,
939 ) -> Result<(), &'static str> {
940 if let Some((peer_score, ..)) = &mut self.peer_score {
941 peer_score.set_topic_params(topic.hash(), params);
942 Ok(())
943 } else {
944 Err("Peer score must be initialised with `with_peer_score()`")
945 }
946 }
947
948 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
950 self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
951 }
952
953 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
956 if let Some((peer_score, ..)) = &mut self.peer_score {
957 peer_score.set_application_score(peer_id, new_score)
958 } else {
959 false
960 }
961 }
962
963 fn join(&mut self, topic_hash: &TopicHash) {
965 tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
966
967 if self.mesh.contains_key(topic_hash) {
969 tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
970 return;
971 }
972
973 let mut added_peers = HashSet::new();
974
975 if let Some(m) = self.metrics.as_mut() {
976 m.joined(topic_hash)
977 }
978
979 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
982 tracing::debug!(
983 topic=%topic_hash,
984 "JOIN: Removing peers from the fanout for topic"
985 );
986
987 peers.retain(|p| {
989 !self.explicit_peers.contains(p)
990 && !self.score_below_threshold(p, |_| 0.0).0
991 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
992 });
993
994 let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
997 tracing::debug!(
998 topic=%topic_hash,
999 "JOIN: Adding {:?} peers from the fanout for topic",
1000 add_peers
1001 );
1002 added_peers.extend(peers.iter().take(add_peers));
1003
1004 self.mesh.insert(
1005 topic_hash.clone(),
1006 peers.into_iter().take(add_peers).collect(),
1007 );
1008
1009 self.fanout_last_pub.remove(topic_hash);
1011 }
1012
1013 let fanaout_added = added_peers.len();
1014 if let Some(m) = self.metrics.as_mut() {
1015 m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
1016 }
1017
1018 if added_peers.len() < self.config.mesh_n() {
1020 let new_peers = get_random_peers(
1022 &self.connected_peers,
1023 topic_hash,
1024 self.config.mesh_n() - added_peers.len(),
1025 |peer| {
1026 !added_peers.contains(peer)
1027 && !self.explicit_peers.contains(peer)
1028 && !self.score_below_threshold(peer, |_| 0.0).0
1029 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1030 },
1031 );
1032 added_peers.extend(new_peers.clone());
1033 tracing::debug!(
1035 "JOIN: Inserting {:?} random peers into the mesh",
1036 new_peers.len()
1037 );
1038 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1039 mesh_peers.extend(new_peers);
1040 }
1041
1042 let random_added = added_peers.len() - fanaout_added;
1043 if let Some(m) = self.metrics.as_mut() {
1044 m.peers_included(topic_hash, Inclusion::Random, random_added)
1045 }
1046
1047 for peer_id in added_peers {
1048 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1050 if let Some((peer_score, ..)) = &mut self.peer_score {
1051 peer_score.graft(&peer_id, topic_hash.clone());
1052 }
1053 self.send_message(
1054 peer_id,
1055 RpcOut::Graft(Graft {
1056 topic_hash: topic_hash.clone(),
1057 }),
1058 );
1059
1060 peer_added_to_mesh(
1062 peer_id,
1063 vec![topic_hash],
1064 &self.mesh,
1065 &mut self.events,
1066 &self.connected_peers,
1067 );
1068 }
1069
1070 let mesh_peers = self.mesh_peers(topic_hash).count();
1071 if let Some(m) = self.metrics.as_mut() {
1072 m.set_mesh_peers(topic_hash, mesh_peers)
1073 }
1074
1075 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1076 }
1077
1078 fn make_prune(
1080 &mut self,
1081 topic_hash: &TopicHash,
1082 peer: &PeerId,
1083 do_px: bool,
1084 on_unsubscribe: bool,
1085 ) -> Prune {
1086 if let Some((peer_score, ..)) = &mut self.peer_score {
1087 peer_score.prune(peer, topic_hash.clone());
1088 }
1089
1090 match self.connected_peers.get(peer).map(|v| &v.kind) {
1091 Some(PeerKind::Floodsub) => {
1092 tracing::error!("Attempted to prune a Floodsub peer");
1093 }
1094 Some(PeerKind::Gossipsub) => {
1095 return Prune {
1097 topic_hash: topic_hash.clone(),
1098 peers: Vec::new(),
1099 backoff: None,
1100 };
1101 }
1102 None => {
1103 tracing::error!("Attempted to Prune an unknown peer");
1104 }
1105 _ => {} }
1107
1108 let peers = if do_px {
1110 get_random_peers(
1111 &self.connected_peers,
1112 topic_hash,
1113 self.config.prune_peers(),
1114 |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1115 )
1116 .into_iter()
1117 .map(|p| PeerInfo { peer_id: Some(p) })
1118 .collect()
1119 } else {
1120 Vec::new()
1121 };
1122
1123 let backoff = if on_unsubscribe {
1124 self.config.unsubscribe_backoff()
1125 } else {
1126 self.config.prune_backoff()
1127 };
1128
1129 self.backoffs.update_backoff(topic_hash, peer, backoff);
1131
1132 Prune {
1133 topic_hash: topic_hash.clone(),
1134 peers,
1135 backoff: Some(backoff.as_secs()),
1136 }
1137 }
1138
1139 fn leave(&mut self, topic_hash: &TopicHash) {
1141 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1142
1143 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1145 if let Some(m) = self.metrics.as_mut() {
1146 m.left(topic_hash)
1147 }
1148 for peer_id in peers {
1149 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1151
1152 let on_unsubscribe = true;
1153 let prune =
1154 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1155 self.send_message(peer_id, RpcOut::Prune(prune));
1156
1157 peer_removed_from_mesh(
1159 peer_id,
1160 topic_hash,
1161 &self.mesh,
1162 &mut self.events,
1163 &self.connected_peers,
1164 );
1165 }
1166 }
1167 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1168 }
1169
1170 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1172 if !self.connected_peers.contains_key(peer_id) {
1173 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1175 self.events.push_back(ToSwarm::Dial {
1176 opts: DialOpts::peer_id(*peer_id).build(),
1177 });
1178 }
1179 }
1180
1181 fn score_below_threshold(
1184 &self,
1185 peer_id: &PeerId,
1186 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1187 ) -> (bool, f64) {
1188 Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1189 }
1190
1191 fn score_below_threshold_from_scores(
1192 peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
1193 peer_id: &PeerId,
1194 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1195 ) -> (bool, f64) {
1196 if let Some((peer_score, thresholds, ..)) = peer_score {
1197 let score = peer_score.score(peer_id);
1198 if score < threshold(thresholds) {
1199 return (true, score);
1200 }
1201 (false, score)
1202 } else {
1203 (false, 0.0)
1204 }
1205 }
1206
1207 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1210 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1212 tracing::debug!(
1213 peer=%peer_id,
1214 %score,
1215 "IHAVE: ignoring peer with score below threshold"
1216 );
1217 return;
1218 }
1219
1220 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1222 *peer_have += 1;
1223 if *peer_have > self.config.max_ihave_messages() {
1224 tracing::debug!(
1225 peer=%peer_id,
1226 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1227 interval; ignoring",
1228 *peer_have
1229 );
1230 return;
1231 }
1232
1233 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1234 if *iasked >= self.config.max_ihave_length() {
1235 tracing::debug!(
1236 peer=%peer_id,
1237 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1238 *iasked
1239 );
1240 return;
1241 }
1242 }
1243
1244 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1245
1246 let mut iwant_ids = HashSet::new();
1247
1248 let want_message = |id: &MessageId| {
1249 if self.duplicate_cache.contains(id) {
1250 return false;
1251 }
1252
1253 !self.gossip_promises.contains(id)
1254 };
1255
1256 for (topic, ids) in ihave_msgs {
1257 if !self.mesh.contains_key(&topic) {
1259 tracing::debug!(
1260 %topic,
1261 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1262 );
1263 continue;
1264 }
1265
1266 for id in ids.into_iter().filter(want_message) {
1267 if iwant_ids.insert(id) {
1269 if let Some(metrics) = self.metrics.as_mut() {
1271 metrics.register_iwant(&topic);
1272 }
1273 }
1274 }
1275 }
1276
1277 if !iwant_ids.is_empty() {
1278 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1279 let mut iask = iwant_ids.len();
1280 if *iasked + iask > self.config.max_ihave_length() {
1281 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1282 }
1283
1284 tracing::debug!(
1286 peer=%peer_id,
1287 "IHAVE: Asking for {} out of {} messages from peer",
1288 iask,
1289 iwant_ids.len()
1290 );
1291
1292 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1294 let mut rng = thread_rng();
1295 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1296
1297 iwant_ids_vec.truncate(iask);
1298 *iasked += iask;
1299
1300 self.gossip_promises.add_promise(
1301 *peer_id,
1302 &iwant_ids_vec,
1303 Instant::now() + self.config.iwant_followup_time(),
1304 );
1305 tracing::trace!(
1306 peer=%peer_id,
1307 "IHAVE: Asking for the following messages from peer: {:?}",
1308 iwant_ids_vec
1309 );
1310
1311 self.send_message(
1312 *peer_id,
1313 RpcOut::IWant(IWant {
1314 message_ids: iwant_ids_vec,
1315 }),
1316 );
1317 }
1318 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1319 }
1320
1321 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1324 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1326 tracing::debug!(
1327 peer=%peer_id,
1328 "IWANT: ignoring peer with score below threshold [score = {}]",
1329 score
1330 );
1331 return;
1332 }
1333
1334 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1335
1336 for id in iwant_msgs {
1337 if let Some((msg, count)) = self
1340 .mcache
1341 .get_with_iwant_counts(&id, peer_id)
1342 .map(|(msg, count)| (msg.clone(), count))
1343 {
1344 if count > self.config.gossip_retransimission() {
1345 tracing::debug!(
1346 peer=%peer_id,
1347 message=%id,
1348 "IWANT: Peer has asked for message too many times; ignoring request"
1349 );
1350 } else {
1351 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1352 if peer.dont_send.contains_key(&id) {
1353 tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
1354 continue;
1355 }
1356 }
1357
1358 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1359 self.send_message(
1360 *peer_id,
1361 RpcOut::Forward {
1362 message: msg,
1363 timeout: Delay::new(self.config.forward_queue_duration()),
1364 },
1365 );
1366 }
1367 }
1368 }
1369 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1370 }
1371
1372 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1375 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1376
1377 let mut to_prune_topics = HashSet::new();
1378
1379 let mut do_px = self.config.do_px();
1380
1381 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1382 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1383 return;
1384 };
1385
1386 for topic in &topics {
1389 if connected_peer.topics.insert(topic.clone()) {
1390 if let Some(m) = self.metrics.as_mut() {
1391 m.inc_topic_peers(topic);
1392 }
1393 }
1394 }
1395
1396 if self.explicit_peers.contains(peer_id) {
1398 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1399 to_prune_topics = topics.into_iter().collect();
1401 do_px = false
1403 } else {
1404 let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1405 let now = Instant::now();
1406 for topic_hash in topics {
1407 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1408 if peers.contains(peer_id) {
1410 tracing::debug!(
1411 peer=%peer_id,
1412 topic=%&topic_hash,
1413 "GRAFT: Received graft for peer that is already in topic"
1414 );
1415 continue;
1416 }
1417
1418 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1420 {
1421 if backoff_time > now {
1422 tracing::warn!(
1423 peer=%peer_id,
1424 "[Penalty] Peer attempted graft within backoff time, penalizing"
1425 );
1426 if let Some((peer_score, ..)) = &mut self.peer_score {
1428 if let Some(metrics) = self.metrics.as_mut() {
1429 metrics.register_score_penalty(Penalty::GraftBackoff);
1430 }
1431 peer_score.add_penalty(peer_id, 1);
1432
1433 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1436 let flood_cutoff = (backoff_time
1437 + self.config.graft_flood_threshold())
1438 - self.config.prune_backoff();
1439 if flood_cutoff > now {
1440 peer_score.add_penalty(peer_id, 1);
1442 }
1443 }
1444 do_px = false;
1446
1447 to_prune_topics.insert(topic_hash.clone());
1448 continue;
1449 }
1450 }
1451
1452 if below_zero {
1454 tracing::debug!(
1456 peer=%peer_id,
1457 %score,
1458 topic=%topic_hash,
1459 "GRAFT: ignoring peer with negative score"
1460 );
1461 to_prune_topics.insert(topic_hash.clone());
1464 do_px = false;
1466 continue;
1467 }
1468
1469 if peers.len() >= self.config.mesh_n_high()
1472 && !self.outbound_peers.contains(peer_id)
1473 {
1474 to_prune_topics.insert(topic_hash.clone());
1475 continue;
1476 }
1477
1478 tracing::debug!(
1480 peer=%peer_id,
1481 topic=%topic_hash,
1482 "GRAFT: Mesh link added for peer in topic"
1483 );
1484
1485 if peers.insert(*peer_id) {
1486 if let Some(m) = self.metrics.as_mut() {
1487 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1488 }
1489 }
1490
1491 peer_added_to_mesh(
1493 *peer_id,
1494 vec![&topic_hash],
1495 &self.mesh,
1496 &mut self.events,
1497 &self.connected_peers,
1498 );
1499
1500 if let Some((peer_score, ..)) = &mut self.peer_score {
1501 peer_score.graft(peer_id, topic_hash);
1502 }
1503 } else {
1504 do_px = false;
1506 tracing::debug!(
1507 peer=%peer_id,
1508 topic=%topic_hash,
1509 "GRAFT: Received graft for unknown topic from peer"
1510 );
1511 continue;
1513 }
1514 }
1515 }
1516
1517 if !to_prune_topics.is_empty() {
1518 let on_unsubscribe = false;
1520
1521 for prune in to_prune_topics
1522 .iter()
1523 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1524 .collect::<Vec<_>>()
1525 {
1526 self.send_message(*peer_id, RpcOut::Prune(prune));
1527 }
1528 tracing::debug!(
1530 peer=%peer_id,
1531 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1532 );
1533 }
1534 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1535 }
1536
1537 fn remove_peer_from_mesh(
1538 &mut self,
1539 peer_id: &PeerId,
1540 topic_hash: &TopicHash,
1541 backoff: Option<u64>,
1542 always_update_backoff: bool,
1543 reason: Churn,
1544 ) {
1545 let mut update_backoff = always_update_backoff;
1546 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1547 if peers.remove(peer_id) {
1549 tracing::debug!(
1550 peer=%peer_id,
1551 topic=%topic_hash,
1552 "PRUNE: Removing peer from the mesh for topic"
1553 );
1554 if let Some(m) = self.metrics.as_mut() {
1555 m.peers_removed(topic_hash, reason, 1)
1556 }
1557
1558 if let Some((peer_score, ..)) = &mut self.peer_score {
1559 peer_score.prune(peer_id, topic_hash.clone());
1560 }
1561
1562 update_backoff = true;
1563
1564 peer_removed_from_mesh(
1566 *peer_id,
1567 topic_hash,
1568 &self.mesh,
1569 &mut self.events,
1570 &self.connected_peers,
1571 );
1572 }
1573 }
1574 if update_backoff {
1575 let time = if let Some(backoff) = backoff {
1576 Duration::from_secs(backoff)
1577 } else {
1578 self.config.prune_backoff()
1579 };
1580 self.backoffs.update_backoff(topic_hash, peer_id, time);
1582 }
1583 }
1584
1585 fn handle_prune(
1587 &mut self,
1588 peer_id: &PeerId,
1589 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1590 ) {
1591 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1592 let (below_threshold, score) =
1593 self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1594 for (topic_hash, px, backoff) in prune_data {
1595 self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1596
1597 if self.mesh.contains_key(&topic_hash) {
1598 if !px.is_empty() {
1600 if below_threshold {
1602 tracing::debug!(
1603 peer=%peer_id,
1604 %score,
1605 topic=%topic_hash,
1606 "PRUNE: ignoring PX from peer with insufficient score"
1607 );
1608 continue;
1609 }
1610
1611 if self.config.prune_peers() > 0 {
1618 self.px_connect(px);
1619 }
1620 }
1621 }
1622 }
1623 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1624 }
1625
1626 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1627 let n = self.config.prune_peers();
1628 px.retain(|p| p.peer_id.is_some());
1633 if px.len() > n {
1634 let mut rng = thread_rng();
1636 px.partial_shuffle(&mut rng, n);
1637 px = px.into_iter().take(n).collect();
1638 }
1639
1640 for p in px {
1641 if let Some(peer_id) = p.peer_id {
1644 self.px_peers.insert(peer_id);
1646
1647 self.events.push_back(ToSwarm::Dial {
1649 opts: DialOpts::peer_id(peer_id).build(),
1650 });
1651 }
1652 }
1653 }
1654
1655 fn message_is_valid(
1658 &mut self,
1659 msg_id: &MessageId,
1660 raw_message: &mut RawMessage,
1661 propagation_source: &PeerId,
1662 ) -> bool {
1663 tracing::debug!(
1664 peer=%propagation_source,
1665 message=%msg_id,
1666 "Handling message from peer"
1667 );
1668
1669 if self.blacklisted_peers.contains(propagation_source) {
1671 tracing::debug!(
1672 peer=%propagation_source,
1673 "Rejecting message from blacklisted peer"
1674 );
1675 self.gossip_promises
1676 .reject_message(msg_id, &RejectReason::BlackListedPeer);
1677 if let Some((peer_score, ..)) = &mut self.peer_score {
1678 peer_score.reject_message(
1679 propagation_source,
1680 msg_id,
1681 &raw_message.topic,
1682 RejectReason::BlackListedPeer,
1683 );
1684 }
1685 return false;
1686 }
1687
1688 if let Some(source) = raw_message.source.as_ref() {
1690 if self.blacklisted_peers.contains(source) {
1691 tracing::debug!(
1692 peer=%propagation_source,
1693 %source,
1694 "Rejecting message from peer because of blacklisted source"
1695 );
1696 self.handle_invalid_message(
1697 propagation_source,
1698 &raw_message.topic,
1699 Some(msg_id),
1700 RejectReason::BlackListedSource,
1701 );
1702 return false;
1703 }
1704 }
1705
1706 if !self.config.validate_messages() {
1710 raw_message.validated = true;
1711 }
1712
1713 let self_published = !self.config.allow_self_origin()
1715 && if let Some(own_id) = self.publish_config.get_own_id() {
1716 own_id != propagation_source
1717 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1718 } else {
1719 self.published_message_ids.contains(msg_id)
1720 };
1721
1722 if self_published {
1723 tracing::debug!(
1724 message=%msg_id,
1725 source=%propagation_source,
1726 "Dropping message claiming to be from self but forwarded from source"
1727 );
1728 self.handle_invalid_message(
1729 propagation_source,
1730 &raw_message.topic,
1731 Some(msg_id),
1732 RejectReason::SelfOrigin,
1733 );
1734 return false;
1735 }
1736
1737 true
1738 }
1739
1740 fn handle_received_message(
1744 &mut self,
1745 mut raw_message: RawMessage,
1746 propagation_source: &PeerId,
1747 ) {
1748 if let Some(metrics) = self.metrics.as_mut() {
1750 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1751 }
1752
1753 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1755 Ok(message) => message,
1756 Err(e) => {
1757 tracing::debug!("Invalid message. Transform error: {:?}", e);
1758 self.handle_invalid_message(
1760 propagation_source,
1761 &raw_message.topic,
1762 None,
1763 RejectReason::ValidationError(ValidationError::TransformFailed),
1764 );
1765 return;
1766 }
1767 };
1768
1769 let msg_id = self.config.message_id(&message);
1771
1772 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1774 self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
1775 }
1776
1777 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1781 return;
1782 }
1783
1784 if !self.duplicate_cache.insert(msg_id.clone()) {
1785 tracing::debug!(message=%msg_id, "Message already received, ignoring");
1786 if let Some((peer_score, ..)) = &mut self.peer_score {
1787 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1788 }
1789 self.mcache.observe_duplicate(&msg_id, propagation_source);
1790 return;
1791 }
1792
1793 tracing::debug!(
1794 message=%msg_id,
1795 "Put message in duplicate_cache and resolve promises"
1796 );
1797
1798 if let Some(metrics) = self.metrics.as_mut() {
1800 metrics.msg_recvd(&message.topic);
1801 }
1802
1803 self.gossip_promises.message_delivered(&msg_id);
1806
1807 if let Some((peer_score, ..)) = &mut self.peer_score {
1809 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1810 }
1811
1812 self.mcache.put(&msg_id, raw_message.clone());
1814
1815 if self.mesh.contains_key(&message.topic) {
1817 tracing::debug!("Sending received message to user");
1818 self.events
1819 .push_back(ToSwarm::GenerateEvent(Event::Message {
1820 propagation_source: *propagation_source,
1821 message_id: msg_id.clone(),
1822 message,
1823 }));
1824 } else {
1825 tracing::debug!(
1826 topic=%message.topic,
1827 "Received message on a topic we are not subscribed to"
1828 );
1829 return;
1830 }
1831
1832 if !self.config.validate_messages() {
1834 self.forward_msg(
1835 &msg_id,
1836 raw_message,
1837 Some(propagation_source),
1838 HashSet::new(),
1839 );
1840 tracing::debug!(message=%msg_id, "Completed message handling for message");
1841 }
1842 }
1843
1844 fn handle_invalid_message(
1846 &mut self,
1847 propagation_source: &PeerId,
1848 topic_hash: &TopicHash,
1849 message_id: Option<&MessageId>,
1850 reject_reason: RejectReason,
1851 ) {
1852 if let Some(metrics) = self.metrics.as_mut() {
1853 metrics.register_invalid_message(topic_hash);
1854 }
1855 if let Some(msg_id) = message_id {
1856 self.gossip_promises.reject_message(msg_id, &reject_reason);
1858 }
1859 if let Some((peer_score, ..)) = &mut self.peer_score {
1860 if let Some(msg_id) = message_id {
1862 peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1865 } else {
1866 peer_score.reject_invalid_message(propagation_source, topic_hash);
1870 }
1871 }
1872 }
1873
1874 fn handle_received_subscriptions(
1876 &mut self,
1877 subscriptions: &[Subscription],
1878 propagation_source: &PeerId,
1879 ) {
1880 tracing::debug!(
1881 source=%propagation_source,
1882 "Handling subscriptions: {:?}",
1883 subscriptions,
1884 );
1885
1886 let mut unsubscribed_peers = Vec::new();
1887
1888 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1889 tracing::error!(
1890 peer=%propagation_source,
1891 "Subscription by unknown peer"
1892 );
1893 return;
1894 };
1895
1896 let mut topics_to_graft = Vec::new();
1898
1899 let mut application_event = Vec::new();
1901
1902 let filtered_topics = match self
1903 .subscription_filter
1904 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1905 {
1906 Ok(topics) => topics,
1907 Err(s) => {
1908 tracing::error!(
1909 peer=%propagation_source,
1910 "Subscription filter error: {}; ignoring RPC from peer",
1911 s
1912 );
1913 return;
1914 }
1915 };
1916
1917 for subscription in filtered_topics {
1918 let topic_hash = &subscription.topic_hash;
1920
1921 match subscription.action {
1922 SubscriptionAction::Subscribe => {
1923 if peer.topics.insert(topic_hash.clone()) {
1924 tracing::debug!(
1925 peer=%propagation_source,
1926 topic=%topic_hash,
1927 "SUBSCRIPTION: Adding gossip peer to topic"
1928 );
1929
1930 if let Some(m) = self.metrics.as_mut() {
1931 m.inc_topic_peers(topic_hash);
1932 }
1933 }
1934
1935 if !self.explicit_peers.contains(propagation_source)
1937 && peer.kind.is_gossipsub()
1938 && !Self::score_below_threshold_from_scores(
1939 &self.peer_score,
1940 propagation_source,
1941 |_| 0.0,
1942 )
1943 .0
1944 && !self
1945 .backoffs
1946 .is_backoff_with_slack(topic_hash, propagation_source)
1947 {
1948 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1949 if peers.len() < self.config.mesh_n_low()
1950 && peers.insert(*propagation_source)
1951 {
1952 tracing::debug!(
1953 peer=%propagation_source,
1954 topic=%topic_hash,
1955 "SUBSCRIPTION: Adding peer to the mesh for topic"
1956 );
1957 if let Some(m) = self.metrics.as_mut() {
1958 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1959 }
1960 tracing::debug!(
1962 peer=%propagation_source,
1963 topic=%topic_hash,
1964 "Sending GRAFT to peer for topic"
1965 );
1966 if let Some((peer_score, ..)) = &mut self.peer_score {
1967 peer_score.graft(propagation_source, topic_hash.clone());
1968 }
1969 topics_to_graft.push(topic_hash.clone());
1970 }
1971 }
1972 }
1973 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1975 peer_id: *propagation_source,
1976 topic: topic_hash.clone(),
1977 }));
1978 }
1979 SubscriptionAction::Unsubscribe => {
1980 if peer.topics.remove(topic_hash) {
1981 tracing::debug!(
1982 peer=%propagation_source,
1983 topic=%topic_hash,
1984 "SUBSCRIPTION: Removing gossip peer from topic"
1985 );
1986
1987 if let Some(m) = self.metrics.as_mut() {
1988 m.dec_topic_peers(topic_hash);
1989 }
1990 }
1991
1992 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1993 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1995 peer_id: *propagation_source,
1996 topic: topic_hash.clone(),
1997 }));
1998 }
1999 }
2000 }
2001
2002 for (peer_id, topic_hash) in unsubscribed_peers {
2004 self.fanout
2005 .get_mut(&topic_hash)
2006 .map(|peers| peers.remove(&peer_id));
2007 self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
2008 }
2009
2010 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2012 if !topics_joined.is_empty() {
2013 peer_added_to_mesh(
2014 *propagation_source,
2015 topics_joined,
2016 &self.mesh,
2017 &mut self.events,
2018 &self.connected_peers,
2019 );
2020 }
2021
2022 for topic_hash in topics_to_graft.into_iter() {
2025 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2026 }
2027
2028 for event in application_event {
2030 self.events.push_back(event);
2031 }
2032
2033 tracing::trace!(
2034 source=%propagation_source,
2035 "Completed handling subscriptions from source"
2036 );
2037 }
2038
2039 fn apply_iwant_penalties(&mut self) {
2041 if let Some((peer_score, ..)) = &mut self.peer_score {
2042 for (peer, count) in self.gossip_promises.get_broken_promises() {
2043 peer_score.add_penalty(&peer, count);
2044 if let Some(metrics) = self.metrics.as_mut() {
2045 metrics.register_score_penalty(Penalty::BrokenPromise);
2046 }
2047 }
2048 }
2049 }
2050
2051 fn heartbeat(&mut self) {
2053 tracing::debug!("Starting heartbeat");
2054 let start = Instant::now();
2055
2056 if let Some(m) = &mut self.metrics {
2060 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2061 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2062 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2063 }
2064 }
2065
2066 self.heartbeat_ticks += 1;
2067
2068 let mut to_graft = HashMap::new();
2069 let mut to_prune = HashMap::new();
2070 let mut no_px = HashSet::new();
2071
2072 self.backoffs.heartbeat();
2074
2075 self.count_sent_iwant.clear();
2077 self.count_received_ihave.clear();
2078
2079 self.apply_iwant_penalties();
2081
2082 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2084 for p in self.explicit_peers.clone() {
2085 self.check_explicit_peer_connection(&p);
2086 }
2087 }
2088
2089 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2091 if let Some((peer_score, ..)) = &self.peer_score {
2092 for peer_id in self.connected_peers.keys() {
2093 scores
2094 .entry(peer_id)
2095 .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2096 }
2097 }
2098
2099 for (topic_hash, peers) in self.mesh.iter_mut() {
2101 let explicit_peers = &self.explicit_peers;
2102 let backoffs = &self.backoffs;
2103 let outbound_peers = &self.outbound_peers;
2104
2105 let mut to_remove_peers = Vec::new();
2109 for peer_id in peers.iter() {
2110 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2111
2112 if let Some(metrics) = self.metrics.as_mut() {
2114 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2115 }
2116
2117 if peer_score < 0.0 {
2118 tracing::debug!(
2119 peer=%peer_id,
2120 score=%peer_score,
2121 topic=%topic_hash,
2122 "HEARTBEAT: Prune peer with negative score"
2123 );
2124
2125 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2126 current_topic.push(topic_hash.clone());
2127 no_px.insert(*peer_id);
2128 to_remove_peers.push(*peer_id);
2129 }
2130 }
2131
2132 if let Some(m) = self.metrics.as_mut() {
2133 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2134 }
2135
2136 for peer_id in to_remove_peers {
2137 peers.remove(&peer_id);
2138 }
2139
2140 if peers.len() < self.config.mesh_n_low() {
2142 tracing::debug!(
2143 topic=%topic_hash,
2144 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2145 peers.len(),
2146 self.config.mesh_n_low()
2147 );
2148 let desired_peers = self.config.mesh_n() - peers.len();
2150 let peer_list =
2151 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2152 !peers.contains(peer)
2153 && !explicit_peers.contains(peer)
2154 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2155 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2156 });
2157 for peer in &peer_list {
2158 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2159 current_topic.push(topic_hash.clone());
2160 }
2161 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2163 if let Some(m) = self.metrics.as_mut() {
2164 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2165 }
2166 peers.extend(peer_list);
2167 }
2168
2169 if peers.len() > self.config.mesh_n_high() {
2171 tracing::debug!(
2172 topic=%topic_hash,
2173 "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2174 peers.len(),
2175 self.config.mesh_n_high()
2176 );
2177 let excess_peer_no = peers.len() - self.config.mesh_n();
2178
2179 let mut rng = thread_rng();
2181 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2182 shuffled.shuffle(&mut rng);
2183 shuffled.sort_by(|p1, p2| {
2184 let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2185 let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2186
2187 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2188 });
2189 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2191
2192 let mut outbound = {
2194 let outbound_peers = &self.outbound_peers;
2195 shuffled
2196 .iter()
2197 .filter(|p| outbound_peers.contains(*p))
2198 .count()
2199 };
2200
2201 let mut removed = 0;
2204 for peer in shuffled {
2205 if removed == excess_peer_no {
2206 break;
2207 }
2208 if self.outbound_peers.contains(&peer) {
2209 if outbound <= self.config.mesh_outbound_min() {
2210 continue;
2212 }
2213 outbound -= 1;
2215 }
2216
2217 peers.remove(&peer);
2219 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2220 current_topic.push(topic_hash.clone());
2221 removed += 1;
2222 }
2223
2224 if let Some(m) = self.metrics.as_mut() {
2225 m.peers_removed(topic_hash, Churn::Excess, removed)
2226 }
2227 }
2228
2229 if peers.len() >= self.config.mesh_n_low() {
2231 let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2233
2234 if outbound < self.config.mesh_outbound_min() {
2236 let needed = self.config.mesh_outbound_min() - outbound;
2237 let peer_list =
2238 get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2239 !peers.contains(peer)
2240 && !explicit_peers.contains(peer)
2241 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2242 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2243 && outbound_peers.contains(peer)
2244 });
2245 for peer in &peer_list {
2246 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2247 current_topic.push(topic_hash.clone());
2248 }
2249 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2251 if let Some(m) = self.metrics.as_mut() {
2252 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2253 }
2254 peers.extend(peer_list);
2255 }
2256 }
2257
2258 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2260 && peers.len() > 1
2261 && self.peer_score.is_some()
2262 {
2263 if let Some((_, thresholds, _)) = &self.peer_score {
2264 let mut peers_by_score: Vec<_> = peers.iter().collect();
2274 peers_by_score.sort_by(|p1, p2| {
2275 let p1_score = *scores.get(p1).unwrap_or(&0.0);
2276 let p2_score = *scores.get(p2).unwrap_or(&0.0);
2277 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2278 });
2279
2280 let middle = peers_by_score.len() / 2;
2281 let median = if peers_by_score.len() % 2 == 0 {
2282 let sub_middle_peer = *peers_by_score
2283 .get(middle - 1)
2284 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2285 let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2286 let middle_peer =
2287 *peers_by_score.get(middle).expect("middle < vector length");
2288 let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2289
2290 (sub_middle_score + middle_score) * 0.5
2291 } else {
2292 *scores
2293 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2294 .unwrap_or(&0.0)
2295 };
2296
2297 if median < thresholds.opportunistic_graft_threshold {
2300 let peer_list = get_random_peers(
2301 &self.connected_peers,
2302 topic_hash,
2303 self.config.opportunistic_graft_peers(),
2304 |peer_id| {
2305 !peers.contains(peer_id)
2306 && !explicit_peers.contains(peer_id)
2307 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2308 && *scores.get(peer_id).unwrap_or(&0.0) > median
2309 },
2310 );
2311 for peer in &peer_list {
2312 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2313 current_topic.push(topic_hash.clone());
2314 }
2315 tracing::debug!(
2317 topic=%topic_hash,
2318 "Opportunistically graft in topic with peers {:?}",
2319 peer_list
2320 );
2321 if let Some(m) = self.metrics.as_mut() {
2322 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2323 }
2324 peers.extend(peer_list);
2325 }
2326 }
2327 }
2328 if let Some(m) = self.metrics.as_mut() {
2330 m.set_mesh_peers(topic_hash, peers.len())
2331 }
2332 }
2333
2334 {
2336 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2338 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2339 if *last_pub_time + fanout_ttl < Instant::now() {
2340 tracing::debug!(
2341 topic=%topic_hash,
2342 "HEARTBEAT: Fanout topic removed due to timeout"
2343 );
2344 fanout.remove(topic_hash);
2345 return false;
2346 }
2347 true
2348 });
2349 }
2350
2351 for (topic_hash, peers) in self.fanout.iter_mut() {
2354 let mut to_remove_peers = Vec::new();
2355 let publish_threshold = match &self.peer_score {
2356 Some((_, thresholds, _)) => thresholds.publish_threshold,
2357 _ => 0.0,
2358 };
2359 for peer_id in peers.iter() {
2360 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2362 match self.connected_peers.get(peer_id) {
2363 Some(peer) => {
2364 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2365 tracing::debug!(
2366 topic=%topic_hash,
2367 "HEARTBEAT: Peer removed from fanout for topic"
2368 );
2369 to_remove_peers.push(*peer_id);
2370 }
2371 }
2372 None => {
2373 to_remove_peers.push(*peer_id);
2375 }
2376 }
2377 }
2378 for to_remove in to_remove_peers {
2379 peers.remove(&to_remove);
2380 }
2381
2382 if peers.len() < self.config.mesh_n() {
2384 tracing::debug!(
2385 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2386 peers.len(),
2387 self.config.mesh_n()
2388 );
2389 let needed_peers = self.config.mesh_n() - peers.len();
2390 let explicit_peers = &self.explicit_peers;
2391 let new_peers =
2392 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2393 !peers.contains(peer_id)
2394 && !explicit_peers.contains(peer_id)
2395 && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2396 });
2397 peers.extend(new_peers);
2398 }
2399 }
2400
2401 if self.peer_score.is_some() {
2402 tracing::trace!("Mesh message deliveries: {:?}", {
2403 self.mesh
2404 .iter()
2405 .map(|(t, peers)| {
2406 (
2407 t.clone(),
2408 peers
2409 .iter()
2410 .map(|p| {
2411 (
2412 *p,
2413 self.peer_score
2414 .as_ref()
2415 .expect("peer_score.is_some()")
2416 .0
2417 .mesh_message_deliveries(p, t)
2418 .unwrap_or(0.0),
2419 )
2420 })
2421 .collect::<HashMap<PeerId, f64>>(),
2422 )
2423 })
2424 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2425 })
2426 }
2427
2428 self.emit_gossip();
2429
2430 if !to_graft.is_empty() | !to_prune.is_empty() {
2432 self.send_graft_prune(to_graft, to_prune, no_px);
2433 }
2434
2435 self.mcache.shift();
2437
2438 for (peer_id, failed_messages) in self.failed_messages.drain() {
2440 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2441 self.events
2442 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2443 peer_id,
2444 failed_messages,
2445 }));
2446 }
2447 self.failed_messages.shrink_to_fit();
2448
2449 for peer in self.connected_peers.values_mut() {
2451 while let Some((_front, instant)) = peer.dont_send.front() {
2452 if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2453 break;
2454 } else {
2455 peer.dont_send.pop_front();
2456 }
2457 }
2458 }
2459
2460 tracing::debug!("Completed Heartbeat");
2461 if let Some(metrics) = self.metrics.as_mut() {
2462 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2463 metrics.observe_heartbeat_duration(duration);
2464 }
2465 }
2466
2467 fn emit_gossip(&mut self) {
2470 let mut rng = thread_rng();
2471 let mut messages = Vec::new();
2472 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2473 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2474 if message_ids.is_empty() {
2475 continue;
2476 }
2477
2478 if message_ids.len() > self.config.max_ihave_length() {
2480 tracing::debug!(
2482 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2483 message_ids.len()
2484 );
2485 } else {
2486 message_ids.shuffle(&mut rng);
2488 }
2489
2490 let n_map = |m| {
2492 max(
2493 self.config.gossip_lazy(),
2494 (self.config.gossip_factor() * m as f64) as usize,
2495 )
2496 };
2497 let to_msg_peers =
2499 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2500 !peers.contains(peer)
2501 && !self.explicit_peers.contains(peer)
2502 && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2503 });
2504
2505 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2506
2507 for peer_id in to_msg_peers {
2508 let mut peer_message_ids = message_ids.clone();
2509
2510 if peer_message_ids.len() > self.config.max_ihave_length() {
2511 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2515 peer_message_ids.truncate(self.config.max_ihave_length());
2516 }
2517
2518 messages.push((
2520 peer_id,
2521 RpcOut::IHave(IHave {
2522 topic_hash: topic_hash.clone(),
2523 message_ids: peer_message_ids,
2524 }),
2525 ));
2526 }
2527 }
2528 for (peer_id, message) in messages {
2529 self.send_message(peer_id, message);
2530 }
2531 }
2532
2533 fn send_graft_prune(
2536 &mut self,
2537 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2538 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2539 no_px: HashSet<PeerId>,
2540 ) {
2541 for (peer_id, topics) in to_graft.into_iter() {
2543 for topic in &topics {
2544 if let Some((peer_score, ..)) = &mut self.peer_score {
2546 peer_score.graft(&peer_id, topic.clone());
2547 }
2548
2549 peer_added_to_mesh(
2552 peer_id,
2553 vec![topic],
2554 &self.mesh,
2555 &mut self.events,
2556 &self.connected_peers,
2557 );
2558 }
2559 let rpc_msgs = topics.iter().map(|topic_hash| {
2560 RpcOut::Graft(Graft {
2561 topic_hash: topic_hash.clone(),
2562 })
2563 });
2564
2565 let prune_msgs = to_prune
2572 .remove(&peer_id)
2573 .into_iter()
2574 .flatten()
2575 .map(|topic_hash| {
2576 let prune = self.make_prune(
2577 &topic_hash,
2578 &peer_id,
2579 self.config.do_px() && !no_px.contains(&peer_id),
2580 false,
2581 );
2582 RpcOut::Prune(prune)
2583 });
2584
2585 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2587 self.send_message(peer_id, msg);
2588 }
2589 }
2590
2591 for (peer_id, topics) in to_prune.iter() {
2594 for topic_hash in topics {
2595 let prune = self.make_prune(
2596 topic_hash,
2597 peer_id,
2598 self.config.do_px() && !no_px.contains(peer_id),
2599 false,
2600 );
2601 self.send_message(*peer_id, RpcOut::Prune(prune));
2602
2603 peer_removed_from_mesh(
2605 *peer_id,
2606 topic_hash,
2607 &self.mesh,
2608 &mut self.events,
2609 &self.connected_peers,
2610 );
2611 }
2612 }
2613 }
2614
2615 fn send_idontwant(
2617 &mut self,
2618 message: &RawMessage,
2619 msg_id: &MessageId,
2620 propagation_source: Option<&PeerId>,
2621 ) {
2622 let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2623 return;
2624 };
2625
2626 let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
2627
2628 let recipient_peers: Vec<PeerId> = mesh_peers
2629 .iter()
2630 .chain(iwant_peers.iter())
2631 .filter(|&peer_id| {
2632 Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
2633 })
2634 .cloned()
2635 .collect();
2636
2637 for peer_id in recipient_peers {
2638 let Some(peer) = self.connected_peers.get_mut(&peer_id) else {
2639 tracing::error!(peer = %peer_id,
2640 "Could not IDONTWANT, peer doesn't exist in connected peer list");
2641 continue;
2642 };
2643
2644 if peer.kind != PeerKind::Gossipsubv1_2 {
2646 continue;
2647 }
2648
2649 self.send_message(
2650 peer_id,
2651 RpcOut::IDontWant(IDontWant {
2652 message_ids: vec![msg_id.clone()],
2653 }),
2654 );
2655 }
2656 }
2657
2658 fn forward_msg(
2662 &mut self,
2663 msg_id: &MessageId,
2664 message: RawMessage,
2665 propagation_source: Option<&PeerId>,
2666 originating_peers: HashSet<PeerId>,
2667 ) -> bool {
2668 if let Some((peer_score, ..)) = &mut self.peer_score {
2670 if let Some(peer) = propagation_source {
2671 peer_score.deliver_message(peer, msg_id, &message.topic);
2672 }
2673 }
2674
2675 tracing::debug!(message=%msg_id, "Forwarding message");
2676 let mut recipient_peers = HashSet::new();
2677
2678 for (peer_id, peer) in &self.connected_peers {
2682 if Some(peer_id) != propagation_source
2683 && !originating_peers.contains(peer_id)
2684 && Some(peer_id) != message.source.as_ref()
2685 && peer.topics.contains(&message.topic)
2686 && (self.explicit_peers.contains(peer_id)
2687 || (peer.kind == PeerKind::Floodsub
2688 && !self
2689 .score_below_threshold(peer_id, |ts| ts.publish_threshold)
2690 .0))
2691 {
2692 recipient_peers.insert(*peer_id);
2693 }
2694 }
2695
2696 let topic = &message.topic;
2698 if let Some(mesh_peers) = self.mesh.get(topic) {
2700 for peer_id in mesh_peers {
2701 if Some(peer_id) != propagation_source
2702 && !originating_peers.contains(peer_id)
2703 && Some(peer_id) != message.source.as_ref()
2704 {
2705 recipient_peers.insert(*peer_id);
2706 }
2707 }
2708 }
2709
2710 if recipient_peers.is_empty() {
2711 return false;
2712 }
2713
2714 for peer_id in recipient_peers.iter() {
2716 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2717 if peer.dont_send.contains_key(msg_id) {
2718 tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
2719 continue;
2720 }
2721
2722 tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
2723
2724 self.send_message(
2725 *peer_id,
2726 RpcOut::Forward {
2727 message: message.clone(),
2728 timeout: Delay::new(self.config.forward_queue_duration()),
2729 },
2730 );
2731 }
2732 }
2733 tracing::debug!("Completed forwarding message");
2734 true
2735 }
2736
2737 pub(crate) fn build_raw_message(
2739 &mut self,
2740 topic: TopicHash,
2741 data: Vec<u8>,
2742 ) -> Result<RawMessage, PublishError> {
2743 match &mut self.publish_config {
2744 PublishConfig::Signing {
2745 ref keypair,
2746 author,
2747 inline_key,
2748 last_seq_no,
2749 } => {
2750 let sequence_number = last_seq_no.next();
2751
2752 let signature = {
2753 let message = proto::Message {
2754 from: Some(author.to_bytes()),
2755 data: Some(data.clone()),
2756 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2757 topic: topic.clone().into_string(),
2758 signature: None,
2759 key: None,
2760 };
2761
2762 let mut buf = Vec::with_capacity(message.get_size());
2763 let mut writer = Writer::new(&mut buf);
2764
2765 message
2766 .write_message(&mut writer)
2767 .expect("Encoding to succeed");
2768
2769 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2771 signature_bytes.extend_from_slice(&buf);
2772 Some(keypair.sign(&signature_bytes)?)
2773 };
2774
2775 Ok(RawMessage {
2776 source: Some(*author),
2777 data,
2778 sequence_number: Some(sequence_number),
2781 topic,
2782 signature,
2783 key: inline_key.clone(),
2784 validated: true, })
2786 }
2787 PublishConfig::Author(peer_id) => {
2788 Ok(RawMessage {
2789 source: Some(*peer_id),
2790 data,
2791 sequence_number: Some(rand::random()),
2794 topic,
2795 signature: None,
2796 key: None,
2797 validated: true, })
2799 }
2800 PublishConfig::RandomAuthor => {
2801 Ok(RawMessage {
2802 source: Some(PeerId::random()),
2803 data,
2804 sequence_number: Some(rand::random()),
2807 topic,
2808 signature: None,
2809 key: None,
2810 validated: true, })
2812 }
2813 PublishConfig::Anonymous => {
2814 Ok(RawMessage {
2815 source: None,
2816 data,
2817 sequence_number: None,
2820 topic,
2821 signature: None,
2822 key: None,
2823 validated: true, })
2825 }
2826 }
2827 }
2828
2829 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2836 if let Some(m) = self.metrics.as_mut() {
2837 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2838 m.msg_sent(&message.topic, message.raw_protobuf_len());
2840 }
2841 }
2842
2843 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2844 tracing::error!(peer = %peer_id,
2845 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2846 return false;
2847 };
2848
2849 match peer.sender.send_message(rpc) {
2851 Ok(()) => true,
2852 Err(rpc) => {
2853 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2855
2856 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2858 match rpc {
2859 RpcOut::Publish { .. } => {
2860 failed_messages.priority += 1;
2861 failed_messages.publish += 1;
2862 }
2863 RpcOut::Forward { .. } => {
2864 failed_messages.non_priority += 1;
2865 failed_messages.forward += 1;
2866 }
2867 RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2868 failed_messages.non_priority += 1;
2869 }
2870 RpcOut::Graft(_)
2871 | RpcOut::Prune(_)
2872 | RpcOut::Subscribe(_)
2873 | RpcOut::Unsubscribe(_) => {
2874 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2875 }
2876 }
2877
2878 if let Some((peer_score, ..)) = &mut self.peer_score {
2880 peer_score.failed_message_slow_peer(&peer_id);
2881 }
2882
2883 false
2884 }
2885 }
2886 }
2887
2888 fn on_connection_established(
2889 &mut self,
2890 ConnectionEstablished {
2891 peer_id,
2892 endpoint,
2893 other_established,
2894 ..
2895 }: ConnectionEstablished,
2896 ) {
2897 if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2901 self.outbound_peers.insert(peer_id);
2904 }
2905
2906 if let Some((peer_score, ..)) = &mut self.peer_score {
2908 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2909 peer_score.add_ip(&peer_id, ip);
2910 } else {
2911 tracing::trace!(
2912 peer=%peer_id,
2913 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2914 endpoint
2915 )
2916 }
2917 }
2918
2919 if other_established > 0 {
2920 return; }
2922
2923 if let Some((peer_score, ..)) = &mut self.peer_score {
2924 peer_score.add_peer(peer_id);
2925 }
2926
2927 if self.blacklisted_peers.contains(&peer_id) {
2929 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2930 return;
2931 }
2932
2933 tracing::debug!(peer=%peer_id, "New peer connected");
2934 for topic_hash in self.mesh.clone().into_keys() {
2936 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2937 }
2938 }
2939
2940 fn on_connection_closed(
2941 &mut self,
2942 ConnectionClosed {
2943 peer_id,
2944 connection_id,
2945 endpoint,
2946 remaining_established,
2947 ..
2948 }: ConnectionClosed,
2949 ) {
2950 if let Some((peer_score, ..)) = &mut self.peer_score {
2952 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2953 peer_score.remove_ip(&peer_id, &ip);
2954 } else {
2955 tracing::trace!(
2956 peer=%peer_id,
2957 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2958 endpoint
2959 )
2960 }
2961 }
2962
2963 if remaining_established != 0 {
2964 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2966 let index = peer
2967 .connections
2968 .iter()
2969 .position(|v| v == &connection_id)
2970 .expect("Previously established connection to peer must be present");
2971 peer.connections.remove(index);
2972
2973 if !peer.connections.is_empty() {
2976 for topic in &peer.topics {
2977 if let Some(mesh_peers) = self.mesh.get(topic) {
2978 if mesh_peers.contains(&peer_id) {
2979 self.events.push_back(ToSwarm::NotifyHandler {
2980 peer_id,
2981 event: HandlerIn::JoinedMesh,
2982 handler: NotifyHandler::One(peer.connections[0]),
2983 });
2984 break;
2985 }
2986 }
2987 }
2988 }
2989 }
2990 } else {
2991 tracing::debug!(peer=%peer_id, "Peer disconnected");
2993 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2994 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2995 return;
2996 };
2997
2998 for topic in &connected_peer.topics {
3000 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3002 if mesh_peers.remove(&peer_id) {
3004 if let Some(m) = self.metrics.as_mut() {
3005 m.peers_removed(topic, Churn::Dc, 1);
3006 m.set_mesh_peers(topic, mesh_peers.len());
3007 }
3008 };
3009 }
3010
3011 if let Some(m) = self.metrics.as_mut() {
3012 m.dec_topic_peers(topic);
3013 }
3014
3015 self.fanout
3017 .get_mut(topic)
3018 .map(|peers| peers.remove(&peer_id));
3019 }
3020
3021 self.px_peers.remove(&peer_id);
3023 self.outbound_peers.remove(&peer_id);
3024
3025 if let Some(metrics) = self.metrics.as_mut() {
3027 metrics.peer_protocol_disconnected(connected_peer.kind);
3028 }
3029
3030 self.connected_peers.remove(&peer_id);
3031
3032 if let Some((peer_score, ..)) = &mut self.peer_score {
3033 peer_score.remove_peer(&peer_id);
3034 }
3035 }
3036 }
3037
3038 fn on_address_change(
3039 &mut self,
3040 AddressChange {
3041 peer_id,
3042 old: endpoint_old,
3043 new: endpoint_new,
3044 ..
3045 }: AddressChange,
3046 ) {
3047 if let Some((peer_score, ..)) = &mut self.peer_score {
3049 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3050 peer_score.remove_ip(&peer_id, &ip);
3051 } else {
3052 tracing::trace!(
3053 peer=%&peer_id,
3054 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3055 endpoint_old
3056 )
3057 }
3058 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3059 peer_score.add_ip(&peer_id, ip);
3060 } else {
3061 tracing::trace!(
3062 peer=%peer_id,
3063 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3064 endpoint_new
3065 )
3066 }
3067 }
3068 }
3069
3070 pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3072 if let Some(metrics) = &mut self.metrics {
3073 metrics.register_allowed_topics(topics);
3074 }
3075 }
3076}
3077
3078fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3079 addr.iter().find_map(|p| match p {
3080 Ip4(addr) => Some(IpAddr::V4(addr)),
3081 Ip6(addr) => Some(IpAddr::V6(addr)),
3082 _ => None,
3083 })
3084}
3085
3086impl<C, F> NetworkBehaviour for Behaviour<C, F>
3087where
3088 C: Send + 'static + DataTransform,
3089 F: Send + 'static + TopicSubscriptionFilter,
3090{
3091 type ConnectionHandler = Handler;
3092 type ToSwarm = Event;
3093
3094 fn handle_established_inbound_connection(
3095 &mut self,
3096 connection_id: ConnectionId,
3097 peer_id: PeerId,
3098 _: &Multiaddr,
3099 _: &Multiaddr,
3100 ) -> Result<THandler<Self>, ConnectionDenied> {
3101 let connected_peer = self
3107 .connected_peers
3108 .entry(peer_id)
3109 .or_insert(PeerConnections {
3110 kind: PeerKind::Floodsub,
3111 connections: vec![],
3112 sender: Sender::new(self.config.connection_handler_queue_len()),
3113 topics: Default::default(),
3114 dont_send: LinkedHashMap::new(),
3115 });
3116 connected_peer.connections.push(connection_id);
3118
3119 Ok(Handler::new(
3120 self.config.protocol_config(),
3121 connected_peer.sender.new_receiver(),
3122 ))
3123 }
3124
3125 fn handle_established_outbound_connection(
3126 &mut self,
3127 connection_id: ConnectionId,
3128 peer_id: PeerId,
3129 _: &Multiaddr,
3130 _: Endpoint,
3131 _: PortUse,
3132 ) -> Result<THandler<Self>, ConnectionDenied> {
3133 let connected_peer = self
3134 .connected_peers
3135 .entry(peer_id)
3136 .or_insert(PeerConnections {
3137 kind: PeerKind::Floodsub,
3138 connections: vec![],
3139 sender: Sender::new(self.config.connection_handler_queue_len()),
3140 topics: Default::default(),
3141 dont_send: LinkedHashMap::new(),
3142 });
3143 connected_peer.connections.push(connection_id);
3145
3146 Ok(Handler::new(
3147 self.config.protocol_config(),
3148 connected_peer.sender.new_receiver(),
3149 ))
3150 }
3151
3152 fn on_connection_handler_event(
3153 &mut self,
3154 propagation_source: PeerId,
3155 _connection_id: ConnectionId,
3156 handler_event: THandlerOutEvent<Self>,
3157 ) {
3158 match handler_event {
3159 HandlerEvent::PeerKind(kind) => {
3160 if let Some(metrics) = self.metrics.as_mut() {
3163 metrics.peer_protocol_connected(kind);
3164 }
3165
3166 if let PeerKind::NotSupported = kind {
3167 tracing::debug!(
3168 peer=%propagation_source,
3169 "Peer does not support gossipsub protocols"
3170 );
3171 self.events
3172 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3173 peer_id: propagation_source,
3174 }));
3175 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3176 tracing::debug!(
3180 peer=%propagation_source,
3181 peer_type=%kind,
3182 "New peer type found for peer"
3183 );
3184 if let PeerKind::Floodsub = conn.kind {
3185 conn.kind = kind;
3186 }
3187 }
3188 }
3189 HandlerEvent::MessageDropped(rpc) => {
3190 if let Some((peer_score, _, _)) = &mut self.peer_score {
3192 peer_score.failed_message_slow_peer(&propagation_source);
3193 }
3194
3195 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3197 failed_messages.timeout += 1;
3198 match rpc {
3199 RpcOut::Publish { .. } => {
3200 failed_messages.publish += 1;
3201 }
3202 RpcOut::Forward { .. } => {
3203 failed_messages.forward += 1;
3204 }
3205 _ => {}
3206 }
3207
3208 if let Some(metrics) = self.metrics.as_mut() {
3210 match rpc {
3211 RpcOut::Publish { message, .. } => {
3212 metrics.publish_msg_dropped(&message.topic);
3213 metrics.timeout_msg_dropped(&message.topic);
3214 }
3215 RpcOut::Forward { message, .. } => {
3216 metrics.forward_msg_dropped(&message.topic);
3217 metrics.timeout_msg_dropped(&message.topic);
3218 }
3219 _ => {}
3220 }
3221 }
3222 }
3223 HandlerEvent::Message {
3224 rpc,
3225 invalid_messages,
3226 } => {
3227 if !rpc.subscriptions.is_empty() {
3232 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3233 }
3234
3235 if let (true, _) =
3237 self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3238 {
3239 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3240 return;
3241 }
3242
3243 if self.peer_score.is_some() {
3245 for (raw_message, validation_error) in invalid_messages {
3246 self.handle_invalid_message(
3247 &propagation_source,
3248 &raw_message.topic,
3249 None,
3250 RejectReason::ValidationError(validation_error),
3251 )
3252 }
3253 } else {
3254 for (message, validation_error) in invalid_messages {
3256 tracing::warn!(
3257 peer=%propagation_source,
3258 source=?message.source,
3259 "Invalid message from peer. Reason: {:?}",
3260 validation_error,
3261 );
3262 }
3263 }
3264
3265 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3267 if self
3269 .config
3270 .max_messages_per_rpc()
3271 .is_some_and(|max_msg| count >= max_msg)
3272 {
3273 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3274 break;
3275 }
3276 self.handle_received_message(raw_message, &propagation_source);
3277 }
3278
3279 let mut ihave_msgs = vec![];
3283 let mut graft_msgs = vec![];
3284 let mut prune_msgs = vec![];
3285 for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3286 if self
3288 .config
3289 .max_messages_per_rpc()
3290 .is_some_and(|max_msg| count >= max_msg)
3291 {
3292 tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3293 break;
3294 }
3295
3296 match control_msg {
3297 ControlAction::IHave(IHave {
3298 topic_hash,
3299 message_ids,
3300 }) => {
3301 ihave_msgs.push((topic_hash, message_ids));
3302 }
3303 ControlAction::IWant(IWant { message_ids }) => {
3304 self.handle_iwant(&propagation_source, message_ids)
3305 }
3306 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3307 ControlAction::Prune(Prune {
3308 topic_hash,
3309 peers,
3310 backoff,
3311 }) => prune_msgs.push((topic_hash, peers, backoff)),
3312 ControlAction::IDontWant(IDontWant { message_ids }) => {
3313 let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3314 else {
3315 tracing::error!(peer = %propagation_source,
3316 "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3317 continue;
3318 };
3319 if let Some(metrics) = self.metrics.as_mut() {
3320 metrics.register_idontwant(message_ids.len());
3321 }
3322 for message_id in message_ids {
3323 peer.dont_send.insert(message_id, Instant::now());
3324 if peer.dont_send.len() > IDONTWANT_CAP {
3326 peer.dont_send.pop_front();
3327 }
3328 }
3329 }
3330 }
3331 }
3332 if !ihave_msgs.is_empty() {
3333 self.handle_ihave(&propagation_source, ihave_msgs);
3334 }
3335 if !graft_msgs.is_empty() {
3336 self.handle_graft(&propagation_source, graft_msgs);
3337 }
3338 if !prune_msgs.is_empty() {
3339 self.handle_prune(&propagation_source, prune_msgs);
3340 }
3341 }
3342 }
3343 }
3344
3345 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3346 fn poll(
3347 &mut self,
3348 cx: &mut Context<'_>,
3349 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3350 if let Some(event) = self.events.pop_front() {
3351 return Poll::Ready(event);
3352 }
3353
3354 if let Some((peer_score, _, delay)) = &mut self.peer_score {
3356 if delay.poll_unpin(cx).is_ready() {
3357 peer_score.refresh_scores();
3358 delay.reset(peer_score.params.decay_interval);
3359 }
3360 }
3361
3362 if self.heartbeat.poll_unpin(cx).is_ready() {
3363 self.heartbeat();
3364 self.heartbeat.reset(self.config.heartbeat_interval());
3365 }
3366
3367 Poll::Pending
3368 }
3369
3370 fn on_swarm_event(&mut self, event: FromSwarm) {
3371 match event {
3372 FromSwarm::ConnectionEstablished(connection_established) => {
3373 self.on_connection_established(connection_established)
3374 }
3375 FromSwarm::ConnectionClosed(connection_closed) => {
3376 self.on_connection_closed(connection_closed)
3377 }
3378 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3379 _ => {}
3380 }
3381 }
3382}
3383
3384fn peer_added_to_mesh(
3388 peer_id: PeerId,
3389 new_topics: Vec<&TopicHash>,
3390 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3391 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3392 connections: &HashMap<PeerId, PeerConnections>,
3393) {
3394 let connection_id = match connections.get(&peer_id) {
3396 Some(p) => p
3397 .connections
3398 .first()
3399 .expect("There should be at least one connection to a peer."),
3400 None => {
3401 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3402 return;
3403 }
3404 };
3405
3406 if let Some(peer) = connections.get(&peer_id) {
3407 for topic in &peer.topics {
3408 if !new_topics.contains(&topic) {
3409 if let Some(mesh_peers) = mesh.get(topic) {
3410 if mesh_peers.contains(&peer_id) {
3411 return;
3413 }
3414 }
3415 }
3416 }
3417 }
3418 events.push_back(ToSwarm::NotifyHandler {
3420 peer_id,
3421 event: HandlerIn::JoinedMesh,
3422 handler: NotifyHandler::One(*connection_id),
3423 });
3424}
3425
3426fn peer_removed_from_mesh(
3430 peer_id: PeerId,
3431 old_topic: &TopicHash,
3432 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3433 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3434 connections: &HashMap<PeerId, PeerConnections>,
3435) {
3436 let connection_id = match connections.get(&peer_id) {
3438 Some(p) => p
3439 .connections
3440 .first()
3441 .expect("There should be at least one connection to a peer."),
3442 None => {
3443 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3444 return;
3445 }
3446 };
3447
3448 if let Some(peer) = connections.get(&peer_id) {
3449 for topic in &peer.topics {
3450 if topic != old_topic {
3451 if let Some(mesh_peers) = mesh.get(topic) {
3452 if mesh_peers.contains(&peer_id) {
3453 return;
3455 }
3456 }
3457 }
3458 }
3459 }
3460 events.push_back(ToSwarm::NotifyHandler {
3462 peer_id,
3463 event: HandlerIn::LeftMesh,
3464 handler: NotifyHandler::One(*connection_id),
3465 });
3466}
3467
3468fn get_random_peers_dynamic(
3472 connected_peers: &HashMap<PeerId, PeerConnections>,
3473 topic_hash: &TopicHash,
3474 n_map: impl Fn(usize) -> usize,
3476 mut f: impl FnMut(&PeerId) -> bool,
3477) -> BTreeSet<PeerId> {
3478 let mut gossip_peers = connected_peers
3479 .iter()
3480 .filter(|(_, p)| p.topics.contains(topic_hash))
3481 .filter(|(peer_id, _)| f(peer_id))
3482 .filter(|(_, p)| p.kind.is_gossipsub())
3483 .map(|(peer_id, _)| *peer_id)
3484 .collect::<Vec<PeerId>>();
3485
3486 let n = n_map(gossip_peers.len());
3488 if gossip_peers.len() <= n {
3489 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3490 return gossip_peers.into_iter().collect();
3491 }
3492
3493 let mut rng = thread_rng();
3495 gossip_peers.partial_shuffle(&mut rng, n);
3496
3497 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3498
3499 gossip_peers.into_iter().take(n).collect()
3500}
3501
3502fn get_random_peers(
3505 connected_peers: &HashMap<PeerId, PeerConnections>,
3506 topic_hash: &TopicHash,
3507 n: usize,
3508 f: impl FnMut(&PeerId) -> bool,
3509) -> BTreeSet<PeerId> {
3510 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3511}
3512
3513fn validate_config(
3516 authenticity: &MessageAuthenticity,
3517 validation_mode: &ValidationMode,
3518) -> Result<(), &'static str> {
3519 match validation_mode {
3520 ValidationMode::Anonymous => {
3521 if authenticity.is_signing() {
3522 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3523 }
3524
3525 if !authenticity.is_anonymous() {
3526 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3527 }
3528 }
3529 ValidationMode::Strict => {
3530 if !authenticity.is_signing() {
3531 return Err(
3532 "Messages will be
3533 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3534 the validation or privacy settings in the config"
3535 );
3536 }
3537 }
3538 _ => {}
3539 }
3540 Ok(())
3541}
3542
3543impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3544 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3545 f.debug_struct("Behaviour")
3546 .field("config", &self.config)
3547 .field("events", &self.events.len())
3548 .field("publish_config", &self.publish_config)
3549 .field("mesh", &self.mesh)
3550 .field("fanout", &self.fanout)
3551 .field("fanout_last_pub", &self.fanout_last_pub)
3552 .field("mcache", &self.mcache)
3553 .field("heartbeat", &self.heartbeat)
3554 .finish()
3555 }
3556}
3557
3558impl fmt::Debug for PublishConfig {
3559 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3560 match self {
3561 PublishConfig::Signing { author, .. } => {
3562 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3563 }
3564 PublishConfig::Author(author) => {
3565 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3566 }
3567 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3568 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3569 }
3570 }
3571}