1use std::{
24 collections::{hash_map, HashMap, HashSet},
25 net::IpAddr,
26 time::Duration,
27};
28
29use futures_timer::Delay;
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::{time_cache::TimeCache, MessageId, TopicHash};
34
35mod params;
36pub use params::{
37 score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
38 TopicScoreParams,
39};
40
41use crate::ValidationError;
42
43#[cfg(test)]
44mod tests;
45
46const TIME_CACHE_DURATION: u64 = 120;
48
49pub(crate) enum PeerScoreState {
52 Active(Box<PeerScore>),
53 Disabled,
54}
55
56impl PeerScoreState {
57 pub(crate) fn below_threshold(
60 &self,
61 peer_id: &PeerId,
62 threshold: impl Fn(&PeerScoreThresholds) -> f64,
63 ) -> (bool, f64) {
64 match self {
65 PeerScoreState::Active(active) => {
66 let score = active.score_report(peer_id).score;
67 (score < threshold(&active.thresholds), score)
68 }
69 PeerScoreState::Disabled => (false, 0.0),
70 }
71 }
72}
73
74#[derive(Default)]
77pub(crate) struct PeerScoreReport {
78 pub(crate) score: f64,
79 #[cfg(feature = "metrics")]
80 pub(crate) penalties: Vec<crate::metrics::Penalty>,
81}
82
83pub(crate) struct PeerScore {
84 pub(crate) params: PeerScoreParams,
86 pub(crate) thresholds: PeerScoreThresholds,
88 pub(crate) decay_interval: Delay,
90 peer_stats: HashMap<PeerId, PeerStats>,
92 peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
94 deliveries: TimeCache<MessageId, DeliveryRecord>,
96 message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
98}
99
100struct PeerStats {
102 status: ConnectionStatus,
104 topics: HashMap<TopicHash, TopicStats>,
106 known_ips: HashSet<IpAddr>,
108 behaviour_penalty: f64,
110 application_score: f64,
112 slow_peer_penalty: f64,
114}
115
116enum ConnectionStatus {
117 Connected,
119 Disconnected {
121 expire: Instant,
123 },
124}
125
126impl Default for PeerStats {
127 fn default() -> Self {
128 PeerStats {
129 status: ConnectionStatus::Connected,
130 topics: HashMap::new(),
131 known_ips: HashSet::new(),
132 behaviour_penalty: 0f64,
133 application_score: 0f64,
134 slow_peer_penalty: 0f64,
135 }
136 }
137}
138
139impl PeerStats {
140 pub(crate) fn stats_or_default_mut(
144 &mut self,
145 topic_hash: TopicHash,
146 params: &PeerScoreParams,
147 ) -> Option<&mut TopicStats> {
148 #[allow(
149 clippy::map_entry,
150 reason = "False positive, see rust-lang/rust-clippy#14449."
151 )]
152 if params.topics.contains_key(&topic_hash) {
153 Some(self.topics.entry(topic_hash).or_default())
154 } else {
155 self.topics.get_mut(&topic_hash)
156 }
157 }
158}
159
160struct TopicStats {
162 mesh_status: MeshStatus,
163 first_message_deliveries: f64,
165 mesh_message_deliveries_active: bool,
167 mesh_message_deliveries: f64,
169 mesh_failure_penalty: f64,
171 invalid_message_deliveries: f64,
173}
174
175impl TopicStats {
176 pub(crate) fn in_mesh(&self) -> bool {
178 matches!(self.mesh_status, MeshStatus::Active { .. })
179 }
180}
181
182enum MeshStatus {
184 Active {
185 graft_time: Instant,
187 mesh_time: Duration,
189 },
190 InActive,
191}
192
193impl MeshStatus {
194 pub(crate) fn new_active() -> Self {
196 MeshStatus::Active {
197 graft_time: Instant::now(),
198 mesh_time: Duration::from_secs(0),
199 }
200 }
201}
202
203impl Default for TopicStats {
204 fn default() -> Self {
205 TopicStats {
206 mesh_status: MeshStatus::InActive,
207 first_message_deliveries: Default::default(),
208 mesh_message_deliveries_active: Default::default(),
209 mesh_message_deliveries: Default::default(),
210 mesh_failure_penalty: Default::default(),
211 invalid_message_deliveries: Default::default(),
212 }
213 }
214}
215
216#[derive(PartialEq, Debug)]
217struct DeliveryRecord {
218 status: DeliveryStatus,
219 first_seen: Instant,
220 peers: HashSet<PeerId>,
221}
222
223#[derive(PartialEq, Debug)]
224enum DeliveryStatus {
225 Unknown,
227 Valid(Instant),
229 Invalid,
231 Ignored,
233}
234
235impl Default for DeliveryRecord {
236 fn default() -> Self {
237 DeliveryRecord {
238 status: DeliveryStatus::Unknown,
239 first_seen: Instant::now(),
240 peers: HashSet::new(),
241 }
242 }
243}
244
245impl PeerScore {
246 #[allow(dead_code)]
248 pub(crate) fn new(params: PeerScoreParams, thresholds: PeerScoreThresholds) -> Self {
249 Self::new_with_message_delivery_time_callback(params, thresholds, None)
250 }
251
252 pub(crate) fn new_with_message_delivery_time_callback(
253 params: PeerScoreParams,
254 thresholds: PeerScoreThresholds,
255 callback: Option<fn(&PeerId, &TopicHash, f64)>,
256 ) -> Self {
257 PeerScore {
258 decay_interval: Delay::new(params.decay_interval),
259 params,
260 thresholds,
261 peer_stats: HashMap::new(),
262 peer_ips: HashMap::new(),
263 deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
264 message_delivery_time_callback: callback,
265 }
266 }
267
268 pub(crate) fn score_report(&self, peer_id: &PeerId) -> PeerScoreReport {
271 let mut report = PeerScoreReport::default();
272 let Some(peer_stats) = self.peer_stats.get(peer_id) else {
273 return report;
274 };
275
276 for (topic, topic_stats) in peer_stats.topics.iter() {
278 if let Some(topic_params) = self.params.topics.get(topic) {
280 let mut topic_score = 0.0;
284
285 if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
287 let p1 = {
288 let v = mesh_time.as_secs_f64()
289 / topic_params.time_in_mesh_quantum.as_secs_f64();
290 if v < topic_params.time_in_mesh_cap {
291 v
292 } else {
293 topic_params.time_in_mesh_cap
294 }
295 };
296 topic_score += p1 * topic_params.time_in_mesh_weight;
297 }
298
299 let p2 = {
301 let v = topic_stats.first_message_deliveries;
302 if v < topic_params.first_message_deliveries_cap {
303 v
304 } else {
305 topic_params.first_message_deliveries_cap
306 }
307 };
308 topic_score += p2 * topic_params.first_message_deliveries_weight;
309
310 if topic_stats.mesh_message_deliveries_active
312 && topic_stats.mesh_message_deliveries
313 < topic_params.mesh_message_deliveries_threshold
314 && topic_params.mesh_message_deliveries_weight != 0.0
315 {
316 let deficit = topic_params.mesh_message_deliveries_threshold
317 - topic_stats.mesh_message_deliveries;
318 let p3 = deficit * deficit;
319 let penalty = p3 * topic_params.mesh_message_deliveries_weight;
320
321 topic_score += penalty;
322 #[cfg(feature = "metrics")]
323 report
324 .penalties
325 .push(crate::metrics::Penalty::MessageDeficit);
326 tracing::debug!(
327 peer=%peer_id,
328 %topic,
329 %deficit,
330 penalty=%penalty,
331 "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
332 );
333 }
334
335 let p3b = topic_stats.mesh_failure_penalty;
339 topic_score += p3b * topic_params.mesh_failure_penalty_weight;
340
341 let p4 =
345 topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
346 topic_score += p4 * topic_params.invalid_message_deliveries_weight;
347
348 report.score += topic_score * topic_params.topic_weight;
350 }
351 }
352
353 if self.params.topic_score_cap > 0f64 && report.score > self.params.topic_score_cap {
355 report.score = self.params.topic_score_cap;
356 }
357
358 let p5 = peer_stats.application_score;
360 report.score += p5 * self.params.app_specific_weight;
361
362 for ip in peer_stats.known_ips.iter() {
364 if self.params.ip_colocation_factor_whitelist.contains(ip) {
365 continue;
366 }
367
368 if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
373 if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold
374 && self.params.ip_colocation_factor_weight != 0.0
375 {
376 let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
377 let p6 = surplus * surplus;
378 #[cfg(feature = "metrics")]
379 report.penalties.push(crate::metrics::Penalty::IPColocation);
380 tracing::debug!(
381 peer=%peer_id,
382 surplus_ip=%ip,
383 surplus=%surplus,
384 "[Penalty] The peer gets penalized because of too many peers with the same ip"
385 );
386 report.score += p6 * self.params.ip_colocation_factor_weight;
387 }
388 }
389 }
390
391 if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
393 let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
394 let p7 = excess * excess;
395 report.score += p7 * self.params.behaviour_penalty_weight;
396 }
397
398 if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
400 let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
401 report.score += excess * self.params.slow_peer_weight;
402 }
403
404 report
405 }
406
407 pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
408 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
409 tracing::debug!(
410 peer=%peer_id,
411 %count,
412 "[Penalty] Behavioral penalty for peer"
413 );
414 peer_stats.behaviour_penalty += count as f64;
415 }
416 }
417
418 fn remove_ips_for_peer(
419 peer_stats: &PeerStats,
420 peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
421 peer_id: &PeerId,
422 ) {
423 for ip in peer_stats.known_ips.iter() {
424 if let Some(peer_set) = peer_ips.get_mut(ip) {
425 peer_set.remove(peer_id);
426 }
427 }
428 }
429
430 pub(crate) fn refresh_scores(&mut self) {
431 let now = Instant::now();
432 let params_ref = &self.params;
433 let peer_ips_ref = &mut self.peer_ips;
434 self.peer_stats.retain(|peer_id, peer_stats| {
435 if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
436 if now > expire {
438 Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
440 return false;
442 }
443
444 return true;
449 }
450
451 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
452 if let Some(topic_params) = params_ref.topics.get(topic) {
454 topic_stats.first_message_deliveries *=
456 topic_params.first_message_deliveries_decay;
457 if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
458 topic_stats.first_message_deliveries = 0.0;
459 }
460 topic_stats.mesh_message_deliveries *=
461 topic_params.mesh_message_deliveries_decay;
462 if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
463 topic_stats.mesh_message_deliveries = 0.0;
464 }
465 topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
466 if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
467 topic_stats.mesh_failure_penalty = 0.0;
468 }
469 topic_stats.invalid_message_deliveries *=
470 topic_params.invalid_message_deliveries_decay;
471 if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
472 topic_stats.invalid_message_deliveries = 0.0;
473 }
474 if let MeshStatus::Active {
476 ref mut mesh_time,
477 ref mut graft_time,
478 } = topic_stats.mesh_status
479 {
480 *mesh_time = now.duration_since(*graft_time);
481 if *mesh_time > topic_params.mesh_message_deliveries_activation {
482 topic_stats.mesh_message_deliveries_active = true;
483 }
484 }
485 }
486 }
487
488 peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
490 if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
491 peer_stats.behaviour_penalty = 0.0;
492 }
493
494 peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
496 if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
497 peer_stats.slow_peer_penalty = 0.0;
498 }
499
500 true
501 });
502 }
503
504 pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
507 let peer_stats = self.peer_stats.entry(peer_id).or_default();
508
509 peer_stats.status = ConnectionStatus::Connected;
511 }
512
513 pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
515 tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
516 let peer_stats = self.peer_stats.entry(*peer_id).or_default();
517
518 peer_stats.status = ConnectionStatus::Connected;
521
522 peer_stats.known_ips.insert(ip);
524 self.peer_ips.entry(ip).or_default().insert(*peer_id);
525 }
526
527 pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
529 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
530 peer_stats.slow_peer_penalty += 1.0;
531 tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
532 }
533 }
534
535 pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
537 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
538 peer_stats.known_ips.remove(ip);
539 if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
540 tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
541 peer_ids.remove(peer_id);
542 } else {
543 tracing::trace!(
544 peer=%peer_id,
545 %ip,
546 "No entry in peer_ips for ip which should get removed for peer"
547 );
548 }
549 } else {
550 tracing::trace!(
551 peer=%peer_id,
552 %ip,
553 "No peer_stats for peer which should remove the ip"
554 );
555 }
556 }
557
558 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
561 if self.score_report(peer_id).score > 0f64 {
563 if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
564 Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
565 entry.remove();
566 }
567 return;
568 }
569
570 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
573 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
574 topic_stats.first_message_deliveries = 0f64;
575
576 if let Some(threshold) = self
577 .params
578 .topics
579 .get(topic)
580 .map(|param| param.mesh_message_deliveries_threshold)
581 {
582 if topic_stats.in_mesh()
583 && topic_stats.mesh_message_deliveries_active
584 && topic_stats.mesh_message_deliveries < threshold
585 {
586 let deficit = threshold - topic_stats.mesh_message_deliveries;
587 topic_stats.mesh_failure_penalty += deficit * deficit;
588 }
589 }
590
591 topic_stats.mesh_status = MeshStatus::InActive;
592 topic_stats.mesh_message_deliveries_active = false;
593 }
594
595 peer_stats.status = ConnectionStatus::Disconnected {
596 expire: Instant::now() + self.params.retain_score,
597 };
598 }
599 }
600
601 pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
603 let topic = topic.into();
604 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
605 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
607 topic_stats.mesh_status = MeshStatus::new_active();
608 topic_stats.mesh_message_deliveries_active = false;
609 }
610 }
611 }
612
613 pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
615 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
616 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
618 {
619 let threshold = self
621 .params
622 .topics
623 .get(&topic)
624 .expect("Topic must exist in order for there to be topic stats")
625 .mesh_message_deliveries_threshold;
626 if topic_stats.mesh_message_deliveries_active
627 && topic_stats.mesh_message_deliveries < threshold
628 {
629 let deficit = threshold - topic_stats.mesh_message_deliveries;
630 topic_stats.mesh_failure_penalty += deficit * deficit;
631 }
632 topic_stats.mesh_message_deliveries_active = false;
633 topic_stats.mesh_status = MeshStatus::InActive;
634 }
635 }
636 }
637
638 pub(crate) fn validate_message(
639 &mut self,
640 from: &PeerId,
641 msg_id: &MessageId,
642 topic_hash: &TopicHash,
643 ) {
644 self.deliveries.entry(msg_id.clone()).or_default();
646
647 if let Some(callback) = self.message_delivery_time_callback {
648 if self
649 .peer_stats
650 .get(from)
651 .and_then(|s| s.topics.get(topic_hash))
652 .map(|ts| ts.in_mesh())
653 .unwrap_or(false)
654 {
655 callback(from, topic_hash, 0.0);
656 }
657 }
658 }
659
660 pub(crate) fn deliver_message(
661 &mut self,
662 from: &PeerId,
663 msg_id: &MessageId,
664 topic_hash: &TopicHash,
665 ) {
666 self.mark_first_message_delivery(from, topic_hash);
667
668 let record = self.deliveries.entry(msg_id.clone()).or_default();
669
670 if record.status != DeliveryStatus::Unknown {
672 tracing::warn!(
673 peer=%from,
674 status=?record.status,
675 first_seen=?record.first_seen.elapsed().as_secs(),
676 "Unexpected delivery trace"
677 );
678 return;
679 }
680
681 record.status = DeliveryStatus::Valid(Instant::now());
683 for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
684 if &peer != from {
687 self.mark_duplicate_message_delivery(&peer, topic_hash, None);
688 }
689 }
690 }
691
692 pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
695 tracing::debug!(
696 peer=%from,
697 "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
698 );
699
700 self.mark_invalid_message_delivery(from, topic_hash);
701 }
702
703 pub(crate) fn reject_message(
705 &mut self,
706 from: &PeerId,
707 msg_id: &MessageId,
708 topic_hash: &TopicHash,
709 reason: RejectReason,
710 ) {
711 match reason {
712 RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
714 self.reject_invalid_message(from, topic_hash);
715 return;
716 }
717 RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
719 return;
720 }
721 _ => {} }
723
724 let peers: Vec<_> = {
725 let record = self.deliveries.entry(msg_id.clone()).or_default();
726
727 if record.status != DeliveryStatus::Unknown {
730 return;
731 }
732
733 if let RejectReason::ValidationIgnored = reason {
734 record.status = DeliveryStatus::Ignored;
737 record.peers.clear();
738 return;
739 }
740
741 record.status = DeliveryStatus::Invalid;
743 record.peers.drain().collect()
745 };
746
747 self.mark_invalid_message_delivery(from, topic_hash);
748 for peer_id in peers.iter() {
749 self.mark_invalid_message_delivery(peer_id, topic_hash)
750 }
751 }
752
753 pub(crate) fn duplicated_message(
754 &mut self,
755 from: &PeerId,
756 msg_id: &MessageId,
757 topic_hash: &TopicHash,
758 ) {
759 let record = self.deliveries.entry(msg_id.clone()).or_default();
760
761 if record.peers.contains(from) {
762 return;
764 }
765
766 if let Some(callback) = self.message_delivery_time_callback {
767 let time = if let DeliveryStatus::Valid(validated) = record.status {
768 validated.elapsed().as_secs_f64()
769 } else {
770 0.0
771 };
772 if self
773 .peer_stats
774 .get(from)
775 .and_then(|s| s.topics.get(topic_hash))
776 .map(|ts| ts.in_mesh())
777 .unwrap_or(false)
778 {
779 callback(from, topic_hash, time);
780 }
781 }
782
783 match record.status {
784 DeliveryStatus::Unknown => {
785 record.peers.insert(*from);
788 }
789 DeliveryStatus::Valid(validated) => {
790 record.peers.insert(*from);
792 self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
793 }
794 DeliveryStatus::Invalid => {
795 self.mark_invalid_message_delivery(from, topic_hash);
797 }
798 DeliveryStatus::Ignored => {
799 }
801 }
802 }
803
804 pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
807 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
808 peer_stats.application_score = new_score;
809 true
810 } else {
811 false
812 }
813 }
814
815 pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
817 use hash_map::Entry::*;
818 match self.params.topics.entry(topic_hash.clone()) {
819 Occupied(mut entry) => {
820 let first_message_deliveries_cap = params.first_message_deliveries_cap;
821 let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
822 let old_params = entry.insert(params);
823
824 if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
825 for stats in &mut self.peer_stats.values_mut() {
826 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
827 if tstats.first_message_deliveries > first_message_deliveries_cap {
828 tstats.first_message_deliveries = first_message_deliveries_cap;
829 }
830 }
831 }
832 }
833
834 if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
835 for stats in self.peer_stats.values_mut() {
836 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
837 if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
838 tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
839 }
840 }
841 }
842 }
843 }
844 Vacant(entry) => {
845 entry.insert(params);
846 }
847 }
848 }
849
850 pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
852 self.params.topics.get(topic_hash)
853 }
854
855 fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
858 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
859 if let Some(topic_stats) =
860 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
861 {
862 tracing::debug!(
863 peer=%peer_id,
864 topic=%topic_hash,
865 "[Penalty] Peer delivered an invalid message in topic and gets penalized \
866 for it",
867 );
868 topic_stats.invalid_message_deliveries += 1f64;
869 }
870 }
871 }
872
873 fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
877 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
878 if let Some(topic_stats) =
879 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
880 {
881 let cap = self
882 .params
883 .topics
884 .get(topic_hash)
885 .expect("Topic must exist if there are known topic_stats")
886 .first_message_deliveries_cap;
887 topic_stats.first_message_deliveries =
888 if topic_stats.first_message_deliveries + 1f64 > cap {
889 cap
890 } else {
891 topic_stats.first_message_deliveries + 1f64
892 };
893
894 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
895 let cap = self
896 .params
897 .topics
898 .get(topic_hash)
899 .expect("Topic must exist if there are known topic_stats")
900 .mesh_message_deliveries_cap;
901
902 topic_stats.mesh_message_deliveries =
903 if topic_stats.mesh_message_deliveries + 1f64 > cap {
904 cap
905 } else {
906 topic_stats.mesh_message_deliveries + 1f64
907 };
908 }
909 }
910 }
911 }
912
913 fn mark_duplicate_message_delivery(
916 &mut self,
917 peer_id: &PeerId,
918 topic_hash: &TopicHash,
919 validated_time: Option<Instant>,
920 ) {
921 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
922 let now = if validated_time.is_some() {
923 Some(Instant::now())
924 } else {
925 None
926 };
927 if let Some(topic_stats) =
928 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
929 {
930 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
931 let topic_params = self
932 .params
933 .topics
934 .get(topic_hash)
935 .expect("Topic must exist if there are known topic_stats");
936
937 let mut falls_in_mesh_deliver_window = true;
942 if let Some(validated_time) = validated_time {
943 if let Some(now) = &now {
944 let window_time = validated_time
946 .checked_add(topic_params.mesh_message_deliveries_window)
947 .unwrap_or(*now);
948 if now > &window_time {
949 falls_in_mesh_deliver_window = false;
950 }
951 }
952 }
953
954 if falls_in_mesh_deliver_window {
955 let cap = topic_params.mesh_message_deliveries_cap;
956 topic_stats.mesh_message_deliveries =
957 if topic_stats.mesh_message_deliveries + 1f64 > cap {
958 cap
959 } else {
960 topic_stats.mesh_message_deliveries + 1f64
961 };
962 }
963 }
964 }
965 }
966 }
967
968 pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
969 self.peer_stats
970 .get(peer)
971 .and_then(|s| s.topics.get(topic))
972 .map(|t| t.mesh_message_deliveries)
973 }
974}
975
976#[derive(Clone, Copy)]
978pub(crate) enum RejectReason {
979 ValidationError(ValidationError),
981 SelfOrigin,
983 BlackListedPeer,
985 BlackListedSource,
987 ValidationIgnored,
989 ValidationFailed,
991}