libp2p_gossipsub/
peer_score.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour.
22
23use std::{
24    collections::{hash_map, HashMap, HashSet},
25    net::IpAddr,
26    time::Duration,
27};
28
29use libp2p_identity::PeerId;
30use web_time::Instant;
31
32use crate::{
33    metrics::{Metrics, Penalty},
34    time_cache::TimeCache,
35    MessageId, TopicHash,
36};
37
38mod params;
39pub use params::{
40    score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
41    TopicScoreParams,
42};
43
44use crate::ValidationError;
45
46#[cfg(test)]
47mod tests;
48
49/// The number of seconds delivery messages are stored in the cache.
50const TIME_CACHE_DURATION: u64 = 120;
51
52pub(crate) struct PeerScore {
53    /// The score parameters.
54    pub(crate) params: PeerScoreParams,
55    /// The stats per PeerId.
56    peer_stats: HashMap<PeerId, PeerStats>,
57    /// Tracking peers per IP.
58    peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
59    /// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s.
60    deliveries: TimeCache<MessageId, DeliveryRecord>,
61    /// Callback for monitoring message delivery times.
62    message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
63}
64
65/// General statistics for a given gossipsub peer.
66struct PeerStats {
67    /// Connection status of the peer.
68    status: ConnectionStatus,
69    /// Stats per topic.
70    topics: HashMap<TopicHash, TopicStats>,
71    /// IP tracking for individual peers.
72    known_ips: HashSet<IpAddr>,
73    /// Behaviour penalty that is applied to the peer, assigned by the behaviour.
74    behaviour_penalty: f64,
75    /// Application specific score. Can be manipulated by calling PeerScore::set_application_score
76    application_score: f64,
77    /// Scoring based on how whether this peer consumes messages fast enough or not.
78    slow_peer_penalty: f64,
79}
80
81enum ConnectionStatus {
82    /// The peer is connected.
83    Connected,
84    /// The peer is disconnected
85    Disconnected {
86        /// Expiration time of the score state for disconnected peers.
87        expire: Instant,
88    },
89}
90
91impl Default for PeerStats {
92    fn default() -> Self {
93        PeerStats {
94            status: ConnectionStatus::Connected,
95            topics: HashMap::new(),
96            known_ips: HashSet::new(),
97            behaviour_penalty: 0f64,
98            application_score: 0f64,
99            slow_peer_penalty: 0f64,
100        }
101    }
102}
103
104impl PeerStats {
105    /// Returns a mutable reference to topic stats if they exist, otherwise if the supplied
106    /// parameters score the topic, inserts the default stats and returns a reference to those.
107    /// If neither apply, returns None.
108    pub(crate) fn stats_or_default_mut(
109        &mut self,
110        topic_hash: TopicHash,
111        params: &PeerScoreParams,
112    ) -> Option<&mut TopicStats> {
113        if params.topics.contains_key(&topic_hash) {
114            Some(self.topics.entry(topic_hash).or_default())
115        } else {
116            self.topics.get_mut(&topic_hash)
117        }
118    }
119}
120
121/// Stats assigned to peer for each topic.
122struct TopicStats {
123    mesh_status: MeshStatus,
124    /// Number of first message deliveries.
125    first_message_deliveries: f64,
126    /// True if the peer has been in the mesh for enough time to activate mesh message deliveries.
127    mesh_message_deliveries_active: bool,
128    /// Number of message deliveries from the mesh.
129    mesh_message_deliveries: f64,
130    /// Mesh rate failure penalty.
131    mesh_failure_penalty: f64,
132    /// Invalid message counter.
133    invalid_message_deliveries: f64,
134}
135
136impl TopicStats {
137    /// Returns true if the peer is in the `mesh`.
138    pub(crate) fn in_mesh(&self) -> bool {
139        matches!(self.mesh_status, MeshStatus::Active { .. })
140    }
141}
142
143/// Status defining a peer's inclusion in the mesh and associated parameters.
144enum MeshStatus {
145    Active {
146        /// The time the peer was last GRAFTed;
147        graft_time: Instant,
148        /// The time the peer has been in the mesh.
149        mesh_time: Duration,
150    },
151    InActive,
152}
153
154impl MeshStatus {
155    /// Initialises a new [`MeshStatus::Active`] mesh status.
156    pub(crate) fn new_active() -> Self {
157        MeshStatus::Active {
158            graft_time: Instant::now(),
159            mesh_time: Duration::from_secs(0),
160        }
161    }
162}
163
164impl Default for TopicStats {
165    fn default() -> Self {
166        TopicStats {
167            mesh_status: MeshStatus::InActive,
168            first_message_deliveries: Default::default(),
169            mesh_message_deliveries_active: Default::default(),
170            mesh_message_deliveries: Default::default(),
171            mesh_failure_penalty: Default::default(),
172            invalid_message_deliveries: Default::default(),
173        }
174    }
175}
176
177#[derive(PartialEq, Debug)]
178struct DeliveryRecord {
179    status: DeliveryStatus,
180    first_seen: Instant,
181    peers: HashSet<PeerId>,
182}
183
184#[derive(PartialEq, Debug)]
185enum DeliveryStatus {
186    /// Don't know (yet) if the message is valid.
187    Unknown,
188    /// The message is valid together with the validated time.
189    Valid(Instant),
190    /// The message is invalid.
191    Invalid,
192    /// Instructed by the validator to ignore the message.
193    Ignored,
194}
195
196impl Default for DeliveryRecord {
197    fn default() -> Self {
198        DeliveryRecord {
199            status: DeliveryStatus::Unknown,
200            first_seen: Instant::now(),
201            peers: HashSet::new(),
202        }
203    }
204}
205
206impl PeerScore {
207    /// Creates a new [`PeerScore`] using a given set of peer scoring parameters.
208    #[allow(dead_code)]
209    pub(crate) fn new(params: PeerScoreParams) -> Self {
210        Self::new_with_message_delivery_time_callback(params, None)
211    }
212
213    pub(crate) fn new_with_message_delivery_time_callback(
214        params: PeerScoreParams,
215        callback: Option<fn(&PeerId, &TopicHash, f64)>,
216    ) -> Self {
217        PeerScore {
218            params,
219            peer_stats: HashMap::new(),
220            peer_ips: HashMap::new(),
221            deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)),
222            message_delivery_time_callback: callback,
223        }
224    }
225
226    /// Returns the score for a peer
227    pub(crate) fn score(&self, peer_id: &PeerId) -> f64 {
228        self.metric_score(peer_id, None)
229    }
230
231    /// Returns the score for a peer, logging metrics. This is called from the heartbeat and
232    /// increments the metric counts for penalties.
233    pub(crate) fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 {
234        let Some(peer_stats) = self.peer_stats.get(peer_id) else {
235            return 0.0;
236        };
237        let mut score = 0.0;
238
239        // topic scores
240        for (topic, topic_stats) in peer_stats.topics.iter() {
241            // topic parameters
242            if let Some(topic_params) = self.params.topics.get(topic) {
243                // we are tracking the topic
244
245                // the topic score
246                let mut topic_score = 0.0;
247
248                // P1: time in mesh
249                if let MeshStatus::Active { mesh_time, .. } = topic_stats.mesh_status {
250                    let p1 = {
251                        let v = mesh_time.as_secs_f64()
252                            / topic_params.time_in_mesh_quantum.as_secs_f64();
253                        if v < topic_params.time_in_mesh_cap {
254                            v
255                        } else {
256                            topic_params.time_in_mesh_cap
257                        }
258                    };
259                    topic_score += p1 * topic_params.time_in_mesh_weight;
260                }
261
262                // P2: first message deliveries
263                let p2 = {
264                    let v = topic_stats.first_message_deliveries;
265                    if v < topic_params.first_message_deliveries_cap {
266                        v
267                    } else {
268                        topic_params.first_message_deliveries_cap
269                    }
270                };
271                topic_score += p2 * topic_params.first_message_deliveries_weight;
272
273                // P3: mesh message deliveries
274                if topic_stats.mesh_message_deliveries_active
275                    && topic_stats.mesh_message_deliveries
276                        < topic_params.mesh_message_deliveries_threshold
277                {
278                    let deficit = topic_params.mesh_message_deliveries_threshold
279                        - topic_stats.mesh_message_deliveries;
280                    let p3 = deficit * deficit;
281                    topic_score += p3 * topic_params.mesh_message_deliveries_weight;
282                    if let Some(metrics) = metrics.as_mut() {
283                        metrics.register_score_penalty(Penalty::MessageDeficit);
284                    }
285                    tracing::debug!(
286                        peer=%peer_id,
287                        %topic,
288                        %deficit,
289                        penalty=%topic_score,
290                        "[Penalty] The peer has a mesh deliveries deficit and will be penalized"
291                    );
292                }
293
294                // P3b:
295                // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so
296                // this detracts.
297                let p3b = topic_stats.mesh_failure_penalty;
298                topic_score += p3b * topic_params.mesh_failure_penalty_weight;
299
300                // P4: invalid messages
301                // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so
302                // this detracts.
303                let p4 =
304                    topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries;
305                topic_score += p4 * topic_params.invalid_message_deliveries_weight;
306
307                // update score, mixing with topic weight
308                score += topic_score * topic_params.topic_weight;
309            }
310        }
311
312        // apply the topic score cap, if any
313        if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap {
314            score = self.params.topic_score_cap;
315        }
316
317        // P5: application-specific score
318        let p5 = peer_stats.application_score;
319        score += p5 * self.params.app_specific_weight;
320
321        // P6: IP collocation factor
322        for ip in peer_stats.known_ips.iter() {
323            if self.params.ip_colocation_factor_whitelist.contains(ip) {
324                continue;
325            }
326
327            // P6 has a cliff (ip_colocation_factor_threshold); it's only applied iff
328            // at least that many peers are connected to us from that source IP
329            // addr. It is quadratic, and the weight is negative (validated by
330            // peer_score_params.validate()).
331            if let Some(peers_in_ip) = self.peer_ips.get(ip).map(|peers| peers.len()) {
332                if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold {
333                    let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold;
334                    let p6 = surplus * surplus;
335                    if let Some(metrics) = metrics.as_mut() {
336                        metrics.register_score_penalty(Penalty::IPColocation);
337                    }
338                    tracing::debug!(
339                        peer=%peer_id,
340                        surplus_ip=%ip,
341                        surplus=%surplus,
342                        "[Penalty] The peer gets penalized because of too many peers with the same ip"
343                    );
344                    score += p6 * self.params.ip_colocation_factor_weight;
345                }
346            }
347        }
348
349        // P7: behavioural pattern penalty.
350        if peer_stats.behaviour_penalty > self.params.behaviour_penalty_threshold {
351            let excess = peer_stats.behaviour_penalty - self.params.behaviour_penalty_threshold;
352            let p7 = excess * excess;
353            score += p7 * self.params.behaviour_penalty_weight;
354        }
355
356        // Slow peer weighting.
357        if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
358            let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
359            score += excess * self.params.slow_peer_weight;
360        }
361
362        score
363    }
364
365    pub(crate) fn add_penalty(&mut self, peer_id: &PeerId, count: usize) {
366        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
367            tracing::debug!(
368                peer=%peer_id,
369                %count,
370                "[Penalty] Behavioral penalty for peer"
371            );
372            peer_stats.behaviour_penalty += count as f64;
373        }
374    }
375
376    fn remove_ips_for_peer(
377        peer_stats: &PeerStats,
378        peer_ips: &mut HashMap<IpAddr, HashSet<PeerId>>,
379        peer_id: &PeerId,
380    ) {
381        for ip in peer_stats.known_ips.iter() {
382            if let Some(peer_set) = peer_ips.get_mut(ip) {
383                peer_set.remove(peer_id);
384            }
385        }
386    }
387
388    pub(crate) fn refresh_scores(&mut self) {
389        let now = Instant::now();
390        let params_ref = &self.params;
391        let peer_ips_ref = &mut self.peer_ips;
392        self.peer_stats.retain(|peer_id, peer_stats| {
393            if let ConnectionStatus::Disconnected { expire } = peer_stats.status {
394                // has the retention period expired?
395                if now > expire {
396                    // yes, throw it away (but clean up the IP tracking first)
397                    Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id);
398                    // re address this, use retain or entry
399                    return false;
400                }
401
402                // we don't decay retained scores, as the peer is not active.
403                // this way the peer cannot reset a negative score by simply disconnecting and
404                // reconnecting, unless the retention period has elapsed.
405                // similarly, a well behaved peer does not lose its score by getting disconnected.
406                return true;
407            }
408
409            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
410                // the topic parameters
411                if let Some(topic_params) = params_ref.topics.get(topic) {
412                    // decay counters
413                    topic_stats.first_message_deliveries *=
414                        topic_params.first_message_deliveries_decay;
415                    if topic_stats.first_message_deliveries < params_ref.decay_to_zero {
416                        topic_stats.first_message_deliveries = 0.0;
417                    }
418                    topic_stats.mesh_message_deliveries *=
419                        topic_params.mesh_message_deliveries_decay;
420                    if topic_stats.mesh_message_deliveries < params_ref.decay_to_zero {
421                        topic_stats.mesh_message_deliveries = 0.0;
422                    }
423                    topic_stats.mesh_failure_penalty *= topic_params.mesh_failure_penalty_decay;
424                    if topic_stats.mesh_failure_penalty < params_ref.decay_to_zero {
425                        topic_stats.mesh_failure_penalty = 0.0;
426                    }
427                    topic_stats.invalid_message_deliveries *=
428                        topic_params.invalid_message_deliveries_decay;
429                    if topic_stats.invalid_message_deliveries < params_ref.decay_to_zero {
430                        topic_stats.invalid_message_deliveries = 0.0;
431                    }
432                    // update mesh time and activate mesh message delivery parameter if need be
433                    if let MeshStatus::Active {
434                        ref mut mesh_time,
435                        ref mut graft_time,
436                    } = topic_stats.mesh_status
437                    {
438                        *mesh_time = now.duration_since(*graft_time);
439                        if *mesh_time > topic_params.mesh_message_deliveries_activation {
440                            topic_stats.mesh_message_deliveries_active = true;
441                        }
442                    }
443                }
444            }
445
446            // decay P7 counter
447            peer_stats.behaviour_penalty *= params_ref.behaviour_penalty_decay;
448            if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
449                peer_stats.behaviour_penalty = 0.0;
450            }
451
452            // decay slow peer score
453            peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
454            if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
455                peer_stats.slow_peer_penalty = 0.0;
456            }
457
458            true
459        });
460    }
461
462    /// Adds a connected peer to [`PeerScore`], initialising with empty ips (ips get added later
463    /// through add_ip.
464    pub(crate) fn add_peer(&mut self, peer_id: PeerId) {
465        let peer_stats = self.peer_stats.entry(peer_id).or_default();
466
467        // mark the peer as connected
468        peer_stats.status = ConnectionStatus::Connected;
469    }
470
471    /// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
472    pub(crate) fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
473        tracing::trace!(peer=%peer_id, %ip, "Add ip for peer");
474        let peer_stats = self.peer_stats.entry(*peer_id).or_default();
475
476        // Mark the peer as connected (currently the default is connected, but we don't want to
477        // rely on the default).
478        peer_stats.status = ConnectionStatus::Connected;
479
480        // Insert the ip
481        peer_stats.known_ips.insert(ip);
482        self.peer_ips.entry(ip).or_default().insert(*peer_id);
483    }
484
485    /// Indicate that a peer has been too slow to consume a message.
486    pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
487        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
488            peer_stats.slow_peer_penalty += 1.0;
489            tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
490        }
491    }
492
493    /// Removes an ip from a peer
494    pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
495        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
496            peer_stats.known_ips.remove(ip);
497            if let Some(peer_ids) = self.peer_ips.get_mut(ip) {
498                tracing::trace!(peer=%peer_id, %ip, "Remove ip for peer");
499                peer_ids.remove(peer_id);
500            } else {
501                tracing::trace!(
502                    peer=%peer_id,
503                    %ip,
504                    "No entry in peer_ips for ip which should get removed for peer"
505                );
506            }
507        } else {
508            tracing::trace!(
509                peer=%peer_id,
510                %ip,
511                "No peer_stats for peer which should remove the ip"
512            );
513        }
514    }
515
516    /// Removes a peer from the score table. This retains peer statistics if their score is
517    /// non-positive.
518    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
519        // we only retain non-positive scores of peers
520        if self.score(peer_id) > 0f64 {
521            if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
522                Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
523                entry.remove();
524            }
525            return;
526        }
527
528        // if the peer is retained (including it's score) the `first_message_delivery` counters
529        // are reset to 0 and mesh delivery penalties applied.
530        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
531            for (topic, topic_stats) in peer_stats.topics.iter_mut() {
532                topic_stats.first_message_deliveries = 0f64;
533
534                if let Some(threshold) = self
535                    .params
536                    .topics
537                    .get(topic)
538                    .map(|param| param.mesh_message_deliveries_threshold)
539                {
540                    if topic_stats.in_mesh()
541                        && topic_stats.mesh_message_deliveries_active
542                        && topic_stats.mesh_message_deliveries < threshold
543                    {
544                        let deficit = threshold - topic_stats.mesh_message_deliveries;
545                        topic_stats.mesh_failure_penalty += deficit * deficit;
546                    }
547                }
548
549                topic_stats.mesh_status = MeshStatus::InActive;
550                topic_stats.mesh_message_deliveries_active = false;
551            }
552
553            peer_stats.status = ConnectionStatus::Disconnected {
554                expire: Instant::now() + self.params.retain_score,
555            };
556        }
557    }
558
559    /// Handles scoring functionality as a peer GRAFTs to a topic.
560    pub(crate) fn graft(&mut self, peer_id: &PeerId, topic: impl Into<TopicHash>) {
561        let topic = topic.into();
562        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
563            // if we are scoring the topic, update the mesh status.
564            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic, &self.params) {
565                topic_stats.mesh_status = MeshStatus::new_active();
566                topic_stats.mesh_message_deliveries_active = false;
567            }
568        }
569    }
570
571    /// Handles scoring functionality as a peer PRUNEs from a topic.
572    pub(crate) fn prune(&mut self, peer_id: &PeerId, topic: TopicHash) {
573        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
574            // if we are scoring the topic, update the mesh status.
575            if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic.clone(), &self.params)
576            {
577                // sticky mesh delivery rate failure penalty
578                let threshold = self
579                    .params
580                    .topics
581                    .get(&topic)
582                    .expect("Topic must exist in order for there to be topic stats")
583                    .mesh_message_deliveries_threshold;
584                if topic_stats.mesh_message_deliveries_active
585                    && topic_stats.mesh_message_deliveries < threshold
586                {
587                    let deficit = threshold - topic_stats.mesh_message_deliveries;
588                    topic_stats.mesh_failure_penalty += deficit * deficit;
589                }
590                topic_stats.mesh_message_deliveries_active = false;
591                topic_stats.mesh_status = MeshStatus::InActive;
592            }
593        }
594    }
595
596    pub(crate) fn validate_message(
597        &mut self,
598        from: &PeerId,
599        msg_id: &MessageId,
600        topic_hash: &TopicHash,
601    ) {
602        // adds an empty record with the message id
603        self.deliveries.entry(msg_id.clone()).or_default();
604
605        if let Some(callback) = self.message_delivery_time_callback {
606            if self
607                .peer_stats
608                .get(from)
609                .and_then(|s| s.topics.get(topic_hash))
610                .map(|ts| ts.in_mesh())
611                .unwrap_or(false)
612            {
613                callback(from, topic_hash, 0.0);
614            }
615        }
616    }
617
618    pub(crate) fn deliver_message(
619        &mut self,
620        from: &PeerId,
621        msg_id: &MessageId,
622        topic_hash: &TopicHash,
623    ) {
624        self.mark_first_message_delivery(from, topic_hash);
625
626        let record = self.deliveries.entry(msg_id.clone()).or_default();
627
628        // this should be the first delivery trace
629        if record.status != DeliveryStatus::Unknown {
630            tracing::warn!(
631                peer=%from,
632                status=?record.status,
633                first_seen=?record.first_seen.elapsed().as_secs(),
634                "Unexpected delivery trace"
635            );
636            return;
637        }
638
639        // mark the message as valid and reward mesh peers that have already forwarded it to us
640        record.status = DeliveryStatus::Valid(Instant::now());
641        for peer in record.peers.iter().cloned().collect::<Vec<_>>() {
642            // this check is to make sure a peer can't send us a message twice and get a double
643            // count if it is a first delivery
644            if &peer != from {
645                self.mark_duplicate_message_delivery(&peer, topic_hash, None);
646            }
647        }
648    }
649
650    /// Similar to `reject_message` except does not require the message id or reason for an invalid
651    /// message.
652    pub(crate) fn reject_invalid_message(&mut self, from: &PeerId, topic_hash: &TopicHash) {
653        tracing::debug!(
654            peer=%from,
655            "[Penalty] Message from peer rejected because of ValidationError or SelfOrigin"
656        );
657
658        self.mark_invalid_message_delivery(from, topic_hash);
659    }
660
661    // Reject a message.
662    pub(crate) fn reject_message(
663        &mut self,
664        from: &PeerId,
665        msg_id: &MessageId,
666        topic_hash: &TopicHash,
667        reason: RejectReason,
668    ) {
669        match reason {
670            // these messages are not tracked, but the peer is penalized as they are invalid
671            RejectReason::ValidationError(_) | RejectReason::SelfOrigin => {
672                self.reject_invalid_message(from, topic_hash);
673                return;
674            }
675            // we ignore those messages, so do nothing.
676            RejectReason::BlackListedPeer | RejectReason::BlackListedSource => {
677                return;
678            }
679            _ => {} // the rest are handled after record creation
680        }
681
682        let peers: Vec<_> = {
683            let record = self.deliveries.entry(msg_id.clone()).or_default();
684
685            // Multiple peers can now reject the same message as we track which peers send us the
686            // message. If we have already updated the status, return.
687            if record.status != DeliveryStatus::Unknown {
688                return;
689            }
690
691            if let RejectReason::ValidationIgnored = reason {
692                // we were explicitly instructed by the validator to ignore the message but not
693                // penalize the peer
694                record.status = DeliveryStatus::Ignored;
695                record.peers.clear();
696                return;
697            }
698
699            // mark the message as invalid and penalize peers that have already forwarded it.
700            record.status = DeliveryStatus::Invalid;
701            // release the delivery time tracking map to free some memory early
702            record.peers.drain().collect()
703        };
704
705        self.mark_invalid_message_delivery(from, topic_hash);
706        for peer_id in peers.iter() {
707            self.mark_invalid_message_delivery(peer_id, topic_hash)
708        }
709    }
710
711    pub(crate) fn duplicated_message(
712        &mut self,
713        from: &PeerId,
714        msg_id: &MessageId,
715        topic_hash: &TopicHash,
716    ) {
717        let record = self.deliveries.entry(msg_id.clone()).or_default();
718
719        if record.peers.contains(from) {
720            // we have already seen this duplicate!
721            return;
722        }
723
724        if let Some(callback) = self.message_delivery_time_callback {
725            let time = if let DeliveryStatus::Valid(validated) = record.status {
726                validated.elapsed().as_secs_f64()
727            } else {
728                0.0
729            };
730            if self
731                .peer_stats
732                .get(from)
733                .and_then(|s| s.topics.get(topic_hash))
734                .map(|ts| ts.in_mesh())
735                .unwrap_or(false)
736            {
737                callback(from, topic_hash, time);
738            }
739        }
740
741        match record.status {
742            DeliveryStatus::Unknown => {
743                // the message is being validated; track the peer delivery and wait for
744                // the Deliver/Reject notification.
745                record.peers.insert(*from);
746            }
747            DeliveryStatus::Valid(validated) => {
748                // mark the peer delivery time to only count a duplicate delivery once.
749                record.peers.insert(*from);
750                self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
751            }
752            DeliveryStatus::Invalid => {
753                // we no longer track delivery time
754                self.mark_invalid_message_delivery(from, topic_hash);
755            }
756            DeliveryStatus::Ignored => {
757                // the message was ignored; do nothing (we don't know if it was valid)
758            }
759        }
760    }
761
762    /// Sets the application specific score for a peer. Returns true if the peer is the peer is
763    /// connected or if the score of the peer is not yet expired and false otherwise.
764    pub(crate) fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
765        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
766            peer_stats.application_score = new_score;
767            true
768        } else {
769            false
770        }
771    }
772
773    /// Sets scoring parameters for a topic.
774    pub(crate) fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
775        use hash_map::Entry::*;
776        match self.params.topics.entry(topic_hash.clone()) {
777            Occupied(mut entry) => {
778                let first_message_deliveries_cap = params.first_message_deliveries_cap;
779                let mesh_message_deliveries_cap = params.mesh_message_deliveries_cap;
780                let old_params = entry.insert(params);
781
782                if old_params.first_message_deliveries_cap > first_message_deliveries_cap {
783                    for stats in &mut self.peer_stats.values_mut() {
784                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
785                            if tstats.first_message_deliveries > first_message_deliveries_cap {
786                                tstats.first_message_deliveries = first_message_deliveries_cap;
787                            }
788                        }
789                    }
790                }
791
792                if old_params.mesh_message_deliveries_cap > mesh_message_deliveries_cap {
793                    for stats in self.peer_stats.values_mut() {
794                        if let Some(tstats) = stats.topics.get_mut(&topic_hash) {
795                            if tstats.mesh_message_deliveries > mesh_message_deliveries_cap {
796                                tstats.mesh_message_deliveries = mesh_message_deliveries_cap;
797                            }
798                        }
799                    }
800                }
801            }
802            Vacant(entry) => {
803                entry.insert(params);
804            }
805        }
806    }
807
808    /// Returns a scoring parameters for a topic if existent.
809    pub(crate) fn get_topic_params(&self, topic_hash: &TopicHash) -> Option<&TopicScoreParams> {
810        self.params.topics.get(topic_hash)
811    }
812
813    /// Increments the "invalid message deliveries" counter for all scored topics the message
814    /// is published in.
815    fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
816        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
817            if let Some(topic_stats) =
818                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
819            {
820                tracing::debug!(
821                    peer=%peer_id,
822                    topic=%topic_hash,
823                    "[Penalty] Peer delivered an invalid message in topic and gets penalized \
824                    for it",
825                );
826                topic_stats.invalid_message_deliveries += 1f64;
827            }
828        }
829    }
830
831    /// Increments the "first message deliveries" counter for all scored topics the message is
832    /// published in, as well as the "mesh message deliveries" counter, if the peer is in the
833    /// mesh for the topic.
834    fn mark_first_message_delivery(&mut self, peer_id: &PeerId, topic_hash: &TopicHash) {
835        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
836            if let Some(topic_stats) =
837                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
838            {
839                let cap = self
840                    .params
841                    .topics
842                    .get(topic_hash)
843                    .expect("Topic must exist if there are known topic_stats")
844                    .first_message_deliveries_cap;
845                topic_stats.first_message_deliveries =
846                    if topic_stats.first_message_deliveries + 1f64 > cap {
847                        cap
848                    } else {
849                        topic_stats.first_message_deliveries + 1f64
850                    };
851
852                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
853                    let cap = self
854                        .params
855                        .topics
856                        .get(topic_hash)
857                        .expect("Topic must exist if there are known topic_stats")
858                        .mesh_message_deliveries_cap;
859
860                    topic_stats.mesh_message_deliveries =
861                        if topic_stats.mesh_message_deliveries + 1f64 > cap {
862                            cap
863                        } else {
864                            topic_stats.mesh_message_deliveries + 1f64
865                        };
866                }
867            }
868        }
869    }
870
871    /// Increments the "mesh message deliveries" counter for messages we've seen before, as long the
872    /// message was received within the P3 window.
873    fn mark_duplicate_message_delivery(
874        &mut self,
875        peer_id: &PeerId,
876        topic_hash: &TopicHash,
877        validated_time: Option<Instant>,
878    ) {
879        if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
880            let now = if validated_time.is_some() {
881                Some(Instant::now())
882            } else {
883                None
884            };
885            if let Some(topic_stats) =
886                peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params)
887            {
888                if let MeshStatus::Active { .. } = topic_stats.mesh_status {
889                    let topic_params = self
890                        .params
891                        .topics
892                        .get(topic_hash)
893                        .expect("Topic must exist if there are known topic_stats");
894
895                    // check against the mesh delivery window -- if the validated time is passed as
896                    // 0, then the message was received before we finished
897                    // validation and thus falls within the mesh
898                    // delivery window.
899                    let mut falls_in_mesh_deliver_window = true;
900                    if let Some(validated_time) = validated_time {
901                        if let Some(now) = &now {
902                            // should always be true
903                            let window_time = validated_time
904                                .checked_add(topic_params.mesh_message_deliveries_window)
905                                .unwrap_or(*now);
906                            if now > &window_time {
907                                falls_in_mesh_deliver_window = false;
908                            }
909                        }
910                    }
911
912                    if falls_in_mesh_deliver_window {
913                        let cap = topic_params.mesh_message_deliveries_cap;
914                        topic_stats.mesh_message_deliveries =
915                            if topic_stats.mesh_message_deliveries + 1f64 > cap {
916                                cap
917                            } else {
918                                topic_stats.mesh_message_deliveries + 1f64
919                            };
920                    }
921                }
922            }
923        }
924    }
925
926    pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option<f64> {
927        self.peer_stats
928            .get(peer)
929            .and_then(|s| s.topics.get(topic))
930            .map(|t| t.mesh_message_deliveries)
931    }
932}
933
934/// The reason a Gossipsub message has been rejected.
935#[derive(Clone, Copy)]
936pub(crate) enum RejectReason {
937    /// The message failed the configured validation during decoding.
938    ValidationError(ValidationError),
939    /// The message source is us.
940    SelfOrigin,
941    /// The peer that sent the message was blacklisted.
942    BlackListedPeer,
943    /// The source (from field) of the message was blacklisted.
944    BlackListedSource,
945    /// The validation was ignored.
946    ValidationIgnored,
947    /// The validation failed.
948    ValidationFailed,
949}