libp2p_gossipsub/
peer_score.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour.
22
23use std::{
24    collections::{hash_map, HashMap, HashSet},
25    net::IpAddr,
26    time::Duration,
27};
28
29use futures_timer::Delay;
30use libp2p_identity::PeerId;
31use web_time::Instant;
32
33use crate::{time_cache::TimeCache, MessageId, TopicHash};
34
35mod params;
36pub use params::{
37    score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
38    TopicScoreParams,
39};
40
41use crate::ValidationError;
42
43#[cfg(test)]
44mod tests;
45
46/// The number of seconds delivery messages are stored in the cache.
47const TIME_CACHE_DURATION: u64 = 120;
48
49/// Represents the state of the peer scoring system, which can either be active
50/// with a configured `PeerScore`, or disabled entirely.
51pub(crate) enum PeerScoreState {
52    Active(Box<PeerScore>),
53    Disabled,
54}
55
56impl PeerScoreState {
57    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
58    /// `threshold` parameter.
59    pub(crate) fn below_threshold(
60        &self,
61        peer_id: &PeerId,
62        threshold: impl Fn(&PeerScoreThresholds) -> f64,
63    ) -> (bool, f64) {
64        match self {
65            PeerScoreState::Active(active) => {
66                let score = active.score_report(peer_id).score;
67                (score < threshold(&active.thresholds), score)
68            }
69            PeerScoreState::Disabled => (false, 0.0),
70        }
71    }
72}
73
74/// Result of a peer score calculation, detailing the peer's
75/// computed score and a list of any incurred penalties.
76#[derive(Default)]
77pub(crate) struct PeerScoreReport {
78    pub(crate) score: f64,
79    #[cfg(feature = "metrics")]
80    pub(crate) penalties: Vec<crate::metrics::Penalty>,
81}
82
83pub(crate) struct PeerScore {
84    /// The score parameters.
85    pub(crate) params: PeerScoreParams,
86    /// The score threshold.
87    pub(crate) thresholds: PeerScoreThresholds,
88    /// The peer score decay interval.
89    pub(crate) decay_interval: Delay,
90    /// The stats per PeerId.
91    peer_stats: HashMap<PeerId, PeerStats>,
92    /// Tracking peers per IP.
93    peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
94    /// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s.
95    deliveries: TimeCache<MessageId, DeliveryRecord>,
96    /// Callback for monitoring message delivery times.
97    message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
98}
99
100/// General statistics for a given gossipsub peer.
101struct PeerStats {
102    /// Connection status of the peer.
103    status: ConnectionStatus,
104    /// Stats per topic.
105    topics: HashMap<TopicHash, TopicStats>,
106    /// IP tracking for individual peers.
107    known_ips: HashSet<IpAddr>,
108    /// Behaviour penalty that is applied to the peer, assigned by the behaviour.
109    behaviour_penalty: f64,
110    /// Application specific score. Can be manipulated by calling PeerScore::set_application_score
111    application_score: f64,
112    /// Scoring based on how whether this peer consumes messages fast enough or not.
113    slow_peer_penalty: f64,
114}
115
116enum ConnectionStatus {
117    /// The peer is connected.
118    Connected,
119    /// The peer is disconnected
120    Disconnected {
121        /// Expiration time of the score state for disconnected peers.
122        expire: Instant,
123    },
124}
125
126impl Default for PeerStats {
127    fn default() -> Self {
128        PeerStats {
129            status: ConnectionStatus::Connected,
130            topics: HashMap::new(),
131            known_ips: HashSet::new(),
132            behaviour_penalty: 0f64,
133            application_score: 0f64,
134            slow_peer_penalty: 0f64,
135        }
136    }
137}
138
139impl PeerStats {
140    /// Returns a mutable reference to topic stats if they exist, otherwise if the supplied
141    /// parameters score the topic, inserts the default stats and returns a reference to those.
142    /// If neither apply, returns None.
143    pub(crate) fn stats_or_default_mut(
144        &mut self,
145        topic_hash: TopicHash,
146        params: &PeerScoreParams,
147    ) -> Option<&mut TopicStats> {
148        #[allow(
149            clippy::map_entry,
150            reason = "False positive, see rust-lang/rust-clippy#14449."
151        )]
152        if params.topics.contains_key(&topic_hash) {
153            Some(self.topics.entry(topic_hash).or_default())
154        } else {
155            self.topics.get_mut(&topic_hash)
156        }
157    }
158}
159
160/// Stats assigned to peer for each topic.
161struct TopicStats {
162    mesh_status: MeshStatus,
163    /// Number of first message deliveries.
164    first_message_deliveries: f64,
165    /// True if the peer has been in the mesh for enough time to activate mesh message deliveries.
166    mesh_message_deliveries_active: bool,
167    /// Number of message deliveries from the mesh.
168    mesh_message_deliveries: f64,
169    /// Mesh rate failure penalty.
170    mesh_failure_penalty: f64,
171    /// Invalid message counter.
172    invalid_message_deliveries: f64,
173}
174
175impl TopicStats {
176    /// Returns true if the peer is in the `mesh`.
177    pub(crate) fn in_mesh(&self) -> bool {
178        matches!(self.mesh_status, MeshStatus::Active { .. })
179    }
180}
181
182/// Status defining a peer's inclusion in the mesh and associated parameters.
183enum MeshStatus {
184    Active {
185        /// The time the peer was last GRAFTed;
186        graft_time: Instant,
187        /// The time the peer has been in the mesh.
188        mesh_time: Duration,
189    },
190    InActive,
191}
192
193impl MeshStatus {
194    /// Initialises a new [`MeshStatus::Active`] mesh status.
195    pub(crate) fn new_active() -> Self {
196        MeshStatus::Active {
197            graft_time: Instant::now(),
198            mesh_time: Duration::from_secs(0),
199        }
200    }
201}
202
203impl Default for TopicStats {
204    fn default() -> Self {
205        TopicStats {
206            mesh_status: MeshStatus::InActive,
207            first_message_deliveries: Default::default(),
208            mesh_message_deliveries_active: Default::default(),
209            mesh_message_deliveries: Default::default(),
210            mesh_failure_penalty: Default::default(),
211            invalid_message_deliveries: Default::default(),
212        }
213    }
214}
215
216#[derive(PartialEq, Debug)]
217struct DeliveryRecord {
218    status: DeliveryStatus,
219    first_seen: Instant,
220    peers: HashSet<PeerId>,
221}
222
223#[derive(PartialEq, Debug)]
224enum DeliveryStatus {
225    /// Don't know (yet) if the message is valid.
226    Unknown,
227    /// The message is valid together with the validated time.
228    Valid(Instant),
229    /// The message is invalid.
230    Invalid,
231    /// Instructed by the validator to ignore the message.
232    Ignored,
233}
234
235impl Default for DeliveryRecord {
236    fn default() -> Self {
237        DeliveryRecord {
238            status: DeliveryStatus::Unknown,
239            first_seen: Instant::now(),
240            peers: HashSet::new(),
241        }
242    }
243}
244
245impl PeerScore {
246    /// Creates a new [`PeerScore`] using a given set of peer scoring parameters.
247    #[allow(dead_code)]
248    pub(crate) fn new(params: PeerScoreParams, thresholds: PeerScoreThresholds) -> Self {
249        Self::new_with_message_delivery_time_callback(params, thresholds, None)
250    }
251
252    pub(crate) fn new_with_message_delivery_time_callback(
253        params: PeerScoreParams,
254        thresholds: PeerScoreThresholds,
255        callback: Option<fn(&PeerId, &TopicHash, f64)>,
256    ) -> Self {
257        PeerScore {
258            decay_interval: Delay::new(params.decay_interval),
259            params,
260            thresholds,
261            peer_stats: HashMap::new(),
262            peer_ips: HashMap::new(),
263            deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
264            message_delivery_time_callback: callback,
265        }
266    }
267
268    /// Returns the score report for a peer, with applied penalties.
269    /// This is called from the heartbeat
270    pub(crate) fn score_report(&self, peer_id: &PeerId) -> PeerScoreReport {
271        let mut report = PeerScoreReport::default();
272        let Some(peer_stats) = self.peer_stats.get(peer_id) else {
273            return report;
274        };
275
276        // topic scores
277        for (topic, topic_stats) in peer_stats.topics.iter() {
278            // topic parameters
279            if let Some(topic_params) = self.params.topics.get(topic) {
280                // we are tracking the topic
281
282                // the topic score
283                let mut topic_score = 0.0;
284
285                // P1: time in mesh
286                if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
287                    let p1 = {
288                        let v = mesh_time.as_secs_f64()
289                            / topic_params.time_in_mesh_quantum.as_secs_f64();
290                        if v < topic_params.time_in_mesh_cap {
291                            v
292                        } else {
293                            topic_params.time_in_mesh_cap
294                        }
295                    };
296                    topic_score += p1 * topic_params.time_in_mesh_weight;
297                }
298
299                // P2: first message deliveries
300                let p2 = {
301                    let v = topic_stats.first_message_deliveries;
302                    if v < topic_params.first_message_deliveries_cap {
303                        v
304                    } else {
305                        topic_params.first_message_deliveries_cap
306                    }
307                };
308                topic_score += p2 * topic_params.first_message_deliveries_weight;
309
310                // P3: mesh message deliveries
311                if topic_stats.mesh_message_deliveries_active
312                    && topic_stats.mesh_message_deliveries
313                        < topic_params.mesh_message_deliveries_threshold
314                {
315                    let deficit = topic_params.mesh_message_deliveries_threshold
316                        - topic_stats.mesh_message_deliveries;
317                    let p3 = deficit * deficit;
318                    topic_score += p3 * topic_params.mesh_message_deliveries_weight;
319                    #[cfg(feature = "metrics")]
320                    report
321                        .penalties
322                        .push(crate::metrics::Penalty::MessageDeficit);
323                    tracing::debug!(
324                        peer=%peer_id,
325                        %topic,
326                        %deficit,
327                        penalty=%topic_score,
328                        "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
329                    );
330                }
331
332                // P3b:
333                // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so
334                // this detracts.
335                let p3b = topic_stats.mesh_failure_penalty;
336                topic_score += p3b * topic_params.mesh_failure_penalty_weight;
337
338                // P4: invalid messages
339                // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so
340                // this detracts.
341                let p4 =
342                    topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
343                topic_score += p4 * topic_params.invalid_message_deliveries_weight;
344
345                // update score, mixing with topic weight
346                report.score += topic_score * topic_params.topic_weight;
347            }
348        }
349
350        // apply the topic score cap, if any
351        if self.params.topic_score_cap > 0f64 && report.score > self.params.topic_score_cap {
352            report.score = self.params.topic_score_cap;
353        }
354
355        // P5: application-specific score
356        let p5 = peer_stats.application_score;
357        report.score += p5 * self.params.app_specific_weight;
358
359        // P6: IP collocation factor
360        for ip in peer_stats.known_ips.iter() {
361            if self.params.ip_colocation_factor_whitelist.contains(ip) {
362                continue;
363            }
364
365            // P6 has a cliff (ip_colocation_factor_threshold); it's only applied if
366            // at least that many peers are connected to us from that source IP
367            // addr. It is quadratic, and the weight is negative (validated by
368            // peer_score_params.validate()).
369            if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
370                if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold {
371                    let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
372                    let p6 = surplus * surplus;
373                    #[cfg(feature = "metrics")]
374                    report.penalties.push(crate::metrics::Penalty::IPColocation);
375                    tracing::debug!(
376                        peer=%peer_id,
377                        surplus_ip=%ip,
378                        surplus=%surplus,
379                        "[Penalty] The peer gets penalized because of too many peers with the same ip"
380                    );
381                    report.score += p6 * self.params.ip_colocation_factor_weight;
382                }
383            }
384        }
385
386        // P7: behavioural pattern penalty.
387        if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
388            let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
389            let p7 = excess * excess;
390            report.score += p7 * self.params.behaviour_penalty_weight;
391        }
392
393        // Slow peer weighting.
394        if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
395            let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
396            report.score += excess * self.params.slow_peer_weight;
397        }
398
399        report
400    }
401
402    pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
403        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
404            tracing::debug!(
405                peer=%peer_id,
406                %count,
407                "[Penalty] Behavioral penalty for peer"
408            );
409            peer_stats.behaviour_penalty += count as f64;
410        }
411    }
412
413    fn remove_ips_for_peer(
414        peer_stats: &PeerStats,
415        peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
416        peer_id: &PeerId,
417    ) {
418        for ip in peer_stats.known_ips.iter() {
419            if let Some(peer_set) = peer_ips.get_mut(ip) {
420                peer_set.remove(peer_id);
421            }
422        }
423    }
424
425    pub(crate) fn refresh_scores(&mut self) {
426        let now = Instant::now();
427        let params_ref = &self.params;
428        let peer_ips_ref = &mut self.peer_ips;
429        self.peer_stats.retain(|peer_id, peer_stats| {
430            if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
431                // has the retention period expired?
432                if now > expire {
433                    // yes, throw it away (but clean up the IP tracking first)
434                    Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
435                    // re address this, use retain or entry
436                    return false;
437                }
438
439                // we don't decay retained scores, as the peer is not active.
440                // this way the peer cannot reset a negative score by simply disconnecting and
441                // reconnecting, unless the retention period has elapsed.
442                // similarly, a well behaved peer does not lose its score by getting disconnected.
443                return true;
444            }
445
446            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
447                // the topic parameters
448                if let Some(topic_params) = params_ref.topics.get(topic) {
449                    // decay counters
450                    topic_stats.first_message_deliveries *=
451                        topic_params.first_message_deliveries_decay;
452                    if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
453                        topic_stats.first_message_deliveries = 0.0;
454                    }
455                    topic_stats.mesh_message_deliveries *=
456                        topic_params.mesh_message_deliveries_decay;
457                    if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
458                        topic_stats.mesh_message_deliveries = 0.0;
459                    }
460                    topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
461                    if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
462                        topic_stats.mesh_failure_penalty = 0.0;
463                    }
464                    topic_stats.invalid_message_deliveries *=
465                        topic_params.invalid_message_deliveries_decay;
466                    if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
467                        topic_stats.invalid_message_deliveries = 0.0;
468                    }
469                    // update mesh time and activate mesh message delivery parameter if need be
470                    if let MeshStatus::Active {
471                        ref mut mesh_time,
472                        ref mut graft_time,
473                    } = topic_stats.mesh_status
474                    {
475                        *mesh_time = now.duration_since(*graft_time);
476                        if *mesh_time > topic_params.mesh_message_deliveries_activation {
477                            topic_stats.mesh_message_deliveries_active = true;
478                        }
479                    }
480                }
481            }
482
483            // decay P7 counter
484            peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
485            if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
486                peer_stats.behaviour_penalty = 0.0;
487            }
488
489            // decay slow peer score
490            peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
491            if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
492                peer_stats.slow_peer_penalty = 0.0;
493            }
494
495            true
496        });
497    }
498
499    /// Adds a connected peer to [`PeerScore`], initialising with empty ips (ips get added later
500    /// through add_ip.
501    pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
502        let peer_stats = self.peer_stats.entry(peer_id).or_default();
503
504        // mark the peer as connected
505        peer_stats.status = ConnectionStatus::Connected;
506    }
507
508    /// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
509    pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
510        tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
511        let peer_stats = self.peer_stats.entry(*peer_id).or_default();
512
513        // Mark the peer as connected (currently the default is connected, but we don't want to
514        // rely on the default).
515        peer_stats.status = ConnectionStatus::Connected;
516
517        // Insert the ip
518        peer_stats.known_ips.insert(ip);
519        self.peer_ips.entry(ip).or_default().insert(*peer_id);
520    }
521
522    /// Indicate that a peer has been too slow to consume a message.
523    pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
524        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
525            peer_stats.slow_peer_penalty += 1.0;
526            tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
527        }
528    }
529
530    /// Removes an ip from a peer
531    pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
532        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
533            peer_stats.known_ips.remove(ip);
534            if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
535                tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
536                peer_ids.remove(peer_id);
537            } else {
538                tracing::trace!(
539                    peer=%peer_id,
540                    %ip,
541                    "No entry in peer_ips for ip which should get removed for peer"
542                );
543            }
544        } else {
545            tracing::trace!(
546                peer=%peer_id,
547                %ip,
548                "No peer_stats for peer which should remove the ip"
549            );
550        }
551    }
552
553    /// Removes a peer from the score table. This retains peer statistics if their score is
554    /// non-positive.
555    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
556        // we only retain non-positive scores of peers
557        if self.score_report(peer_id).score > 0f64 {
558            if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
559                Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
560                entry.remove();
561            }
562            return;
563        }
564
565        // if the peer is retained (including it's score) the `first_message_delivery` counters
566        // are reset to 0 and mesh delivery penalties applied.
567        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
568            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
569                topic_stats.first_message_deliveries = 0f64;
570
571                if let Some(threshold) = self
572                    .params
573                    .topics
574                    .get(topic)
575                    .map(|param| param.mesh_message_deliveries_threshold)
576                {
577                    if topic_stats.in_mesh()
578                        && topic_stats.mesh_message_deliveries_active
579                        && topic_stats.mesh_message_deliveries < threshold
580                    {
581                        let deficit = threshold - topic_stats.mesh_message_deliveries;
582                        topic_stats.mesh_failure_penalty += deficit * deficit;
583                    }
584                }
585
586                topic_stats.mesh_status = MeshStatus::InActive;
587                topic_stats.mesh_message_deliveries_active = false;
588            }
589
590            peer_stats.status = ConnectionStatus::Disconnected {
591                expire: Instant::now() + self.params.retain_score,
592            };
593        }
594    }
595
596    /// Handles scoring functionality as a peer GRAFTs to a topic.
597    pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
598        let topic = topic.into();
599        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
600            // if we are scoring the topic, update the mesh status.
601            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
602                topic_stats.mesh_status = MeshStatus::new_active();
603                topic_stats.mesh_message_deliveries_active = false;
604            }
605        }
606    }
607
608    /// Handles scoring functionality as a peer PRUNEs from a topic.
609    pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
610        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
611            // if we are scoring the topic, update the mesh status.
612            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
613            {
614                // sticky mesh delivery rate failure penalty
615                let threshold = self
616                    .params
617                    .topics
618                    .get(&topic)
619                    .expect("Topic must exist in order for there to be topic stats")
620                    .mesh_message_deliveries_threshold;
621                if topic_stats.mesh_message_deliveries_active
622                    && topic_stats.mesh_message_deliveries < threshold
623                {
624                    let deficit = threshold - topic_stats.mesh_message_deliveries;
625                    topic_stats.mesh_failure_penalty += deficit * deficit;
626                }
627                topic_stats.mesh_message_deliveries_active = false;
628                topic_stats.mesh_status = MeshStatus::InActive;
629            }
630        }
631    }
632
633    pub(crate) fn validate_message(
634        &mut self,
635        from: &PeerId,
636        msg_id: &MessageId,
637        topic_hash: &TopicHash,
638    ) {
639        // adds an empty record with the message id
640        self.deliveries.entry(msg_id.clone()).or_default();
641
642        if let Some(callback) = self.message_delivery_time_callback {
643            if self
644                .peer_stats
645                .get(from)
646                .and_then(|s| s.topics.get(topic_hash))
647                .map(|ts| ts.in_mesh())
648                .unwrap_or(false)
649            {
650                callback(from, topic_hash, 0.0);
651            }
652        }
653    }
654
655    pub(crate) fn deliver_message(
656        &mut self,
657        from: &PeerId,
658        msg_id: &MessageId,
659        topic_hash: &TopicHash,
660    ) {
661        self.mark_first_message_delivery(from, topic_hash);
662
663        let record = self.deliveries.entry(msg_id.clone()).or_default();
664
665        // this should be the first delivery trace
666        if record.status != DeliveryStatus::Unknown {
667            tracing::warn!(
668                peer=%from,
669                status=?record.status,
670                first_seen=?record.first_seen.elapsed().as_secs(),
671                "Unexpected delivery trace"
672            );
673            return;
674        }
675
676        // mark the message as valid and reward mesh peers that have already forwarded it to us
677        record.status = DeliveryStatus::Valid(Instant::now());
678        for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
679            // this check is to make sure a peer can't send us a message twice and get a double
680            // count if it is a first delivery
681            if &peer != from {
682                self.mark_duplicate_message_delivery(&peer, topic_hash, None);
683            }
684        }
685    }
686
687    /// Similar to `reject_message` except does not require the message id or reason for an invalid
688    /// message.
689    pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
690        tracing::debug!(
691            peer=%from,
692            "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
693        );
694
695        self.mark_invalid_message_delivery(from, topic_hash);
696    }
697
698    // Reject a message.
699    pub(crate) fn reject_message(
700        &mut self,
701        from: &PeerId,
702        msg_id: &MessageId,
703        topic_hash: &TopicHash,
704        reason: RejectReason,
705    ) {
706        match reason {
707            // these messages are not tracked, but the peer is penalized as they are invalid
708            RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
709                self.reject_invalid_message(from, topic_hash);
710                return;
711            }
712            // we ignore those messages, so do nothing.
713            RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
714                return;
715            }
716            _ => {} // the rest are handled after record creation
717        }
718
719        let peers: Vec<_> = {
720            let record = self.deliveries.entry(msg_id.clone()).or_default();
721
722            // Multiple peers can now reject the same message as we track which peers send us the
723            // message. If we have already updated the status, return.
724            if record.status != DeliveryStatus::Unknown {
725                return;
726            }
727
728            if let RejectReason::ValidationIgnored = reason {
729                // we were explicitly instructed by the validator to ignore the message but not
730                // penalize the peer
731                record.status = DeliveryStatus::Ignored;
732                record.peers.clear();
733                return;
734            }
735
736            // mark the message as invalid and penalize peers that have already forwarded it.
737            record.status = DeliveryStatus::Invalid;
738            // release the delivery time tracking map to free some memory early
739            record.peers.drain().collect()
740        };
741
742        self.mark_invalid_message_delivery(from, topic_hash);
743        for peer_id in peers.iter() {
744            self.mark_invalid_message_delivery(peer_id, topic_hash)
745        }
746    }
747
748    pub(crate) fn duplicated_message(
749        &mut self,
750        from: &PeerId,
751        msg_id: &MessageId,
752        topic_hash: &TopicHash,
753    ) {
754        let record = self.deliveries.entry(msg_id.clone()).or_default();
755
756        if record.peers.contains(from) {
757            // we have already seen this duplicate!
758            return;
759        }
760
761        if let Some(callback) = self.message_delivery_time_callback {
762            let time = if let DeliveryStatus::Valid(validated) = record.status {
763                validated.elapsed().as_secs_f64()
764            } else {
765                0.0
766            };
767            if self
768                .peer_stats
769                .get(from)
770                .and_then(|s| s.topics.get(topic_hash))
771                .map(|ts| ts.in_mesh())
772                .unwrap_or(false)
773            {
774                callback(from, topic_hash, time);
775            }
776        }
777
778        match record.status {
779            DeliveryStatus::Unknown => {
780                // the message is being validated; track the peer delivery and wait for
781                // the Deliver/Reject notification.
782                record.peers.insert(*from);
783            }
784            DeliveryStatus::Valid(validated) => {
785                // mark the peer delivery time to only count a duplicate delivery once.
786                record.peers.insert(*from);
787                self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
788            }
789            DeliveryStatus::Invalid => {
790                // we no longer track delivery time
791                self.mark_invalid_message_delivery(from, topic_hash);
792            }
793            DeliveryStatus::Ignored => {
794                // the message was ignored; do nothing (we don't know if it was valid)
795            }
796        }
797    }
798
799    /// Sets the application specific score for a peer. Returns true if the peer is the peer is
800    /// connected or if the score of the peer is not yet expired and false otherwise.
801    pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
802        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
803            peer_stats.application_score = new_score;
804            true
805        } else {
806            false
807        }
808    }
809
810    /// Sets scoring parameters for a topic.
811    pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
812        use hash_map::Entry::*;
813        match self.params.topics.entry(topic_hash.clone()) {
814            Occupied(mut entry) => {
815                let first_message_deliveries_cap = params.first_message_deliveries_cap;
816                let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
817                let old_params = entry.insert(params);
818
819                if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
820                    for stats in &mut self.peer_stats.values_mut() {
821                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
822                            if tstats.first_message_deliveries > first_message_deliveries_cap {
823                                tstats.first_message_deliveries = first_message_deliveries_cap;
824                            }
825                        }
826                    }
827                }
828
829                if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
830                    for stats in self.peer_stats.values_mut() {
831                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
832                            if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
833                                tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
834                            }
835                        }
836                    }
837                }
838            }
839            Vacant(entry) => {
840                entry.insert(params);
841            }
842        }
843    }
844
845    /// Returns a scoring parameters for a topic if existent.
846    pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
847        self.params.topics.get(topic_hash)
848    }
849
850    /// Increments the "invalid message deliveries" counter for all scored topics the message
851    /// is published in.
852    fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
853        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
854            if let Some(topic_stats) =
855                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
856            {
857                tracing::debug!(
858                    peer=%peer_id,
859                    topic=%topic_hash,
860                    "[Penalty] Peer delivered an invalid message in topic and gets penalized \
861                    for it",
862                );
863                topic_stats.invalid_message_deliveries += 1f64;
864            }
865        }
866    }
867
868    /// Increments the "first message deliveries" counter for all scored topics the message is
869    /// published in, as well as the "mesh message deliveries" counter, if the peer is in the
870    /// mesh for the topic.
871    fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
872        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
873            if let Some(topic_stats) =
874                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
875            {
876                let cap = self
877                    .params
878                    .topics
879                    .get(topic_hash)
880                    .expect("Topic must exist if there are known topic_stats")
881                    .first_message_deliveries_cap;
882                topic_stats.first_message_deliveries =
883                    if topic_stats.first_message_deliveries + 1f64 > cap {
884                        cap
885                    } else {
886                        topic_stats.first_message_deliveries + 1f64
887                    };
888
889                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
890                    let cap = self
891                        .params
892                        .topics
893                        .get(topic_hash)
894                        .expect("Topic must exist if there are known topic_stats")
895                        .mesh_message_deliveries_cap;
896
897                    topic_stats.mesh_message_deliveries =
898                        if topic_stats.mesh_message_deliveries + 1f64 > cap {
899                            cap
900                        } else {
901                            topic_stats.mesh_message_deliveries + 1f64
902                        };
903                }
904            }
905        }
906    }
907
908    /// Increments the "mesh message deliveries" counter for messages we've seen before, as long the
909    /// message was received within the P3 window.
910    fn mark_duplicate_message_delivery(
911        &mut self,
912        peer_id: &PeerId,
913        topic_hash: &TopicHash,
914        validated_time: Option<Instant>,
915    ) {
916        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
917            let now = if validated_time.is_some() {
918                Some(Instant::now())
919            } else {
920                None
921            };
922            if let Some(topic_stats) =
923                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
924            {
925                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
926                    let topic_params = self
927                        .params
928                        .topics
929                        .get(topic_hash)
930                        .expect("Topic must exist if there are known topic_stats");
931
932                    // check against the mesh delivery window -- if the validated time is passed as
933                    // 0, then the message was received before we finished
934                    // validation and thus falls within the mesh
935                    // delivery window.
936                    let mut falls_in_mesh_deliver_window = true;
937                    if let Some(validated_time) = validated_time {
938                        if let Some(now) = &now {
939                            // should always be true
940                            let window_time = validated_time
941                                .checked_add(topic_params.mesh_message_deliveries_window)
942                                .unwrap_or(*now);
943                            if now > &window_time {
944                                falls_in_mesh_deliver_window = false;
945                            }
946                        }
947                    }
948
949                    if falls_in_mesh_deliver_window {
950                        let cap = topic_params.mesh_message_deliveries_cap;
951                        topic_stats.mesh_message_deliveries =
952                            if topic_stats.mesh_message_deliveries + 1f64 > cap {
953                                cap
954                            } else {
955                                topic_stats.mesh_message_deliveries + 1f64
956                            };
957                    }
958                }
959            }
960        }
961    }
962
963    pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
964        self.peer_stats
965            .get(peer)
966            .and_then(|s| s.topics.get(topic))
967            .map(|t| t.mesh_message_deliveries)
968    }
969}
970
971/// The reason a Gossipsub message has been rejected.
972#[derive(Clone, Copy)]
973pub(crate) enum RejectReason {
974    /// The message failed the configured validation during decoding.
975    ValidationError(ValidationError),
976    /// The message source is us.
977    SelfOrigin,
978    /// The peer that sent the message was blacklisted.
979    BlackListedPeer,
980    /// The source (from field) of the message was blacklisted.
981    BlackListedSource,
982    /// The validation was ignored.
983    ValidationIgnored,
984    /// The validation failed.
985    ValidationFailed,
986}