1use std::{
24    collections::{hash_map, HashMap, HashSet},
25    net::IpAddr,
26    time::Duration,
27};
28
29use futures_timer::Delay;
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::{time_cache::TimeCache, MessageId, TopicHash};
34
35mod params;
36pub use params::{
37    score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
38    TopicScoreParams,
39};
40
41use crate::ValidationError;
42
43#[cfg(test)]
44mod tests;
45
46const TIME_CACHE_DURATION: u64 = 120;
48
49pub(crate) enum PeerScoreState {
52    Active(Box<PeerScore>),
53    Disabled,
54}
55
56impl PeerScoreState {
57    pub(crate) fn below_threshold(
60        &self,
61        peer_id: &PeerId,
62        threshold: impl Fn(&PeerScoreThresholds) -> f64,
63    ) -> (bool, f64) {
64        match self {
65            PeerScoreState::Active(active) => {
66                let score = active.score_report(peer_id).score;
67                (score < threshold(&active.thresholds), score)
68            }
69            PeerScoreState::Disabled => (false, 0.0),
70        }
71    }
72}
73
74#[derive(Default)]
77pub(crate) struct PeerScoreReport {
78    pub(crate) score: f64,
79    #[cfg(feature = "metrics")]
80    pub(crate) penalties: Vec<crate::metrics::Penalty>,
81}
82
83pub(crate) struct PeerScore {
84    pub(crate) params: PeerScoreParams,
86    pub(crate) thresholds: PeerScoreThresholds,
88    pub(crate) decay_interval: Delay,
90    peer_stats: HashMap<PeerId, PeerStats>,
92    peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
94    deliveries: TimeCache<MessageId, DeliveryRecord>,
96    message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
98}
99
100struct PeerStats {
102    status: ConnectionStatus,
104    topics: HashMap<TopicHash, TopicStats>,
106    known_ips: HashSet<IpAddr>,
108    behaviour_penalty: f64,
110    application_score: f64,
112    slow_peer_penalty: f64,
114}
115
116enum ConnectionStatus {
117    Connected,
119    Disconnected {
121        expire: Instant,
123    },
124}
125
126impl Default for PeerStats {
127    fn default() -> Self {
128        PeerStats {
129            status: ConnectionStatus::Connected,
130            topics: HashMap::new(),
131            known_ips: HashSet::new(),
132            behaviour_penalty: 0f64,
133            application_score: 0f64,
134            slow_peer_penalty: 0f64,
135        }
136    }
137}
138
139impl PeerStats {
140    pub(crate) fn stats_or_default_mut(
144        &mut self,
145        topic_hash: TopicHash,
146        params: &PeerScoreParams,
147    ) -> Option<&mut TopicStats> {
148        #[allow(
149            clippy::map_entry,
150            reason = "False positive, see rust-lang/rust-clippy#14449."
151        )]
152        if params.topics.contains_key(&topic_hash) {
153            Some(self.topics.entry(topic_hash).or_default())
154        } else {
155            self.topics.get_mut(&topic_hash)
156        }
157    }
158}
159
160struct TopicStats {
162    mesh_status: MeshStatus,
163    first_message_deliveries: f64,
165    mesh_message_deliveries_active: bool,
167    mesh_message_deliveries: f64,
169    mesh_failure_penalty: f64,
171    invalid_message_deliveries: f64,
173}
174
175impl TopicStats {
176    pub(crate) fn in_mesh(&self) -> bool {
178        matches!(self.mesh_status, MeshStatus::Active { .. })
179    }
180}
181
182enum MeshStatus {
184    Active {
185        graft_time: Instant,
187        mesh_time: Duration,
189    },
190    InActive,
191}
192
193impl MeshStatus {
194    pub(crate) fn new_active() -> Self {
196        MeshStatus::Active {
197            graft_time: Instant::now(),
198            mesh_time: Duration::from_secs(0),
199        }
200    }
201}
202
203impl Default for TopicStats {
204    fn default() -> Self {
205        TopicStats {
206            mesh_status: MeshStatus::InActive,
207            first_message_deliveries: Default::default(),
208            mesh_message_deliveries_active: Default::default(),
209            mesh_message_deliveries: Default::default(),
210            mesh_failure_penalty: Default::default(),
211            invalid_message_deliveries: Default::default(),
212        }
213    }
214}
215
216#[derive(PartialEq, Debug)]
217struct DeliveryRecord {
218    status: DeliveryStatus,
219    first_seen: Instant,
220    peers: HashSet<PeerId>,
221}
222
223#[derive(PartialEq, Debug)]
224enum DeliveryStatus {
225    Unknown,
227    Valid(Instant),
229    Invalid,
231    Ignored,
233}
234
235impl Default for DeliveryRecord {
236    fn default() -> Self {
237        DeliveryRecord {
238            status: DeliveryStatus::Unknown,
239            first_seen: Instant::now(),
240            peers: HashSet::new(),
241        }
242    }
243}
244
245impl PeerScore {
246    #[allow(dead_code)]
248    pub(crate) fn new(params: PeerScoreParams, thresholds: PeerScoreThresholds) -> Self {
249        Self::new_with_message_delivery_time_callback(params, thresholds, None)
250    }
251
252    pub(crate) fn new_with_message_delivery_time_callback(
253        params: PeerScoreParams,
254        thresholds: PeerScoreThresholds,
255        callback: Option<fn(&PeerId, &TopicHash, f64)>,
256    ) -> Self {
257        PeerScore {
258            decay_interval: Delay::new(params.decay_interval),
259            params,
260            thresholds,
261            peer_stats: HashMap::new(),
262            peer_ips: HashMap::new(),
263            deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
264            message_delivery_time_callback: callback,
265        }
266    }
267
268    pub(crate) fn score_report(&self, peer_id: &PeerId) -> PeerScoreReport {
271        let mut report = PeerScoreReport::default();
272        let Some(peer_stats) = self.peer_stats.get(peer_id) else {
273            return report;
274        };
275
276        for (topic, topic_stats) in peer_stats.topics.iter() {
278            if let Some(topic_params) = self.params.topics.get(topic) {
280                let mut topic_score = 0.0;
284
285                if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
287                    let p1 = {
288                        let v = mesh_time.as_secs_f64()
289                            / topic_params.time_in_mesh_quantum.as_secs_f64();
290                        if v < topic_params.time_in_mesh_cap {
291                            v
292                        } else {
293                            topic_params.time_in_mesh_cap
294                        }
295                    };
296                    topic_score += p1 * topic_params.time_in_mesh_weight;
297                }
298
299                let p2 = {
301                    let v = topic_stats.first_message_deliveries;
302                    if v < topic_params.first_message_deliveries_cap {
303                        v
304                    } else {
305                        topic_params.first_message_deliveries_cap
306                    }
307                };
308                topic_score += p2 * topic_params.first_message_deliveries_weight;
309
310                if topic_stats.mesh_message_deliveries_active
312                    && topic_stats.mesh_message_deliveries
313                        < topic_params.mesh_message_deliveries_threshold
314                    && topic_params.mesh_message_deliveries_weight != 0.0
315                {
316                    let deficit = topic_params.mesh_message_deliveries_threshold
317                        - topic_stats.mesh_message_deliveries;
318                    let p3 = deficit * deficit;
319                    let penalty = p3 * topic_params.mesh_message_deliveries_weight;
320
321                    topic_score += penalty;
322                    #[cfg(feature = "metrics")]
323                    report
324                        .penalties
325                        .push(crate::metrics::Penalty::MessageDeficit);
326                    tracing::debug!(
327                        peer=%peer_id,
328                        %topic,
329                        %deficit,
330                        penalty=%penalty,
331                        "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
332                    );
333                }
334
335                let p3b = topic_stats.mesh_failure_penalty;
339                topic_score += p3b * topic_params.mesh_failure_penalty_weight;
340
341                let p4 =
345                    topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
346                topic_score += p4 * topic_params.invalid_message_deliveries_weight;
347
348                report.score += topic_score * topic_params.topic_weight;
350            }
351        }
352
353        if self.params.topic_score_cap > 0f64 && report.score > self.params.topic_score_cap {
355            report.score = self.params.topic_score_cap;
356        }
357
358        let p5 = peer_stats.application_score;
360        report.score += p5 * self.params.app_specific_weight;
361
362        for ip in peer_stats.known_ips.iter() {
364            if self.params.ip_colocation_factor_whitelist.contains(ip) {
365                continue;
366            }
367
368            if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
373                if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold
374                    && self.params.ip_colocation_factor_weight != 0.0
375                {
376                    let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
377                    let p6 = surplus * surplus;
378                    #[cfg(feature = "metrics")]
379                    report.penalties.push(crate::metrics::Penalty::IPColocation);
380                    tracing::debug!(
381                        peer=%peer_id,
382                        surplus_ip=%ip,
383                        surplus=%surplus,
384                        "[Penalty] The peer gets penalized because of too many peers with the same ip"
385                    );
386                    report.score += p6 * self.params.ip_colocation_factor_weight;
387                }
388            }
389        }
390
391        if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
393            let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
394            let p7 = excess * excess;
395            report.score += p7 * self.params.behaviour_penalty_weight;
396        }
397
398        if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
400            let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
401            report.score += excess * self.params.slow_peer_weight;
402        }
403
404        report
405    }
406
407    pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
408        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
409            tracing::debug!(
410                peer=%peer_id,
411                %count,
412                "[Penalty] Behavioral penalty for peer"
413            );
414            peer_stats.behaviour_penalty += count as f64;
415        }
416    }
417
418    fn remove_ips_for_peer(
419        peer_stats: &PeerStats,
420        peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
421        peer_id: &PeerId,
422    ) {
423        for ip in peer_stats.known_ips.iter() {
424            if let Some(peer_set) = peer_ips.get_mut(ip) {
425                peer_set.remove(peer_id);
426            }
427        }
428    }
429
430    pub(crate) fn refresh_scores(&mut self) {
431        let now = Instant::now();
432        let params_ref = &self.params;
433        let peer_ips_ref = &mut self.peer_ips;
434        self.peer_stats.retain(|peer_id, peer_stats| {
435            if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
436                if now > expire {
438                    Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
440                    return false;
442                }
443
444                return true;
449            }
450
451            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
452                if let Some(topic_params) = params_ref.topics.get(topic) {
454                    topic_stats.first_message_deliveries *=
456                        topic_params.first_message_deliveries_decay;
457                    if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
458                        topic_stats.first_message_deliveries = 0.0;
459                    }
460                    topic_stats.mesh_message_deliveries *=
461                        topic_params.mesh_message_deliveries_decay;
462                    if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
463                        topic_stats.mesh_message_deliveries = 0.0;
464                    }
465                    topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
466                    if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
467                        topic_stats.mesh_failure_penalty = 0.0;
468                    }
469                    topic_stats.invalid_message_deliveries *=
470                        topic_params.invalid_message_deliveries_decay;
471                    if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
472                        topic_stats.invalid_message_deliveries = 0.0;
473                    }
474                    if let MeshStatus::Active {
476                        ref mut mesh_time,
477                        ref mut graft_time,
478                    } = topic_stats.mesh_status
479                    {
480                        *mesh_time = now.duration_since(*graft_time);
481                        if *mesh_time > topic_params.mesh_message_deliveries_activation {
482                            topic_stats.mesh_message_deliveries_active = true;
483                        }
484                    }
485                }
486            }
487
488            peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
490            if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
491                peer_stats.behaviour_penalty = 0.0;
492            }
493
494            peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
496            if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
497                peer_stats.slow_peer_penalty = 0.0;
498            }
499
500            true
501        });
502    }
503
504    pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
507        let peer_stats = self.peer_stats.entry(peer_id).or_default();
508
509        peer_stats.status = ConnectionStatus::Connected;
511    }
512
513    pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
515        tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
516        let peer_stats = self.peer_stats.entry(*peer_id).or_default();
517
518        peer_stats.status = ConnectionStatus::Connected;
521
522        peer_stats.known_ips.insert(ip);
524        self.peer_ips.entry(ip).or_default().insert(*peer_id);
525    }
526
527    pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
529        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
530            peer_stats.slow_peer_penalty += 1.0;
531            tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
532        }
533    }
534
535    pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
537        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
538            peer_stats.known_ips.remove(ip);
539            if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
540                tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
541                peer_ids.remove(peer_id);
542            } else {
543                tracing::trace!(
544                    peer=%peer_id,
545                    %ip,
546                    "No entry in peer_ips for ip which should get removed for peer"
547                );
548            }
549        } else {
550            tracing::trace!(
551                peer=%peer_id,
552                %ip,
553                "No peer_stats for peer which should remove the ip"
554            );
555        }
556    }
557
558    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
561        if self.score_report(peer_id).score > 0f64 {
563            if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
564                Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
565                entry.remove();
566            }
567            return;
568        }
569
570        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
573            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
574                topic_stats.first_message_deliveries = 0f64;
575
576                if let Some(threshold) = self
577                    .params
578                    .topics
579                    .get(topic)
580                    .map(|param| param.mesh_message_deliveries_threshold)
581                {
582                    if topic_stats.in_mesh()
583                        && topic_stats.mesh_message_deliveries_active
584                        && topic_stats.mesh_message_deliveries < threshold
585                    {
586                        let deficit = threshold - topic_stats.mesh_message_deliveries;
587                        topic_stats.mesh_failure_penalty += deficit * deficit;
588                    }
589                }
590
591                topic_stats.mesh_status = MeshStatus::InActive;
592                topic_stats.mesh_message_deliveries_active = false;
593            }
594
595            peer_stats.status = ConnectionStatus::Disconnected {
596                expire: Instant::now() + self.params.retain_score,
597            };
598        }
599    }
600
601    pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
603        let topic = topic.into();
604        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
605            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
607                topic_stats.mesh_status = MeshStatus::new_active();
608                topic_stats.mesh_message_deliveries_active = false;
609            }
610        }
611    }
612
613    pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
615        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
616            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
618            {
619                let threshold = self
621                    .params
622                    .topics
623                    .get(&topic)
624                    .expect("Topic must exist in order for there to be topic stats")
625                    .mesh_message_deliveries_threshold;
626                if topic_stats.mesh_message_deliveries_active
627                    && topic_stats.mesh_message_deliveries < threshold
628                {
629                    let deficit = threshold - topic_stats.mesh_message_deliveries;
630                    topic_stats.mesh_failure_penalty += deficit * deficit;
631                }
632                topic_stats.mesh_message_deliveries_active = false;
633                topic_stats.mesh_status = MeshStatus::InActive;
634            }
635        }
636    }
637
638    pub(crate) fn validate_message(
639        &mut self,
640        from: &PeerId,
641        msg_id: &MessageId,
642        topic_hash: &TopicHash,
643    ) {
644        self.deliveries.entry(msg_id.clone()).or_default();
646
647        if let Some(callback) = self.message_delivery_time_callback {
648            if self
649                .peer_stats
650                .get(from)
651                .and_then(|s| s.topics.get(topic_hash))
652                .map(|ts| ts.in_mesh())
653                .unwrap_or(false)
654            {
655                callback(from, topic_hash, 0.0);
656            }
657        }
658    }
659
660    pub(crate) fn deliver_message(
661        &mut self,
662        from: &PeerId,
663        msg_id: &MessageId,
664        topic_hash: &TopicHash,
665    ) {
666        self.mark_first_message_delivery(from, topic_hash);
667
668        let record = self.deliveries.entry(msg_id.clone()).or_default();
669
670        if record.status != DeliveryStatus::Unknown {
672            tracing::warn!(
673                peer=%from,
674                status=?record.status,
675                first_seen=?record.first_seen.elapsed().as_secs(),
676                "Unexpected delivery trace"
677            );
678            return;
679        }
680
681        record.status = DeliveryStatus::Valid(Instant::now());
683        for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
684            if &peer != from {
687                self.mark_duplicate_message_delivery(&peer, topic_hash, None);
688            }
689        }
690    }
691
692    pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
695        tracing::debug!(
696            peer=%from,
697            "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
698        );
699
700        self.mark_invalid_message_delivery(from, topic_hash);
701    }
702
703    pub(crate) fn reject_message(
705        &mut self,
706        from: &PeerId,
707        msg_id: &MessageId,
708        topic_hash: &TopicHash,
709        reason: RejectReason,
710    ) {
711        match reason {
712            RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
714                self.reject_invalid_message(from, topic_hash);
715                return;
716            }
717            RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
719                return;
720            }
721            _ => {} }
723
724        let peers: Vec<_> = {
725            let record = self.deliveries.entry(msg_id.clone()).or_default();
726
727            if record.status != DeliveryStatus::Unknown {
730                return;
731            }
732
733            if let RejectReason::ValidationIgnored = reason {
734                record.status = DeliveryStatus::Ignored;
737                record.peers.clear();
738                return;
739            }
740
741            record.status = DeliveryStatus::Invalid;
743            record.peers.drain().collect()
745        };
746
747        self.mark_invalid_message_delivery(from, topic_hash);
748        for peer_id in peers.iter() {
749            self.mark_invalid_message_delivery(peer_id, topic_hash)
750        }
751    }
752
753    pub(crate) fn duplicated_message(
754        &mut self,
755        from: &PeerId,
756        msg_id: &MessageId,
757        topic_hash: &TopicHash,
758    ) {
759        let record = self.deliveries.entry(msg_id.clone()).or_default();
760
761        if record.peers.contains(from) {
762            return;
764        }
765
766        if let Some(callback) = self.message_delivery_time_callback {
767            let time = if let DeliveryStatus::Valid(validated) = record.status {
768                validated.elapsed().as_secs_f64()
769            } else {
770                0.0
771            };
772            if self
773                .peer_stats
774                .get(from)
775                .and_then(|s| s.topics.get(topic_hash))
776                .map(|ts| ts.in_mesh())
777                .unwrap_or(false)
778            {
779                callback(from, topic_hash, time);
780            }
781        }
782
783        match record.status {
784            DeliveryStatus::Unknown => {
785                record.peers.insert(*from);
788            }
789            DeliveryStatus::Valid(validated) => {
790                record.peers.insert(*from);
792                self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
793            }
794            DeliveryStatus::Invalid => {
795                self.mark_invalid_message_delivery(from, topic_hash);
797            }
798            DeliveryStatus::Ignored => {
799                }
801        }
802    }
803
804    pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
807        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
808            peer_stats.application_score = new_score;
809            true
810        } else {
811            false
812        }
813    }
814
815    pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
817        use hash_map::Entry::*;
818        match self.params.topics.entry(topic_hash.clone()) {
819            Occupied(mut entry) => {
820                let first_message_deliveries_cap = params.first_message_deliveries_cap;
821                let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
822                let old_params = entry.insert(params);
823
824                if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
825                    for stats in &mut self.peer_stats.values_mut() {
826                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
827                            if tstats.first_message_deliveries > first_message_deliveries_cap {
828                                tstats.first_message_deliveries = first_message_deliveries_cap;
829                            }
830                        }
831                    }
832                }
833
834                if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
835                    for stats in self.peer_stats.values_mut() {
836                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
837                            if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
838                                tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
839                            }
840                        }
841                    }
842                }
843            }
844            Vacant(entry) => {
845                entry.insert(params);
846            }
847        }
848    }
849
850    pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
852        self.params.topics.get(topic_hash)
853    }
854
855    fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
858        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
859            if let Some(topic_stats) =
860                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
861            {
862                tracing::debug!(
863                    peer=%peer_id,
864                    topic=%topic_hash,
865                    "[Penalty] Peer delivered an invalid message in topic and gets penalized \
866                    for it",
867                );
868                topic_stats.invalid_message_deliveries += 1f64;
869            }
870        }
871    }
872
873    fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
877        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
878            if let Some(topic_stats) =
879                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
880            {
881                let cap = self
882                    .params
883                    .topics
884                    .get(topic_hash)
885                    .expect("Topic must exist if there are known topic_stats")
886                    .first_message_deliveries_cap;
887                topic_stats.first_message_deliveries =
888                    if topic_stats.first_message_deliveries + 1f64 > cap {
889                        cap
890                    } else {
891                        topic_stats.first_message_deliveries + 1f64
892                    };
893
894                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
895                    let cap = self
896                        .params
897                        .topics
898                        .get(topic_hash)
899                        .expect("Topic must exist if there are known topic_stats")
900                        .mesh_message_deliveries_cap;
901
902                    topic_stats.mesh_message_deliveries =
903                        if topic_stats.mesh_message_deliveries + 1f64 > cap {
904                            cap
905                        } else {
906                            topic_stats.mesh_message_deliveries + 1f64
907                        };
908                }
909            }
910        }
911    }
912
913    fn mark_duplicate_message_delivery(
916        &mut self,
917        peer_id: &PeerId,
918        topic_hash: &TopicHash,
919        validated_time: Option<Instant>,
920    ) {
921        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
922            let now = if validated_time.is_some() {
923                Some(Instant::now())
924            } else {
925                None
926            };
927            if let Some(topic_stats) =
928                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
929            {
930                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
931                    let topic_params = self
932                        .params
933                        .topics
934                        .get(topic_hash)
935                        .expect("Topic must exist if there are known topic_stats");
936
937                    let mut falls_in_mesh_deliver_window = true;
942                    if let Some(validated_time) = validated_time {
943                        if let Some(now) = &now {
944                            let window_time = validated_time
946                                .checked_add(topic_params.mesh_message_deliveries_window)
947                                .unwrap_or(*now);
948                            if now > &window_time {
949                                falls_in_mesh_deliver_window = false;
950                            }
951                        }
952                    }
953
954                    if falls_in_mesh_deliver_window {
955                        let cap = topic_params.mesh_message_deliveries_cap;
956                        topic_stats.mesh_message_deliveries =
957                            if topic_stats.mesh_message_deliveries + 1f64 > cap {
958                                cap
959                            } else {
960                                topic_stats.mesh_message_deliveries + 1f64
961                            };
962                    }
963                }
964            }
965        }
966    }
967
968    pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
969        self.peer_stats
970            .get(peer)
971            .and_then(|s| s.topics.get(topic))
972            .map(|t| t.mesh_message_deliveries)
973    }
974}
975
976#[derive(Clone, Copy)]
978pub(crate) enum RejectReason {
979    ValidationError(ValidationError),
981    SelfOrigin,
983    BlackListedPeer,
985    BlackListedSource,
987    ValidationIgnored,
989    ValidationFailed,
991}