libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{max, Ordering, Ordering::Equal},
23    collections::{BTreeSet, HashMap, HashSet, VecDeque},
24    fmt,
25    fmt::Debug,
26    net::IpAddr,
27    task::{Context, Poll},
28    time::Duration,
29};
30
31use futures::FutureExt;
32use futures_timer::Delay;
33use hashlink::LinkedHashMap;
34use libp2p_core::{
35    multiaddr::Protocol::{Ip4, Ip6},
36    transport::PortUse,
37    Endpoint, Multiaddr,
38};
39use libp2p_identity::{Keypair, PeerId};
40use libp2p_swarm::{
41    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
42    dial_opts::DialOpts,
43    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
44    THandlerOutEvent, ToSwarm,
45};
46use prometheus_client::registry::Registry;
47use quick_protobuf::{MessageWrite, Writer};
48use rand::{seq::SliceRandom, thread_rng};
49use web_time::{Instant, SystemTime};
50
51use crate::{
52    backoff::BackoffStorage,
53    config::{Config, ValidationMode},
54    gossip_promises::GossipPromises,
55    handler::{Handler, HandlerEvent, HandlerIn},
56    mcache::MessageCache,
57    metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
58    peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
59    protocol::SIGNING_PREFIX,
60    rpc::Sender,
61    rpc_proto::proto,
62    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
63    time_cache::DuplicateCache,
64    topic::{Hasher, Topic, TopicHash},
65    transform::{DataTransform, IdentityTransform},
66    types::{
67        ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
68        PeerConnections, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
69        SubscriptionAction,
70    },
71    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
72};
73
74#[cfg(test)]
75mod tests;
76
77/// IDONTWANT cache capacity.
78const IDONTWANT_CAP: usize = 10_000;
79
80/// IDONTWANT timeout before removal.
81const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
82
83/// Determines if published messages should be signed or not.
84///
85/// Without signing, a number of privacy preserving modes can be selected.
86///
87/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
88/// should be updated in the [`Config`] to allow for unsigned messages.
89#[derive(Clone)]
90pub enum MessageAuthenticity {
91    /// Message signing is enabled. The author will be the owner of the key and the sequence number
92    /// will be linearly increasing.
93    Signed(Keypair),
94    /// Message signing is disabled.
95    ///
96    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
97    /// number will be randomized.
98    Author(PeerId),
99    /// Message signing is disabled.
100    ///
101    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
102    /// randomized.
103    RandomAuthor,
104    /// Message signing is disabled.
105    ///
106    /// The author of the message and the sequence numbers are excluded from the message.
107    ///
108    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
109    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
110    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
111    /// function will need to be set to prevent all messages from a peer being filtered
112    /// as duplicates.
113    Anonymous,
114}
115
116impl MessageAuthenticity {
117    /// Returns true if signing is enabled.
118    pub fn is_signing(&self) -> bool {
119        matches!(self, MessageAuthenticity::Signed(_))
120    }
121
122    pub fn is_anonymous(&self) -> bool {
123        matches!(self, MessageAuthenticity::Anonymous)
124    }
125}
126
127/// Event that can be emitted by the gossipsub behaviour.
128#[derive(Debug)]
129pub enum Event {
130    /// A message has been received.
131    Message {
132        /// The peer that forwarded us this message.
133        propagation_source: PeerId,
134        /// The [`MessageId`] of the message. This should be referenced by the application when
135        /// validating a message (if required).
136        message_id: MessageId,
137        /// The decompressed message itself.
138        message: Message,
139    },
140    /// A remote subscribed to a topic.
141    Subscribed {
142        /// Remote that has subscribed.
143        peer_id: PeerId,
144        /// The topic it has subscribed to.
145        topic: TopicHash,
146    },
147    /// A remote unsubscribed from a topic.
148    Unsubscribed {
149        /// Remote that has unsubscribed.
150        peer_id: PeerId,
151        /// The topic it has subscribed from.
152        topic: TopicHash,
153    },
154    /// A peer that does not support gossipsub has connected.
155    GossipsubNotSupported { peer_id: PeerId },
156    /// A peer is not able to download messages in time.
157    SlowPeer {
158        /// The peer_id
159        peer_id: PeerId,
160        /// The types and amounts of failed messages that are occurring for this peer.
161        failed_messages: FailedMessages,
162    },
163}
164
165/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
166/// for further details.
167#[allow(clippy::large_enum_variant)]
168enum PublishConfig {
169    Signing {
170        keypair: Keypair,
171        author: PeerId,
172        inline_key: Option<Vec<u8>>,
173        last_seq_no: SequenceNumber,
174    },
175    Author(PeerId),
176    RandomAuthor,
177    Anonymous,
178}
179
180/// A strictly linearly increasing sequence number.
181///
182/// We start from the current time as unix timestamp in milliseconds.
183#[derive(Debug)]
184struct SequenceNumber(u64);
185
186impl SequenceNumber {
187    fn new() -> Self {
188        let unix_timestamp = SystemTime::now()
189            .duration_since(SystemTime::UNIX_EPOCH)
190            .expect("time to be linear")
191            .as_nanos();
192
193        Self(unix_timestamp as u64)
194    }
195
196    fn next(&mut self) -> u64 {
197        self.0 = self
198            .0
199            .checked_add(1)
200            .expect("to not exhaust u64 space for sequence numbers");
201
202        self.0
203    }
204}
205
206impl PublishConfig {
207    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
208        match self {
209            Self::Signing { author, .. } => Some(author),
210            Self::Author(author) => Some(author),
211            _ => None,
212        }
213    }
214}
215
216impl From<MessageAuthenticity> for PublishConfig {
217    fn from(authenticity: MessageAuthenticity) -> Self {
218        match authenticity {
219            MessageAuthenticity::Signed(keypair) => {
220                let public_key = keypair.public();
221                let key_enc = public_key.encode_protobuf();
222                let key = if key_enc.len() <= 42 {
223                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
224                    // don't include it specifically in the
225                    // [`rpc_proto::proto::Message::key`] field.
226                    None
227                } else {
228                    // Include the protobuf encoding of the public key in the message.
229                    Some(key_enc)
230                };
231
232                PublishConfig::Signing {
233                    keypair,
234                    author: public_key.to_peer_id(),
235                    inline_key: key,
236                    last_seq_no: SequenceNumber::new(),
237                }
238            }
239            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
240            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
241            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
242        }
243    }
244}
245
246/// Network behaviour that handles the gossipsub protocol.
247///
248/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
249/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
250/// appropriate level to accept unsigned messages.
251///
252/// The DataTransform trait allows applications to optionally add extra encoding/decoding
253/// functionality to the underlying messages. This is intended for custom compression algorithms.
254///
255/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
256/// prevent unwanted messages being propagated and evaluated.
257pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
258    /// Configuration providing gossipsub performance parameters.
259    config: Config,
260
261    /// Events that need to be yielded to the outside when polling.
262    events: VecDeque<ToSwarm<Event, HandlerIn>>,
263
264    /// Information used for publishing messages.
265    publish_config: PublishConfig,
266
267    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
268    /// duplicates from being propagated to the application and on the network.
269    duplicate_cache: DuplicateCache<MessageId>,
270
271    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
272    /// the set of [`ConnectionId`]s.
273    connected_peers: HashMap<PeerId, PeerConnections>,
274
275    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
276    /// forward messages to, outside of the scoring system.
277    explicit_peers: HashSet<PeerId>,
278
279    /// A list of peers that have been blacklisted by the user.
280    /// Messages are not sent to and are rejected from these peers.
281    blacklisted_peers: HashSet<PeerId>,
282
283    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
284    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
285
286    /// Map of topics to list of peers that we publish to, but don't subscribe to.
287    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
288
289    /// The last publish time for fanout topics.
290    fanout_last_pub: HashMap<TopicHash, Instant>,
291
292    /// Storage for backoffs
293    backoffs: BackoffStorage,
294
295    /// Message cache for the last few heartbeats.
296    mcache: MessageCache,
297
298    /// Heartbeat interval stream.
299    heartbeat: Delay,
300
301    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
302    /// clean up -- eg backoff clean up.
303    heartbeat_ticks: u64,
304
305    /// We remember all peers we found through peer exchange, since those peers are not considered
306    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
307    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
308    /// be removed from this list which may result in a true outbound rediscovery.
309    px_peers: HashSet<PeerId>,
310
311    /// Set of connected outbound peers (we only consider true outbound peers found through
312    /// discovery and not by PX).
313    outbound_peers: HashSet<PeerId>,
314
315    /// Stores optional peer score data together with thresholds, decay interval and gossip
316    /// promises.
317    peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,
318
319    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
320    count_received_ihave: HashMap<PeerId, usize>,
321
322    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
323    count_sent_iwant: HashMap<PeerId, usize>,
324
325    /// Short term cache for published message ids. This is used for penalizing peers sending
326    /// our own messages back if the messages are anonymous or use a random author.
327    published_message_ids: DuplicateCache<MessageId>,
328
329    /// The filter used to handle message subscriptions.
330    subscription_filter: F,
331
332    /// A general transformation function that can be applied to data received from the wire before
333    /// calculating the message-id and sending to the application. This is designed to allow the
334    /// user to implement arbitrary topic-based compression algorithms.
335    data_transform: D,
336
337    /// Keep track of a set of internal metrics relating to gossipsub.
338    metrics: Option<Metrics>,
339
340    /// Tracks the numbers of failed messages per peer-id.
341    failed_messages: HashMap<PeerId, FailedMessages>,
342
343    /// Tracks recently sent `IWANT` messages and checks if peers respond to them.
344    gossip_promises: GossipPromises,
345}
346
347impl<D, F> Behaviour<D, F>
348where
349    D: DataTransform + Default,
350    F: TopicSubscriptionFilter + Default,
351{
352    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
353    /// [`Config`]. This has no subscription filter and uses no compression.
354    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
355        Self::new_with_subscription_filter_and_transform(
356            privacy,
357            config,
358            None,
359            F::default(),
360            D::default(),
361        )
362    }
363
364    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
365    /// [`Config`]. This has no subscription filter and uses no compression.
366    /// Metrics can be evaluated by passing a reference to a [`Registry`].
367    pub fn new_with_metrics(
368        privacy: MessageAuthenticity,
369        config: Config,
370        metrics_registry: &mut Registry,
371        metrics_config: MetricsConfig,
372    ) -> Result<Self, &'static str> {
373        Self::new_with_subscription_filter_and_transform(
374            privacy,
375            config,
376            Some((metrics_registry, metrics_config)),
377            F::default(),
378            D::default(),
379        )
380    }
381}
382
383impl<D, F> Behaviour<D, F>
384where
385    D: DataTransform + Default,
386    F: TopicSubscriptionFilter,
387{
388    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
389    /// [`Config`] and a custom subscription filter.
390    pub fn new_with_subscription_filter(
391        privacy: MessageAuthenticity,
392        config: Config,
393        metrics: Option<(&mut Registry, MetricsConfig)>,
394        subscription_filter: F,
395    ) -> Result<Self, &'static str> {
396        Self::new_with_subscription_filter_and_transform(
397            privacy,
398            config,
399            metrics,
400            subscription_filter,
401            D::default(),
402        )
403    }
404}
405
406impl<D, F> Behaviour<D, F>
407where
408    D: DataTransform,
409    F: TopicSubscriptionFilter + Default,
410{
411    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
412    /// [`Config`] and a custom data transform.
413    pub fn new_with_transform(
414        privacy: MessageAuthenticity,
415        config: Config,
416        metrics: Option<(&mut Registry, MetricsConfig)>,
417        data_transform: D,
418    ) -> Result<Self, &'static str> {
419        Self::new_with_subscription_filter_and_transform(
420            privacy,
421            config,
422            metrics,
423            F::default(),
424            data_transform,
425        )
426    }
427}
428
429impl<D, F> Behaviour<D, F>
430where
431    D: DataTransform,
432    F: TopicSubscriptionFilter,
433{
434    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
435    /// [`Config`] and a custom subscription filter and data transform.
436    pub fn new_with_subscription_filter_and_transform(
437        privacy: MessageAuthenticity,
438        config: Config,
439        metrics: Option<(&mut Registry, MetricsConfig)>,
440        subscription_filter: F,
441        data_transform: D,
442    ) -> Result<Self, &'static str> {
443        // Set up the router given the configuration settings.
444
445        // We do not allow configurations where a published message would also be rejected if it
446        // were received locally.
447        validate_config(&privacy, config.validation_mode())?;
448
449        Ok(Behaviour {
450            metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
451            events: VecDeque::new(),
452            publish_config: privacy.into(),
453            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
454            explicit_peers: HashSet::new(),
455            blacklisted_peers: HashSet::new(),
456            mesh: HashMap::new(),
457            fanout: HashMap::new(),
458            fanout_last_pub: HashMap::new(),
459            backoffs: BackoffStorage::new(
460                &config.prune_backoff(),
461                config.heartbeat_interval(),
462                config.backoff_slack(),
463            ),
464            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
465            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
466            heartbeat_ticks: 0,
467            px_peers: HashSet::new(),
468            outbound_peers: HashSet::new(),
469            peer_score: None,
470            count_received_ihave: HashMap::new(),
471            count_sent_iwant: HashMap::new(),
472            connected_peers: HashMap::new(),
473            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
474            config,
475            subscription_filter,
476            data_transform,
477            failed_messages: Default::default(),
478            gossip_promises: Default::default(),
479        })
480    }
481}
482
483impl<D, F> Behaviour<D, F>
484where
485    D: DataTransform + Send + 'static,
486    F: TopicSubscriptionFilter + Send + 'static,
487{
488    /// Lists the hashes of the topics we are currently subscribed to.
489    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
490        self.mesh.keys()
491    }
492
493    /// Lists all mesh peers for a certain topic hash.
494    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
495        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
496    }
497
498    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
499        let mut res = BTreeSet::new();
500        for peers in self.mesh.values() {
501            res.extend(peers);
502        }
503        res.into_iter()
504    }
505
506    /// Lists all known peers and their associated subscribed topics.
507    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
508        self.connected_peers
509            .iter()
510            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
511    }
512
513    /// Lists all known peers and their associated protocol.
514    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
515        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
516    }
517
518    /// Returns the gossipsub score for a given peer, if one exists.
519    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
520        self.peer_score
521            .as_ref()
522            .map(|(score, ..)| score.score(peer_id))
523    }
524
525    /// Subscribe to a topic.
526    ///
527    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
528    /// subscribed.
529    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
530        tracing::debug!(%topic, "Subscribing to topic");
531        let topic_hash = topic.hash();
532        if !self.subscription_filter.can_subscribe(&topic_hash) {
533            return Err(SubscriptionError::NotAllowed);
534        }
535
536        if self.mesh.contains_key(&topic_hash) {
537            tracing::debug!(%topic, "Topic is already in the mesh");
538            return Ok(false);
539        }
540
541        // send subscription request to all peers
542        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
543            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
544            let event = RpcOut::Subscribe(topic_hash.clone());
545            self.send_message(peer_id, event);
546        }
547
548        // call JOIN(topic)
549        // this will add new peers to the mesh for the topic
550        self.join(&topic_hash);
551        tracing::debug!(%topic, "Subscribed to topic");
552        Ok(true)
553    }
554
555    /// Unsubscribes from a topic.
556    ///
557    /// Returns `true` if we were subscribed to this topic.
558    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
559        tracing::debug!(%topic, "Unsubscribing from topic");
560        let topic_hash = topic.hash();
561
562        if !self.mesh.contains_key(&topic_hash) {
563            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
564            // we are not subscribed
565            return false;
566        }
567
568        // announce to all peers
569        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
570            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
571            let event = RpcOut::Unsubscribe(topic_hash.clone());
572            self.send_message(peer, event);
573        }
574
575        // call LEAVE(topic)
576        // this will remove the topic from the mesh
577        self.leave(&topic_hash);
578
579        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
580        true
581    }
582
583    /// Publishes a message with multiple topics to the network.
584    pub fn publish(
585        &mut self,
586        topic: impl Into<TopicHash>,
587        data: impl Into<Vec<u8>>,
588    ) -> Result<MessageId, PublishError> {
589        let data = data.into();
590        let topic = topic.into();
591
592        // Transform the data before building a raw_message.
593        let transformed_data = self
594            .data_transform
595            .outbound_transform(&topic, data.clone())?;
596
597        // check that the size doesn't exceed the max transmission size.
598        if transformed_data.len() > self.config.max_transmit_size() {
599            return Err(PublishError::MessageTooLarge);
600        }
601
602        let raw_message = self.build_raw_message(topic, transformed_data)?;
603
604        // calculate the message id from the un-transformed data
605        let msg_id = self.config.message_id(&Message {
606            source: raw_message.source,
607            data, // the uncompressed form
608            sequence_number: raw_message.sequence_number,
609            topic: raw_message.topic.clone(),
610        });
611
612        // Check the if the message has been published before
613        if self.duplicate_cache.contains(&msg_id) {
614            // This message has already been seen. We don't re-publish messages that have already
615            // been published on the network.
616            tracing::warn!(
617                message=%msg_id,
618                "Not publishing a message that has already been published"
619            );
620            return Err(PublishError::Duplicate);
621        }
622
623        tracing::trace!(message=%msg_id, "Publishing message");
624
625        let topic_hash = raw_message.topic.clone();
626
627        let mut peers_on_topic = self
628            .connected_peers
629            .iter()
630            .filter(|(_, p)| p.topics.contains(&topic_hash))
631            .map(|(peer_id, _)| peer_id)
632            .peekable();
633
634        if peers_on_topic.peek().is_none() {
635            return Err(PublishError::NoPeersSubscribedToTopic);
636        }
637
638        let mut recipient_peers = HashSet::new();
639        if self.config.flood_publish() {
640            // Forward to all peers above score and all explicit peers
641            recipient_peers.extend(peers_on_topic.filter(|p| {
642                self.explicit_peers.contains(*p)
643                    || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
644            }));
645        } else {
646            match self.mesh.get(&topic_hash) {
647                // Mesh peers
648                Some(mesh_peers) => {
649                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
650                    // peers (if possible).
651                    let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
652
653                    if needed_extra_peers > 0 {
654                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
655                        // and publish to them.
656
657                        // Get a random set of peers that are appropriate to send messages too.
658                        let peer_list = get_random_peers(
659                            &self.connected_peers,
660                            &topic_hash,
661                            needed_extra_peers,
662                            |peer| {
663                                !mesh_peers.contains(peer)
664                                    && !self.explicit_peers.contains(peer)
665                                    && !self
666                                        .score_below_threshold(peer, |pst| pst.publish_threshold)
667                                        .0
668                            },
669                        );
670                        recipient_peers.extend(peer_list);
671                    }
672
673                    recipient_peers.extend(mesh_peers);
674                }
675                // Gossipsub peers
676                None => {
677                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678                    // `fanout_peers` is always non-empty if it's `Some`.
679                    let fanout_peers = self
680                        .fanout
681                        .get(&topic_hash)
682                        .filter(|peers| !peers.is_empty());
683                    // If we have fanout peers add them to the map.
684                    if let Some(peers) = fanout_peers {
685                        for peer in peers {
686                            recipient_peers.insert(*peer);
687                        }
688                    } else {
689                        // We have no fanout peers, select mesh_n of them and add them to the fanout
690                        let mesh_n = self.config.mesh_n();
691                        let new_peers =
692                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
693                                |p| {
694                                    !self.explicit_peers.contains(p)
695                                        && !self
696                                            .score_below_threshold(p, |pst| pst.publish_threshold)
697                                            .0
698                                }
699                            });
700                        // Add the new peers to the fanout and recipient peers
701                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
702                        for peer in new_peers {
703                            tracing::debug!(%peer, "Peer added to fanout");
704                            recipient_peers.insert(peer);
705                        }
706                    }
707                    // We are publishing to fanout peers - update the time we published
708                    self.fanout_last_pub
709                        .insert(topic_hash.clone(), Instant::now());
710                }
711            }
712
713            // Explicit peers that are part of the topic
714            recipient_peers
715                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717            // Floodsub peers
718            for (peer, connections) in &self.connected_peers {
719                if connections.kind == PeerKind::Floodsub
720                    && connections.topics.contains(&topic_hash)
721                    && !self
722                        .score_below_threshold(peer, |ts| ts.publish_threshold)
723                        .0
724                {
725                    recipient_peers.insert(*peer);
726                }
727            }
728        }
729
730        // If the message isn't a duplicate and we have sent it to some peers add it to the
731        // duplicate cache and memcache.
732        self.duplicate_cache.insert(msg_id.clone());
733        self.mcache.put(&msg_id, raw_message.clone());
734
735        // Consider the message as delivered for gossip promises.
736        self.gossip_promises.message_delivered(&msg_id);
737
738        // If the message is anonymous or has a random author add it to the published message ids
739        // cache.
740        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
741            if !self.config.allow_self_origin() {
742                self.published_message_ids.insert(msg_id.clone());
743            }
744        }
745
746        // Send to peers we know are subscribed to the topic.
747        let mut publish_failed = true;
748        for peer_id in recipient_peers.iter() {
749            tracing::trace!(peer=%peer_id, "Sending message to peer");
750            if self.send_message(
751                *peer_id,
752                RpcOut::Publish {
753                    message: raw_message.clone(),
754                    timeout: Delay::new(self.config.publish_queue_duration()),
755                },
756            ) {
757                publish_failed = false
758            }
759        }
760
761        if recipient_peers.is_empty() {
762            return Err(PublishError::NoPeersSubscribedToTopic);
763        }
764
765        if publish_failed {
766            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
767        }
768
769        // Broadcast IDONTWANT messages
770        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
771            && self.config.idontwant_on_publish()
772        {
773            self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
774        }
775
776        tracing::debug!(message=%msg_id, "Published message");
777
778        if let Some(metrics) = self.metrics.as_mut() {
779            metrics.register_published_message(&topic_hash);
780        }
781
782        Ok(msg_id)
783    }
784
785    /// This function should be called when [`Config::validate_messages()`] is `true` after
786    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
787    /// validation is expected to be fast enough that the messages should still exist in the cache.
788    /// There are three possible validation outcomes and the outcome is given in acceptance.
789    ///
790    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
791    /// network. The `propagation_source` parameter indicates who the message was received by and
792    /// will not be forwarded back to that peer.
793    ///
794    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
795    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
796    //
797    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
798    /// but no Pâ‚„ penalty will be applied.
799    ///
800    /// This function will return true if the message was found in the cache and false if was not
801    /// in the cache anymore.
802    ///
803    /// This should only be called once per message.
804    pub fn report_message_validation_result(
805        &mut self,
806        msg_id: &MessageId,
807        propagation_source: &PeerId,
808        acceptance: MessageAcceptance,
809    ) -> bool {
810        let reject_reason = match acceptance {
811            MessageAcceptance::Accept => {
812                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
813                    Some((raw_message, originating_peers)) => {
814                        (raw_message.clone(), originating_peers)
815                    }
816                    None => {
817                        tracing::warn!(
818                            message=%msg_id,
819                            "Message not in cache. Ignoring forwarding"
820                        );
821                        if let Some(metrics) = self.metrics.as_mut() {
822                            metrics.memcache_miss();
823                        }
824                        return false;
825                    }
826                };
827
828                if let Some(metrics) = self.metrics.as_mut() {
829                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
830                }
831
832                self.forward_msg(
833                    msg_id,
834                    raw_message,
835                    Some(propagation_source),
836                    originating_peers,
837                );
838                return true;
839            }
840            MessageAcceptance::Reject => RejectReason::ValidationFailed,
841            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
842        };
843
844        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
845            if let Some(metrics) = self.metrics.as_mut() {
846                metrics.register_msg_validation(&raw_message.topic, &acceptance);
847            }
848
849            // Tell peer_score about reject
850            // Reject the original source, and any duplicates we've seen from other peers.
851            if let Some((peer_score, ..)) = &mut self.peer_score {
852                peer_score.reject_message(
853                    propagation_source,
854                    msg_id,
855                    &raw_message.topic,
856                    reject_reason,
857                );
858                for peer in originating_peers.iter() {
859                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
860                }
861            }
862            true
863        } else {
864            tracing::warn!(message=%msg_id, "Rejected message not in cache");
865            false
866        }
867    }
868
869    /// Adds a new peer to the list of explicitly connected peers.
870    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
871        tracing::debug!(peer=%peer_id, "Adding explicit peer");
872
873        self.explicit_peers.insert(*peer_id);
874
875        self.check_explicit_peer_connection(peer_id);
876    }
877
878    /// This removes the peer from explicitly connected peers, note that this does not disconnect
879    /// the peer.
880    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
881        tracing::debug!(peer=%peer_id, "Removing explicit peer");
882        self.explicit_peers.remove(peer_id);
883    }
884
885    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
886    /// created by this peer will be rejected.
887    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
888        if self.blacklisted_peers.insert(*peer_id) {
889            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
890        }
891    }
892
893    /// Removes a peer from the blacklist if it has previously been blacklisted.
894    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
895        if self.blacklisted_peers.remove(peer_id) {
896            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
897        }
898    }
899
900    /// Activates the peer scoring system with the given parameters. This will reset all scores
901    /// if there was already another peer scoring system activated. Returns an error if the
902    /// params are not valid or if they got already set.
903    pub fn with_peer_score(
904        &mut self,
905        params: PeerScoreParams,
906        threshold: PeerScoreThresholds,
907    ) -> Result<(), String> {
908        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
909    }
910
911    /// Activates the peer scoring system with the given parameters and a message delivery time
912    /// callback. Returns an error if the parameters got already set.
913    pub fn with_peer_score_and_message_delivery_time_callback(
914        &mut self,
915        params: PeerScoreParams,
916        threshold: PeerScoreThresholds,
917        callback: Option<fn(&PeerId, &TopicHash, f64)>,
918    ) -> Result<(), String> {
919        params.validate()?;
920        threshold.validate()?;
921
922        if self.peer_score.is_some() {
923            return Err("Peer score set twice".into());
924        }
925
926        let interval = Delay::new(params.decay_interval);
927        let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
928        self.peer_score = Some((peer_score, threshold, interval));
929        Ok(())
930    }
931
932    /// Sets scoring parameters for a topic.
933    ///
934    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
935    pub fn set_topic_params<H: Hasher>(
936        &mut self,
937        topic: Topic<H>,
938        params: TopicScoreParams,
939    ) -> Result<(), &'static str> {
940        if let Some((peer_score, ..)) = &mut self.peer_score {
941            peer_score.set_topic_params(topic.hash(), params);
942            Ok(())
943        } else {
944            Err("Peer score must be initialised with `with_peer_score()`")
945        }
946    }
947
948    /// Returns a scoring parameters for a topic if existent.
949    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
950        self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
951    }
952
953    /// Sets the application specific score for a peer. Returns true if scoring is active and
954    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
955    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
956        if let Some((peer_score, ..)) = &mut self.peer_score {
957            peer_score.set_application_score(peer_id, new_score)
958        } else {
959            false
960        }
961    }
962
963    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
964    fn join(&mut self, topic_hash: &TopicHash) {
965        tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
966
967        // if we are already in the mesh, return
968        if self.mesh.contains_key(topic_hash) {
969            tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
970            return;
971        }
972
973        let mut added_peers = HashSet::new();
974
975        if let Some(m) = self.metrics.as_mut() {
976            m.joined(topic_hash)
977        }
978
979        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
980        // removing the fanout entry.
981        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
982            tracing::debug!(
983                topic=%topic_hash,
984                "JOIN: Removing peers from the fanout for topic"
985            );
986
987            // remove explicit peers, peers with negative scores, and backoffed peers
988            peers.retain(|p| {
989                !self.explicit_peers.contains(p)
990                    && !self.score_below_threshold(p, |_| 0.0).0
991                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
992            });
993
994            // Add up to mesh_n of them to the mesh
995            // NOTE: These aren't randomly added, currently FIFO
996            let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
997            tracing::debug!(
998                topic=%topic_hash,
999                "JOIN: Adding {:?} peers from the fanout for topic",
1000                add_peers
1001            );
1002            added_peers.extend(peers.iter().take(add_peers));
1003
1004            self.mesh.insert(
1005                topic_hash.clone(),
1006                peers.into_iter().take(add_peers).collect(),
1007            );
1008
1009            // remove the last published time
1010            self.fanout_last_pub.remove(topic_hash);
1011        }
1012
1013        let fanaout_added = added_peers.len();
1014        if let Some(m) = self.metrics.as_mut() {
1015            m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
1016        }
1017
1018        // check if we need to get more peers, which we randomly select
1019        if added_peers.len() < self.config.mesh_n() {
1020            // get the peers
1021            let new_peers = get_random_peers(
1022                &self.connected_peers,
1023                topic_hash,
1024                self.config.mesh_n() - added_peers.len(),
1025                |peer| {
1026                    !added_peers.contains(peer)
1027                        && !self.explicit_peers.contains(peer)
1028                        && !self.score_below_threshold(peer, |_| 0.0).0
1029                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1030                },
1031            );
1032            added_peers.extend(new_peers.clone());
1033            // add them to the mesh
1034            tracing::debug!(
1035                "JOIN: Inserting {:?} random peers into the mesh",
1036                new_peers.len()
1037            );
1038            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1039            mesh_peers.extend(new_peers);
1040        }
1041
1042        let random_added = added_peers.len() - fanaout_added;
1043        if let Some(m) = self.metrics.as_mut() {
1044            m.peers_included(topic_hash, Inclusion::Random, random_added)
1045        }
1046
1047        for peer_id in added_peers {
1048            // Send a GRAFT control message
1049            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1050            if let Some((peer_score, ..)) = &mut self.peer_score {
1051                peer_score.graft(&peer_id, topic_hash.clone());
1052            }
1053            self.send_message(
1054                peer_id,
1055                RpcOut::Graft(Graft {
1056                    topic_hash: topic_hash.clone(),
1057                }),
1058            );
1059
1060            // If the peer did not previously exist in any mesh, inform the handler
1061            peer_added_to_mesh(
1062                peer_id,
1063                vec![topic_hash],
1064                &self.mesh,
1065                &mut self.events,
1066                &self.connected_peers,
1067            );
1068        }
1069
1070        let mesh_peers = self.mesh_peers(topic_hash).count();
1071        if let Some(m) = self.metrics.as_mut() {
1072            m.set_mesh_peers(topic_hash, mesh_peers)
1073        }
1074
1075        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1076    }
1077
1078    /// Creates a PRUNE gossipsub action.
1079    fn make_prune(
1080        &mut self,
1081        topic_hash: &TopicHash,
1082        peer: &PeerId,
1083        do_px: bool,
1084        on_unsubscribe: bool,
1085    ) -> Prune {
1086        if let Some((peer_score, ..)) = &mut self.peer_score {
1087            peer_score.prune(peer, topic_hash.clone());
1088        }
1089
1090        match self.connected_peers.get(peer).map(|v| &v.kind) {
1091            Some(PeerKind::Floodsub) => {
1092                tracing::error!("Attempted to prune a Floodsub peer");
1093            }
1094            Some(PeerKind::Gossipsub) => {
1095                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1096                return Prune {
1097                    topic_hash: topic_hash.clone(),
1098                    peers: Vec::new(),
1099                    backoff: None,
1100                };
1101            }
1102            None => {
1103                tracing::error!("Attempted to Prune an unknown peer");
1104            }
1105            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1106        }
1107
1108        // Select peers for peer exchange
1109        let peers = if do_px {
1110            get_random_peers(
1111                &self.connected_peers,
1112                topic_hash,
1113                self.config.prune_peers(),
1114                |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1115            )
1116            .into_iter()
1117            .map(|p| PeerInfo { peer_id: Some(p) })
1118            .collect()
1119        } else {
1120            Vec::new()
1121        };
1122
1123        let backoff = if on_unsubscribe {
1124            self.config.unsubscribe_backoff()
1125        } else {
1126            self.config.prune_backoff()
1127        };
1128
1129        // update backoff
1130        self.backoffs.update_backoff(topic_hash, peer, backoff);
1131
1132        Prune {
1133            topic_hash: topic_hash.clone(),
1134            peers,
1135            backoff: Some(backoff.as_secs()),
1136        }
1137    }
1138
1139    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1140    fn leave(&mut self, topic_hash: &TopicHash) {
1141        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1142
1143        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1144        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1145            if let Some(m) = self.metrics.as_mut() {
1146                m.left(topic_hash)
1147            }
1148            for peer_id in peers {
1149                // Send a PRUNE control message
1150                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1151
1152                let on_unsubscribe = true;
1153                let prune =
1154                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1155                self.send_message(peer_id, RpcOut::Prune(prune));
1156
1157                // If the peer did not previously exist in any mesh, inform the handler
1158                peer_removed_from_mesh(
1159                    peer_id,
1160                    topic_hash,
1161                    &self.mesh,
1162                    &mut self.events,
1163                    &self.connected_peers,
1164                );
1165            }
1166        }
1167        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1168    }
1169
1170    /// Checks if the given peer is still connected and if not dials the peer again.
1171    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1172        if !self.connected_peers.contains_key(peer_id) {
1173            // Connect to peer
1174            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1175            self.events.push_back(ToSwarm::Dial {
1176                opts: DialOpts::peer_id(*peer_id).build(),
1177            });
1178        }
1179    }
1180
1181    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
1182    /// `threshold` parameter.
1183    fn score_below_threshold(
1184        &self,
1185        peer_id: &PeerId,
1186        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1187    ) -> (bool, f64) {
1188        Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1189    }
1190
1191    fn score_below_threshold_from_scores(
1192        peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
1193        peer_id: &PeerId,
1194        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1195    ) -> (bool, f64) {
1196        if let Some((peer_score, thresholds, ..)) = peer_score {
1197            let score = peer_score.score(peer_id);
1198            if score < threshold(thresholds) {
1199                return (true, score);
1200            }
1201            (false, score)
1202        } else {
1203            (false, 0.0)
1204        }
1205    }
1206
1207    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1208    /// requests it with an IWANT control message.
1209    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1210        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1211        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1212            tracing::debug!(
1213                peer=%peer_id,
1214                %score,
1215                "IHAVE: ignoring peer with score below threshold"
1216            );
1217            return;
1218        }
1219
1220        // IHAVE flood protection
1221        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1222        *peer_have += 1;
1223        if *peer_have > self.config.max_ihave_messages() {
1224            tracing::debug!(
1225                peer=%peer_id,
1226                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1227            interval; ignoring",
1228                *peer_have
1229            );
1230            return;
1231        }
1232
1233        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1234            if *iasked >= self.config.max_ihave_length() {
1235                tracing::debug!(
1236                    peer=%peer_id,
1237                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1238                    *iasked
1239                );
1240                return;
1241            }
1242        }
1243
1244        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1245
1246        let mut iwant_ids = HashSet::new();
1247
1248        let want_message = |id: &MessageId| {
1249            if self.duplicate_cache.contains(id) {
1250                return false;
1251            }
1252
1253            !self.gossip_promises.contains(id)
1254        };
1255
1256        for (topic, ids) in ihave_msgs {
1257            // only process the message if we are subscribed
1258            if !self.mesh.contains_key(&topic) {
1259                tracing::debug!(
1260                    %topic,
1261                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1262                );
1263                continue;
1264            }
1265
1266            for id in ids.into_iter().filter(want_message) {
1267                // have not seen this message and are not currently requesting it
1268                if iwant_ids.insert(id) {
1269                    // Register the IWANT metric
1270                    if let Some(metrics) = self.metrics.as_mut() {
1271                        metrics.register_iwant(&topic);
1272                    }
1273                }
1274            }
1275        }
1276
1277        if !iwant_ids.is_empty() {
1278            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1279            let mut iask = iwant_ids.len();
1280            if *iasked + iask > self.config.max_ihave_length() {
1281                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1282            }
1283
1284            // Send the list of IWANT control messages
1285            tracing::debug!(
1286                peer=%peer_id,
1287                "IHAVE: Asking for {} out of {} messages from peer",
1288                iask,
1289                iwant_ids.len()
1290            );
1291
1292            // Ask in random order
1293            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1294            let mut rng = thread_rng();
1295            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1296
1297            iwant_ids_vec.truncate(iask);
1298            *iasked += iask;
1299
1300            self.gossip_promises.add_promise(
1301                *peer_id,
1302                &iwant_ids_vec,
1303                Instant::now() + self.config.iwant_followup_time(),
1304            );
1305            tracing::trace!(
1306                peer=%peer_id,
1307                "IHAVE: Asking for the following messages from peer: {:?}",
1308                iwant_ids_vec
1309            );
1310
1311            self.send_message(
1312                *peer_id,
1313                RpcOut::IWant(IWant {
1314                    message_ids: iwant_ids_vec,
1315                }),
1316            );
1317        }
1318        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1319    }
1320
1321    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1322    /// forwarded to the requesting peer.
1323    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1324        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1325        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1326            tracing::debug!(
1327                peer=%peer_id,
1328                "IWANT: ignoring peer with score below threshold [score = {}]",
1329                score
1330            );
1331            return;
1332        }
1333
1334        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1335
1336        for id in iwant_msgs {
1337            // If we have it and the IHAVE count is not above the threshold,
1338            // forward the message.
1339            if let Some((msg, count)) = self
1340                .mcache
1341                .get_with_iwant_counts(&id, peer_id)
1342                .map(|(msg, count)| (msg.clone(), count))
1343            {
1344                if count > self.config.gossip_retransimission() {
1345                    tracing::debug!(
1346                        peer=%peer_id,
1347                        message=%id,
1348                        "IWANT: Peer has asked for message too many times; ignoring request"
1349                    );
1350                } else {
1351                    if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1352                        if peer.dont_send.contains_key(&id) {
1353                            tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
1354                            continue;
1355                        }
1356                    }
1357
1358                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1359                    self.send_message(
1360                        *peer_id,
1361                        RpcOut::Forward {
1362                            message: msg,
1363                            timeout: Delay::new(self.config.forward_queue_duration()),
1364                        },
1365                    );
1366                }
1367            }
1368        }
1369        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1370    }
1371
1372    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1373    /// responds with PRUNE messages.
1374    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1375        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1376
1377        let mut to_prune_topics = HashSet::new();
1378
1379        let mut do_px = self.config.do_px();
1380
1381        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1382            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1383            return;
1384        };
1385
1386        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1387        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1388        for topic in &topics {
1389            if connected_peer.topics.insert(topic.clone()) {
1390                if let Some(m) = self.metrics.as_mut() {
1391                    m.inc_topic_peers(topic);
1392                }
1393            }
1394        }
1395
1396        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1397        if self.explicit_peers.contains(peer_id) {
1398            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1399            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1400            to_prune_topics = topics.into_iter().collect();
1401            // but don't PX
1402            do_px = false
1403        } else {
1404            let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1405            let now = Instant::now();
1406            for topic_hash in topics {
1407                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1408                    // if the peer is already in the mesh ignore the graft
1409                    if peers.contains(peer_id) {
1410                        tracing::debug!(
1411                            peer=%peer_id,
1412                            topic=%&topic_hash,
1413                            "GRAFT: Received graft for peer that is already in topic"
1414                        );
1415                        continue;
1416                    }
1417
1418                    // make sure we are not backing off that peer
1419                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1420                    {
1421                        if backoff_time > now {
1422                            tracing::warn!(
1423                                peer=%peer_id,
1424                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1425                            );
1426                            // add behavioural penalty
1427                            if let Some((peer_score, ..)) = &mut self.peer_score {
1428                                if let Some(metrics) = self.metrics.as_mut() {
1429                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1430                                }
1431                                peer_score.add_penalty(peer_id, 1);
1432
1433                                // check the flood cutoff
1434                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1435                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1436                                let flood_cutoff = (backoff_time
1437                                    + self.config.graft_flood_threshold())
1438                                    - self.config.prune_backoff();
1439                                if flood_cutoff > now {
1440                                    // extra penalty
1441                                    peer_score.add_penalty(peer_id, 1);
1442                                }
1443                            }
1444                            // no PX
1445                            do_px = false;
1446
1447                            to_prune_topics.insert(topic_hash.clone());
1448                            continue;
1449                        }
1450                    }
1451
1452                    // check the score
1453                    if below_zero {
1454                        // we don't GRAFT peers with negative score
1455                        tracing::debug!(
1456                            peer=%peer_id,
1457                            %score,
1458                            topic=%topic_hash,
1459                            "GRAFT: ignoring peer with negative score"
1460                        );
1461                        // we do send them PRUNE however, because it's a matter of protocol
1462                        // correctness
1463                        to_prune_topics.insert(topic_hash.clone());
1464                        // but we won't PX to them
1465                        do_px = false;
1466                        continue;
1467                    }
1468
1469                    // check mesh upper bound and only allow graft if the upper bound is not reached
1470                    // or if it is an outbound peer
1471                    if peers.len() >= self.config.mesh_n_high()
1472                        && !self.outbound_peers.contains(peer_id)
1473                    {
1474                        to_prune_topics.insert(topic_hash.clone());
1475                        continue;
1476                    }
1477
1478                    // add peer to the mesh
1479                    tracing::debug!(
1480                        peer=%peer_id,
1481                        topic=%topic_hash,
1482                        "GRAFT: Mesh link added for peer in topic"
1483                    );
1484
1485                    if peers.insert(*peer_id) {
1486                        if let Some(m) = self.metrics.as_mut() {
1487                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1488                        }
1489                    }
1490
1491                    // If the peer did not previously exist in any mesh, inform the handler
1492                    peer_added_to_mesh(
1493                        *peer_id,
1494                        vec![&topic_hash],
1495                        &self.mesh,
1496                        &mut self.events,
1497                        &self.connected_peers,
1498                    );
1499
1500                    if let Some((peer_score, ..)) = &mut self.peer_score {
1501                        peer_score.graft(peer_id, topic_hash);
1502                    }
1503                } else {
1504                    // don't do PX when there is an unknown topic to avoid leaking our peers
1505                    do_px = false;
1506                    tracing::debug!(
1507                        peer=%peer_id,
1508                        topic=%topic_hash,
1509                        "GRAFT: Received graft for unknown topic from peer"
1510                    );
1511                    // spam hardening: ignore GRAFTs for unknown topics
1512                    continue;
1513                }
1514            }
1515        }
1516
1517        if !to_prune_topics.is_empty() {
1518            // build the prune messages to send
1519            let on_unsubscribe = false;
1520
1521            for prune in to_prune_topics
1522                .iter()
1523                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1524                .collect::<Vec<_>>()
1525            {
1526                self.send_message(*peer_id, RpcOut::Prune(prune));
1527            }
1528            // Send the prune messages to the peer
1529            tracing::debug!(
1530                peer=%peer_id,
1531                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1532            );
1533        }
1534        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1535    }
1536
1537    fn remove_peer_from_mesh(
1538        &mut self,
1539        peer_id: &PeerId,
1540        topic_hash: &TopicHash,
1541        backoff: Option<u64>,
1542        always_update_backoff: bool,
1543        reason: Churn,
1544    ) {
1545        let mut update_backoff = always_update_backoff;
1546        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1547            // remove the peer if it exists in the mesh
1548            if peers.remove(peer_id) {
1549                tracing::debug!(
1550                    peer=%peer_id,
1551                    topic=%topic_hash,
1552                    "PRUNE: Removing peer from the mesh for topic"
1553                );
1554                if let Some(m) = self.metrics.as_mut() {
1555                    m.peers_removed(topic_hash, reason, 1)
1556                }
1557
1558                if let Some((peer_score, ..)) = &mut self.peer_score {
1559                    peer_score.prune(peer_id, topic_hash.clone());
1560                }
1561
1562                update_backoff = true;
1563
1564                // inform the handler
1565                peer_removed_from_mesh(
1566                    *peer_id,
1567                    topic_hash,
1568                    &self.mesh,
1569                    &mut self.events,
1570                    &self.connected_peers,
1571                );
1572            }
1573        }
1574        if update_backoff {
1575            let time = if let Some(backoff) = backoff {
1576                Duration::from_secs(backoff)
1577            } else {
1578                self.config.prune_backoff()
1579            };
1580            // is there a backoff specified by the peer? if so obey it.
1581            self.backoffs.update_backoff(topic_hash, peer_id, time);
1582        }
1583    }
1584
1585    /// Handles PRUNE control messages. Removes peer from the mesh.
1586    fn handle_prune(
1587        &mut self,
1588        peer_id: &PeerId,
1589        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1590    ) {
1591        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1592        let (below_threshold, score) =
1593            self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1594        for (topic_hash, px, backoff) in prune_data {
1595            self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1596
1597            if self.mesh.contains_key(&topic_hash) {
1598                // connect to px peers
1599                if !px.is_empty() {
1600                    // we ignore PX from peers with insufficient score
1601                    if below_threshold {
1602                        tracing::debug!(
1603                            peer=%peer_id,
1604                            %score,
1605                            topic=%topic_hash,
1606                            "PRUNE: ignoring PX from peer with insufficient score"
1607                        );
1608                        continue;
1609                    }
1610
1611                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1612                    // know their multiaddr. Until SignedRecords are spec'd this
1613                    // remains a stub. By default `config.prune_peers()` is set to zero and
1614                    // this is skipped. If the user modifies this, this will only be able to
1615                    // dial already known peers (from an external discovery mechanism for
1616                    // example).
1617                    if self.config.prune_peers() > 0 {
1618                        self.px_connect(px);
1619                    }
1620                }
1621            }
1622        }
1623        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1624    }
1625
1626    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1627        let n = self.config.prune_peers();
1628        // Ignore peerInfo with no ID
1629        //
1630        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1631        // signed peer record?
1632        px.retain(|p| p.peer_id.is_some());
1633        if px.len() > n {
1634            // only use at most prune_peers many random peers
1635            let mut rng = thread_rng();
1636            px.partial_shuffle(&mut rng, n);
1637            px = px.into_iter().take(n).collect();
1638        }
1639
1640        for p in px {
1641            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1642            // it, see https://github.com/libp2p/specs/pull/217
1643            if let Some(peer_id) = p.peer_id {
1644                // mark as px peer
1645                self.px_peers.insert(peer_id);
1646
1647                // dial peer
1648                self.events.push_back(ToSwarm::Dial {
1649                    opts: DialOpts::peer_id(peer_id).build(),
1650                });
1651            }
1652        }
1653    }
1654
1655    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1656    /// checks.
1657    fn message_is_valid(
1658        &mut self,
1659        msg_id: &MessageId,
1660        raw_message: &mut RawMessage,
1661        propagation_source: &PeerId,
1662    ) -> bool {
1663        tracing::debug!(
1664            peer=%propagation_source,
1665            message=%msg_id,
1666            "Handling message from peer"
1667        );
1668
1669        // Reject any message from a blacklisted peer
1670        if self.blacklisted_peers.contains(propagation_source) {
1671            tracing::debug!(
1672                peer=%propagation_source,
1673                "Rejecting message from blacklisted peer"
1674            );
1675            self.gossip_promises
1676                .reject_message(msg_id, &RejectReason::BlackListedPeer);
1677            if let Some((peer_score, ..)) = &mut self.peer_score {
1678                peer_score.reject_message(
1679                    propagation_source,
1680                    msg_id,
1681                    &raw_message.topic,
1682                    RejectReason::BlackListedPeer,
1683                );
1684            }
1685            return false;
1686        }
1687
1688        // Also reject any message that originated from a blacklisted peer
1689        if let Some(source) = raw_message.source.as_ref() {
1690            if self.blacklisted_peers.contains(source) {
1691                tracing::debug!(
1692                    peer=%propagation_source,
1693                    %source,
1694                    "Rejecting message from peer because of blacklisted source"
1695                );
1696                self.handle_invalid_message(
1697                    propagation_source,
1698                    &raw_message.topic,
1699                    Some(msg_id),
1700                    RejectReason::BlackListedSource,
1701                );
1702                return false;
1703            }
1704        }
1705
1706        // If we are not validating messages, assume this message is validated
1707        // This will allow the message to be gossiped without explicitly calling
1708        // `validate_message`.
1709        if !self.config.validate_messages() {
1710            raw_message.validated = true;
1711        }
1712
1713        // reject messages claiming to be from ourselves but not locally published
1714        let self_published = !self.config.allow_self_origin()
1715            && if let Some(own_id) = self.publish_config.get_own_id() {
1716                own_id != propagation_source
1717                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1718            } else {
1719                self.published_message_ids.contains(msg_id)
1720            };
1721
1722        if self_published {
1723            tracing::debug!(
1724                message=%msg_id,
1725                source=%propagation_source,
1726                "Dropping message claiming to be from self but forwarded from source"
1727            );
1728            self.handle_invalid_message(
1729                propagation_source,
1730                &raw_message.topic,
1731                Some(msg_id),
1732                RejectReason::SelfOrigin,
1733            );
1734            return false;
1735        }
1736
1737        true
1738    }
1739
1740    /// Handles a newly received [`RawMessage`].
1741    ///
1742    /// Forwards the message to all peers in the mesh.
1743    fn handle_received_message(
1744        &mut self,
1745        mut raw_message: RawMessage,
1746        propagation_source: &PeerId,
1747    ) {
1748        // Record the received metric
1749        if let Some(metrics) = self.metrics.as_mut() {
1750            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1751        }
1752
1753        // Try and perform the data transform to the message. If it fails, consider it invalid.
1754        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1755            Ok(message) => message,
1756            Err(e) => {
1757                tracing::debug!("Invalid message. Transform error: {:?}", e);
1758                // Reject the message and return
1759                self.handle_invalid_message(
1760                    propagation_source,
1761                    &raw_message.topic,
1762                    None,
1763                    RejectReason::ValidationError(ValidationError::TransformFailed),
1764                );
1765                return;
1766            }
1767        };
1768
1769        // Calculate the message id on the transformed data.
1770        let msg_id = self.config.message_id(&message);
1771
1772        // Broadcast IDONTWANT messages
1773        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1774            self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
1775        }
1776
1777        // Check the validity of the message
1778        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1779        // and instead continually penalize peers that repeatedly send this message.
1780        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1781            return;
1782        }
1783
1784        if !self.duplicate_cache.insert(msg_id.clone()) {
1785            tracing::debug!(message=%msg_id, "Message already received, ignoring");
1786            if let Some((peer_score, ..)) = &mut self.peer_score {
1787                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1788            }
1789            self.mcache.observe_duplicate(&msg_id, propagation_source);
1790            return;
1791        }
1792
1793        tracing::debug!(
1794            message=%msg_id,
1795            "Put message in duplicate_cache and resolve promises"
1796        );
1797
1798        // Record the received message with the metrics
1799        if let Some(metrics) = self.metrics.as_mut() {
1800            metrics.msg_recvd(&message.topic);
1801        }
1802
1803        // Tells score that message arrived (but is maybe not fully validated yet).
1804        // Consider the message as delivered for gossip promises.
1805        self.gossip_promises.message_delivered(&msg_id);
1806
1807        // Tells score that message arrived (but is maybe not fully validated yet).
1808        if let Some((peer_score, ..)) = &mut self.peer_score {
1809            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1810        }
1811
1812        // Add the message to our memcache
1813        self.mcache.put(&msg_id, raw_message.clone());
1814
1815        // Dispatch the message to the user if we are subscribed to any of the topics
1816        if self.mesh.contains_key(&message.topic) {
1817            tracing::debug!("Sending received message to user");
1818            self.events
1819                .push_back(ToSwarm::GenerateEvent(Event::Message {
1820                    propagation_source: *propagation_source,
1821                    message_id: msg_id.clone(),
1822                    message,
1823                }));
1824        } else {
1825            tracing::debug!(
1826                topic=%message.topic,
1827                "Received message on a topic we are not subscribed to"
1828            );
1829            return;
1830        }
1831
1832        // forward the message to mesh peers, if no validation is required
1833        if !self.config.validate_messages() {
1834            self.forward_msg(
1835                &msg_id,
1836                raw_message,
1837                Some(propagation_source),
1838                HashSet::new(),
1839            );
1840            tracing::debug!(message=%msg_id, "Completed message handling for message");
1841        }
1842    }
1843
1844    // Handles invalid messages received.
1845    fn handle_invalid_message(
1846        &mut self,
1847        propagation_source: &PeerId,
1848        topic_hash: &TopicHash,
1849        message_id: Option<&MessageId>,
1850        reject_reason: RejectReason,
1851    ) {
1852        if let Some(metrics) = self.metrics.as_mut() {
1853            metrics.register_invalid_message(topic_hash);
1854        }
1855        if let Some(msg_id) = message_id {
1856            // Valid transformation without peer scoring
1857            self.gossip_promises.reject_message(msg_id, &reject_reason);
1858        }
1859        if let Some((peer_score, ..)) = &mut self.peer_score {
1860            // The compiler will optimize this pattern-matching
1861            if let Some(msg_id) = message_id {
1862                // The message itself is valid, but is from a banned peer or
1863                // claiming to be self-origin but is actually forwarded from other peers.
1864                peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1865            } else {
1866                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1867                // advertising this message via an IHAVE and it's invalid it will be double
1868                // penalized, one for sending us an invalid and again for breaking a promise.
1869                peer_score.reject_invalid_message(propagation_source, topic_hash);
1870            }
1871        }
1872    }
1873
1874    /// Handles received subscriptions.
1875    fn handle_received_subscriptions(
1876        &mut self,
1877        subscriptions: &[Subscription],
1878        propagation_source: &PeerId,
1879    ) {
1880        tracing::debug!(
1881            source=%propagation_source,
1882            "Handling subscriptions: {:?}",
1883            subscriptions,
1884        );
1885
1886        let mut unsubscribed_peers = Vec::new();
1887
1888        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1889            tracing::error!(
1890                peer=%propagation_source,
1891                "Subscription by unknown peer"
1892            );
1893            return;
1894        };
1895
1896        // Collect potential graft topics for the peer.
1897        let mut topics_to_graft = Vec::new();
1898
1899        // Notify the application about the subscription, after the grafts are sent.
1900        let mut application_event = Vec::new();
1901
1902        let filtered_topics = match self
1903            .subscription_filter
1904            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1905        {
1906            Ok(topics) => topics,
1907            Err(s) => {
1908                tracing::error!(
1909                    peer=%propagation_source,
1910                    "Subscription filter error: {}; ignoring RPC from peer",
1911                    s
1912                );
1913                return;
1914            }
1915        };
1916
1917        for subscription in filtered_topics {
1918            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1919            let topic_hash = &subscription.topic_hash;
1920
1921            match subscription.action {
1922                SubscriptionAction::Subscribe => {
1923                    if peer.topics.insert(topic_hash.clone()) {
1924                        tracing::debug!(
1925                            peer=%propagation_source,
1926                            topic=%topic_hash,
1927                            "SUBSCRIPTION: Adding gossip peer to topic"
1928                        );
1929
1930                        if let Some(m) = self.metrics.as_mut() {
1931                            m.inc_topic_peers(topic_hash);
1932                        }
1933                    }
1934
1935                    // if the mesh needs peers add the peer to the mesh
1936                    if !self.explicit_peers.contains(propagation_source)
1937                        && peer.kind.is_gossipsub()
1938                        && !Self::score_below_threshold_from_scores(
1939                            &self.peer_score,
1940                            propagation_source,
1941                            |_| 0.0,
1942                        )
1943                        .0
1944                        && !self
1945                            .backoffs
1946                            .is_backoff_with_slack(topic_hash, propagation_source)
1947                    {
1948                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1949                            if peers.len() < self.config.mesh_n_low()
1950                                && peers.insert(*propagation_source)
1951                            {
1952                                tracing::debug!(
1953                                    peer=%propagation_source,
1954                                    topic=%topic_hash,
1955                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1956                                );
1957                                if let Some(m) = self.metrics.as_mut() {
1958                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1959                                }
1960                                // send graft to the peer
1961                                tracing::debug!(
1962                                    peer=%propagation_source,
1963                                    topic=%topic_hash,
1964                                    "Sending GRAFT to peer for topic"
1965                                );
1966                                if let Some((peer_score, ..)) = &mut self.peer_score {
1967                                    peer_score.graft(propagation_source, topic_hash.clone());
1968                                }
1969                                topics_to_graft.push(topic_hash.clone());
1970                            }
1971                        }
1972                    }
1973                    // generates a subscription event to be polled
1974                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1975                        peer_id: *propagation_source,
1976                        topic: topic_hash.clone(),
1977                    }));
1978                }
1979                SubscriptionAction::Unsubscribe => {
1980                    if peer.topics.remove(topic_hash) {
1981                        tracing::debug!(
1982                            peer=%propagation_source,
1983                            topic=%topic_hash,
1984                            "SUBSCRIPTION: Removing gossip peer from topic"
1985                        );
1986
1987                        if let Some(m) = self.metrics.as_mut() {
1988                            m.dec_topic_peers(topic_hash);
1989                        }
1990                    }
1991
1992                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1993                    // generate an unsubscribe event to be polled
1994                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1995                        peer_id: *propagation_source,
1996                        topic: topic_hash.clone(),
1997                    }));
1998                }
1999            }
2000        }
2001
2002        // remove unsubscribed peers from the mesh and fanout if they exist there.
2003        for (peer_id, topic_hash) in unsubscribed_peers {
2004            self.fanout
2005                .get_mut(&topic_hash)
2006                .map(|peers| peers.remove(&peer_id));
2007            self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
2008        }
2009
2010        // Potentially inform the handler if we have added this peer to a mesh for the first time.
2011        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2012        if !topics_joined.is_empty() {
2013            peer_added_to_mesh(
2014                *propagation_source,
2015                topics_joined,
2016                &self.mesh,
2017                &mut self.events,
2018                &self.connected_peers,
2019            );
2020        }
2021
2022        // If we need to send grafts to peer, do so immediately, rather than waiting for the
2023        // heartbeat.
2024        for topic_hash in topics_to_graft.into_iter() {
2025            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2026        }
2027
2028        // Notify the application of the subscriptions
2029        for event in application_event {
2030            self.events.push_back(event);
2031        }
2032
2033        tracing::trace!(
2034            source=%propagation_source,
2035            "Completed handling subscriptions from source"
2036        );
2037    }
2038
2039    /// Applies penalties to peers that did not respond to our IWANT requests.
2040    fn apply_iwant_penalties(&mut self) {
2041        if let Some((peer_score, ..)) = &mut self.peer_score {
2042            for (peer, count) in self.gossip_promises.get_broken_promises() {
2043                peer_score.add_penalty(&peer, count);
2044                if let Some(metrics) = self.metrics.as_mut() {
2045                    metrics.register_score_penalty(Penalty::BrokenPromise);
2046                }
2047            }
2048        }
2049    }
2050
2051    /// Heartbeat function which shifts the memcache and updates the mesh.
2052    fn heartbeat(&mut self) {
2053        tracing::debug!("Starting heartbeat");
2054        let start = Instant::now();
2055
2056        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2057        // before we add all the gossip from this heartbeat in order to gain a true measure of
2058        // steady-state size of the queues.
2059        if let Some(m) = &mut self.metrics {
2060            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2061                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2062                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2063            }
2064        }
2065
2066        self.heartbeat_ticks += 1;
2067
2068        let mut to_graft = HashMap::new();
2069        let mut to_prune = HashMap::new();
2070        let mut no_px = HashSet::new();
2071
2072        // clean up expired backoffs
2073        self.backoffs.heartbeat();
2074
2075        // clean up ihave counters
2076        self.count_sent_iwant.clear();
2077        self.count_received_ihave.clear();
2078
2079        // apply iwant penalties
2080        self.apply_iwant_penalties();
2081
2082        // check connections to explicit peers
2083        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2084            for p in self.explicit_peers.clone() {
2085                self.check_explicit_peer_connection(&p);
2086            }
2087        }
2088
2089        // Cache the scores of all connected peers, and record metrics for current penalties.
2090        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2091        if let Some((peer_score, ..)) = &self.peer_score {
2092            for peer_id in self.connected_peers.keys() {
2093                scores
2094                    .entry(peer_id)
2095                    .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2096            }
2097        }
2098
2099        // maintain the mesh for each topic
2100        for (topic_hash, peers) in self.mesh.iter_mut() {
2101            let explicit_peers = &self.explicit_peers;
2102            let backoffs = &self.backoffs;
2103            let outbound_peers = &self.outbound_peers;
2104
2105            // drop all peers with negative score, without PX
2106            // if there is at some point a stable retain method for BTreeSet the following can be
2107            // written more efficiently with retain.
2108            let mut to_remove_peers = Vec::new();
2109            for peer_id in peers.iter() {
2110                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2111
2112                // Record the score per mesh
2113                if let Some(metrics) = self.metrics.as_mut() {
2114                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2115                }
2116
2117                if peer_score < 0.0 {
2118                    tracing::debug!(
2119                        peer=%peer_id,
2120                        score=%peer_score,
2121                        topic=%topic_hash,
2122                        "HEARTBEAT: Prune peer with negative score"
2123                    );
2124
2125                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2126                    current_topic.push(topic_hash.clone());
2127                    no_px.insert(*peer_id);
2128                    to_remove_peers.push(*peer_id);
2129                }
2130            }
2131
2132            if let Some(m) = self.metrics.as_mut() {
2133                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2134            }
2135
2136            for peer_id in to_remove_peers {
2137                peers.remove(&peer_id);
2138            }
2139
2140            // too little peers - add some
2141            if peers.len() < self.config.mesh_n_low() {
2142                tracing::debug!(
2143                    topic=%topic_hash,
2144                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2145                    peers.len(),
2146                    self.config.mesh_n_low()
2147                );
2148                // not enough peers - get mesh_n - current_length more
2149                let desired_peers = self.config.mesh_n() - peers.len();
2150                let peer_list =
2151                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2152                        !peers.contains(peer)
2153                            && !explicit_peers.contains(peer)
2154                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2155                            && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2156                    });
2157                for peer in &peer_list {
2158                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2159                    current_topic.push(topic_hash.clone());
2160                }
2161                // update the mesh
2162                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2163                if let Some(m) = self.metrics.as_mut() {
2164                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2165                }
2166                peers.extend(peer_list);
2167            }
2168
2169            // too many peers - remove some
2170            if peers.len() > self.config.mesh_n_high() {
2171                tracing::debug!(
2172                    topic=%topic_hash,
2173                    "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2174                    peers.len(),
2175                    self.config.mesh_n_high()
2176                );
2177                let excess_peer_no = peers.len() - self.config.mesh_n();
2178
2179                // shuffle the peers and then sort by score ascending beginning with the worst
2180                let mut rng = thread_rng();
2181                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2182                shuffled.shuffle(&mut rng);
2183                shuffled.sort_by(|p1, p2| {
2184                    let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2185                    let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2186
2187                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2188                });
2189                // shuffle everything except the last retain_scores many peers (the best ones)
2190                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2191
2192                // count total number of outbound peers
2193                let mut outbound = {
2194                    let outbound_peers = &self.outbound_peers;
2195                    shuffled
2196                        .iter()
2197                        .filter(|p| outbound_peers.contains(*p))
2198                        .count()
2199                };
2200
2201                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2202                // them to to_prune
2203                let mut removed = 0;
2204                for peer in shuffled {
2205                    if removed == excess_peer_no {
2206                        break;
2207                    }
2208                    if self.outbound_peers.contains(&peer) {
2209                        if outbound <= self.config.mesh_outbound_min() {
2210                            // do not remove anymore outbound peers
2211                            continue;
2212                        }
2213                        // an outbound peer gets removed
2214                        outbound -= 1;
2215                    }
2216
2217                    // remove the peer
2218                    peers.remove(&peer);
2219                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2220                    current_topic.push(topic_hash.clone());
2221                    removed += 1;
2222                }
2223
2224                if let Some(m) = self.metrics.as_mut() {
2225                    m.peers_removed(topic_hash, Churn::Excess, removed)
2226                }
2227            }
2228
2229            // do we have enough outbound peers?
2230            if peers.len() >= self.config.mesh_n_low() {
2231                // count number of outbound peers we have
2232                let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2233
2234                // if we have not enough outbound peers, graft to some new outbound peers
2235                if outbound < self.config.mesh_outbound_min() {
2236                    let needed = self.config.mesh_outbound_min() - outbound;
2237                    let peer_list =
2238                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2239                            !peers.contains(peer)
2240                                && !explicit_peers.contains(peer)
2241                                && !backoffs.is_backoff_with_slack(topic_hash, peer)
2242                                && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2243                                && outbound_peers.contains(peer)
2244                        });
2245                    for peer in &peer_list {
2246                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2247                        current_topic.push(topic_hash.clone());
2248                    }
2249                    // update the mesh
2250                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2251                    if let Some(m) = self.metrics.as_mut() {
2252                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2253                    }
2254                    peers.extend(peer_list);
2255                }
2256            }
2257
2258            // should we try to improve the mesh with opportunistic grafting?
2259            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2260                && peers.len() > 1
2261                && self.peer_score.is_some()
2262            {
2263                if let Some((_, thresholds, _)) = &self.peer_score {
2264                    // Opportunistic grafting works as follows: we check the median score of peers
2265                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2266                    // select a few peers at random with score over the median.
2267                    // The intention is to (slowly) improve an underperforming mesh by introducing
2268                    // good scoring peers that may have been gossiping at us. This allows us to
2269                    // get out of sticky situations where we are stuck with poor peers and also
2270                    // recover from churn of good peers.
2271
2272                    // now compute the median peer score in the mesh
2273                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2274                    peers_by_score.sort_by(|p1, p2| {
2275                        let p1_score = *scores.get(p1).unwrap_or(&0.0);
2276                        let p2_score = *scores.get(p2).unwrap_or(&0.0);
2277                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2278                    });
2279
2280                    let middle = peers_by_score.len() / 2;
2281                    let median = if peers_by_score.len() % 2 == 0 {
2282                        let sub_middle_peer = *peers_by_score
2283                            .get(middle - 1)
2284                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2285                        let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2286                        let middle_peer =
2287                            *peers_by_score.get(middle).expect("middle < vector length");
2288                        let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2289
2290                        (sub_middle_score + middle_score) * 0.5
2291                    } else {
2292                        *scores
2293                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2294                            .unwrap_or(&0.0)
2295                    };
2296
2297                    // if the median score is below the threshold, select a better peer (if any) and
2298                    // GRAFT
2299                    if median < thresholds.opportunistic_graft_threshold {
2300                        let peer_list = get_random_peers(
2301                            &self.connected_peers,
2302                            topic_hash,
2303                            self.config.opportunistic_graft_peers(),
2304                            |peer_id| {
2305                                !peers.contains(peer_id)
2306                                    && !explicit_peers.contains(peer_id)
2307                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2308                                    && *scores.get(peer_id).unwrap_or(&0.0) > median
2309                            },
2310                        );
2311                        for peer in &peer_list {
2312                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2313                            current_topic.push(topic_hash.clone());
2314                        }
2315                        // update the mesh
2316                        tracing::debug!(
2317                            topic=%topic_hash,
2318                            "Opportunistically graft in topic with peers {:?}",
2319                            peer_list
2320                        );
2321                        if let Some(m) = self.metrics.as_mut() {
2322                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2323                        }
2324                        peers.extend(peer_list);
2325                    }
2326                }
2327            }
2328            // Register the final count of peers in the mesh
2329            if let Some(m) = self.metrics.as_mut() {
2330                m.set_mesh_peers(topic_hash, peers.len())
2331            }
2332        }
2333
2334        // remove expired fanout topics
2335        {
2336            let fanout = &mut self.fanout; // help the borrow checker
2337            let fanout_ttl = self.config.fanout_ttl();
2338            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2339                if *last_pub_time + fanout_ttl < Instant::now() {
2340                    tracing::debug!(
2341                        topic=%topic_hash,
2342                        "HEARTBEAT: Fanout topic removed due to timeout"
2343                    );
2344                    fanout.remove(topic_hash);
2345                    return false;
2346                }
2347                true
2348            });
2349        }
2350
2351        // maintain fanout
2352        // check if our peers are still a part of the topic
2353        for (topic_hash, peers) in self.fanout.iter_mut() {
2354            let mut to_remove_peers = Vec::new();
2355            let publish_threshold = match &self.peer_score {
2356                Some((_, thresholds, _)) => thresholds.publish_threshold,
2357                _ => 0.0,
2358            };
2359            for peer_id in peers.iter() {
2360                // is the peer still subscribed to the topic?
2361                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2362                match self.connected_peers.get(peer_id) {
2363                    Some(peer) => {
2364                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2365                            tracing::debug!(
2366                                topic=%topic_hash,
2367                                "HEARTBEAT: Peer removed from fanout for topic"
2368                            );
2369                            to_remove_peers.push(*peer_id);
2370                        }
2371                    }
2372                    None => {
2373                        // remove if the peer has disconnected
2374                        to_remove_peers.push(*peer_id);
2375                    }
2376                }
2377            }
2378            for to_remove in to_remove_peers {
2379                peers.remove(&to_remove);
2380            }
2381
2382            // not enough peers
2383            if peers.len() < self.config.mesh_n() {
2384                tracing::debug!(
2385                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2386                    peers.len(),
2387                    self.config.mesh_n()
2388                );
2389                let needed_peers = self.config.mesh_n() - peers.len();
2390                let explicit_peers = &self.explicit_peers;
2391                let new_peers =
2392                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2393                        !peers.contains(peer_id)
2394                            && !explicit_peers.contains(peer_id)
2395                            && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2396                    });
2397                peers.extend(new_peers);
2398            }
2399        }
2400
2401        if self.peer_score.is_some() {
2402            tracing::trace!("Mesh message deliveries: {:?}", {
2403                self.mesh
2404                    .iter()
2405                    .map(|(t, peers)| {
2406                        (
2407                            t.clone(),
2408                            peers
2409                                .iter()
2410                                .map(|p| {
2411                                    (
2412                                        *p,
2413                                        self.peer_score
2414                                            .as_ref()
2415                                            .expect("peer_score.is_some()")
2416                                            .0
2417                                            .mesh_message_deliveries(p, t)
2418                                            .unwrap_or(0.0),
2419                                    )
2420                                })
2421                                .collect::<HashMap<PeerId, f64>>(),
2422                        )
2423                    })
2424                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2425            })
2426        }
2427
2428        self.emit_gossip();
2429
2430        // send graft/prunes
2431        if !to_graft.is_empty() | !to_prune.is_empty() {
2432            self.send_graft_prune(to_graft, to_prune, no_px);
2433        }
2434
2435        // shift the memcache
2436        self.mcache.shift();
2437
2438        // Report expired messages
2439        for (peer_id, failed_messages) in self.failed_messages.drain() {
2440            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2441            self.events
2442                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2443                    peer_id,
2444                    failed_messages,
2445                }));
2446        }
2447        self.failed_messages.shrink_to_fit();
2448
2449        // Flush stale IDONTWANTs.
2450        for peer in self.connected_peers.values_mut() {
2451            while let Some((_front, instant)) = peer.dont_send.front() {
2452                if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2453                    break;
2454                } else {
2455                    peer.dont_send.pop_front();
2456                }
2457            }
2458        }
2459
2460        tracing::debug!("Completed Heartbeat");
2461        if let Some(metrics) = self.metrics.as_mut() {
2462            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2463            metrics.observe_heartbeat_duration(duration);
2464        }
2465    }
2466
2467    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2468    /// and fanout peers
2469    fn emit_gossip(&mut self) {
2470        let mut rng = thread_rng();
2471        let mut messages = Vec::new();
2472        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2473            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2474            if message_ids.is_empty() {
2475                continue;
2476            }
2477
2478            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2479            if message_ids.len() > self.config.max_ihave_length() {
2480                // we do the truncation (with shuffling) per peer below
2481                tracing::debug!(
2482                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2483                    message_ids.len()
2484                );
2485            } else {
2486                // shuffle to emit in random order
2487                message_ids.shuffle(&mut rng);
2488            }
2489
2490            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2491            let n_map = |m| {
2492                max(
2493                    self.config.gossip_lazy(),
2494                    (self.config.gossip_factor() * m as f64) as usize,
2495                )
2496            };
2497            // get gossip_lazy random peers
2498            let to_msg_peers =
2499                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2500                    !peers.contains(peer)
2501                        && !self.explicit_peers.contains(peer)
2502                        && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2503                });
2504
2505            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2506
2507            for peer_id in to_msg_peers {
2508                let mut peer_message_ids = message_ids.clone();
2509
2510                if peer_message_ids.len() > self.config.max_ihave_length() {
2511                    // We do this per peer so that we emit a different set for each peer.
2512                    // we have enough redundancy in the system that this will significantly increase
2513                    // the message coverage when we do truncate.
2514                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2515                    peer_message_ids.truncate(self.config.max_ihave_length());
2516                }
2517
2518                // send an IHAVE message
2519                messages.push((
2520                    peer_id,
2521                    RpcOut::IHave(IHave {
2522                        topic_hash: topic_hash.clone(),
2523                        message_ids: peer_message_ids,
2524                    }),
2525                ));
2526            }
2527        }
2528        for (peer_id, message) in messages {
2529            self.send_message(peer_id, message);
2530        }
2531    }
2532
2533    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2534    /// messages.
2535    fn send_graft_prune(
2536        &mut self,
2537        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2538        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2539        no_px: HashSet<PeerId>,
2540    ) {
2541        // handle the grafts and overlapping prunes per peer
2542        for (peer_id, topics) in to_graft.into_iter() {
2543            for topic in &topics {
2544                // inform scoring of graft
2545                if let Some((peer_score, ..)) = &mut self.peer_score {
2546                    peer_score.graft(&peer_id, topic.clone());
2547                }
2548
2549                // inform the handler of the peer being added to the mesh
2550                // If the peer did not previously exist in any mesh, inform the handler
2551                peer_added_to_mesh(
2552                    peer_id,
2553                    vec![topic],
2554                    &self.mesh,
2555                    &mut self.events,
2556                    &self.connected_peers,
2557                );
2558            }
2559            let rpc_msgs = topics.iter().map(|topic_hash| {
2560                RpcOut::Graft(Graft {
2561                    topic_hash: topic_hash.clone(),
2562                })
2563            });
2564
2565            // If there are prunes associated with the same peer add them.
2566            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2567            // It therefore must be in at least one mesh and we do not need to inform the handler
2568            // of its removal from another.
2569
2570            // The following prunes are not due to unsubscribing.
2571            let prune_msgs = to_prune
2572                .remove(&peer_id)
2573                .into_iter()
2574                .flatten()
2575                .map(|topic_hash| {
2576                    let prune = self.make_prune(
2577                        &topic_hash,
2578                        &peer_id,
2579                        self.config.do_px() && !no_px.contains(&peer_id),
2580                        false,
2581                    );
2582                    RpcOut::Prune(prune)
2583                });
2584
2585            // send the rpc messages
2586            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2587                self.send_message(peer_id, msg);
2588            }
2589        }
2590
2591        // handle the remaining prunes
2592        // The following prunes are not due to unsubscribing.
2593        for (peer_id, topics) in to_prune.iter() {
2594            for topic_hash in topics {
2595                let prune = self.make_prune(
2596                    topic_hash,
2597                    peer_id,
2598                    self.config.do_px() && !no_px.contains(peer_id),
2599                    false,
2600                );
2601                self.send_message(*peer_id, RpcOut::Prune(prune));
2602
2603                // inform the handler
2604                peer_removed_from_mesh(
2605                    *peer_id,
2606                    topic_hash,
2607                    &self.mesh,
2608                    &mut self.events,
2609                    &self.connected_peers,
2610                );
2611            }
2612        }
2613    }
2614
2615    /// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
2616    fn send_idontwant(
2617        &mut self,
2618        message: &RawMessage,
2619        msg_id: &MessageId,
2620        propagation_source: Option<&PeerId>,
2621    ) {
2622        let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2623            return;
2624        };
2625
2626        let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
2627
2628        let recipient_peers: Vec<PeerId> = mesh_peers
2629            .iter()
2630            .chain(iwant_peers.iter())
2631            .filter(|&peer_id| {
2632                Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
2633            })
2634            .cloned()
2635            .collect();
2636
2637        for peer_id in recipient_peers {
2638            let Some(peer) = self.connected_peers.get_mut(&peer_id) else {
2639                tracing::error!(peer = %peer_id,
2640                    "Could not IDONTWANT, peer doesn't exist in connected peer list");
2641                continue;
2642            };
2643
2644            // Only gossipsub 1.2 peers support IDONTWANT.
2645            if peer.kind != PeerKind::Gossipsubv1_2 {
2646                continue;
2647            }
2648
2649            self.send_message(
2650                peer_id,
2651                RpcOut::IDontWant(IDontWant {
2652                    message_ids: vec![msg_id.clone()],
2653                }),
2654            );
2655        }
2656    }
2657
2658    /// Helper function which forwards a message to mesh\[topic\] peers.
2659    ///
2660    /// Returns true if at least one peer was messaged.
2661    fn forward_msg(
2662        &mut self,
2663        msg_id: &MessageId,
2664        message: RawMessage,
2665        propagation_source: Option<&PeerId>,
2666        originating_peers: HashSet<PeerId>,
2667    ) -> bool {
2668        // message is fully validated inform peer_score
2669        if let Some((peer_score, ..)) = &mut self.peer_score {
2670            if let Some(peer) = propagation_source {
2671                peer_score.deliver_message(peer, msg_id, &message.topic);
2672            }
2673        }
2674
2675        tracing::debug!(message=%msg_id, "Forwarding message");
2676        let mut recipient_peers = HashSet::new();
2677
2678        // Populate the recipient peers mapping
2679
2680        // Add explicit peers and floodsub peers
2681        for (peer_id, peer) in &self.connected_peers {
2682            if Some(peer_id) != propagation_source
2683                && !originating_peers.contains(peer_id)
2684                && Some(peer_id) != message.source.as_ref()
2685                && peer.topics.contains(&message.topic)
2686                && (self.explicit_peers.contains(peer_id)
2687                    || (peer.kind == PeerKind::Floodsub
2688                        && !self
2689                            .score_below_threshold(peer_id, |ts| ts.publish_threshold)
2690                            .0))
2691            {
2692                recipient_peers.insert(*peer_id);
2693            }
2694        }
2695
2696        // add mesh peers
2697        let topic = &message.topic;
2698        // mesh
2699        if let Some(mesh_peers) = self.mesh.get(topic) {
2700            for peer_id in mesh_peers {
2701                if Some(peer_id) != propagation_source
2702                    && !originating_peers.contains(peer_id)
2703                    && Some(peer_id) != message.source.as_ref()
2704                {
2705                    recipient_peers.insert(*peer_id);
2706                }
2707            }
2708        }
2709
2710        if recipient_peers.is_empty() {
2711            return false;
2712        }
2713
2714        // forward the message to peers
2715        for peer_id in recipient_peers.iter() {
2716            if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2717                if peer.dont_send.contains_key(msg_id) {
2718                    tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
2719                    continue;
2720                }
2721
2722                tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
2723
2724                self.send_message(
2725                    *peer_id,
2726                    RpcOut::Forward {
2727                        message: message.clone(),
2728                        timeout: Delay::new(self.config.forward_queue_duration()),
2729                    },
2730                );
2731            }
2732        }
2733        tracing::debug!("Completed forwarding message");
2734        true
2735    }
2736
2737    /// Constructs a [`RawMessage`] performing message signing if required.
2738    pub(crate) fn build_raw_message(
2739        &mut self,
2740        topic: TopicHash,
2741        data: Vec<u8>,
2742    ) -> Result<RawMessage, PublishError> {
2743        match &mut self.publish_config {
2744            PublishConfig::Signing {
2745                ref keypair,
2746                author,
2747                inline_key,
2748                last_seq_no,
2749            } => {
2750                let sequence_number = last_seq_no.next();
2751
2752                let signature = {
2753                    let message = proto::Message {
2754                        from: Some(author.to_bytes()),
2755                        data: Some(data.clone()),
2756                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2757                        topic: topic.clone().into_string(),
2758                        signature: None,
2759                        key: None,
2760                    };
2761
2762                    let mut buf = Vec::with_capacity(message.get_size());
2763                    let mut writer = Writer::new(&mut buf);
2764
2765                    message
2766                        .write_message(&mut writer)
2767                        .expect("Encoding to succeed");
2768
2769                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2770                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2771                    signature_bytes.extend_from_slice(&buf);
2772                    Some(keypair.sign(&signature_bytes)?)
2773                };
2774
2775                Ok(RawMessage {
2776                    source: Some(*author),
2777                    data,
2778                    // To be interoperable with the go-implementation this is treated as a 64-bit
2779                    // big-endian uint.
2780                    sequence_number: Some(sequence_number),
2781                    topic,
2782                    signature,
2783                    key: inline_key.clone(),
2784                    validated: true, // all published messages are valid
2785                })
2786            }
2787            PublishConfig::Author(peer_id) => {
2788                Ok(RawMessage {
2789                    source: Some(*peer_id),
2790                    data,
2791                    // To be interoperable with the go-implementation this is treated as a 64-bit
2792                    // big-endian uint.
2793                    sequence_number: Some(rand::random()),
2794                    topic,
2795                    signature: None,
2796                    key: None,
2797                    validated: true, // all published messages are valid
2798                })
2799            }
2800            PublishConfig::RandomAuthor => {
2801                Ok(RawMessage {
2802                    source: Some(PeerId::random()),
2803                    data,
2804                    // To be interoperable with the go-implementation this is treated as a 64-bit
2805                    // big-endian uint.
2806                    sequence_number: Some(rand::random()),
2807                    topic,
2808                    signature: None,
2809                    key: None,
2810                    validated: true, // all published messages are valid
2811                })
2812            }
2813            PublishConfig::Anonymous => {
2814                Ok(RawMessage {
2815                    source: None,
2816                    data,
2817                    // To be interoperable with the go-implementation this is treated as a 64-bit
2818                    // big-endian uint.
2819                    sequence_number: None,
2820                    topic,
2821                    signature: None,
2822                    key: None,
2823                    validated: true, // all published messages are valid
2824                })
2825            }
2826        }
2827    }
2828
2829    /// Send a [`RpcOut`] message to a peer.
2830    ///
2831    /// Returns `true` if sending was successful, `false` otherwise.
2832    /// The method will update the peer score and failed message counter if
2833    /// sending the message failed due to the channel to the connection handler being
2834    /// full (which indicates a slow peer).
2835    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2836        if let Some(m) = self.metrics.as_mut() {
2837            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2838                // register bytes sent on the internal metrics.
2839                m.msg_sent(&message.topic, message.raw_protobuf_len());
2840            }
2841        }
2842
2843        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2844            tracing::error!(peer = %peer_id,
2845                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2846            return false;
2847        };
2848
2849        // Try sending the message to the connection handler.
2850        match peer.sender.send_message(rpc) {
2851            Ok(()) => true,
2852            Err(rpc) => {
2853                // Sending failed because the channel is full.
2854                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2855
2856                // Update failed message counter.
2857                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2858                match rpc {
2859                    RpcOut::Publish { .. } => {
2860                        failed_messages.priority += 1;
2861                        failed_messages.publish += 1;
2862                    }
2863                    RpcOut::Forward { .. } => {
2864                        failed_messages.non_priority += 1;
2865                        failed_messages.forward += 1;
2866                    }
2867                    RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2868                        failed_messages.non_priority += 1;
2869                    }
2870                    RpcOut::Graft(_)
2871                    | RpcOut::Prune(_)
2872                    | RpcOut::Subscribe(_)
2873                    | RpcOut::Unsubscribe(_) => {
2874                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2875                    }
2876                }
2877
2878                // Update peer score.
2879                if let Some((peer_score, ..)) = &mut self.peer_score {
2880                    peer_score.failed_message_slow_peer(&peer_id);
2881                }
2882
2883                false
2884            }
2885        }
2886    }
2887
2888    fn on_connection_established(
2889        &mut self,
2890        ConnectionEstablished {
2891            peer_id,
2892            endpoint,
2893            other_established,
2894            ..
2895        }: ConnectionEstablished,
2896    ) {
2897        // Diverging from the go implementation we only want to consider a peer as outbound peer
2898        // if its first connection is outbound.
2899
2900        if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2901            // The first connection is outbound and it is not a peer from peer exchange => mark
2902            // it as outbound peer
2903            self.outbound_peers.insert(peer_id);
2904        }
2905
2906        // Add the IP to the peer scoring system
2907        if let Some((peer_score, ..)) = &mut self.peer_score {
2908            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2909                peer_score.add_ip(&peer_id, ip);
2910            } else {
2911                tracing::trace!(
2912                    peer=%peer_id,
2913                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2914                    endpoint
2915                )
2916            }
2917        }
2918
2919        if other_established > 0 {
2920            return; // Not our first connection to this peer, hence nothing to do.
2921        }
2922
2923        if let Some((peer_score, ..)) = &mut self.peer_score {
2924            peer_score.add_peer(peer_id);
2925        }
2926
2927        // Ignore connections from blacklisted peers.
2928        if self.blacklisted_peers.contains(&peer_id) {
2929            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2930            return;
2931        }
2932
2933        tracing::debug!(peer=%peer_id, "New peer connected");
2934        // We need to send our subscriptions to the newly-connected node.
2935        for topic_hash in self.mesh.clone().into_keys() {
2936            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2937        }
2938    }
2939
2940    fn on_connection_closed(
2941        &mut self,
2942        ConnectionClosed {
2943            peer_id,
2944            connection_id,
2945            endpoint,
2946            remaining_established,
2947            ..
2948        }: ConnectionClosed,
2949    ) {
2950        // Remove IP from peer scoring system
2951        if let Some((peer_score, ..)) = &mut self.peer_score {
2952            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2953                peer_score.remove_ip(&peer_id, &ip);
2954            } else {
2955                tracing::trace!(
2956                    peer=%peer_id,
2957                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2958                    endpoint
2959                )
2960            }
2961        }
2962
2963        if remaining_established != 0 {
2964            // Remove the connection from the list
2965            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2966                let index = peer
2967                    .connections
2968                    .iter()
2969                    .position(|v| v == &connection_id)
2970                    .expect("Previously established connection to peer must be present");
2971                peer.connections.remove(index);
2972
2973                // If there are more connections and this peer is in a mesh, inform the first
2974                // connection handler.
2975                if !peer.connections.is_empty() {
2976                    for topic in &peer.topics {
2977                        if let Some(mesh_peers) = self.mesh.get(topic) {
2978                            if mesh_peers.contains(&peer_id) {
2979                                self.events.push_back(ToSwarm::NotifyHandler {
2980                                    peer_id,
2981                                    event: HandlerIn::JoinedMesh,
2982                                    handler: NotifyHandler::One(peer.connections[0]),
2983                                });
2984                                break;
2985                            }
2986                        }
2987                    }
2988                }
2989            }
2990        } else {
2991            // remove from mesh, topic_peers, peer_topic and the fanout
2992            tracing::debug!(peer=%peer_id, "Peer disconnected");
2993            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2994                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2995                return;
2996            };
2997
2998            // remove peer from all mappings
2999            for topic in &connected_peer.topics {
3000                // check the mesh for the topic
3001                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3002                    // check if the peer is in the mesh and remove it
3003                    if mesh_peers.remove(&peer_id) {
3004                        if let Some(m) = self.metrics.as_mut() {
3005                            m.peers_removed(topic, Churn::Dc, 1);
3006                            m.set_mesh_peers(topic, mesh_peers.len());
3007                        }
3008                    };
3009                }
3010
3011                if let Some(m) = self.metrics.as_mut() {
3012                    m.dec_topic_peers(topic);
3013                }
3014
3015                // remove from fanout
3016                self.fanout
3017                    .get_mut(topic)
3018                    .map(|peers| peers.remove(&peer_id));
3019            }
3020
3021            // Forget px and outbound status for this peer
3022            self.px_peers.remove(&peer_id);
3023            self.outbound_peers.remove(&peer_id);
3024
3025            // If metrics are enabled, register the disconnection of a peer based on its protocol.
3026            if let Some(metrics) = self.metrics.as_mut() {
3027                metrics.peer_protocol_disconnected(connected_peer.kind);
3028            }
3029
3030            self.connected_peers.remove(&peer_id);
3031
3032            if let Some((peer_score, ..)) = &mut self.peer_score {
3033                peer_score.remove_peer(&peer_id);
3034            }
3035        }
3036    }
3037
3038    fn on_address_change(
3039        &mut self,
3040        AddressChange {
3041            peer_id,
3042            old: endpoint_old,
3043            new: endpoint_new,
3044            ..
3045        }: AddressChange,
3046    ) {
3047        // Exchange IP in peer scoring system
3048        if let Some((peer_score, ..)) = &mut self.peer_score {
3049            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3050                peer_score.remove_ip(&peer_id, &ip);
3051            } else {
3052                tracing::trace!(
3053                    peer=%&peer_id,
3054                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3055                    endpoint_old
3056                )
3057            }
3058            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3059                peer_score.add_ip(&peer_id, ip);
3060            } else {
3061                tracing::trace!(
3062                    peer=%peer_id,
3063                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3064                    endpoint_new
3065                )
3066            }
3067        }
3068    }
3069
3070    /// Register topics to ensure metrics are recorded correctly for these topics.
3071    pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3072        if let Some(metrics) = &mut self.metrics {
3073            metrics.register_allowed_topics(topics);
3074        }
3075    }
3076}
3077
3078fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3079    addr.iter().find_map(|p| match p {
3080        Ip4(addr) => Some(IpAddr::V4(addr)),
3081        Ip6(addr) => Some(IpAddr::V6(addr)),
3082        _ => None,
3083    })
3084}
3085
3086impl<C, F> NetworkBehaviour for Behaviour<C, F>
3087where
3088    C: Send + 'static + DataTransform,
3089    F: Send + 'static + TopicSubscriptionFilter,
3090{
3091    type ConnectionHandler = Handler;
3092    type ToSwarm = Event;
3093
3094    fn handle_established_inbound_connection(
3095        &mut self,
3096        connection_id: ConnectionId,
3097        peer_id: PeerId,
3098        _: &Multiaddr,
3099        _: &Multiaddr,
3100    ) -> Result<THandler<Self>, ConnectionDenied> {
3101        // By default we assume a peer is only a floodsub peer.
3102        //
3103        // The protocol negotiation occurs once a message is sent/received. Once this happens we
3104        // update the type of peer that this is in order to determine which kind of routing should
3105        // occur.
3106        let connected_peer = self
3107            .connected_peers
3108            .entry(peer_id)
3109            .or_insert(PeerConnections {
3110                kind: PeerKind::Floodsub,
3111                connections: vec![],
3112                sender: Sender::new(self.config.connection_handler_queue_len()),
3113                topics: Default::default(),
3114                dont_send: LinkedHashMap::new(),
3115            });
3116        // Add the new connection
3117        connected_peer.connections.push(connection_id);
3118
3119        Ok(Handler::new(
3120            self.config.protocol_config(),
3121            connected_peer.sender.new_receiver(),
3122        ))
3123    }
3124
3125    fn handle_established_outbound_connection(
3126        &mut self,
3127        connection_id: ConnectionId,
3128        peer_id: PeerId,
3129        _: &Multiaddr,
3130        _: Endpoint,
3131        _: PortUse,
3132    ) -> Result<THandler<Self>, ConnectionDenied> {
3133        let connected_peer = self
3134            .connected_peers
3135            .entry(peer_id)
3136            .or_insert(PeerConnections {
3137                kind: PeerKind::Floodsub,
3138                connections: vec![],
3139                sender: Sender::new(self.config.connection_handler_queue_len()),
3140                topics: Default::default(),
3141                dont_send: LinkedHashMap::new(),
3142            });
3143        // Add the new connection
3144        connected_peer.connections.push(connection_id);
3145
3146        Ok(Handler::new(
3147            self.config.protocol_config(),
3148            connected_peer.sender.new_receiver(),
3149        ))
3150    }
3151
3152    fn on_connection_handler_event(
3153        &mut self,
3154        propagation_source: PeerId,
3155        _connection_id: ConnectionId,
3156        handler_event: THandlerOutEvent<Self>,
3157    ) {
3158        match handler_event {
3159            HandlerEvent::PeerKind(kind) => {
3160                // We have identified the protocol this peer is using
3161
3162                if let Some(metrics) = self.metrics.as_mut() {
3163                    metrics.peer_protocol_connected(kind);
3164                }
3165
3166                if let PeerKind::NotSupported = kind {
3167                    tracing::debug!(
3168                        peer=%propagation_source,
3169                        "Peer does not support gossipsub protocols"
3170                    );
3171                    self.events
3172                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3173                            peer_id: propagation_source,
3174                        }));
3175                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3176                    // Only change the value if the old value is Floodsub (the default set in
3177                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3178                    // All other PeerKind changes are ignored.
3179                    tracing::debug!(
3180                        peer=%propagation_source,
3181                        peer_type=%kind,
3182                        "New peer type found for peer"
3183                    );
3184                    if let PeerKind::Floodsub = conn.kind {
3185                        conn.kind = kind;
3186                    }
3187                }
3188            }
3189            HandlerEvent::MessageDropped(rpc) => {
3190                // Account for this in the scoring logic
3191                if let Some((peer_score, _, _)) = &mut self.peer_score {
3192                    peer_score.failed_message_slow_peer(&propagation_source);
3193                }
3194
3195                // Keep track of expired messages for the application layer.
3196                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3197                failed_messages.timeout += 1;
3198                match rpc {
3199                    RpcOut::Publish { .. } => {
3200                        failed_messages.publish += 1;
3201                    }
3202                    RpcOut::Forward { .. } => {
3203                        failed_messages.forward += 1;
3204                    }
3205                    _ => {}
3206                }
3207
3208                // Record metrics on the failure.
3209                if let Some(metrics) = self.metrics.as_mut() {
3210                    match rpc {
3211                        RpcOut::Publish { message, .. } => {
3212                            metrics.publish_msg_dropped(&message.topic);
3213                            metrics.timeout_msg_dropped(&message.topic);
3214                        }
3215                        RpcOut::Forward { message, .. } => {
3216                            metrics.forward_msg_dropped(&message.topic);
3217                            metrics.timeout_msg_dropped(&message.topic);
3218                        }
3219                        _ => {}
3220                    }
3221                }
3222            }
3223            HandlerEvent::Message {
3224                rpc,
3225                invalid_messages,
3226            } => {
3227                // Handle the gossipsub RPC
3228
3229                // Handle subscriptions
3230                // Update connected peers topics
3231                if !rpc.subscriptions.is_empty() {
3232                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3233                }
3234
3235                // Check if peer is graylisted in which case we ignore the event
3236                if let (true, _) =
3237                    self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3238                {
3239                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3240                    return;
3241                }
3242
3243                // Handle any invalid messages from this peer
3244                if self.peer_score.is_some() {
3245                    for (raw_message, validation_error) in invalid_messages {
3246                        self.handle_invalid_message(
3247                            &propagation_source,
3248                            &raw_message.topic,
3249                            None,
3250                            RejectReason::ValidationError(validation_error),
3251                        )
3252                    }
3253                } else {
3254                    // log the invalid messages
3255                    for (message, validation_error) in invalid_messages {
3256                        tracing::warn!(
3257                            peer=%propagation_source,
3258                            source=?message.source,
3259                            "Invalid message from peer. Reason: {:?}",
3260                            validation_error,
3261                        );
3262                    }
3263                }
3264
3265                // Handle messages
3266                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3267                    // Only process the amount of messages the configuration allows.
3268                    if self
3269                        .config
3270                        .max_messages_per_rpc()
3271                        .is_some_and(|max_msg| count >= max_msg)
3272                    {
3273                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3274                        break;
3275                    }
3276                    self.handle_received_message(raw_message, &propagation_source);
3277                }
3278
3279                // Handle control messages
3280                // group some control messages, this minimises SendEvents (code is simplified to
3281                // handle each event at a time however)
3282                let mut ihave_msgs = vec![];
3283                let mut graft_msgs = vec![];
3284                let mut prune_msgs = vec![];
3285                for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3286                    // Only process the amount of messages the configuration allows.
3287                    if self
3288                        .config
3289                        .max_messages_per_rpc()
3290                        .is_some_and(|max_msg| count >= max_msg)
3291                    {
3292                        tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3293                        break;
3294                    }
3295
3296                    match control_msg {
3297                        ControlAction::IHave(IHave {
3298                            topic_hash,
3299                            message_ids,
3300                        }) => {
3301                            ihave_msgs.push((topic_hash, message_ids));
3302                        }
3303                        ControlAction::IWant(IWant { message_ids }) => {
3304                            self.handle_iwant(&propagation_source, message_ids)
3305                        }
3306                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3307                        ControlAction::Prune(Prune {
3308                            topic_hash,
3309                            peers,
3310                            backoff,
3311                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3312                        ControlAction::IDontWant(IDontWant { message_ids }) => {
3313                            let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3314                            else {
3315                                tracing::error!(peer = %propagation_source,
3316                                    "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3317                                continue;
3318                            };
3319                            if let Some(metrics) = self.metrics.as_mut() {
3320                                metrics.register_idontwant(message_ids.len());
3321                            }
3322                            for message_id in message_ids {
3323                                peer.dont_send.insert(message_id, Instant::now());
3324                                // Don't exceed capacity.
3325                                if peer.dont_send.len() > IDONTWANT_CAP {
3326                                    peer.dont_send.pop_front();
3327                                }
3328                            }
3329                        }
3330                    }
3331                }
3332                if !ihave_msgs.is_empty() {
3333                    self.handle_ihave(&propagation_source, ihave_msgs);
3334                }
3335                if !graft_msgs.is_empty() {
3336                    self.handle_graft(&propagation_source, graft_msgs);
3337                }
3338                if !prune_msgs.is_empty() {
3339                    self.handle_prune(&propagation_source, prune_msgs);
3340                }
3341            }
3342        }
3343    }
3344
3345    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3346    fn poll(
3347        &mut self,
3348        cx: &mut Context<'_>,
3349    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3350        if let Some(event) = self.events.pop_front() {
3351            return Poll::Ready(event);
3352        }
3353
3354        // update scores
3355        if let Some((peer_score, _, delay)) = &mut self.peer_score {
3356            if delay.poll_unpin(cx).is_ready() {
3357                peer_score.refresh_scores();
3358                delay.reset(peer_score.params.decay_interval);
3359            }
3360        }
3361
3362        if self.heartbeat.poll_unpin(cx).is_ready() {
3363            self.heartbeat();
3364            self.heartbeat.reset(self.config.heartbeat_interval());
3365        }
3366
3367        Poll::Pending
3368    }
3369
3370    fn on_swarm_event(&mut self, event: FromSwarm) {
3371        match event {
3372            FromSwarm::ConnectionEstablished(connection_established) => {
3373                self.on_connection_established(connection_established)
3374            }
3375            FromSwarm::ConnectionClosed(connection_closed) => {
3376                self.on_connection_closed(connection_closed)
3377            }
3378            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3379            _ => {}
3380        }
3381    }
3382}
3383
3384/// This is called when peers are added to any mesh. It checks if the peer existed
3385/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3386/// the appropriate connection handler to maintain a connection.
3387fn peer_added_to_mesh(
3388    peer_id: PeerId,
3389    new_topics: Vec<&TopicHash>,
3390    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3391    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3392    connections: &HashMap<PeerId, PeerConnections>,
3393) {
3394    // Ensure there is an active connection
3395    let connection_id = match connections.get(&peer_id) {
3396        Some(p) => p
3397            .connections
3398            .first()
3399            .expect("There should be at least one connection to a peer."),
3400        None => {
3401            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3402            return;
3403        }
3404    };
3405
3406    if let Some(peer) = connections.get(&peer_id) {
3407        for topic in &peer.topics {
3408            if !new_topics.contains(&topic) {
3409                if let Some(mesh_peers) = mesh.get(topic) {
3410                    if mesh_peers.contains(&peer_id) {
3411                        // the peer is already in a mesh for another topic
3412                        return;
3413                    }
3414                }
3415            }
3416        }
3417    }
3418    // This is the first mesh the peer has joined, inform the handler
3419    events.push_back(ToSwarm::NotifyHandler {
3420        peer_id,
3421        event: HandlerIn::JoinedMesh,
3422        handler: NotifyHandler::One(*connection_id),
3423    });
3424}
3425
3426/// This is called when peers are removed from a mesh. It checks if the peer exists
3427/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3428/// notify the handler to no longer maintain a connection.
3429fn peer_removed_from_mesh(
3430    peer_id: PeerId,
3431    old_topic: &TopicHash,
3432    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3433    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3434    connections: &HashMap<PeerId, PeerConnections>,
3435) {
3436    // Ensure there is an active connection
3437    let connection_id = match connections.get(&peer_id) {
3438        Some(p) => p
3439            .connections
3440            .first()
3441            .expect("There should be at least one connection to a peer."),
3442        None => {
3443            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3444            return;
3445        }
3446    };
3447
3448    if let Some(peer) = connections.get(&peer_id) {
3449        for topic in &peer.topics {
3450            if topic != old_topic {
3451                if let Some(mesh_peers) = mesh.get(topic) {
3452                    if mesh_peers.contains(&peer_id) {
3453                        // the peer exists in another mesh still
3454                        return;
3455                    }
3456                }
3457            }
3458        }
3459    }
3460    // The peer is not in any other mesh, inform the handler
3461    events.push_back(ToSwarm::NotifyHandler {
3462        peer_id,
3463        event: HandlerIn::LeftMesh,
3464        handler: NotifyHandler::One(*connection_id),
3465    });
3466}
3467
3468/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3469/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3470/// that gets as input the number of filtered peers.
3471fn get_random_peers_dynamic(
3472    connected_peers: &HashMap<PeerId, PeerConnections>,
3473    topic_hash: &TopicHash,
3474    // maps the number of total peers to the number of selected peers
3475    n_map: impl Fn(usize) -> usize,
3476    mut f: impl FnMut(&PeerId) -> bool,
3477) -> BTreeSet<PeerId> {
3478    let mut gossip_peers = connected_peers
3479        .iter()
3480        .filter(|(_, p)| p.topics.contains(topic_hash))
3481        .filter(|(peer_id, _)| f(peer_id))
3482        .filter(|(_, p)| p.kind.is_gossipsub())
3483        .map(|(peer_id, _)| *peer_id)
3484        .collect::<Vec<PeerId>>();
3485
3486    // if we have less than needed, return them
3487    let n = n_map(gossip_peers.len());
3488    if gossip_peers.len() <= n {
3489        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3490        return gossip_peers.into_iter().collect();
3491    }
3492
3493    // we have more peers than needed, shuffle them and return n of them
3494    let mut rng = thread_rng();
3495    gossip_peers.partial_shuffle(&mut rng, n);
3496
3497    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3498
3499    gossip_peers.into_iter().take(n).collect()
3500}
3501
3502/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3503/// filtered by the function `f`.
3504fn get_random_peers(
3505    connected_peers: &HashMap<PeerId, PeerConnections>,
3506    topic_hash: &TopicHash,
3507    n: usize,
3508    f: impl FnMut(&PeerId) -> bool,
3509) -> BTreeSet<PeerId> {
3510    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3511}
3512
3513/// Validates the combination of signing, privacy and message validation to ensure the
3514/// configuration will not reject published messages.
3515fn validate_config(
3516    authenticity: &MessageAuthenticity,
3517    validation_mode: &ValidationMode,
3518) -> Result<(), &'static str> {
3519    match validation_mode {
3520        ValidationMode::Anonymous => {
3521            if authenticity.is_signing() {
3522                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3523            }
3524
3525            if !authenticity.is_anonymous() {
3526                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3527            }
3528        }
3529        ValidationMode::Strict => {
3530            if !authenticity.is_signing() {
3531                return Err(
3532                    "Messages will be
3533                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3534                the validation or privacy settings in the config"
3535                );
3536            }
3537        }
3538        _ => {}
3539    }
3540    Ok(())
3541}
3542
3543impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3544    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3545        f.debug_struct("Behaviour")
3546            .field("config", &self.config)
3547            .field("events", &self.events.len())
3548            .field("publish_config", &self.publish_config)
3549            .field("mesh", &self.mesh)
3550            .field("fanout", &self.fanout)
3551            .field("fanout_last_pub", &self.fanout_last_pub)
3552            .field("mcache", &self.mcache)
3553            .field("heartbeat", &self.heartbeat)
3554            .finish()
3555    }
3556}
3557
3558impl fmt::Debug for PublishConfig {
3559    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3560        match self {
3561            PublishConfig::Signing { author, .. } => {
3562                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3563            }
3564            PublishConfig::Author(author) => {
3565                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3566            }
3567            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3568            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3569        }
3570    }
3571}