1use std::{
24 collections::{hash_map, HashMap, HashSet},
25 net::IpAddr,
26 time::Duration,
27};
28
29use futures_timer::Delay;
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::{time_cache::TimeCache, MessageId, TopicHash};
34
35mod params;
36pub use params::{
37 score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
38 TopicScoreParams,
39};
40
41use crate::ValidationError;
42
43#[cfg(test)]
44mod tests;
45
46const TIME_CACHE_DURATION: u64 = 120;
48
49pub(crate) enum PeerScoreState {
52 Active(Box<PeerScore>),
53 Disabled,
54}
55
56impl PeerScoreState {
57 pub(crate) fn below_threshold(
60 &self,
61 peer_id: &PeerId,
62 threshold: impl Fn(&PeerScoreThresholds) -> f64,
63 ) -> (bool, f64) {
64 match self {
65 PeerScoreState::Active(active) => {
66 let score = active.score_report(peer_id).score;
67 (score < threshold(&active.thresholds), score)
68 }
69 PeerScoreState::Disabled => (false, 0.0),
70 }
71 }
72}
73
74#[derive(Default)]
77pub(crate) struct PeerScoreReport {
78 pub(crate) score: f64,
79 #[cfg(feature = "metrics")]
80 pub(crate) penalties: Vec<crate::metrics::Penalty>,
81}
82
83pub(crate) struct PeerScore {
84 pub(crate) params: PeerScoreParams,
86 pub(crate) thresholds: PeerScoreThresholds,
88 pub(crate) decay_interval: Delay,
90 peer_stats: HashMap<PeerId, PeerStats>,
92 peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
94 deliveries: TimeCache<MessageId, DeliveryRecord>,
96 message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
98}
99
100struct PeerStats {
102 status: ConnectionStatus,
104 topics: HashMap<TopicHash, TopicStats>,
106 known_ips: HashSet<IpAddr>,
108 behaviour_penalty: f64,
110 application_score: f64,
112 slow_peer_penalty: f64,
114}
115
116enum ConnectionStatus {
117 Connected,
119 Disconnected {
121 expire: Instant,
123 },
124}
125
126impl Default for PeerStats {
127 fn default() -> Self {
128 PeerStats {
129 status: ConnectionStatus::Connected,
130 topics: HashMap::new(),
131 known_ips: HashSet::new(),
132 behaviour_penalty: 0f64,
133 application_score: 0f64,
134 slow_peer_penalty: 0f64,
135 }
136 }
137}
138
139impl PeerStats {
140 pub(crate) fn stats_or_default_mut(
144 &mut self,
145 topic_hash: TopicHash,
146 params: &PeerScoreParams,
147 ) -> Option<&mut TopicStats> {
148 #[allow(
149 clippy::map_entry,
150 reason = "False positive, see rust-lang/rust-clippy#14449."
151 )]
152 if params.topics.contains_key(&topic_hash) {
153 Some(self.topics.entry(topic_hash).or_default())
154 } else {
155 self.topics.get_mut(&topic_hash)
156 }
157 }
158}
159
160struct TopicStats {
162 mesh_status: MeshStatus,
163 first_message_deliveries: f64,
165 mesh_message_deliveries_active: bool,
167 mesh_message_deliveries: f64,
169 mesh_failure_penalty: f64,
171 invalid_message_deliveries: f64,
173}
174
175impl TopicStats {
176 pub(crate) fn in_mesh(&self) -> bool {
178 matches!(self.mesh_status, MeshStatus::Active { .. })
179 }
180}
181
182enum MeshStatus {
184 Active {
185 graft_time: Instant,
187 mesh_time: Duration,
189 },
190 InActive,
191}
192
193impl MeshStatus {
194 pub(crate) fn new_active() -> Self {
196 MeshStatus::Active {
197 graft_time: Instant::now(),
198 mesh_time: Duration::from_secs(0),
199 }
200 }
201}
202
203impl Default for TopicStats {
204 fn default() -> Self {
205 TopicStats {
206 mesh_status: MeshStatus::InActive,
207 first_message_deliveries: Default::default(),
208 mesh_message_deliveries_active: Default::default(),
209 mesh_message_deliveries: Default::default(),
210 mesh_failure_penalty: Default::default(),
211 invalid_message_deliveries: Default::default(),
212 }
213 }
214}
215
216#[derive(PartialEq, Debug)]
217struct DeliveryRecord {
218 status: DeliveryStatus,
219 first_seen: Instant,
220 peers: HashSet<PeerId>,
221}
222
223#[derive(PartialEq, Debug)]
224enum DeliveryStatus {
225 Unknown,
227 Valid(Instant),
229 Invalid,
231 Ignored,
233}
234
235impl Default for DeliveryRecord {
236 fn default() -> Self {
237 DeliveryRecord {
238 status: DeliveryStatus::Unknown,
239 first_seen: Instant::now(),
240 peers: HashSet::new(),
241 }
242 }
243}
244
245impl PeerScore {
246 #[allow(dead_code)]
248 pub(crate) fn new(params: PeerScoreParams, thresholds: PeerScoreThresholds) -> Self {
249 Self::new_with_message_delivery_time_callback(params, thresholds, None)
250 }
251
252 pub(crate) fn new_with_message_delivery_time_callback(
253 params: PeerScoreParams,
254 thresholds: PeerScoreThresholds,
255 callback: Option<fn(&PeerId, &TopicHash, f64)>,
256 ) -> Self {
257 PeerScore {
258 decay_interval: Delay::new(params.decay_interval),
259 params,
260 thresholds,
261 peer_stats: HashMap::new(),
262 peer_ips: HashMap::new(),
263 deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
264 message_delivery_time_callback: callback,
265 }
266 }
267
268 pub(crate) fn score_report(&self, peer_id: &PeerId) -> PeerScoreReport {
271 let mut report = PeerScoreReport::default();
272 let Some(peer_stats) = self.peer_stats.get(peer_id) else {
273 return report;
274 };
275
276 for (topic, topic_stats) in peer_stats.topics.iter() {
278 if let Some(topic_params) = self.params.topics.get(topic) {
280 let mut topic_score = 0.0;
284
285 if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
287 let p1 = {
288 let v = mesh_time.as_secs_f64()
289 / topic_params.time_in_mesh_quantum.as_secs_f64();
290 if v < topic_params.time_in_mesh_cap {
291 v
292 } else {
293 topic_params.time_in_mesh_cap
294 }
295 };
296 topic_score += p1 * topic_params.time_in_mesh_weight;
297 }
298
299 let p2 = {
301 let v = topic_stats.first_message_deliveries;
302 if v < topic_params.first_message_deliveries_cap {
303 v
304 } else {
305 topic_params.first_message_deliveries_cap
306 }
307 };
308 topic_score += p2 * topic_params.first_message_deliveries_weight;
309
310 if topic_stats.mesh_message_deliveries_active
312 && topic_stats.mesh_message_deliveries
313 < topic_params.mesh_message_deliveries_threshold
314 {
315 let deficit = topic_params.mesh_message_deliveries_threshold
316 - topic_stats.mesh_message_deliveries;
317 let p3 = deficit * deficit;
318 topic_score += p3 * topic_params.mesh_message_deliveries_weight;
319 #[cfg(feature = "metrics")]
320 report
321 .penalties
322 .push(crate::metrics::Penalty::MessageDeficit);
323 tracing::debug!(
324 peer=%peer_id,
325 %topic,
326 %deficit,
327 penalty=%topic_score,
328 "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
329 );
330 }
331
332 let p3b = topic_stats.mesh_failure_penalty;
336 topic_score += p3b * topic_params.mesh_failure_penalty_weight;
337
338 let p4 =
342 topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
343 topic_score += p4 * topic_params.invalid_message_deliveries_weight;
344
345 report.score += topic_score * topic_params.topic_weight;
347 }
348 }
349
350 if self.params.topic_score_cap > 0f64 && report.score > self.params.topic_score_cap {
352 report.score = self.params.topic_score_cap;
353 }
354
355 let p5 = peer_stats.application_score;
357 report.score += p5 * self.params.app_specific_weight;
358
359 for ip in peer_stats.known_ips.iter() {
361 if self.params.ip_colocation_factor_whitelist.contains(ip) {
362 continue;
363 }
364
365 if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
370 if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold {
371 let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
372 let p6 = surplus * surplus;
373 #[cfg(feature = "metrics")]
374 report.penalties.push(crate::metrics::Penalty::IPColocation);
375 tracing::debug!(
376 peer=%peer_id,
377 surplus_ip=%ip,
378 surplus=%surplus,
379 "[Penalty] The peer gets penalized because of too many peers with the same ip"
380 );
381 report.score += p6 * self.params.ip_colocation_factor_weight;
382 }
383 }
384 }
385
386 if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
388 let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
389 let p7 = excess * excess;
390 report.score += p7 * self.params.behaviour_penalty_weight;
391 }
392
393 if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
395 let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
396 report.score += excess * self.params.slow_peer_weight;
397 }
398
399 report
400 }
401
402 pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
403 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
404 tracing::debug!(
405 peer=%peer_id,
406 %count,
407 "[Penalty] Behavioral penalty for peer"
408 );
409 peer_stats.behaviour_penalty += count as f64;
410 }
411 }
412
413 fn remove_ips_for_peer(
414 peer_stats: &PeerStats,
415 peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
416 peer_id: &PeerId,
417 ) {
418 for ip in peer_stats.known_ips.iter() {
419 if let Some(peer_set) = peer_ips.get_mut(ip) {
420 peer_set.remove(peer_id);
421 }
422 }
423 }
424
425 pub(crate) fn refresh_scores(&mut self) {
426 let now = Instant::now();
427 let params_ref = &self.params;
428 let peer_ips_ref = &mut self.peer_ips;
429 self.peer_stats.retain(|peer_id, peer_stats| {
430 if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
431 if now > expire {
433 Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
435 return false;
437 }
438
439 return true;
444 }
445
446 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
447 if let Some(topic_params) = params_ref.topics.get(topic) {
449 topic_stats.first_message_deliveries *=
451 topic_params.first_message_deliveries_decay;
452 if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
453 topic_stats.first_message_deliveries = 0.0;
454 }
455 topic_stats.mesh_message_deliveries *=
456 topic_params.mesh_message_deliveries_decay;
457 if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
458 topic_stats.mesh_message_deliveries = 0.0;
459 }
460 topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
461 if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
462 topic_stats.mesh_failure_penalty = 0.0;
463 }
464 topic_stats.invalid_message_deliveries *=
465 topic_params.invalid_message_deliveries_decay;
466 if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
467 topic_stats.invalid_message_deliveries = 0.0;
468 }
469 if let MeshStatus::Active {
471 ref mut mesh_time,
472 ref mut graft_time,
473 } = topic_stats.mesh_status
474 {
475 *mesh_time = now.duration_since(*graft_time);
476 if *mesh_time > topic_params.mesh_message_deliveries_activation {
477 topic_stats.mesh_message_deliveries_active = true;
478 }
479 }
480 }
481 }
482
483 peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
485 if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
486 peer_stats.behaviour_penalty = 0.0;
487 }
488
489 peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
491 if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
492 peer_stats.slow_peer_penalty = 0.0;
493 }
494
495 true
496 });
497 }
498
499 pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
502 let peer_stats = self.peer_stats.entry(peer_id).or_default();
503
504 peer_stats.status = ConnectionStatus::Connected;
506 }
507
508 pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
510 tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
511 let peer_stats = self.peer_stats.entry(*peer_id).or_default();
512
513 peer_stats.status = ConnectionStatus::Connected;
516
517 peer_stats.known_ips.insert(ip);
519 self.peer_ips.entry(ip).or_default().insert(*peer_id);
520 }
521
522 pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
524 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
525 peer_stats.slow_peer_penalty += 1.0;
526 tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
527 }
528 }
529
530 pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
532 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
533 peer_stats.known_ips.remove(ip);
534 if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
535 tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
536 peer_ids.remove(peer_id);
537 } else {
538 tracing::trace!(
539 peer=%peer_id,
540 %ip,
541 "No entry in peer_ips for ip which should get removed for peer"
542 );
543 }
544 } else {
545 tracing::trace!(
546 peer=%peer_id,
547 %ip,
548 "No peer_stats for peer which should remove the ip"
549 );
550 }
551 }
552
553 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
556 if self.score_report(peer_id).score > 0f64 {
558 if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
559 Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
560 entry.remove();
561 }
562 return;
563 }
564
565 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
568 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
569 topic_stats.first_message_deliveries = 0f64;
570
571 if let Some(threshold) = self
572 .params
573 .topics
574 .get(topic)
575 .map(|param| param.mesh_message_deliveries_threshold)
576 {
577 if topic_stats.in_mesh()
578 && topic_stats.mesh_message_deliveries_active
579 && topic_stats.mesh_message_deliveries < threshold
580 {
581 let deficit = threshold - topic_stats.mesh_message_deliveries;
582 topic_stats.mesh_failure_penalty += deficit * deficit;
583 }
584 }
585
586 topic_stats.mesh_status = MeshStatus::InActive;
587 topic_stats.mesh_message_deliveries_active = false;
588 }
589
590 peer_stats.status = ConnectionStatus::Disconnected {
591 expire: Instant::now() + self.params.retain_score,
592 };
593 }
594 }
595
596 pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
598 let topic = topic.into();
599 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
600 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
602 topic_stats.mesh_status = MeshStatus::new_active();
603 topic_stats.mesh_message_deliveries_active = false;
604 }
605 }
606 }
607
608 pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
610 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
611 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
613 {
614 let threshold = self
616 .params
617 .topics
618 .get(&topic)
619 .expect("Topic must exist in order for there to be topic stats")
620 .mesh_message_deliveries_threshold;
621 if topic_stats.mesh_message_deliveries_active
622 && topic_stats.mesh_message_deliveries < threshold
623 {
624 let deficit = threshold - topic_stats.mesh_message_deliveries;
625 topic_stats.mesh_failure_penalty += deficit * deficit;
626 }
627 topic_stats.mesh_message_deliveries_active = false;
628 topic_stats.mesh_status = MeshStatus::InActive;
629 }
630 }
631 }
632
633 pub(crate) fn validate_message(
634 &mut self,
635 from: &PeerId,
636 msg_id: &MessageId,
637 topic_hash: &TopicHash,
638 ) {
639 self.deliveries.entry(msg_id.clone()).or_default();
641
642 if let Some(callback) = self.message_delivery_time_callback {
643 if self
644 .peer_stats
645 .get(from)
646 .and_then(|s| s.topics.get(topic_hash))
647 .map(|ts| ts.in_mesh())
648 .unwrap_or(false)
649 {
650 callback(from, topic_hash, 0.0);
651 }
652 }
653 }
654
655 pub(crate) fn deliver_message(
656 &mut self,
657 from: &PeerId,
658 msg_id: &MessageId,
659 topic_hash: &TopicHash,
660 ) {
661 self.mark_first_message_delivery(from, topic_hash);
662
663 let record = self.deliveries.entry(msg_id.clone()).or_default();
664
665 if record.status != DeliveryStatus::Unknown {
667 tracing::warn!(
668 peer=%from,
669 status=?record.status,
670 first_seen=?record.first_seen.elapsed().as_secs(),
671 "Unexpected delivery trace"
672 );
673 return;
674 }
675
676 record.status = DeliveryStatus::Valid(Instant::now());
678 for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
679 if &peer != from {
682 self.mark_duplicate_message_delivery(&peer, topic_hash, None);
683 }
684 }
685 }
686
687 pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
690 tracing::debug!(
691 peer=%from,
692 "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
693 );
694
695 self.mark_invalid_message_delivery(from, topic_hash);
696 }
697
698 pub(crate) fn reject_message(
700 &mut self,
701 from: &PeerId,
702 msg_id: &MessageId,
703 topic_hash: &TopicHash,
704 reason: RejectReason,
705 ) {
706 match reason {
707 RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
709 self.reject_invalid_message(from, topic_hash);
710 return;
711 }
712 RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
714 return;
715 }
716 _ => {} }
718
719 let peers: Vec<_> = {
720 let record = self.deliveries.entry(msg_id.clone()).or_default();
721
722 if record.status != DeliveryStatus::Unknown {
725 return;
726 }
727
728 if let RejectReason::ValidationIgnored = reason {
729 record.status = DeliveryStatus::Ignored;
732 record.peers.clear();
733 return;
734 }
735
736 record.status = DeliveryStatus::Invalid;
738 record.peers.drain().collect()
740 };
741
742 self.mark_invalid_message_delivery(from, topic_hash);
743 for peer_id in peers.iter() {
744 self.mark_invalid_message_delivery(peer_id, topic_hash)
745 }
746 }
747
748 pub(crate) fn duplicated_message(
749 &mut self,
750 from: &PeerId,
751 msg_id: &MessageId,
752 topic_hash: &TopicHash,
753 ) {
754 let record = self.deliveries.entry(msg_id.clone()).or_default();
755
756 if record.peers.contains(from) {
757 return;
759 }
760
761 if let Some(callback) = self.message_delivery_time_callback {
762 let time = if let DeliveryStatus::Valid(validated) = record.status {
763 validated.elapsed().as_secs_f64()
764 } else {
765 0.0
766 };
767 if self
768 .peer_stats
769 .get(from)
770 .and_then(|s| s.topics.get(topic_hash))
771 .map(|ts| ts.in_mesh())
772 .unwrap_or(false)
773 {
774 callback(from, topic_hash, time);
775 }
776 }
777
778 match record.status {
779 DeliveryStatus::Unknown => {
780 record.peers.insert(*from);
783 }
784 DeliveryStatus::Valid(validated) => {
785 record.peers.insert(*from);
787 self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
788 }
789 DeliveryStatus::Invalid => {
790 self.mark_invalid_message_delivery(from, topic_hash);
792 }
793 DeliveryStatus::Ignored => {
794 }
796 }
797 }
798
799 pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
802 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
803 peer_stats.application_score = new_score;
804 true
805 } else {
806 false
807 }
808 }
809
810 pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
812 use hash_map::Entry::*;
813 match self.params.topics.entry(topic_hash.clone()) {
814 Occupied(mut entry) => {
815 let first_message_deliveries_cap = params.first_message_deliveries_cap;
816 let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
817 let old_params = entry.insert(params);
818
819 if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
820 for stats in &mut self.peer_stats.values_mut() {
821 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
822 if tstats.first_message_deliveries > first_message_deliveries_cap {
823 tstats.first_message_deliveries = first_message_deliveries_cap;
824 }
825 }
826 }
827 }
828
829 if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
830 for stats in self.peer_stats.values_mut() {
831 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
832 if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
833 tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
834 }
835 }
836 }
837 }
838 }
839 Vacant(entry) => {
840 entry.insert(params);
841 }
842 }
843 }
844
845 pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
847 self.params.topics.get(topic_hash)
848 }
849
850 fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
853 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
854 if let Some(topic_stats) =
855 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
856 {
857 tracing::debug!(
858 peer=%peer_id,
859 topic=%topic_hash,
860 "[Penalty] Peer delivered an invalid message in topic and gets penalized \
861 for it",
862 );
863 topic_stats.invalid_message_deliveries += 1f64;
864 }
865 }
866 }
867
868 fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
872 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
873 if let Some(topic_stats) =
874 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
875 {
876 let cap = self
877 .params
878 .topics
879 .get(topic_hash)
880 .expect("Topic must exist if there are known topic_stats")
881 .first_message_deliveries_cap;
882 topic_stats.first_message_deliveries =
883 if topic_stats.first_message_deliveries + 1f64 > cap {
884 cap
885 } else {
886 topic_stats.first_message_deliveries + 1f64
887 };
888
889 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
890 let cap = self
891 .params
892 .topics
893 .get(topic_hash)
894 .expect("Topic must exist if there are known topic_stats")
895 .mesh_message_deliveries_cap;
896
897 topic_stats.mesh_message_deliveries =
898 if topic_stats.mesh_message_deliveries + 1f64 > cap {
899 cap
900 } else {
901 topic_stats.mesh_message_deliveries + 1f64
902 };
903 }
904 }
905 }
906 }
907
908 fn mark_duplicate_message_delivery(
911 &mut self,
912 peer_id: &PeerId,
913 topic_hash: &TopicHash,
914 validated_time: Option<Instant>,
915 ) {
916 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
917 let now = if validated_time.is_some() {
918 Some(Instant::now())
919 } else {
920 None
921 };
922 if let Some(topic_stats) =
923 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
924 {
925 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
926 let topic_params = self
927 .params
928 .topics
929 .get(topic_hash)
930 .expect("Topic must exist if there are known topic_stats");
931
932 let mut falls_in_mesh_deliver_window = true;
937 if let Some(validated_time) = validated_time {
938 if let Some(now) = &now {
939 let window_time = validated_time
941 .checked_add(topic_params.mesh_message_deliveries_window)
942 .unwrap_or(*now);
943 if now > &window_time {
944 falls_in_mesh_deliver_window = false;
945 }
946 }
947 }
948
949 if falls_in_mesh_deliver_window {
950 let cap = topic_params.mesh_message_deliveries_cap;
951 topic_stats.mesh_message_deliveries =
952 if topic_stats.mesh_message_deliveries + 1f64 > cap {
953 cap
954 } else {
955 topic_stats.mesh_message_deliveries + 1f64
956 };
957 }
958 }
959 }
960 }
961 }
962
963 pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
964 self.peer_stats
965 .get(peer)
966 .and_then(|s| s.topics.get(topic))
967 .map(|t| t.mesh_message_deliveries)
968 }
969}
970
971#[derive(Clone, Copy)]
973pub(crate) enum RejectReason {
974 ValidationError(ValidationError),
976 SelfOrigin,
978 BlackListedPeer,
980 BlackListedSource,
982 ValidationIgnored,
984 ValidationFailed,
986}