libp2p_gossipsub/
peer_score.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour.
22
23use std::{
24    collections::{hash_map, HashMap, HashSet},
25    net::IpAddr,
26    time::Duration,
27};
28
29use futures_timer::Delay;
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::{time_cache::TimeCache, MessageId, TopicHash};
34
35mod params;
36pub use params::{
37    score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
38    TopicScoreParams,
39};
40
41use crate::ValidationError;
42
43#[cfg(test)]
44mod tests;
45
46/// The number of seconds delivery messages are stored in the cache.
47const TIME_CACHE_DURATION: u64 = 120;
48
49/// Represents the state of the peer scoring system, which can either be active
50/// with a configured `PeerScore`, or disabled entirely.
51pub(crate) enum PeerScoreState {
52    Active(Box<PeerScore>),
53    Disabled,
54}
55
56impl PeerScoreState {
57    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
58    /// `threshold` parameter.
59    pub(crate) fn below_threshold(
60        &self,
61        peer_id: &PeerId,
62        threshold: impl Fn(&PeerScoreThresholds) -> f64,
63    ) -> (bool, f64) {
64        match self {
65            PeerScoreState::Active(active) => {
66                let score = active.score_report(peer_id).score;
67                (score < threshold(&active.thresholds), score)
68            }
69            PeerScoreState::Disabled => (false, 0.0),
70        }
71    }
72}
73
74/// Result of a peer score calculation, detailing the peer's
75/// computed score and a list of any incurred penalties.
76#[derive(Default)]
77pub(crate) struct PeerScoreReport {
78    pub(crate) score: f64,
79    #[cfg(feature = "metrics")]
80    pub(crate) penalties: Vec<crate::metrics::Penalty>,
81}
82
83pub(crate) struct PeerScore {
84    /// The score parameters.
85    pub(crate) params: PeerScoreParams,
86    /// The score threshold.
87    pub(crate) thresholds: PeerScoreThresholds,
88    /// The peer score decay interval.
89    pub(crate) decay_interval: Delay,
90    /// The stats per PeerId.
91    peer_stats: HashMap<PeerId, PeerStats>,
92    /// Tracking peers per IP.
93    peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
94    /// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s.
95    deliveries: TimeCache<MessageId, DeliveryRecord>,
96    /// Callback for monitoring message delivery times.
97    message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
98}
99
100/// General statistics for a given gossipsub peer.
101struct PeerStats {
102    /// Connection status of the peer.
103    status: ConnectionStatus,
104    /// Stats per topic.
105    topics: HashMap<TopicHash, TopicStats>,
106    /// IP tracking for individual peers.
107    known_ips: HashSet<IpAddr>,
108    /// Behaviour penalty that is applied to the peer, assigned by the behaviour.
109    behaviour_penalty: f64,
110    /// Application specific score. Can be manipulated by calling PeerScore::set_application_score
111    application_score: f64,
112    /// Scoring based on how whether this peer consumes messages fast enough or not.
113    slow_peer_penalty: f64,
114}
115
116enum ConnectionStatus {
117    /// The peer is connected.
118    Connected,
119    /// The peer is disconnected
120    Disconnected {
121        /// Expiration time of the score state for disconnected peers.
122        expire: Instant,
123    },
124}
125
126impl Default for PeerStats {
127    fn default() -> Self {
128        PeerStats {
129            status: ConnectionStatus::Connected,
130            topics: HashMap::new(),
131            known_ips: HashSet::new(),
132            behaviour_penalty: 0f64,
133            application_score: 0f64,
134            slow_peer_penalty: 0f64,
135        }
136    }
137}
138
139impl PeerStats {
140    /// Returns a mutable reference to topic stats if they exist, otherwise if the supplied
141    /// parameters score the topic, inserts the default stats and returns a reference to those.
142    /// If neither apply, returns None.
143    pub(crate) fn stats_or_default_mut(
144        &mut self,
145        topic_hash: TopicHash,
146        params: &PeerScoreParams,
147    ) -> Option<&mut TopicStats> {
148        #[allow(
149            clippy::map_entry,
150            reason = "False positive, see rust-lang/rust-clippy#14449."
151        )]
152        if params.topics.contains_key(&topic_hash) {
153            Some(self.topics.entry(topic_hash).or_default())
154        } else {
155            self.topics.get_mut(&topic_hash)
156        }
157    }
158}
159
160/// Stats assigned to peer for each topic.
161struct TopicStats {
162    mesh_status: MeshStatus,
163    /// Number of first message deliveries.
164    first_message_deliveries: f64,
165    /// True if the peer has been in the mesh for enough time to activate mesh message deliveries.
166    mesh_message_deliveries_active: bool,
167    /// Number of message deliveries from the mesh.
168    mesh_message_deliveries: f64,
169    /// Mesh rate failure penalty.
170    mesh_failure_penalty: f64,
171    /// Invalid message counter.
172    invalid_message_deliveries: f64,
173}
174
175impl TopicStats {
176    /// Returns true if the peer is in the `mesh`.
177    pub(crate) fn in_mesh(&self) -> bool {
178        matches!(self.mesh_status, MeshStatus::Active { .. })
179    }
180}
181
182/// Status defining a peer's inclusion in the mesh and associated parameters.
183enum MeshStatus {
184    Active {
185        /// The time the peer was last GRAFTed;
186        graft_time: Instant,
187        /// The time the peer has been in the mesh.
188        mesh_time: Duration,
189    },
190    InActive,
191}
192
193impl MeshStatus {
194    /// Initialises a new [`MeshStatus::Active`] mesh status.
195    pub(crate) fn new_active() -> Self {
196        MeshStatus::Active {
197            graft_time: Instant::now(),
198            mesh_time: Duration::from_secs(0),
199        }
200    }
201}
202
203impl Default for TopicStats {
204    fn default() -> Self {
205        TopicStats {
206            mesh_status: MeshStatus::InActive,
207            first_message_deliveries: Default::default(),
208            mesh_message_deliveries_active: Default::default(),
209            mesh_message_deliveries: Default::default(),
210            mesh_failure_penalty: Default::default(),
211            invalid_message_deliveries: Default::default(),
212        }
213    }
214}
215
216#[derive(PartialEq, Debug)]
217struct DeliveryRecord {
218    status: DeliveryStatus,
219    first_seen: Instant,
220    peers: HashSet<PeerId>,
221}
222
223#[derive(PartialEq, Debug)]
224enum DeliveryStatus {
225    /// Don't know (yet) if the message is valid.
226    Unknown,
227    /// The message is valid together with the validated time.
228    Valid(Instant),
229    /// The message is invalid.
230    Invalid,
231    /// Instructed by the validator to ignore the message.
232    Ignored,
233}
234
235impl Default for DeliveryRecord {
236    fn default() -> Self {
237        DeliveryRecord {
238            status: DeliveryStatus::Unknown,
239            first_seen: Instant::now(),
240            peers: HashSet::new(),
241        }
242    }
243}
244
245impl PeerScore {
246    /// Creates a new [`PeerScore`] using a given set of peer scoring parameters.
247    #[allow(dead_code)]
248    pub(crate) fn new(params: PeerScoreParams, thresholds: PeerScoreThresholds) -> Self {
249        Self::new_with_message_delivery_time_callback(params, thresholds, None)
250    }
251
252    pub(crate) fn new_with_message_delivery_time_callback(
253        params: PeerScoreParams,
254        thresholds: PeerScoreThresholds,
255        callback: Option<fn(&PeerId, &TopicHash, f64)>,
256    ) -> Self {
257        PeerScore {
258            decay_interval: Delay::new(params.decay_interval),
259            params,
260            thresholds,
261            peer_stats: HashMap::new(),
262            peer_ips: HashMap::new(),
263            deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
264            message_delivery_time_callback: callback,
265        }
266    }
267
268    /// Returns the score report for a peer, with applied penalties.
269    /// This is called from the heartbeat
270    pub(crate) fn score_report(&self, peer_id: &PeerId) -> PeerScoreReport {
271        let mut report = PeerScoreReport::default();
272        let Some(peer_stats) = self.peer_stats.get(peer_id) else {
273            return report;
274        };
275
276        // topic scores
277        for (topic, topic_stats) in peer_stats.topics.iter() {
278            // topic parameters
279            if let Some(topic_params) = self.params.topics.get(topic) {
280                // we are tracking the topic
281
282                // the topic score
283                let mut topic_score = 0.0;
284
285                // P1: time in mesh
286                if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
287                    let p1 = {
288                        let v = mesh_time.as_secs_f64()
289                            / topic_params.time_in_mesh_quantum.as_secs_f64();
290                        if v < topic_params.time_in_mesh_cap {
291                            v
292                        } else {
293                            topic_params.time_in_mesh_cap
294                        }
295                    };
296                    topic_score += p1 * topic_params.time_in_mesh_weight;
297                }
298
299                // P2: first message deliveries
300                let p2 = {
301                    let v = topic_stats.first_message_deliveries;
302                    if v < topic_params.first_message_deliveries_cap {
303                        v
304                    } else {
305                        topic_params.first_message_deliveries_cap
306                    }
307                };
308                topic_score += p2 * topic_params.first_message_deliveries_weight;
309
310                // P3: mesh message deliveries
311                if topic_stats.mesh_message_deliveries_active
312                    && topic_stats.mesh_message_deliveries
313                        < topic_params.mesh_message_deliveries_threshold
314                    && topic_params.mesh_message_deliveries_weight != 0.0
315                {
316                    let deficit = topic_params.mesh_message_deliveries_threshold
317                        - topic_stats.mesh_message_deliveries;
318                    let p3 = deficit * deficit;
319                    let penalty = p3 * topic_params.mesh_message_deliveries_weight;
320
321                    topic_score += penalty;
322                    #[cfg(feature = "metrics")]
323                    report
324                        .penalties
325                        .push(crate::metrics::Penalty::MessageDeficit);
326                    tracing::debug!(
327                        peer=%peer_id,
328                        %topic,
329                        %deficit,
330                        penalty=%penalty,
331                        "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
332                    );
333                }
334
335                // P3b:
336                // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so
337                // this detracts.
338                let p3b = topic_stats.mesh_failure_penalty;
339                topic_score += p3b * topic_params.mesh_failure_penalty_weight;
340
341                // P4: invalid messages
342                // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so
343                // this detracts.
344                let p4 =
345                    topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
346                topic_score += p4 * topic_params.invalid_message_deliveries_weight;
347
348                // update score, mixing with topic weight
349                report.score += topic_score * topic_params.topic_weight;
350            }
351        }
352
353        // apply the topic score cap, if any
354        if self.params.topic_score_cap > 0f64 && report.score > self.params.topic_score_cap {
355            report.score = self.params.topic_score_cap;
356        }
357
358        // P5: application-specific score
359        let p5 = peer_stats.application_score;
360        report.score += p5 * self.params.app_specific_weight;
361
362        // P6: IP collocation factor
363        for ip in peer_stats.known_ips.iter() {
364            if self.params.ip_colocation_factor_whitelist.contains(ip) {
365                continue;
366            }
367
368            // P6 has a cliff (ip_colocation_factor_threshold); it's only applied if
369            // at least that many peers are connected to us from that source IP
370            // addr. It is quadratic, and the weight is negative (validated by
371            // peer_score_params.validate()).
372            if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
373                if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold
374                    && self.params.ip_colocation_factor_weight != 0.0
375                {
376                    let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
377                    let p6 = surplus * surplus;
378                    #[cfg(feature = "metrics")]
379                    report.penalties.push(crate::metrics::Penalty::IPColocation);
380                    tracing::debug!(
381                        peer=%peer_id,
382                        surplus_ip=%ip,
383                        surplus=%surplus,
384                        "[Penalty] The peer gets penalized because of too many peers with the same ip"
385                    );
386                    report.score += p6 * self.params.ip_colocation_factor_weight;
387                }
388            }
389        }
390
391        // P7: behavioural pattern penalty.
392        if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
393            let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
394            let p7 = excess * excess;
395            report.score += p7 * self.params.behaviour_penalty_weight;
396        }
397
398        // Slow peer weighting.
399        if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
400            let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
401            report.score += excess * self.params.slow_peer_weight;
402        }
403
404        report
405    }
406
407    pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
408        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
409            tracing::debug!(
410                peer=%peer_id,
411                %count,
412                "[Penalty] Behavioral penalty for peer"
413            );
414            peer_stats.behaviour_penalty += count as f64;
415        }
416    }
417
418    fn remove_ips_for_peer(
419        peer_stats: &PeerStats,
420        peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
421        peer_id: &PeerId,
422    ) {
423        for ip in peer_stats.known_ips.iter() {
424            if let Some(peer_set) = peer_ips.get_mut(ip) {
425                peer_set.remove(peer_id);
426            }
427        }
428    }
429
430    pub(crate) fn refresh_scores(&mut self) {
431        let now = Instant::now();
432        let params_ref = &self.params;
433        let peer_ips_ref = &mut self.peer_ips;
434        self.peer_stats.retain(|peer_id, peer_stats| {
435            if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
436                // has the retention period expired?
437                if now > expire {
438                    // yes, throw it away (but clean up the IP tracking first)
439                    Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
440                    // re address this, use retain or entry
441                    return false;
442                }
443
444                // we don't decay retained scores, as the peer is not active.
445                // this way the peer cannot reset a negative score by simply disconnecting and
446                // reconnecting, unless the retention period has elapsed.
447                // similarly, a well behaved peer does not lose its score by getting disconnected.
448                return true;
449            }
450
451            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
452                // the topic parameters
453                if let Some(topic_params) = params_ref.topics.get(topic) {
454                    // decay counters
455                    topic_stats.first_message_deliveries *=
456                        topic_params.first_message_deliveries_decay;
457                    if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
458                        topic_stats.first_message_deliveries = 0.0;
459                    }
460                    topic_stats.mesh_message_deliveries *=
461                        topic_params.mesh_message_deliveries_decay;
462                    if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
463                        topic_stats.mesh_message_deliveries = 0.0;
464                    }
465                    topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
466                    if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
467                        topic_stats.mesh_failure_penalty = 0.0;
468                    }
469                    topic_stats.invalid_message_deliveries *=
470                        topic_params.invalid_message_deliveries_decay;
471                    if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
472                        topic_stats.invalid_message_deliveries = 0.0;
473                    }
474                    // update mesh time and activate mesh message delivery parameter if need be
475                    if let MeshStatus::Active {
476                        ref mut mesh_time,
477                        ref mut graft_time,
478                    } = topic_stats.mesh_status
479                    {
480                        *mesh_time = now.duration_since(*graft_time);
481                        if *mesh_time > topic_params.mesh_message_deliveries_activation {
482                            topic_stats.mesh_message_deliveries_active = true;
483                        }
484                    }
485                }
486            }
487
488            // decay P7 counter
489            peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
490            if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
491                peer_stats.behaviour_penalty = 0.0;
492            }
493
494            // decay slow peer score
495            peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
496            if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
497                peer_stats.slow_peer_penalty = 0.0;
498            }
499
500            true
501        });
502    }
503
504    /// Adds a connected peer to [`PeerScore`], initialising with empty ips (ips get added later
505    /// through add_ip.
506    pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
507        let peer_stats = self.peer_stats.entry(peer_id).or_default();
508
509        // mark the peer as connected
510        peer_stats.status = ConnectionStatus::Connected;
511    }
512
513    /// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
514    pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
515        tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
516        let peer_stats = self.peer_stats.entry(*peer_id).or_default();
517
518        // Mark the peer as connected (currently the default is connected, but we don't want to
519        // rely on the default).
520        peer_stats.status = ConnectionStatus::Connected;
521
522        // Insert the ip
523        peer_stats.known_ips.insert(ip);
524        self.peer_ips.entry(ip).or_default().insert(*peer_id);
525    }
526
527    /// Indicate that a peer has been too slow to consume a message.
528    pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
529        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
530            peer_stats.slow_peer_penalty += 1.0;
531            tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
532        }
533    }
534
535    /// Removes an ip from a peer
536    pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
537        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
538            peer_stats.known_ips.remove(ip);
539            if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
540                tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
541                peer_ids.remove(peer_id);
542            } else {
543                tracing::trace!(
544                    peer=%peer_id,
545                    %ip,
546                    "No entry in peer_ips for ip which should get removed for peer"
547                );
548            }
549        } else {
550            tracing::trace!(
551                peer=%peer_id,
552                %ip,
553                "No peer_stats for peer which should remove the ip"
554            );
555        }
556    }
557
558    /// Removes a peer from the score table. This retains peer statistics if their score is
559    /// non-positive.
560    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
561        // we only retain non-positive scores of peers
562        if self.score_report(peer_id).score > 0f64 {
563            if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
564                Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
565                entry.remove();
566            }
567            return;
568        }
569
570        // if the peer is retained (including it's score) the `first_message_delivery` counters
571        // are reset to 0 and mesh delivery penalties applied.
572        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
573            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
574                topic_stats.first_message_deliveries = 0f64;
575
576                if let Some(threshold) = self
577                    .params
578                    .topics
579                    .get(topic)
580                    .map(|param| param.mesh_message_deliveries_threshold)
581                {
582                    if topic_stats.in_mesh()
583                        && topic_stats.mesh_message_deliveries_active
584                        && topic_stats.mesh_message_deliveries < threshold
585                    {
586                        let deficit = threshold - topic_stats.mesh_message_deliveries;
587                        topic_stats.mesh_failure_penalty += deficit * deficit;
588                    }
589                }
590
591                topic_stats.mesh_status = MeshStatus::InActive;
592                topic_stats.mesh_message_deliveries_active = false;
593            }
594
595            peer_stats.status = ConnectionStatus::Disconnected {
596                expire: Instant::now() + self.params.retain_score,
597            };
598        }
599    }
600
601    /// Handles scoring functionality as a peer GRAFTs to a topic.
602    pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
603        let topic = topic.into();
604        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
605            // if we are scoring the topic, update the mesh status.
606            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
607                topic_stats.mesh_status = MeshStatus::new_active();
608                topic_stats.mesh_message_deliveries_active = false;
609            }
610        }
611    }
612
613    /// Handles scoring functionality as a peer PRUNEs from a topic.
614    pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
615        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
616            // if we are scoring the topic, update the mesh status.
617            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
618            {
619                // sticky mesh delivery rate failure penalty
620                let threshold = self
621                    .params
622                    .topics
623                    .get(&topic)
624                    .expect("Topic must exist in order for there to be topic stats")
625                    .mesh_message_deliveries_threshold;
626                if topic_stats.mesh_message_deliveries_active
627                    && topic_stats.mesh_message_deliveries < threshold
628                {
629                    let deficit = threshold - topic_stats.mesh_message_deliveries;
630                    topic_stats.mesh_failure_penalty += deficit * deficit;
631                }
632                topic_stats.mesh_message_deliveries_active = false;
633                topic_stats.mesh_status = MeshStatus::InActive;
634            }
635        }
636    }
637
638    pub(crate) fn validate_message(
639        &mut self,
640        from: &PeerId,
641        msg_id: &MessageId,
642        topic_hash: &TopicHash,
643    ) {
644        // adds an empty record with the message id
645        self.deliveries.entry(msg_id.clone()).or_default();
646
647        if let Some(callback) = self.message_delivery_time_callback {
648            if self
649                .peer_stats
650                .get(from)
651                .and_then(|s| s.topics.get(topic_hash))
652                .map(|ts| ts.in_mesh())
653                .unwrap_or(false)
654            {
655                callback(from, topic_hash, 0.0);
656            }
657        }
658    }
659
660    pub(crate) fn deliver_message(
661        &mut self,
662        from: &PeerId,
663        msg_id: &MessageId,
664        topic_hash: &TopicHash,
665    ) {
666        self.mark_first_message_delivery(from, topic_hash);
667
668        let record = self.deliveries.entry(msg_id.clone()).or_default();
669
670        // this should be the first delivery trace
671        if record.status != DeliveryStatus::Unknown {
672            tracing::warn!(
673                peer=%from,
674                status=?record.status,
675                first_seen=?record.first_seen.elapsed().as_secs(),
676                "Unexpected delivery trace"
677            );
678            return;
679        }
680
681        // mark the message as valid and reward mesh peers that have already forwarded it to us
682        record.status = DeliveryStatus::Valid(Instant::now());
683        for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
684            // this check is to make sure a peer can't send us a message twice and get a double
685            // count if it is a first delivery
686            if &peer != from {
687                self.mark_duplicate_message_delivery(&peer, topic_hash, None);
688            }
689        }
690    }
691
692    /// Similar to `reject_message` except does not require the message id or reason for an invalid
693    /// message.
694    pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
695        tracing::debug!(
696            peer=%from,
697            "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
698        );
699
700        self.mark_invalid_message_delivery(from, topic_hash);
701    }
702
703    // Reject a message.
704    pub(crate) fn reject_message(
705        &mut self,
706        from: &PeerId,
707        msg_id: &MessageId,
708        topic_hash: &TopicHash,
709        reason: RejectReason,
710    ) {
711        match reason {
712            // these messages are not tracked, but the peer is penalized as they are invalid
713            RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
714                self.reject_invalid_message(from, topic_hash);
715                return;
716            }
717            // we ignore those messages, so do nothing.
718            RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
719                return;
720            }
721            _ => {} // the rest are handled after record creation
722        }
723
724        let peers: Vec<_> = {
725            let record = self.deliveries.entry(msg_id.clone()).or_default();
726
727            // Multiple peers can now reject the same message as we track which peers send us the
728            // message. If we have already updated the status, return.
729            if record.status != DeliveryStatus::Unknown {
730                return;
731            }
732
733            if let RejectReason::ValidationIgnored = reason {
734                // we were explicitly instructed by the validator to ignore the message but not
735                // penalize the peer
736                record.status = DeliveryStatus::Ignored;
737                record.peers.clear();
738                return;
739            }
740
741            // mark the message as invalid and penalize peers that have already forwarded it.
742            record.status = DeliveryStatus::Invalid;
743            // release the delivery time tracking map to free some memory early
744            record.peers.drain().collect()
745        };
746
747        self.mark_invalid_message_delivery(from, topic_hash);
748        for peer_id in peers.iter() {
749            self.mark_invalid_message_delivery(peer_id, topic_hash)
750        }
751    }
752
753    pub(crate) fn duplicated_message(
754        &mut self,
755        from: &PeerId,
756        msg_id: &MessageId,
757        topic_hash: &TopicHash,
758    ) {
759        let record = self.deliveries.entry(msg_id.clone()).or_default();
760
761        if record.peers.contains(from) {
762            // we have already seen this duplicate!
763            return;
764        }
765
766        if let Some(callback) = self.message_delivery_time_callback {
767            let time = if let DeliveryStatus::Valid(validated) = record.status {
768                validated.elapsed().as_secs_f64()
769            } else {
770                0.0
771            };
772            if self
773                .peer_stats
774                .get(from)
775                .and_then(|s| s.topics.get(topic_hash))
776                .map(|ts| ts.in_mesh())
777                .unwrap_or(false)
778            {
779                callback(from, topic_hash, time);
780            }
781        }
782
783        match record.status {
784            DeliveryStatus::Unknown => {
785                // the message is being validated; track the peer delivery and wait for
786                // the Deliver/Reject notification.
787                record.peers.insert(*from);
788            }
789            DeliveryStatus::Valid(validated) => {
790                // mark the peer delivery time to only count a duplicate delivery once.
791                record.peers.insert(*from);
792                self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
793            }
794            DeliveryStatus::Invalid => {
795                // we no longer track delivery time
796                self.mark_invalid_message_delivery(from, topic_hash);
797            }
798            DeliveryStatus::Ignored => {
799                // the message was ignored; do nothing (we don't know if it was valid)
800            }
801        }
802    }
803
804    /// Sets the application specific score for a peer. Returns true if the peer is the peer is
805    /// connected or if the score of the peer is not yet expired and false otherwise.
806    pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
807        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
808            peer_stats.application_score = new_score;
809            true
810        } else {
811            false
812        }
813    }
814
815    /// Sets scoring parameters for a topic.
816    pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
817        use hash_map::Entry::*;
818        match self.params.topics.entry(topic_hash.clone()) {
819            Occupied(mut entry) => {
820                let first_message_deliveries_cap = params.first_message_deliveries_cap;
821                let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
822                let old_params = entry.insert(params);
823
824                if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
825                    for stats in &mut self.peer_stats.values_mut() {
826                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
827                            if tstats.first_message_deliveries > first_message_deliveries_cap {
828                                tstats.first_message_deliveries = first_message_deliveries_cap;
829                            }
830                        }
831                    }
832                }
833
834                if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
835                    for stats in self.peer_stats.values_mut() {
836                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
837                            if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
838                                tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
839                            }
840                        }
841                    }
842                }
843            }
844            Vacant(entry) => {
845                entry.insert(params);
846            }
847        }
848    }
849
850    /// Returns a scoring parameters for a topic if existent.
851    pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
852        self.params.topics.get(topic_hash)
853    }
854
855    /// Increments the "invalid message deliveries" counter for all scored topics the message
856    /// is published in.
857    fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
858        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
859            if let Some(topic_stats) =
860                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
861            {
862                tracing::debug!(
863                    peer=%peer_id,
864                    topic=%topic_hash,
865                    "[Penalty] Peer delivered an invalid message in topic and gets penalized \
866                    for it",
867                );
868                topic_stats.invalid_message_deliveries += 1f64;
869            }
870        }
871    }
872
873    /// Increments the "first message deliveries" counter for all scored topics the message is
874    /// published in, as well as the "mesh message deliveries" counter, if the peer is in the
875    /// mesh for the topic.
876    fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
877        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
878            if let Some(topic_stats) =
879                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
880            {
881                let cap = self
882                    .params
883                    .topics
884                    .get(topic_hash)
885                    .expect("Topic must exist if there are known topic_stats")
886                    .first_message_deliveries_cap;
887                topic_stats.first_message_deliveries =
888                    if topic_stats.first_message_deliveries + 1f64 > cap {
889                        cap
890                    } else {
891                        topic_stats.first_message_deliveries + 1f64
892                    };
893
894                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
895                    let cap = self
896                        .params
897                        .topics
898                        .get(topic_hash)
899                        .expect("Topic must exist if there are known topic_stats")
900                        .mesh_message_deliveries_cap;
901
902                    topic_stats.mesh_message_deliveries =
903                        if topic_stats.mesh_message_deliveries + 1f64 > cap {
904                            cap
905                        } else {
906                            topic_stats.mesh_message_deliveries + 1f64
907                        };
908                }
909            }
910        }
911    }
912
913    /// Increments the "mesh message deliveries" counter for messages we've seen before, as long the
914    /// message was received within the P3 window.
915    fn mark_duplicate_message_delivery(
916        &mut self,
917        peer_id: &PeerId,
918        topic_hash: &TopicHash,
919        validated_time: Option<Instant>,
920    ) {
921        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
922            let now = if validated_time.is_some() {
923                Some(Instant::now())
924            } else {
925                None
926            };
927            if let Some(topic_stats) =
928                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
929            {
930                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
931                    let topic_params = self
932                        .params
933                        .topics
934                        .get(topic_hash)
935                        .expect("Topic must exist if there are known topic_stats");
936
937                    // check against the mesh delivery window -- if the validated time is passed as
938                    // 0, then the message was received before we finished
939                    // validation and thus falls within the mesh
940                    // delivery window.
941                    let mut falls_in_mesh_deliver_window = true;
942                    if let Some(validated_time) = validated_time {
943                        if let Some(now) = &now {
944                            // should always be true
945                            let window_time = validated_time
946                                .checked_add(topic_params.mesh_message_deliveries_window)
947                                .unwrap_or(*now);
948                            if now > &window_time {
949                                falls_in_mesh_deliver_window = false;
950                            }
951                        }
952                    }
953
954                    if falls_in_mesh_deliver_window {
955                        let cap = topic_params.mesh_message_deliveries_cap;
956                        topic_stats.mesh_message_deliveries =
957                            if topic_stats.mesh_message_deliveries + 1f64 > cap {
958                                cap
959                            } else {
960                                topic_stats.mesh_message_deliveries + 1f64
961                            };
962                    }
963                }
964            }
965        }
966    }
967
968    pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
969        self.peer_stats
970            .get(peer)
971            .and_then(|s| s.topics.get(topic))
972            .map(|t| t.mesh_message_deliveries)
973    }
974}
975
976/// The reason a Gossipsub message has been rejected.
977#[derive(Clone, Copy)]
978pub(crate) enum RejectReason {
979    /// The message failed the configured validation during decoding.
980    ValidationError(ValidationError),
981    /// The message source is us.
982    SelfOrigin,
983    /// The peer that sent the message was blacklisted.
984    BlackListedPeer,
985    /// The source (from field) of the message was blacklisted.
986    BlackListedSource,
987    /// The validation was ignored.
988    ValidationIgnored,
989    /// The validation failed.
990    ValidationFailed,
991}