1use std::{
24 collections::{hash_map, HashMap, HashSet},
25 net::IpAddr,
26 time::Duration,
27};
28
29use libp2p_identity::PeerId;
30use web_time::Instant;
31
32use crate::{
33 metrics::{Metrics, Penalty},
34 time_cache::TimeCache,
35 MessageId, TopicHash,
36};
37
38mod params;
39pub use params::{
40 score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
41 TopicScoreParams,
42};
43
44use crate::ValidationError;
45
46#[cfg(test)]
47mod tests;
48
49const TIME_CACHE_DURATION: u64 = 120;
51
52pub(crate) struct PeerScore {
53 pub(crate) params: PeerScoreParams,
55 peer_stats: HashMap<PeerId, PeerStats>,
57 peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
59 deliveries: TimeCache<MessageId, DeliveryRecord>,
61 message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
63}
64
65struct PeerStats {
67 status: ConnectionStatus,
69 topics: HashMap<TopicHash, TopicStats>,
71 known_ips: HashSet<IpAddr>,
73 behaviour_penalty: f64,
75 application_score: f64,
77 slow_peer_penalty: f64,
79}
80
81enum ConnectionStatus {
82 Connected,
84 Disconnected {
86 expire: Instant,
88 },
89}
90
91impl Default for PeerStats {
92 fn default() -> Self {
93 PeerStats {
94 status: ConnectionStatus::Connected,
95 topics: HashMap::new(),
96 known_ips: HashSet::new(),
97 behaviour_penalty: 0f64,
98 application_score: 0f64,
99 slow_peer_penalty: 0f64,
100 }
101 }
102}
103
104impl PeerStats {
105 pub(crate) fn stats_or_default_mut(
109 &mut self,
110 topic_hash: TopicHash,
111 params: &PeerScoreParams,
112 ) -> Option<&mut TopicStats> {
113 if params.topics.contains_key(&topic_hash) {
114 Some(self.topics.entry(topic_hash).or_default())
115 } else {
116 self.topics.get_mut(&topic_hash)
117 }
118 }
119}
120
121struct TopicStats {
123 mesh_status: MeshStatus,
124 first_message_deliveries: f64,
126 mesh_message_deliveries_active: bool,
128 mesh_message_deliveries: f64,
130 mesh_failure_penalty: f64,
132 invalid_message_deliveries: f64,
134}
135
136impl TopicStats {
137 pub(crate) fn in_mesh(&self) -> bool {
139 matches!(self.mesh_status, MeshStatus::Active { .. })
140 }
141}
142
143enum MeshStatus {
145 Active {
146 graft_time: Instant,
148 mesh_time: Duration,
150 },
151 InActive,
152}
153
154impl MeshStatus {
155 pub(crate) fn new_active() -> Self {
157 MeshStatus::Active {
158 graft_time: Instant::now(),
159 mesh_time: Duration::from_secs(0),
160 }
161 }
162}
163
164impl Default for TopicStats {
165 fn default() -> Self {
166 TopicStats {
167 mesh_status: MeshStatus::InActive,
168 first_message_deliveries: Default::default(),
169 mesh_message_deliveries_active: Default::default(),
170 mesh_message_deliveries: Default::default(),
171 mesh_failure_penalty: Default::default(),
172 invalid_message_deliveries: Default::default(),
173 }
174 }
175}
176
177#[derive(PartialEq, Debug)]
178struct DeliveryRecord {
179 status: DeliveryStatus,
180 first_seen: Instant,
181 peers: HashSet<PeerId>,
182}
183
184#[derive(PartialEq, Debug)]
185enum DeliveryStatus {
186 Unknown,
188 Valid(Instant),
190 Invalid,
192 Ignored,
194}
195
196impl Default for DeliveryRecord {
197 fn default() -> Self {
198 DeliveryRecord {
199 status: DeliveryStatus::Unknown,
200 first_seen: Instant::now(),
201 peers: HashSet::new(),
202 }
203 }
204}
205
206impl PeerScore {
207 #[allow(dead_code)]
209 pub(crate) fn new(params: PeerScoreParams) -> Self {
210 Self::new_with_message_delivery_time_callback(params, None)
211 }
212
213 pub(crate) fn new_with_message_delivery_time_callback(
214 params: PeerScoreParams,
215 callback: Option<fn(&PeerId, &TopicHash, f64)>,
216 ) -> Self {
217 PeerScore {
218 params,
219 peer_stats: HashMap::new(),
220 peer_ips: HashMap::new(),
221 deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
222 message_delivery_time_callback: callback,
223 }
224 }
225
226 pub(crate) fn score(&self, peer_id: &PeerId) -> f64 {
228 self.metric_score(peer_id, None)
229 }
230
231 pub(crate) fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 {
234 let Some(peer_stats) = self.peer_stats.get(peer_id) else {
235 return 0.0;
236 };
237 let mut score = 0.0;
238
239 for (topic, topic_stats) in peer_stats.topics.iter() {
241 if let Some(topic_params) = self.params.topics.get(topic) {
243 let mut topic_score = 0.0;
247
248 if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
250 let p1 = {
251 let v = mesh_time.as_secs_f64()
252 / topic_params.time_in_mesh_quantum.as_secs_f64();
253 if v < topic_params.time_in_mesh_cap {
254 v
255 } else {
256 topic_params.time_in_mesh_cap
257 }
258 };
259 topic_score += p1 * topic_params.time_in_mesh_weight;
260 }
261
262 let p2 = {
264 let v = topic_stats.first_message_deliveries;
265 if v < topic_params.first_message_deliveries_cap {
266 v
267 } else {
268 topic_params.first_message_deliveries_cap
269 }
270 };
271 topic_score += p2 * topic_params.first_message_deliveries_weight;
272
273 if topic_stats.mesh_message_deliveries_active
275 && topic_stats.mesh_message_deliveries
276 < topic_params.mesh_message_deliveries_threshold
277 {
278 let deficit = topic_params.mesh_message_deliveries_threshold
279 - topic_stats.mesh_message_deliveries;
280 let p3 = deficit * deficit;
281 topic_score += p3 * topic_params.mesh_message_deliveries_weight;
282 if let Some(metrics) = metrics.as_mut() {
283 metrics.register_score_penalty(Penalty::MessageDeficit);
284 }
285 tracing::debug!(
286 peer=%peer_id,
287 %topic,
288 %deficit,
289 penalty=%topic_score,
290 "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
291 );
292 }
293
294 let p3b = topic_stats.mesh_failure_penalty;
298 topic_score += p3b * topic_params.mesh_failure_penalty_weight;
299
300 let p4 =
304 topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
305 topic_score += p4 * topic_params.invalid_message_deliveries_weight;
306
307 score += topic_score * topic_params.topic_weight;
309 }
310 }
311
312 if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap {
314 score = self.params.topic_score_cap;
315 }
316
317 let p5 = peer_stats.application_score;
319 score += p5 * self.params.app_specific_weight;
320
321 for ip in peer_stats.known_ips.iter() {
323 if self.params.ip_colocation_factor_whitelist.contains(ip) {
324 continue;
325 }
326
327 if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
332 if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold {
333 let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
334 let p6 = surplus * surplus;
335 if let Some(metrics) = metrics.as_mut() {
336 metrics.register_score_penalty(Penalty::IPColocation);
337 }
338 tracing::debug!(
339 peer=%peer_id,
340 surplus_ip=%ip,
341 surplus=%surplus,
342 "[Penalty] The peer gets penalized because of too many peers with the same ip"
343 );
344 score += p6 * self.params.ip_colocation_factor_weight;
345 }
346 }
347 }
348
349 if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
351 let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
352 let p7 = excess * excess;
353 score += p7 * self.params.behaviour_penalty_weight;
354 }
355
356 if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
358 let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
359 score += excess * self.params.slow_peer_weight;
360 }
361
362 score
363 }
364
365 pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
366 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
367 tracing::debug!(
368 peer=%peer_id,
369 %count,
370 "[Penalty] Behavioral penalty for peer"
371 );
372 peer_stats.behaviour_penalty += count as f64;
373 }
374 }
375
376 fn remove_ips_for_peer(
377 peer_stats: &PeerStats,
378 peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
379 peer_id: &PeerId,
380 ) {
381 for ip in peer_stats.known_ips.iter() {
382 if let Some(peer_set) = peer_ips.get_mut(ip) {
383 peer_set.remove(peer_id);
384 }
385 }
386 }
387
388 pub(crate) fn refresh_scores(&mut self) {
389 let now = Instant::now();
390 let params_ref = &self.params;
391 let peer_ips_ref = &mut self.peer_ips;
392 self.peer_stats.retain(|peer_id, peer_stats| {
393 if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
394 if now > expire {
396 Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
398 return false;
400 }
401
402 return true;
407 }
408
409 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
410 if let Some(topic_params) = params_ref.topics.get(topic) {
412 topic_stats.first_message_deliveries *=
414 topic_params.first_message_deliveries_decay;
415 if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
416 topic_stats.first_message_deliveries = 0.0;
417 }
418 topic_stats.mesh_message_deliveries *=
419 topic_params.mesh_message_deliveries_decay;
420 if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
421 topic_stats.mesh_message_deliveries = 0.0;
422 }
423 topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
424 if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
425 topic_stats.mesh_failure_penalty = 0.0;
426 }
427 topic_stats.invalid_message_deliveries *=
428 topic_params.invalid_message_deliveries_decay;
429 if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
430 topic_stats.invalid_message_deliveries = 0.0;
431 }
432 if let MeshStatus::Active {
434 ref mut mesh_time,
435 ref mut graft_time,
436 } = topic_stats.mesh_status
437 {
438 *mesh_time = now.duration_since(*graft_time);
439 if *mesh_time > topic_params.mesh_message_deliveries_activation {
440 topic_stats.mesh_message_deliveries_active = true;
441 }
442 }
443 }
444 }
445
446 peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
448 if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
449 peer_stats.behaviour_penalty = 0.0;
450 }
451
452 peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
454 if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
455 peer_stats.slow_peer_penalty = 0.0;
456 }
457
458 true
459 });
460 }
461
462 pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
465 let peer_stats = self.peer_stats.entry(peer_id).or_default();
466
467 peer_stats.status = ConnectionStatus::Connected;
469 }
470
471 pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
473 tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
474 let peer_stats = self.peer_stats.entry(*peer_id).or_default();
475
476 peer_stats.status = ConnectionStatus::Connected;
479
480 peer_stats.known_ips.insert(ip);
482 self.peer_ips.entry(ip).or_default().insert(*peer_id);
483 }
484
485 pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
487 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
488 peer_stats.slow_peer_penalty += 1.0;
489 tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
490 }
491 }
492
493 pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
495 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
496 peer_stats.known_ips.remove(ip);
497 if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
498 tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
499 peer_ids.remove(peer_id);
500 } else {
501 tracing::trace!(
502 peer=%peer_id,
503 %ip,
504 "No entry in peer_ips for ip which should get removed for peer"
505 );
506 }
507 } else {
508 tracing::trace!(
509 peer=%peer_id,
510 %ip,
511 "No peer_stats for peer which should remove the ip"
512 );
513 }
514 }
515
516 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
519 if self.score(peer_id) > 0f64 {
521 if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
522 Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
523 entry.remove();
524 }
525 return;
526 }
527
528 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
531 for (topic, topic_stats) in peer_stats.topics.iter_mut() {
532 topic_stats.first_message_deliveries = 0f64;
533
534 if let Some(threshold) = self
535 .params
536 .topics
537 .get(topic)
538 .map(|param| param.mesh_message_deliveries_threshold)
539 {
540 if topic_stats.in_mesh()
541 && topic_stats.mesh_message_deliveries_active
542 && topic_stats.mesh_message_deliveries < threshold
543 {
544 let deficit = threshold - topic_stats.mesh_message_deliveries;
545 topic_stats.mesh_failure_penalty += deficit * deficit;
546 }
547 }
548
549 topic_stats.mesh_status = MeshStatus::InActive;
550 topic_stats.mesh_message_deliveries_active = false;
551 }
552
553 peer_stats.status = ConnectionStatus::Disconnected {
554 expire: Instant::now() + self.params.retain_score,
555 };
556 }
557 }
558
559 pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
561 let topic = topic.into();
562 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
563 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
565 topic_stats.mesh_status = MeshStatus::new_active();
566 topic_stats.mesh_message_deliveries_active = false;
567 }
568 }
569 }
570
571 pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
573 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
574 if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
576 {
577 let threshold = self
579 .params
580 .topics
581 .get(&topic)
582 .expect("Topic must exist in order for there to be topic stats")
583 .mesh_message_deliveries_threshold;
584 if topic_stats.mesh_message_deliveries_active
585 && topic_stats.mesh_message_deliveries < threshold
586 {
587 let deficit = threshold - topic_stats.mesh_message_deliveries;
588 topic_stats.mesh_failure_penalty += deficit * deficit;
589 }
590 topic_stats.mesh_message_deliveries_active = false;
591 topic_stats.mesh_status = MeshStatus::InActive;
592 }
593 }
594 }
595
596 pub(crate) fn validate_message(
597 &mut self,
598 from: &PeerId,
599 msg_id: &MessageId,
600 topic_hash: &TopicHash,
601 ) {
602 self.deliveries.entry(msg_id.clone()).or_default();
604
605 if let Some(callback) = self.message_delivery_time_callback {
606 if self
607 .peer_stats
608 .get(from)
609 .and_then(|s| s.topics.get(topic_hash))
610 .map(|ts| ts.in_mesh())
611 .unwrap_or(false)
612 {
613 callback(from, topic_hash, 0.0);
614 }
615 }
616 }
617
618 pub(crate) fn deliver_message(
619 &mut self,
620 from: &PeerId,
621 msg_id: &MessageId,
622 topic_hash: &TopicHash,
623 ) {
624 self.mark_first_message_delivery(from, topic_hash);
625
626 let record = self.deliveries.entry(msg_id.clone()).or_default();
627
628 if record.status != DeliveryStatus::Unknown {
630 tracing::warn!(
631 peer=%from,
632 status=?record.status,
633 first_seen=?record.first_seen.elapsed().as_secs(),
634 "Unexpected delivery trace"
635 );
636 return;
637 }
638
639 record.status = DeliveryStatus::Valid(Instant::now());
641 for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
642 if &peer != from {
645 self.mark_duplicate_message_delivery(&peer, topic_hash, None);
646 }
647 }
648 }
649
650 pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
653 tracing::debug!(
654 peer=%from,
655 "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
656 );
657
658 self.mark_invalid_message_delivery(from, topic_hash);
659 }
660
661 pub(crate) fn reject_message(
663 &mut self,
664 from: &PeerId,
665 msg_id: &MessageId,
666 topic_hash: &TopicHash,
667 reason: RejectReason,
668 ) {
669 match reason {
670 RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
672 self.reject_invalid_message(from, topic_hash);
673 return;
674 }
675 RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
677 return;
678 }
679 _ => {} }
681
682 let peers: Vec<_> = {
683 let record = self.deliveries.entry(msg_id.clone()).or_default();
684
685 if record.status != DeliveryStatus::Unknown {
688 return;
689 }
690
691 if let RejectReason::ValidationIgnored = reason {
692 record.status = DeliveryStatus::Ignored;
695 record.peers.clear();
696 return;
697 }
698
699 record.status = DeliveryStatus::Invalid;
701 record.peers.drain().collect()
703 };
704
705 self.mark_invalid_message_delivery(from, topic_hash);
706 for peer_id in peers.iter() {
707 self.mark_invalid_message_delivery(peer_id, topic_hash)
708 }
709 }
710
711 pub(crate) fn duplicated_message(
712 &mut self,
713 from: &PeerId,
714 msg_id: &MessageId,
715 topic_hash: &TopicHash,
716 ) {
717 let record = self.deliveries.entry(msg_id.clone()).or_default();
718
719 if record.peers.contains(from) {
720 return;
722 }
723
724 if let Some(callback) = self.message_delivery_time_callback {
725 let time = if let DeliveryStatus::Valid(validated) = record.status {
726 validated.elapsed().as_secs_f64()
727 } else {
728 0.0
729 };
730 if self
731 .peer_stats
732 .get(from)
733 .and_then(|s| s.topics.get(topic_hash))
734 .map(|ts| ts.in_mesh())
735 .unwrap_or(false)
736 {
737 callback(from, topic_hash, time);
738 }
739 }
740
741 match record.status {
742 DeliveryStatus::Unknown => {
743 record.peers.insert(*from);
746 }
747 DeliveryStatus::Valid(validated) => {
748 record.peers.insert(*from);
750 self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
751 }
752 DeliveryStatus::Invalid => {
753 self.mark_invalid_message_delivery(from, topic_hash);
755 }
756 DeliveryStatus::Ignored => {
757 }
759 }
760 }
761
762 pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
765 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
766 peer_stats.application_score = new_score;
767 true
768 } else {
769 false
770 }
771 }
772
773 pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
775 use hash_map::Entry::*;
776 match self.params.topics.entry(topic_hash.clone()) {
777 Occupied(mut entry) => {
778 let first_message_deliveries_cap = params.first_message_deliveries_cap;
779 let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
780 let old_params = entry.insert(params);
781
782 if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
783 for stats in &mut self.peer_stats.values_mut() {
784 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
785 if tstats.first_message_deliveries > first_message_deliveries_cap {
786 tstats.first_message_deliveries = first_message_deliveries_cap;
787 }
788 }
789 }
790 }
791
792 if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
793 for stats in self.peer_stats.values_mut() {
794 if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
795 if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
796 tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
797 }
798 }
799 }
800 }
801 }
802 Vacant(entry) => {
803 entry.insert(params);
804 }
805 }
806 }
807
808 pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
810 self.params.topics.get(topic_hash)
811 }
812
813 fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
816 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
817 if let Some(topic_stats) =
818 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
819 {
820 tracing::debug!(
821 peer=%peer_id,
822 topic=%topic_hash,
823 "[Penalty] Peer delivered an invalid message in topic and gets penalized \
824 for it",
825 );
826 topic_stats.invalid_message_deliveries += 1f64;
827 }
828 }
829 }
830
831 fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
835 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
836 if let Some(topic_stats) =
837 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
838 {
839 let cap = self
840 .params
841 .topics
842 .get(topic_hash)
843 .expect("Topic must exist if there are known topic_stats")
844 .first_message_deliveries_cap;
845 topic_stats.first_message_deliveries =
846 if topic_stats.first_message_deliveries + 1f64 > cap {
847 cap
848 } else {
849 topic_stats.first_message_deliveries + 1f64
850 };
851
852 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
853 let cap = self
854 .params
855 .topics
856 .get(topic_hash)
857 .expect("Topic must exist if there are known topic_stats")
858 .mesh_message_deliveries_cap;
859
860 topic_stats.mesh_message_deliveries =
861 if topic_stats.mesh_message_deliveries + 1f64 > cap {
862 cap
863 } else {
864 topic_stats.mesh_message_deliveries + 1f64
865 };
866 }
867 }
868 }
869 }
870
871 fn mark_duplicate_message_delivery(
874 &mut self,
875 peer_id: &PeerId,
876 topic_hash: &TopicHash,
877 validated_time: Option<Instant>,
878 ) {
879 if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
880 let now = if validated_time.is_some() {
881 Some(Instant::now())
882 } else {
883 None
884 };
885 if let Some(topic_stats) =
886 peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
887 {
888 if let MeshStatus::Active { .. } = topic_stats.mesh_status {
889 let topic_params = self
890 .params
891 .topics
892 .get(topic_hash)
893 .expect("Topic must exist if there are known topic_stats");
894
895 let mut falls_in_mesh_deliver_window = true;
900 if let Some(validated_time) = validated_time {
901 if let Some(now) = &now {
902 let window_time = validated_time
904 .checked_add(topic_params.mesh_message_deliveries_window)
905 .unwrap_or(*now);
906 if now > &window_time {
907 falls_in_mesh_deliver_window = false;
908 }
909 }
910 }
911
912 if falls_in_mesh_deliver_window {
913 let cap = topic_params.mesh_message_deliveries_cap;
914 topic_stats.mesh_message_deliveries =
915 if topic_stats.mesh_message_deliveries + 1f64 > cap {
916 cap
917 } else {
918 topic_stats.mesh_message_deliveries + 1f64
919 };
920 }
921 }
922 }
923 }
924 }
925
926 pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
927 self.peer_stats
928 .get(peer)
929 .and_then(|s| s.topics.get(topic))
930 .map(|t| t.mesh_message_deliveries)
931 }
932}
933
934#[derive(Clone, Copy)]
936pub(crate) enum RejectReason {
937 ValidationError(ValidationError),
939 SelfOrigin,
941 BlackListedPeer,
943 BlackListedSource,
945 ValidationIgnored,
947 ValidationFailed,
949}