libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{
23        max,
24        Ordering::{self, Equal},
25    },
26    collections::{BTreeSet, HashMap, HashSet, VecDeque},
27    fmt::{self, Debug},
28    net::IpAddr,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37    multiaddr::Protocol::{Ip4, Ip6},
38    transport::PortUse,
39    Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44    dial_opts::DialOpts,
45    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46    THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57    backoff::BackoffStorage,
58    config::{Config, ValidationMode},
59    gossip_promises::GossipPromises,
60    handler::{Handler, HandlerEvent, HandlerIn},
61    mcache::MessageCache,
62    peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63    protocol::SIGNING_PREFIX,
64    rpc::Sender,
65    rpc_proto::proto,
66    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67    time_cache::DuplicateCache,
68    topic::{Hasher, Topic, TopicHash},
69    transform::{DataTransform, IdentityTransform},
70    types::{
71        ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72        PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73        SubscriptionAction,
74    },
75    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81/// IDONTWANT cache capacity.
82const IDONTWANT_CAP: usize = 10_000;
83
84/// IDONTWANT timeout before removal.
85const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87/// Determines if published messages should be signed or not.
88///
89/// Without signing, a number of privacy preserving modes can be selected.
90///
91/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
92/// should be updated in the [`Config`] to allow for unsigned messages.
93#[derive(Clone)]
94pub enum MessageAuthenticity {
95    /// Message signing is enabled. The author will be the owner of the key and the sequence number
96    /// will be linearly increasing.
97    Signed(Keypair),
98    /// Message signing is disabled.
99    ///
100    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
101    /// number will be randomized.
102    Author(PeerId),
103    /// Message signing is disabled.
104    ///
105    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
106    /// randomized.
107    RandomAuthor,
108    /// Message signing is disabled.
109    ///
110    /// The author of the message and the sequence numbers are excluded from the message.
111    ///
112    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
113    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
114    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
115    /// function will need to be set to prevent all messages from a peer being filtered
116    /// as duplicates.
117    Anonymous,
118}
119
120impl MessageAuthenticity {
121    /// Returns true if signing is enabled.
122    pub fn is_signing(&self) -> bool {
123        matches!(self, MessageAuthenticity::Signed(_))
124    }
125
126    pub fn is_anonymous(&self) -> bool {
127        matches!(self, MessageAuthenticity::Anonymous)
128    }
129}
130
131/// Event that can be emitted by the gossipsub behaviour.
132#[derive(Debug)]
133pub enum Event {
134    /// A message has been received.
135    Message {
136        /// The peer that forwarded us this message.
137        propagation_source: PeerId,
138        /// The [`MessageId`] of the message. This should be referenced by the application when
139        /// validating a message (if required).
140        message_id: MessageId,
141        /// The decompressed message itself.
142        message: Message,
143    },
144    /// A remote subscribed to a topic.
145    Subscribed {
146        /// Remote that has subscribed.
147        peer_id: PeerId,
148        /// The topic it has subscribed to.
149        topic: TopicHash,
150    },
151    /// A remote unsubscribed from a topic.
152    Unsubscribed {
153        /// Remote that has unsubscribed.
154        peer_id: PeerId,
155        /// The topic it has subscribed from.
156        topic: TopicHash,
157    },
158    /// A peer that does not support gossipsub has connected.
159    GossipsubNotSupported { peer_id: PeerId },
160    /// A peer is not able to download messages in time.
161    SlowPeer {
162        /// The peer_id
163        peer_id: PeerId,
164        /// The types and amounts of failed messages that are occurring for this peer.
165        failed_messages: FailedMessages,
166    },
167}
168
169/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
170/// for further details.
171#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173    Signing {
174        keypair: Keypair,
175        author: PeerId,
176        inline_key: Option<Vec<u8>>,
177        last_seq_no: SequenceNumber,
178    },
179    Author(PeerId),
180    RandomAuthor,
181    Anonymous,
182}
183
184/// A strictly linearly increasing sequence number.
185///
186/// We start from the current time as unix timestamp in milliseconds.
187#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191    fn new() -> Self {
192        let unix_timestamp = SystemTime::now()
193            .duration_since(SystemTime::UNIX_EPOCH)
194            .expect("time to be linear")
195            .as_nanos();
196
197        Self(unix_timestamp as u64)
198    }
199
200    fn next(&mut self) -> u64 {
201        self.0 = self
202            .0
203            .checked_add(1)
204            .expect("to not exhaust u64 space for sequence numbers");
205
206        self.0
207    }
208}
209
210impl PublishConfig {
211    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212        match self {
213            Self::Signing { author, .. } => Some(author),
214            Self::Author(author) => Some(author),
215            _ => None,
216        }
217    }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221    fn from(authenticity: MessageAuthenticity) -> Self {
222        match authenticity {
223            MessageAuthenticity::Signed(keypair) => {
224                let public_key = keypair.public();
225                let key_enc = public_key.encode_protobuf();
226                let key = if key_enc.len() <= 42 {
227                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
228                    // don't include it specifically in the
229                    // [`rpc_proto::proto::Message::key`] field.
230                    None
231                } else {
232                    // Include the protobuf encoding of the public key in the message.
233                    Some(key_enc)
234                };
235
236                PublishConfig::Signing {
237                    keypair,
238                    author: public_key.to_peer_id(),
239                    inline_key: key,
240                    last_seq_no: SequenceNumber::new(),
241                }
242            }
243            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246        }
247    }
248}
249
250/// Network behaviour that handles the gossipsub protocol.
251///
252/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
253/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
254/// appropriate level to accept unsigned messages.
255///
256/// The DataTransform trait allows applications to optionally add extra encoding/decoding
257/// functionality to the underlying messages. This is intended for custom compression algorithms.
258///
259/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
260/// prevent unwanted messages being propagated and evaluated.
261pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262    /// Configuration providing gossipsub performance parameters.
263    config: Config,
264
265    /// Events that need to be yielded to the outside when polling.
266    events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268    /// Information used for publishing messages.
269    publish_config: PublishConfig,
270
271    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
272    /// duplicates from being propagated to the application and on the network.
273    duplicate_cache: DuplicateCache<MessageId>,
274
275    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
276    /// the set of [`ConnectionId`]s.
277    connected_peers: HashMap<PeerId, PeerDetails>,
278
279    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
280    /// forward messages to, outside of the scoring system.
281    explicit_peers: HashSet<PeerId>,
282
283    /// A list of peers that have been blacklisted by the user.
284    /// Messages are not sent to and are rejected from these peers.
285    blacklisted_peers: HashSet<PeerId>,
286
287    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
288    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290    /// Map of topics to list of peers that we publish to, but don't subscribe to.
291    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293    /// The last publish time for fanout topics.
294    fanout_last_pub: HashMap<TopicHash, Instant>,
295
296    /// Storage for backoffs
297    backoffs: BackoffStorage,
298
299    /// Message cache for the last few heartbeats.
300    mcache: MessageCache,
301
302    /// Heartbeat interval stream.
303    heartbeat: Delay,
304
305    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
306    /// clean up -- eg backoff clean up.
307    heartbeat_ticks: u64,
308
309    /// We remember all peers we found through peer exchange, since those peers are not considered
310    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
311    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
312    /// be removed from this list which may result in a true outbound rediscovery.
313    px_peers: HashSet<PeerId>,
314
315    /// Stores optional peer score data together with thresholds, decay interval and gossip
316    /// promises.
317    peer_score: PeerScoreState,
318
319    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
320    count_received_ihave: HashMap<PeerId, usize>,
321
322    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
323    count_sent_iwant: HashMap<PeerId, usize>,
324
325    /// Short term cache for published message ids. This is used for penalizing peers sending
326    /// our own messages back if the messages are anonymous or use a random author.
327    published_message_ids: DuplicateCache<MessageId>,
328
329    /// The filter used to handle message subscriptions.
330    subscription_filter: F,
331
332    /// A general transformation function that can be applied to data received from the wire before
333    /// calculating the message-id and sending to the application. This is designed to allow the
334    /// user to implement arbitrary topic-based compression algorithms.
335    data_transform: D,
336
337    /// Keep track of a set of internal metrics relating to gossipsub.
338    #[cfg(feature = "metrics")]
339    metrics: Option<Metrics>,
340
341    /// Tracks the numbers of failed messages per peer-id.
342    failed_messages: HashMap<PeerId, FailedMessages>,
343
344    /// Tracks recently sent `IWANT` messages and checks if peers respond to them.
345    gossip_promises: GossipPromises,
346}
347
348impl<D, F> Behaviour<D, F>
349where
350    D: DataTransform + Default,
351    F: TopicSubscriptionFilter + Default,
352{
353    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
354    /// [`Config`]. This has no subscription filter and uses no compression.
355    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
356        Self::new_with_subscription_filter_and_transform(
357            privacy,
358            config,
359            F::default(),
360            D::default(),
361        )
362    }
363
364    /// Allow the [`Behaviour`] to also record metrics.
365    /// Metrics can be evaluated by passing a reference to a [`Registry`].
366    #[cfg(feature = "metrics")]
367    pub fn with_metrics(
368        mut self,
369        metrics_registry: &mut Registry,
370        metrics_config: MetricsConfig,
371    ) -> Self {
372        self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
373        self
374    }
375}
376
377impl<D, F> Behaviour<D, F>
378where
379    D: DataTransform + Default,
380    F: TopicSubscriptionFilter,
381{
382    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
383    /// [`Config`] and a custom subscription filter.
384    pub fn new_with_subscription_filter(
385        privacy: MessageAuthenticity,
386        config: Config,
387        subscription_filter: F,
388    ) -> Result<Self, &'static str> {
389        Self::new_with_subscription_filter_and_transform(
390            privacy,
391            config,
392            subscription_filter,
393            D::default(),
394        )
395    }
396}
397
398impl<D, F> Behaviour<D, F>
399where
400    D: DataTransform,
401    F: TopicSubscriptionFilter + Default,
402{
403    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
404    /// [`Config`] and a custom data transform.
405    /// Metrics are disabled by default.
406    pub fn new_with_transform(
407        privacy: MessageAuthenticity,
408        config: Config,
409        data_transform: D,
410    ) -> Result<Self, &'static str> {
411        Self::new_with_subscription_filter_and_transform(
412            privacy,
413            config,
414            F::default(),
415            data_transform,
416        )
417    }
418}
419
420impl<D, F> Behaviour<D, F>
421where
422    D: DataTransform,
423    F: TopicSubscriptionFilter,
424{
425    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
426    /// [`Config`] and a custom subscription filter and data transform.
427    /// Metrics are disabled by default.
428    pub fn new_with_subscription_filter_and_transform(
429        privacy: MessageAuthenticity,
430        config: Config,
431        subscription_filter: F,
432        data_transform: D,
433    ) -> Result<Self, &'static str> {
434        // Set up the router given the configuration settings.
435
436        // We do not allow configurations where a published message would also be rejected if it
437        // were received locally.
438        validate_config(&privacy, config.validation_mode())?;
439
440        Ok(Behaviour {
441            #[cfg(feature = "metrics")]
442            metrics: None,
443            events: VecDeque::new(),
444            publish_config: privacy.into(),
445            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
446            explicit_peers: HashSet::new(),
447            blacklisted_peers: HashSet::new(),
448            mesh: HashMap::new(),
449            fanout: HashMap::new(),
450            fanout_last_pub: HashMap::new(),
451            backoffs: BackoffStorage::new(
452                &config.prune_backoff(),
453                config.heartbeat_interval(),
454                config.backoff_slack(),
455            ),
456            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
457            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
458            heartbeat_ticks: 0,
459            px_peers: HashSet::new(),
460            peer_score: PeerScoreState::Disabled,
461            count_received_ihave: HashMap::new(),
462            count_sent_iwant: HashMap::new(),
463            connected_peers: HashMap::new(),
464            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
465            config,
466            subscription_filter,
467            data_transform,
468            failed_messages: Default::default(),
469            gossip_promises: Default::default(),
470        })
471    }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476    D: DataTransform + Send + 'static,
477    F: TopicSubscriptionFilter + Send + 'static,
478{
479    /// Lists the hashes of the topics we are currently subscribed to.
480    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481        self.mesh.keys()
482    }
483
484    /// Lists all mesh peers for a certain topic hash.
485    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487    }
488
489    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490        let mut res = BTreeSet::new();
491        for peers in self.mesh.values() {
492            res.extend(peers);
493        }
494        res.into_iter()
495    }
496
497    /// Lists all known peers and their associated subscribed topics.
498    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499        self.connected_peers
500            .iter()
501            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502    }
503
504    /// Lists all known peers and their associated protocol.
505    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507    }
508
509    /// Returns the gossipsub score for a given peer, if one exists.
510    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511        match &self.peer_score {
512            PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
513            PeerScoreState::Disabled => None,
514        }
515    }
516
517    /// Subscribe to a topic.
518    ///
519    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
520    /// subscribed.
521    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
522        tracing::debug!(%topic, "Subscribing to topic");
523        let topic_hash = topic.hash();
524        if !self.subscription_filter.can_subscribe(&topic_hash) {
525            return Err(SubscriptionError::NotAllowed);
526        }
527
528        if self.mesh.contains_key(&topic_hash) {
529            tracing::debug!(%topic, "Topic is already in the mesh");
530            return Ok(false);
531        }
532
533        // send subscription request to all peers
534        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
535            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
536            let event = RpcOut::Subscribe(topic_hash.clone());
537            self.send_message(peer_id, event);
538        }
539
540        // call JOIN(topic)
541        // this will add new peers to the mesh for the topic
542        self.join(&topic_hash);
543        tracing::debug!(%topic, "Subscribed to topic");
544        Ok(true)
545    }
546
547    /// Unsubscribes from a topic.
548    ///
549    /// Returns `true` if we were subscribed to this topic.
550    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
551        tracing::debug!(%topic, "Unsubscribing from topic");
552        let topic_hash = topic.hash();
553
554        if !self.mesh.contains_key(&topic_hash) {
555            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
556            // we are not subscribed
557            return false;
558        }
559
560        // announce to all peers
561        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
562            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
563            let event = RpcOut::Unsubscribe(topic_hash.clone());
564            self.send_message(peer, event);
565        }
566
567        // call LEAVE(topic)
568        // this will remove the topic from the mesh
569        self.leave(&topic_hash);
570
571        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
572        true
573    }
574
575    /// Publishes a message with multiple topics to the network.
576    pub fn publish(
577        &mut self,
578        topic: impl Into<TopicHash>,
579        data: impl Into<Vec<u8>>,
580    ) -> Result<MessageId, PublishError> {
581        let data = data.into();
582        let topic = topic.into();
583
584        // Transform the data before building a raw_message.
585        let transformed_data = self
586            .data_transform
587            .outbound_transform(&topic.clone(), data.clone())?;
588
589        let max_transmit_size_for_topic = self
590            .config
591            .protocol_config()
592            .max_transmit_size_for_topic(&topic);
593
594        // check that the size doesn't exceed the max transmission size.
595        if transformed_data.len() > max_transmit_size_for_topic {
596            return Err(PublishError::MessageTooLarge);
597        }
598
599        let mesh_n = self.config.mesh_n_for_topic(&topic);
600        let raw_message = self.build_raw_message(topic, transformed_data)?;
601
602        // calculate the message id from the un-transformed data
603        let msg_id = self.config.message_id(&Message {
604            source: raw_message.source,
605            data, // the uncompressed form
606            sequence_number: raw_message.sequence_number,
607            topic: raw_message.topic.clone(),
608        });
609
610        // Check the if the message has been published before
611        if self.duplicate_cache.contains(&msg_id) {
612            // This message has already been seen. We don't re-publish messages that have already
613            // been published on the network.
614            tracing::warn!(
615                message_id=%msg_id,
616                "Not publishing a message that has already been published"
617            );
618            return Err(PublishError::Duplicate);
619        }
620
621        tracing::trace!(message_id=%msg_id, "Publishing message");
622
623        let topic_hash = raw_message.topic.clone();
624
625        let mut peers_on_topic = self
626            .connected_peers
627            .iter()
628            .filter(|(_, p)| p.topics.contains(&topic_hash))
629            .map(|(peer_id, _)| peer_id)
630            .peekable();
631
632        if peers_on_topic.peek().is_none() {
633            return Err(PublishError::NoPeersSubscribedToTopic);
634        }
635
636        let mut recipient_peers = HashSet::new();
637        if self.config.flood_publish() {
638            // Forward to all peers above score and all explicit peers
639            recipient_peers.extend(peers_on_topic.filter(|p| {
640                self.explicit_peers.contains(*p)
641                    || !self
642                        .peer_score
643                        .below_threshold(p, |ts| ts.publish_threshold)
644                        .0
645            }));
646        } else {
647            match self.mesh.get(&topic_hash) {
648                // Mesh peers
649                Some(mesh_peers) => {
650                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
651                    // peers (if possible).
652                    let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
653
654                    if needed_extra_peers > 0 {
655                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
656                        // and publish to them.
657
658                        // Get a random set of peers that are appropriate to send messages too.
659                        let peer_list = get_random_peers(
660                            &self.connected_peers,
661                            &topic_hash,
662                            needed_extra_peers,
663                            |peer| {
664                                !mesh_peers.contains(peer)
665                                    && !self.explicit_peers.contains(peer)
666                                    && !self
667                                        .peer_score
668                                        .below_threshold(peer, |ts| ts.publish_threshold)
669                                        .0
670                            },
671                        );
672                        recipient_peers.extend(peer_list);
673                    }
674
675                    recipient_peers.extend(mesh_peers);
676                }
677                // Gossipsub peers
678                None => {
679                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
680                    // `fanout_peers` is always non-empty if it's `Some`.
681                    let fanout_peers = self
682                        .fanout
683                        .get(&topic_hash)
684                        .filter(|peers| !peers.is_empty());
685                    // If we have fanout peers add them to the map.
686                    if let Some(peers) = fanout_peers {
687                        for peer in peers {
688                            recipient_peers.insert(*peer);
689                        }
690                    } else {
691                        // We have no fanout peers, select mesh_n of them and add them to the fanout
692                        let new_peers =
693                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
694                                |p| {
695                                    !self.explicit_peers.contains(p)
696                                        && !self
697                                            .peer_score
698                                            .below_threshold(p, |ts| ts.publish_threshold)
699                                            .0
700                                }
701                            });
702                        // Add the new peers to the fanout and recipient peers
703                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
704                        for peer in new_peers {
705                            tracing::debug!(%peer, "Peer added to fanout");
706                            recipient_peers.insert(peer);
707                        }
708                    }
709                    // We are publishing to fanout peers - update the time we published
710                    self.fanout_last_pub
711                        .insert(topic_hash.clone(), Instant::now());
712                }
713            }
714
715            // Explicit peers that are part of the topic
716            recipient_peers
717                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
718
719            // Floodsub peers
720            for (peer, connections) in &self.connected_peers {
721                if connections.kind == PeerKind::Floodsub
722                    && connections.topics.contains(&topic_hash)
723                    && !self
724                        .peer_score
725                        .below_threshold(peer, |ts| ts.publish_threshold)
726                        .0
727                {
728                    recipient_peers.insert(*peer);
729                }
730            }
731        }
732
733        // If the message isn't a duplicate and we have sent it to some peers add it to the
734        // duplicate cache and memcache.
735        self.duplicate_cache.insert(msg_id.clone());
736        self.mcache.put(&msg_id, raw_message.clone());
737
738        // Consider the message as delivered for gossip promises.
739        self.gossip_promises.message_delivered(&msg_id);
740
741        // If the message is anonymous or has a random author add it to the published message ids
742        // cache.
743        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
744            if !self.config.allow_self_origin() {
745                self.published_message_ids.insert(msg_id.clone());
746            }
747        }
748
749        // Send to peers we know are subscribed to the topic.
750        let mut publish_failed = true;
751        for peer_id in recipient_peers.iter() {
752            tracing::trace!(peer=%peer_id, "Sending message to peer");
753            // If enabled, Send first an IDONTWANT so that if we are slower than forwarders
754            // publishing the original message we don't receive it back.
755            if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
756                && self.config.idontwant_on_publish()
757            {
758                self.send_message(
759                    *peer_id,
760                    RpcOut::IDontWant(IDontWant {
761                        message_ids: vec![msg_id.clone()],
762                    }),
763                );
764            }
765
766            if self.send_message(
767                *peer_id,
768                RpcOut::Publish {
769                    message: raw_message.clone(),
770                    timeout: Delay::new(self.config.publish_queue_duration()),
771                },
772            ) {
773                publish_failed = false
774            }
775        }
776
777        if recipient_peers.is_empty() {
778            return Err(PublishError::NoPeersSubscribedToTopic);
779        }
780
781        if publish_failed {
782            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
783        }
784
785        tracing::debug!(message_id=%msg_id, "Published message");
786
787        #[cfg(feature = "metrics")]
788        if let Some(metrics) = self.metrics.as_mut() {
789            metrics.register_published_message(&topic_hash);
790        }
791
792        Ok(msg_id)
793    }
794
795    /// This function should be called when [`Config::validate_messages()`] is `true` after
796    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
797    /// validation is expected to be fast enough that the messages should still exist in the cache.
798    /// There are three possible validation outcomes and the outcome is given in acceptance.
799    ///
800    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
801    /// network. The `propagation_source` parameter indicates who the message was received by and
802    /// will not be forwarded back to that peer.
803    ///
804    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
805    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
806    //
807    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
808    /// but no Pâ‚„ penalty will be applied.
809    ///
810    /// This function will return true if the message was found in the cache and false if was not
811    /// in the cache anymore.
812    ///
813    /// This should only be called once per message.
814    pub fn report_message_validation_result(
815        &mut self,
816        msg_id: &MessageId,
817        propagation_source: &PeerId,
818        acceptance: MessageAcceptance,
819    ) -> bool {
820        let reject_reason = match acceptance {
821            MessageAcceptance::Accept => {
822                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
823                    Some((raw_message, originating_peers)) => {
824                        (raw_message.clone(), originating_peers)
825                    }
826                    None => {
827                        tracing::warn!(
828                            message_id=%msg_id,
829                            "Message not in cache. Ignoring forwarding"
830                        );
831                        #[cfg(feature = "metrics")]
832                        if let Some(metrics) = self.metrics.as_mut() {
833                            metrics.memcache_miss();
834                        }
835                        return false;
836                    }
837                };
838
839                #[cfg(feature = "metrics")]
840                if let Some(metrics) = self.metrics.as_mut() {
841                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
842                }
843
844                self.forward_msg(
845                    msg_id,
846                    raw_message,
847                    Some(propagation_source),
848                    originating_peers,
849                );
850                return true;
851            }
852            MessageAcceptance::Reject => RejectReason::ValidationFailed,
853            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
854        };
855
856        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
857            #[cfg(feature = "metrics")]
858            if let Some(metrics) = self.metrics.as_mut() {
859                metrics.register_msg_validation(&raw_message.topic, &acceptance);
860            }
861
862            // Tell peer_score about reject
863            // Reject the original source, and any duplicates we've seen from other peers.
864            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
865                peer_score.reject_message(
866                    propagation_source,
867                    msg_id,
868                    &raw_message.topic,
869                    reject_reason,
870                );
871                for peer in originating_peers.iter() {
872                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
873                }
874            }
875            true
876        } else {
877            tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
878            false
879        }
880    }
881
882    /// Adds a new peer to the list of explicitly connected peers.
883    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
884        tracing::debug!(peer=%peer_id, "Adding explicit peer");
885
886        self.explicit_peers.insert(*peer_id);
887
888        self.check_explicit_peer_connection(peer_id);
889    }
890
891    /// This removes the peer from explicitly connected peers, note that this does not disconnect
892    /// the peer.
893    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
894        tracing::debug!(peer=%peer_id, "Removing explicit peer");
895        self.explicit_peers.remove(peer_id);
896    }
897
898    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
899    /// created by this peer will be rejected.
900    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
901        if self.blacklisted_peers.insert(*peer_id) {
902            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
903        }
904    }
905
906    /// Removes a peer from the blacklist if it has previously been blacklisted.
907    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
908        if self.blacklisted_peers.remove(peer_id) {
909            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
910        }
911    }
912
913    /// Activates the peer scoring system with the given parameters. This will reset all scores
914    /// if there was already another peer scoring system activated. Returns an error if the
915    /// params are not valid or if they got already set.
916    pub fn with_peer_score(
917        &mut self,
918        params: PeerScoreParams,
919        threshold: PeerScoreThresholds,
920    ) -> Result<(), String> {
921        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
922    }
923
924    /// Activates the peer scoring system with the given parameters and a message delivery time
925    /// callback. Returns an error if the parameters got already set.
926    pub fn with_peer_score_and_message_delivery_time_callback(
927        &mut self,
928        params: PeerScoreParams,
929        thresholds: PeerScoreThresholds,
930        callback: Option<fn(&PeerId, &TopicHash, f64)>,
931    ) -> Result<(), String> {
932        params.validate()?;
933        thresholds.validate()?;
934
935        if let PeerScoreState::Active(_) = self.peer_score {
936            return Err("Peer score set twice".into());
937        }
938
939        let peer_score =
940            PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
941        self.peer_score = PeerScoreState::Active(Box::new(peer_score));
942        Ok(())
943    }
944
945    /// Sets scoring parameters for a topic.
946    ///
947    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
948    pub fn set_topic_params<H: Hasher>(
949        &mut self,
950        topic: Topic<H>,
951        params: TopicScoreParams,
952    ) -> Result<(), &'static str> {
953        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
954            peer_score.set_topic_params(topic.hash(), params);
955            Ok(())
956        } else {
957            Err("Peer score must be initialised with `with_peer_score()`")
958        }
959    }
960
961    /// Returns a scoring parameters for a topic if existent.
962    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
963        match &self.peer_score {
964            PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
965            PeerScoreState::Disabled => None,
966        }
967    }
968
969    /// Sets the application specific score for a peer. Returns true if scoring is active and
970    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
971    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
972        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
973            peer_score.set_application_score(peer_id, new_score)
974        } else {
975            false
976        }
977    }
978
979    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
980    fn join(&mut self, topic_hash: &TopicHash) {
981        tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
982
983        let mut added_peers = HashSet::new();
984        let mesh_n = self.config.mesh_n_for_topic(topic_hash);
985        #[cfg(feature = "metrics")]
986        if let Some(m) = self.metrics.as_mut() {
987            m.joined(topic_hash)
988        }
989
990        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
991        // removing the fanout entry.
992        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
993            tracing::debug!(
994                topic=%topic_hash,
995                "JOIN: Removing peers from the fanout for topic"
996            );
997
998            // remove explicit peers, peers with negative scores, and backoffed peers
999            peers.retain(|p| {
1000                !self.explicit_peers.contains(p)
1001                    && !self.peer_score.below_threshold(p, |_| 0.0).0
1002                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
1003            });
1004
1005            // Add up to mesh_n of them to the mesh
1006            // NOTE: These aren't randomly added, currently FIFO
1007            let add_peers = std::cmp::min(peers.len(), mesh_n);
1008            tracing::debug!(
1009                topic=%topic_hash,
1010                "JOIN: Adding {:?} peers from the fanout for topic",
1011                add_peers
1012            );
1013            added_peers.extend(peers.iter().take(add_peers));
1014
1015            self.mesh.insert(
1016                topic_hash.clone(),
1017                peers.into_iter().take(add_peers).collect(),
1018            );
1019
1020            // remove the last published time
1021            self.fanout_last_pub.remove(topic_hash);
1022        }
1023
1024        #[cfg(feature = "metrics")]
1025        if let Some(m) = self.metrics.as_mut() {
1026            m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1027        }
1028
1029        // check if we need to get more peers, which we randomly select
1030        if added_peers.len() < mesh_n {
1031            // get the peers
1032            let random_added = get_random_peers(
1033                &self.connected_peers,
1034                topic_hash,
1035                mesh_n - added_peers.len(),
1036                |peer| {
1037                    !added_peers.contains(peer)
1038                        && !self.explicit_peers.contains(peer)
1039                        && !self.peer_score.below_threshold(peer, |_| 0.0).0
1040                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1041                },
1042            );
1043
1044            added_peers.extend(random_added.clone());
1045            // add them to the mesh
1046            tracing::debug!(
1047                "JOIN: Inserting {:?} random peers into the mesh",
1048                random_added.len()
1049            );
1050
1051            #[cfg(feature = "metrics")]
1052            if let Some(m) = self.metrics.as_mut() {
1053                m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1054            }
1055
1056            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1057            mesh_peers.extend(random_added);
1058        }
1059
1060        for peer_id in added_peers {
1061            // Send a GRAFT control message
1062            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1063            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1064                peer_score.graft(&peer_id, topic_hash.clone());
1065            }
1066            self.send_message(
1067                peer_id,
1068                RpcOut::Graft(Graft {
1069                    topic_hash: topic_hash.clone(),
1070                }),
1071            );
1072
1073            // If the peer did not previously exist in any mesh, inform the handler
1074            peer_added_to_mesh(
1075                peer_id,
1076                vec![topic_hash],
1077                &self.mesh,
1078                &mut self.events,
1079                &self.connected_peers,
1080            );
1081        }
1082
1083        #[cfg(feature = "metrics")]
1084        {
1085            let mesh_peers = self.mesh_peers(topic_hash).count();
1086            if let Some(m) = self.metrics.as_mut() {
1087                m.set_mesh_peers(topic_hash, mesh_peers)
1088            }
1089        }
1090
1091        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1092    }
1093
1094    /// Creates a PRUNE gossipsub action.
1095    fn make_prune(
1096        &mut self,
1097        topic_hash: &TopicHash,
1098        peer: &PeerId,
1099        do_px: bool,
1100        on_unsubscribe: bool,
1101    ) -> Prune {
1102        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1103            peer_score.prune(peer, topic_hash.clone());
1104        }
1105
1106        match self.connected_peers.get(peer).map(|v| &v.kind) {
1107            Some(PeerKind::Floodsub) => {
1108                tracing::error!("Attempted to prune a Floodsub peer");
1109            }
1110            Some(PeerKind::Gossipsub) => {
1111                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1112                return Prune {
1113                    topic_hash: topic_hash.clone(),
1114                    peers: Vec::new(),
1115                    backoff: None,
1116                };
1117            }
1118            None => {
1119                tracing::error!("Attempted to Prune an unknown peer");
1120            }
1121            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1122        }
1123
1124        // Select peers for peer exchange
1125        let peers = if do_px {
1126            get_random_peers(
1127                &self.connected_peers,
1128                topic_hash,
1129                self.config.prune_peers(),
1130                |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1131            )
1132            .into_iter()
1133            .map(|p| PeerInfo { peer_id: Some(p) })
1134            .collect()
1135        } else {
1136            Vec::new()
1137        };
1138
1139        let backoff = if on_unsubscribe {
1140            self.config.unsubscribe_backoff()
1141        } else {
1142            self.config.prune_backoff()
1143        };
1144
1145        // update backoff
1146        self.backoffs.update_backoff(topic_hash, peer, backoff);
1147
1148        Prune {
1149            topic_hash: topic_hash.clone(),
1150            peers,
1151            backoff: Some(backoff.as_secs()),
1152        }
1153    }
1154
1155    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1156    fn leave(&mut self, topic_hash: &TopicHash) {
1157        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1158
1159        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1160        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1161            #[cfg(feature = "metrics")]
1162            if let Some(m) = self.metrics.as_mut() {
1163                m.left(topic_hash)
1164            }
1165            for peer_id in peers {
1166                // Send a PRUNE control message
1167                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1168
1169                let on_unsubscribe = true;
1170                let prune =
1171                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1172                self.send_message(peer_id, RpcOut::Prune(prune));
1173
1174                // If the peer did not previously exist in any mesh, inform the handler
1175                peer_removed_from_mesh(
1176                    peer_id,
1177                    topic_hash,
1178                    &self.mesh,
1179                    &mut self.events,
1180                    &self.connected_peers,
1181                );
1182            }
1183        }
1184        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1185    }
1186
1187    /// Checks if the given peer is still connected and if not dials the peer again.
1188    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1189        if !self.connected_peers.contains_key(peer_id) {
1190            // Connect to peer
1191            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1192            self.events.push_back(ToSwarm::Dial {
1193                opts: DialOpts::peer_id(*peer_id).build(),
1194            });
1195        }
1196    }
1197
1198    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1199    /// requests it with an IWANT control message.
1200    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1201        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1202        if let (true, score) = self
1203            .peer_score
1204            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1205        {
1206            tracing::debug!(
1207                peer=%peer_id,
1208                %score,
1209                "IHAVE: ignoring peer with score below threshold"
1210            );
1211            return;
1212        }
1213
1214        // IHAVE flood protection
1215        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1216        *peer_have += 1;
1217        if *peer_have > self.config.max_ihave_messages() {
1218            tracing::debug!(
1219                peer=%peer_id,
1220                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1221            interval; ignoring",
1222                *peer_have
1223            );
1224            return;
1225        }
1226
1227        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1228            if *iasked >= self.config.max_ihave_length() {
1229                tracing::debug!(
1230                    peer=%peer_id,
1231                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1232                    *iasked
1233                );
1234                return;
1235            }
1236        }
1237
1238        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1239
1240        let mut iwant_ids = HashSet::new();
1241
1242        for (topic, ids) in ihave_msgs {
1243            // only process the message if we are subscribed
1244            if !self.mesh.contains_key(&topic) {
1245                tracing::debug!(
1246                    %topic,
1247                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1248                );
1249                continue;
1250            }
1251
1252            for id in ids.into_iter().filter(|id| {
1253                if self.duplicate_cache.contains(id) {
1254                    return false;
1255                }
1256
1257                !self.gossip_promises.contains(id)
1258            }) {
1259                // have not seen this message and are not currently requesting it
1260                if iwant_ids.insert(id) {
1261                    // Register the IWANT metric
1262                    #[cfg(feature = "metrics")]
1263                    if let Some(metrics) = self.metrics.as_mut() {
1264                        metrics.register_iwant(&topic);
1265                    }
1266                }
1267            }
1268        }
1269
1270        if !iwant_ids.is_empty() {
1271            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1272            let mut iask = iwant_ids.len();
1273            if *iasked + iask > self.config.max_ihave_length() {
1274                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1275            }
1276
1277            // Send the list of IWANT control messages
1278            tracing::debug!(
1279                peer=%peer_id,
1280                "IHAVE: Asking for {} out of {} messages from peer",
1281                iask,
1282                iwant_ids.len()
1283            );
1284
1285            // Ask in random order
1286            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1287            let mut rng = thread_rng();
1288            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1289
1290            iwant_ids_vec.truncate(iask);
1291            *iasked += iask;
1292
1293            self.gossip_promises.add_promise(
1294                *peer_id,
1295                &iwant_ids_vec,
1296                Instant::now() + self.config.iwant_followup_time(),
1297            );
1298            tracing::trace!(
1299                peer=%peer_id,
1300                "IHAVE: Asking for the following messages from peer: {:?}",
1301                iwant_ids_vec
1302            );
1303
1304            self.send_message(
1305                *peer_id,
1306                RpcOut::IWant(IWant {
1307                    message_ids: iwant_ids_vec,
1308                }),
1309            );
1310        }
1311        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1312    }
1313
1314    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1315    /// forwarded to the requesting peer.
1316    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1317        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1318        if let (true, score) = self
1319            .peer_score
1320            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1321        {
1322            tracing::debug!(
1323                peer=%peer_id,
1324                "IWANT: ignoring peer with score below threshold [score = {}]",
1325                score
1326            );
1327            return;
1328        }
1329
1330        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1331
1332        for id in iwant_msgs {
1333            // If we have it and the IHAVE count is not above the threshold,
1334            // forward the message.
1335            if let Some((msg, count)) = self
1336                .mcache
1337                .get_with_iwant_counts(&id, peer_id)
1338                .map(|(msg, count)| (msg.clone(), count))
1339            {
1340                if count > self.config.gossip_retransimission() {
1341                    tracing::debug!(
1342                        peer=%peer_id,
1343                        message_id=%id,
1344                        "IWANT: Peer has asked for message too many times; ignoring request"
1345                    );
1346                } else {
1347                    if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1348                        if peer.dont_send.contains_key(&id) {
1349                            tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1350                            continue;
1351                        }
1352                    }
1353
1354                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1355                    self.send_message(
1356                        *peer_id,
1357                        RpcOut::Forward {
1358                            message: msg,
1359                            timeout: Delay::new(self.config.forward_queue_duration()),
1360                        },
1361                    );
1362                }
1363            }
1364        }
1365        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1366    }
1367
1368    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1369    /// responds with PRUNE messages.
1370    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1371        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1372
1373        let mut to_prune_topics = HashSet::new();
1374
1375        let mut do_px = self.config.do_px();
1376
1377        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1378            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1379            return;
1380        };
1381        // Needs to be here to comply with the borrow checker.
1382        let is_outbound = connected_peer.outbound;
1383
1384        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1385        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1386        for topic in &topics {
1387            if connected_peer.topics.insert(topic.clone()) {
1388                #[cfg(feature = "metrics")]
1389                if let Some(m) = self.metrics.as_mut() {
1390                    m.inc_topic_peers(topic);
1391                }
1392            }
1393        }
1394
1395        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1396        if self.explicit_peers.contains(peer_id) {
1397            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1398            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1399            to_prune_topics = topics.into_iter().collect();
1400            // but don't PX
1401            do_px = false
1402        } else {
1403            let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1404            let now = Instant::now();
1405            for topic_hash in topics {
1406                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1407                    // if the peer is already in the mesh ignore the graft
1408                    if peers.contains(peer_id) {
1409                        tracing::debug!(
1410                            peer=%peer_id,
1411                            topic=%&topic_hash,
1412                            "GRAFT: Received graft for peer that is already in topic"
1413                        );
1414                        continue;
1415                    }
1416
1417                    // make sure we are not backing off that peer
1418                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1419                    {
1420                        if backoff_time > now {
1421                            tracing::warn!(
1422                                peer=%peer_id,
1423                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1424                            );
1425                            // add behavioural penalty
1426                            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1427                                #[cfg(feature = "metrics")]
1428                                if let Some(metrics) = self.metrics.as_mut() {
1429                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1430                                }
1431                                peer_score.add_penalty(peer_id, 1);
1432
1433                                // check the flood cutoff
1434                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1435                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1436                                let flood_cutoff = (backoff_time
1437                                    + self.config.graft_flood_threshold())
1438                                    - self.config.prune_backoff();
1439                                if flood_cutoff > now {
1440                                    // extra penalty
1441                                    peer_score.add_penalty(peer_id, 1);
1442                                }
1443                            }
1444                            // no PX
1445                            do_px = false;
1446
1447                            to_prune_topics.insert(topic_hash.clone());
1448                            continue;
1449                        }
1450                    }
1451
1452                    // check the score
1453                    if below_zero {
1454                        // we don't GRAFT peers with negative score
1455                        tracing::debug!(
1456                            peer=%peer_id,
1457                            %score,
1458                            topic=%topic_hash,
1459                            "GRAFT: ignoring peer with negative score"
1460                        );
1461                        // we do send them PRUNE however, because it's a matter of protocol
1462                        // correctness
1463                        to_prune_topics.insert(topic_hash.clone());
1464                        // but we won't PX to them
1465                        do_px = false;
1466                        continue;
1467                    }
1468
1469                    // check mesh upper bound and only allow graft if the upper bound is not reached
1470                    // or if it is an outbound peer
1471                    let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1472
1473                    if peers.len() >= mesh_n_high && !is_outbound {
1474                        to_prune_topics.insert(topic_hash.clone());
1475                        continue;
1476                    }
1477
1478                    // add peer to the mesh
1479                    tracing::debug!(
1480                        peer=%peer_id,
1481                        topic=%topic_hash,
1482                        "GRAFT: Mesh link added for peer in topic"
1483                    );
1484
1485                    if peers.insert(*peer_id) {
1486                        #[cfg(feature = "metrics")]
1487                        if let Some(m) = self.metrics.as_mut() {
1488                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1489                        }
1490                    }
1491
1492                    // If the peer did not previously exist in any mesh, inform the handler
1493                    peer_added_to_mesh(
1494                        *peer_id,
1495                        vec![&topic_hash],
1496                        &self.mesh,
1497                        &mut self.events,
1498                        &self.connected_peers,
1499                    );
1500
1501                    if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1502                        peer_score.graft(peer_id, topic_hash);
1503                    }
1504                } else {
1505                    // don't do PX when there is an unknown topic to avoid leaking our peers
1506                    do_px = false;
1507                    tracing::debug!(
1508                        peer=%peer_id,
1509                        topic=%topic_hash,
1510                        "GRAFT: Received graft for unknown topic from peer"
1511                    );
1512                    // spam hardening: ignore GRAFTs for unknown topics
1513                    continue;
1514                }
1515            }
1516        }
1517
1518        if !to_prune_topics.is_empty() {
1519            // build the prune messages to send
1520            let on_unsubscribe = false;
1521
1522            for prune in to_prune_topics
1523                .iter()
1524                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1525                .collect::<Vec<_>>()
1526            {
1527                self.send_message(*peer_id, RpcOut::Prune(prune));
1528            }
1529            // Send the prune messages to the peer
1530            tracing::debug!(
1531                peer=%peer_id,
1532                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1533            );
1534        }
1535        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1536    }
1537
1538    /// Removes the specified peer from the mesh, returning true if it was present.
1539    fn remove_peer_from_mesh(
1540        &mut self,
1541        peer_id: &PeerId,
1542        topic_hash: &TopicHash,
1543        backoff: Option<u64>,
1544        always_update_backoff: bool,
1545    ) -> bool {
1546        let mut peer_removed = false;
1547        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1548            // remove the peer if it exists in the mesh
1549            peer_removed = peers.remove(peer_id);
1550            if peer_removed {
1551                tracing::debug!(
1552                    peer=%peer_id,
1553                    topic=%topic_hash,
1554                    "PRUNE: Removing peer from the mesh for topic"
1555                );
1556
1557                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1558                    peer_score.prune(peer_id, topic_hash.clone());
1559                }
1560
1561                // inform the handler
1562                peer_removed_from_mesh(
1563                    *peer_id,
1564                    topic_hash,
1565                    &self.mesh,
1566                    &mut self.events,
1567                    &self.connected_peers,
1568                );
1569            }
1570        }
1571        if always_update_backoff || peer_removed {
1572            let time = if let Some(backoff) = backoff {
1573                Duration::from_secs(backoff)
1574            } else {
1575                self.config.prune_backoff()
1576            };
1577            // is there a backoff specified by the peer? if so obey it.
1578            self.backoffs.update_backoff(topic_hash, peer_id, time);
1579        }
1580        peer_removed
1581    }
1582
1583    /// Handles PRUNE control messages. Removes peer from the mesh.
1584    fn handle_prune(
1585        &mut self,
1586        peer_id: &PeerId,
1587        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1588    ) {
1589        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1590        let (below_threshold, score) = self
1591            .peer_score
1592            .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1593        for (topic_hash, px, backoff) in prune_data {
1594            if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1595                #[cfg(feature = "metrics")]
1596                if let Some(m) = self.metrics.as_mut() {
1597                    m.peers_removed(&topic_hash, Churn::Prune, 1);
1598                }
1599            }
1600
1601            if self.mesh.contains_key(&topic_hash) {
1602                // connect to px peers
1603                if !px.is_empty() {
1604                    // we ignore PX from peers with insufficient score
1605                    if below_threshold {
1606                        tracing::debug!(
1607                            peer=%peer_id,
1608                            %score,
1609                            topic=%topic_hash,
1610                            "PRUNE: ignoring PX from peer with insufficient score"
1611                        );
1612                        continue;
1613                    }
1614
1615                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1616                    // know their multiaddr. Until SignedRecords are spec'd this
1617                    // remains a stub. By default `config.prune_peers()` is set to zero and
1618                    // this is skipped. If the user modifies this, this will only be able to
1619                    // dial already known peers (from an external discovery mechanism for
1620                    // example).
1621                    if self.config.prune_peers() > 0 {
1622                        self.px_connect(px);
1623                    }
1624                }
1625            }
1626        }
1627        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1628    }
1629
1630    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1631        let n = self.config.prune_peers();
1632        // Ignore peerInfo with no ID
1633        //
1634        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1635        // signed peer record?
1636        px.retain(|p| p.peer_id.is_some());
1637        if px.len() > n {
1638            // only use at most prune_peers many random peers
1639            let mut rng = thread_rng();
1640            px.partial_shuffle(&mut rng, n);
1641            px = px.into_iter().take(n).collect();
1642        }
1643
1644        for p in px {
1645            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1646            // it, see https://github.com/libp2p/specs/pull/217
1647            if let Some(peer_id) = p.peer_id {
1648                // mark as px peer
1649                self.px_peers.insert(peer_id);
1650
1651                // dial peer
1652                self.events.push_back(ToSwarm::Dial {
1653                    opts: DialOpts::peer_id(peer_id).build(),
1654                });
1655            }
1656        }
1657    }
1658
1659    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1660    /// checks.
1661    fn message_is_valid(
1662        &mut self,
1663        msg_id: &MessageId,
1664        raw_message: &mut RawMessage,
1665        propagation_source: &PeerId,
1666    ) -> bool {
1667        tracing::debug!(
1668            peer=%propagation_source,
1669            message_id=%msg_id,
1670            "Handling message from peer"
1671        );
1672
1673        // Reject any message from a blacklisted peer
1674        if self.blacklisted_peers.contains(propagation_source) {
1675            tracing::debug!(
1676                peer=%propagation_source,
1677                "Rejecting message from blacklisted peer"
1678            );
1679            self.gossip_promises
1680                .reject_message(msg_id, &RejectReason::BlackListedPeer);
1681            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1682                peer_score.reject_message(
1683                    propagation_source,
1684                    msg_id,
1685                    &raw_message.topic,
1686                    RejectReason::BlackListedPeer,
1687                );
1688            }
1689            return false;
1690        }
1691
1692        // Also reject any message that originated from a blacklisted peer
1693        if let Some(source) = raw_message.source.as_ref() {
1694            if self.blacklisted_peers.contains(source) {
1695                tracing::debug!(
1696                    peer=%propagation_source,
1697                    %source,
1698                    "Rejecting message from peer because of blacklisted source"
1699                );
1700                self.handle_invalid_message(
1701                    propagation_source,
1702                    &raw_message.topic,
1703                    Some(msg_id),
1704                    RejectReason::BlackListedSource,
1705                );
1706                return false;
1707            }
1708        }
1709
1710        // If we are not validating messages, assume this message is validated
1711        // This will allow the message to be gossiped without explicitly calling
1712        // `validate_message`.
1713        if !self.config.validate_messages() {
1714            raw_message.validated = true;
1715        }
1716
1717        // reject messages claiming to be from ourselves but not locally published
1718        let self_published = !self.config.allow_self_origin()
1719            && if let Some(own_id) = self.publish_config.get_own_id() {
1720                own_id != propagation_source
1721                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1722            } else {
1723                self.published_message_ids.contains(msg_id)
1724            };
1725
1726        if self_published {
1727            tracing::debug!(
1728                message_id=%msg_id,
1729                source=%propagation_source,
1730                "Dropping message claiming to be from self but forwarded from source"
1731            );
1732            self.handle_invalid_message(
1733                propagation_source,
1734                &raw_message.topic,
1735                Some(msg_id),
1736                RejectReason::SelfOrigin,
1737            );
1738            return false;
1739        }
1740
1741        true
1742    }
1743
1744    /// Handles a newly received [`RawMessage`].
1745    ///
1746    /// Forwards the message to all peers in the mesh.
1747    fn handle_received_message(
1748        &mut self,
1749        mut raw_message: RawMessage,
1750        propagation_source: &PeerId,
1751    ) {
1752        // Record the received metric
1753        #[cfg(feature = "metrics")]
1754        if let Some(metrics) = self.metrics.as_mut() {
1755            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1756        }
1757
1758        // Try and perform the data transform to the message. If it fails, consider it invalid.
1759        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1760            Ok(message) => message,
1761            Err(e) => {
1762                tracing::debug!("Invalid message. Transform error: {:?}", e);
1763                // Reject the message and return
1764                self.handle_invalid_message(
1765                    propagation_source,
1766                    &raw_message.topic,
1767                    None,
1768                    RejectReason::ValidationError(ValidationError::TransformFailed),
1769                );
1770                return;
1771            }
1772        };
1773
1774        // Calculate the message id on the transformed data.
1775        let msg_id = self.config.message_id(&message);
1776
1777        // Broadcast IDONTWANT messages
1778        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1779            let recipient_peers = self
1780                .mesh
1781                .get(&message.topic)
1782                .map(|mesh| mesh.iter())
1783                .unwrap_or_default()
1784                .copied()
1785                .chain(self.gossip_promises.peers_for_message(&msg_id))
1786                .filter(|peer_id| {
1787                    peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1788                })
1789                .collect::<Vec<PeerId>>();
1790
1791            for peer_id in recipient_peers {
1792                self.send_message(
1793                    peer_id,
1794                    RpcOut::IDontWant(IDontWant {
1795                        message_ids: vec![msg_id.clone()],
1796                    }),
1797                );
1798            }
1799        }
1800
1801        // Check the validity of the message
1802        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1803        // and instead continually penalize peers that repeatedly send this message.
1804        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1805            return;
1806        }
1807
1808        if !self.duplicate_cache.insert(msg_id.clone()) {
1809            tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1810            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1811                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1812            }
1813            self.mcache.observe_duplicate(&msg_id, propagation_source);
1814            return;
1815        }
1816
1817        tracing::debug!(
1818            message_id=%msg_id,
1819            "Put message in duplicate_cache and resolve promises"
1820        );
1821
1822        // Record the received message with the metrics
1823        #[cfg(feature = "metrics")]
1824        if let Some(metrics) = self.metrics.as_mut() {
1825            metrics.msg_recvd(&message.topic);
1826        }
1827
1828        // Tells score that message arrived (but is maybe not fully validated yet).
1829        // Consider the message as delivered for gossip promises.
1830        self.gossip_promises.message_delivered(&msg_id);
1831
1832        // Tells score that message arrived (but is maybe not fully validated yet).
1833        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1834            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1835        }
1836
1837        // Add the message to our memcache
1838        self.mcache.put(&msg_id, raw_message.clone());
1839
1840        // Dispatch the message to the user if we are subscribed to any of the topics
1841        #[allow(
1842            clippy::map_entry,
1843            reason = "False positive, see rust-lang/rust-clippy#14449."
1844        )]
1845        if self.mesh.contains_key(&message.topic) {
1846            tracing::debug!("Sending received message to user");
1847            self.events
1848                .push_back(ToSwarm::GenerateEvent(Event::Message {
1849                    propagation_source: *propagation_source,
1850                    message_id: msg_id.clone(),
1851                    message,
1852                }));
1853        } else {
1854            tracing::debug!(
1855                topic=%message.topic,
1856                "Received message on a topic we are not subscribed to"
1857            );
1858            return;
1859        }
1860
1861        // forward the message to mesh peers, if no validation is required
1862        if !self.config.validate_messages() {
1863            self.forward_msg(
1864                &msg_id,
1865                raw_message,
1866                Some(propagation_source),
1867                HashSet::new(),
1868            );
1869            tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1870        }
1871    }
1872
1873    // Handles invalid messages received.
1874    fn handle_invalid_message(
1875        &mut self,
1876        propagation_source: &PeerId,
1877        topic_hash: &TopicHash,
1878        message_id: Option<&MessageId>,
1879        reject_reason: RejectReason,
1880    ) {
1881        #[cfg(feature = "metrics")]
1882        if let Some(metrics) = self.metrics.as_mut() {
1883            metrics.register_invalid_message(topic_hash);
1884        }
1885        if let Some(msg_id) = message_id {
1886            // Valid transformation without peer scoring
1887            self.gossip_promises.reject_message(msg_id, &reject_reason);
1888        }
1889        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1890            // The compiler will optimize this pattern-matching
1891            if let Some(msg_id) = message_id {
1892                // The message itself is valid, but is from a banned peer or
1893                // claiming to be self-origin but is actually forwarded from other peers.
1894                peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1895            } else {
1896                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1897                // advertising this message via an IHAVE and it's invalid it will be double
1898                // penalized, one for sending us an invalid and again for breaking a promise.
1899                peer_score.reject_invalid_message(propagation_source, topic_hash);
1900            }
1901        }
1902    }
1903
1904    /// Handles received subscriptions.
1905    fn handle_received_subscriptions(
1906        &mut self,
1907        subscriptions: &[Subscription],
1908        propagation_source: &PeerId,
1909    ) {
1910        tracing::debug!(
1911            source=%propagation_source,
1912            "Handling subscriptions: {:?}",
1913            subscriptions,
1914        );
1915
1916        let mut unsubscribed_peers = Vec::new();
1917
1918        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1919            tracing::error!(
1920                peer=%propagation_source,
1921                "Subscription by unknown peer"
1922            );
1923            return;
1924        };
1925
1926        // Collect potential graft topics for the peer.
1927        let mut topics_to_graft = Vec::new();
1928
1929        // Notify the application about the subscription, after the grafts are sent.
1930        let mut application_event = Vec::new();
1931
1932        let filtered_topics = match self
1933            .subscription_filter
1934            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1935        {
1936            Ok(topics) => topics,
1937            Err(s) => {
1938                tracing::error!(
1939                    peer=%propagation_source,
1940                    "Subscription filter error: {}; ignoring RPC from peer",
1941                    s
1942                );
1943                return;
1944            }
1945        };
1946
1947        for subscription in filtered_topics {
1948            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1949            let topic_hash = &subscription.topic_hash;
1950
1951            match subscription.action {
1952                SubscriptionAction::Subscribe => {
1953                    if peer.topics.insert(topic_hash.clone()) {
1954                        tracing::debug!(
1955                            peer=%propagation_source,
1956                            topic=%topic_hash,
1957                            "SUBSCRIPTION: Adding gossip peer to topic"
1958                        );
1959
1960                        #[cfg(feature = "metrics")]
1961                        if let Some(m) = self.metrics.as_mut() {
1962                            m.inc_topic_peers(topic_hash);
1963                        }
1964                    }
1965
1966                    // if the mesh needs peers add the peer to the mesh
1967                    if !self.explicit_peers.contains(propagation_source)
1968                        && peer.kind.is_gossipsub()
1969                        && !self
1970                            .peer_score
1971                            .below_threshold(propagation_source, |_| 0.0)
1972                            .0
1973                        && !self
1974                            .backoffs
1975                            .is_backoff_with_slack(topic_hash, propagation_source)
1976                    {
1977                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1978                            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1979
1980                            if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1981                                tracing::debug!(
1982                                    peer=%propagation_source,
1983                                    topic=%topic_hash,
1984                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1985                                );
1986                                #[cfg(feature = "metrics")]
1987                                if let Some(m) = self.metrics.as_mut() {
1988                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1989                                }
1990                                // send graft to the peer
1991                                tracing::debug!(
1992                                    peer=%propagation_source,
1993                                    topic=%topic_hash,
1994                                    "Sending GRAFT to peer for topic"
1995                                );
1996                                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1997                                    peer_score.graft(propagation_source, topic_hash.clone());
1998                                }
1999                                topics_to_graft.push(topic_hash.clone());
2000                            }
2001                        }
2002                    }
2003                    // generates a subscription event to be polled
2004                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
2005                        peer_id: *propagation_source,
2006                        topic: topic_hash.clone(),
2007                    }));
2008                }
2009                SubscriptionAction::Unsubscribe => {
2010                    if peer.topics.remove(topic_hash) {
2011                        tracing::debug!(
2012                            peer=%propagation_source,
2013                            topic=%topic_hash,
2014                            "SUBSCRIPTION: Removing gossip peer from topic"
2015                        );
2016
2017                        #[cfg(feature = "metrics")]
2018                        if let Some(m) = self.metrics.as_mut() {
2019                            m.dec_topic_peers(topic_hash);
2020                        }
2021                    }
2022
2023                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2024                    // generate an unsubscribe event to be polled
2025                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2026                        peer_id: *propagation_source,
2027                        topic: topic_hash.clone(),
2028                    }));
2029                }
2030            }
2031        }
2032
2033        // remove unsubscribed peers from the mesh and fanout if they exist there.
2034        for (peer_id, topic_hash) in unsubscribed_peers {
2035            self.fanout
2036                .get_mut(&topic_hash)
2037                .map(|peers| peers.remove(&peer_id));
2038            if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2039                #[cfg(feature = "metrics")]
2040                if let Some(m) = self.metrics.as_mut() {
2041                    m.peers_removed(&topic_hash, Churn::Unsub, 1);
2042                }
2043            };
2044        }
2045
2046        // Potentially inform the handler if we have added this peer to a mesh for the first time.
2047        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2048        if !topics_joined.is_empty() {
2049            peer_added_to_mesh(
2050                *propagation_source,
2051                topics_joined,
2052                &self.mesh,
2053                &mut self.events,
2054                &self.connected_peers,
2055            );
2056        }
2057
2058        // If we need to send grafts to peer, do so immediately, rather than waiting for the
2059        // heartbeat.
2060        for topic_hash in topics_to_graft.into_iter() {
2061            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2062        }
2063
2064        // Notify the application of the subscriptions
2065        for event in application_event {
2066            self.events.push_back(event);
2067        }
2068
2069        tracing::trace!(
2070            source=%propagation_source,
2071            "Completed handling subscriptions from source"
2072        );
2073    }
2074
2075    /// Applies penalties to peers that did not respond to our IWANT requests.
2076    fn apply_iwant_penalties(&mut self) {
2077        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2078            for (peer, count) in self.gossip_promises.get_broken_promises() {
2079                peer_score.add_penalty(&peer, count);
2080                #[cfg(feature = "metrics")]
2081                if let Some(metrics) = self.metrics.as_mut() {
2082                    metrics.register_score_penalty(Penalty::BrokenPromise);
2083                }
2084            }
2085        }
2086    }
2087
2088    /// Heartbeat function which shifts the memcache and updates the mesh.
2089    fn heartbeat(&mut self) {
2090        tracing::debug!("Starting heartbeat");
2091        #[cfg(feature = "metrics")]
2092        let start = Instant::now();
2093
2094        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2095        // before we add all the gossip from this heartbeat in order to gain a true measure of
2096        // steady-state size of the queues.
2097        #[cfg(feature = "metrics")]
2098        if let Some(m) = &mut self.metrics {
2099            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2100                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2101                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2102            }
2103        }
2104
2105        self.heartbeat_ticks += 1;
2106
2107        let mut to_graft = HashMap::new();
2108        let mut to_prune = HashMap::new();
2109        let mut no_px = HashSet::new();
2110
2111        // clean up expired backoffs
2112        self.backoffs.heartbeat();
2113
2114        // clean up ihave counters
2115        self.count_sent_iwant.clear();
2116        self.count_received_ihave.clear();
2117
2118        // apply iwant penalties
2119        self.apply_iwant_penalties();
2120
2121        // check connections to explicit peers
2122        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2123            for p in self.explicit_peers.clone() {
2124                self.check_explicit_peer_connection(&p);
2125            }
2126        }
2127
2128        // Cache the scores of all connected peers, and record metrics for current penalties.
2129        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2130        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2131            for peer_id in self.connected_peers.keys() {
2132                #[allow(unused_variables)]
2133                let report = scores
2134                    .entry(peer_id)
2135                    .or_insert_with(|| peer_score.score_report(peer_id));
2136
2137                #[cfg(feature = "metrics")]
2138                if let Some(metrics) = self.metrics.as_mut() {
2139                    for penalty in &report.penalties {
2140                        metrics.register_score_penalty(*penalty);
2141                    }
2142                }
2143            }
2144        }
2145
2146        // maintain the mesh for each topic
2147        for (topic_hash, peers) in self.mesh.iter_mut() {
2148            let explicit_peers = &self.explicit_peers;
2149            let backoffs = &self.backoffs;
2150
2151            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2152            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2153            let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2154            let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2155
2156            // drop all peers with negative score, without PX
2157            // if there is at some point a stable retain method for BTreeSet the following can be
2158            // written more efficiently with retain.
2159            let mut to_remove_peers = Vec::new();
2160            for peer_id in peers.iter() {
2161                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2162
2163                // Record the score per mesh
2164                #[cfg(feature = "metrics")]
2165                if let Some(metrics) = self.metrics.as_mut() {
2166                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2167                }
2168
2169                if peer_score < 0.0 {
2170                    tracing::debug!(
2171                        peer=%peer_id,
2172                        score=%peer_score,
2173                        topic=%topic_hash,
2174                        "HEARTBEAT: Prune peer with negative score"
2175                    );
2176
2177                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2178                    current_topic.push(topic_hash.clone());
2179                    no_px.insert(*peer_id);
2180                    to_remove_peers.push(*peer_id);
2181                }
2182            }
2183
2184            #[cfg(feature = "metrics")]
2185            if let Some(m) = self.metrics.as_mut() {
2186                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2187            }
2188
2189            for peer_id in to_remove_peers {
2190                peers.remove(&peer_id);
2191            }
2192
2193            // too little peers - add some
2194            if peers.len() < mesh_n_low {
2195                tracing::debug!(
2196                    topic=%topic_hash,
2197                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2198                    peers.len(),
2199                    self.config.mesh_n()
2200                );
2201                // not enough peers - get mesh_n - current_length more
2202                let desired_peers = mesh_n - peers.len();
2203                let peer_list =
2204                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2205                        !peers.contains(peer)
2206                            && !explicit_peers.contains(peer)
2207                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2208                            && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2209                    });
2210                for peer in &peer_list {
2211                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2212                    current_topic.push(topic_hash.clone());
2213                }
2214                // update the mesh
2215                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2216                #[cfg(feature = "metrics")]
2217                if let Some(m) = self.metrics.as_mut() {
2218                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2219                }
2220                peers.extend(peer_list);
2221            }
2222
2223            // too many peers - remove some
2224            if peers.len() > mesh_n_high {
2225                tracing::debug!(
2226                    topic=%topic_hash,
2227                    "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2228                    peers.len(),
2229                    self.config.mesh_n()
2230                );
2231                let excess_peer_no = peers.len() - mesh_n;
2232
2233                // shuffle the peers and then sort by score ascending beginning with the worst
2234                let mut rng = thread_rng();
2235                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2236                shuffled.shuffle(&mut rng);
2237                shuffled.sort_by(|p1, p2| {
2238                    let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2239                    let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2240
2241                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2242                });
2243                // shuffle everything except the last retain_scores many peers (the best ones)
2244                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2245
2246                // count total number of outbound peers
2247                let mut outbound = shuffled
2248                    .iter()
2249                    .filter(|peer_id| {
2250                        self.connected_peers
2251                            .get(peer_id)
2252                            .is_some_and(|peer| peer.outbound)
2253                    })
2254                    .count();
2255
2256                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2257                // them to to_prune
2258                let mut removed = 0;
2259                for peer in shuffled {
2260                    if removed == excess_peer_no {
2261                        break;
2262                    }
2263                    if self
2264                        .connected_peers
2265                        .get(&peer)
2266                        .is_some_and(|peer| peer.outbound)
2267                    {
2268                        if outbound <= mesh_outbound_min {
2269                            // do not remove anymore outbound peers
2270                            continue;
2271                        }
2272                        // an outbound peer gets removed
2273                        outbound -= 1;
2274                    }
2275
2276                    // remove the peer
2277                    peers.remove(&peer);
2278                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2279                    current_topic.push(topic_hash.clone());
2280                    removed += 1;
2281                }
2282
2283                #[cfg(feature = "metrics")]
2284                if let Some(m) = self.metrics.as_mut() {
2285                    m.peers_removed(topic_hash, Churn::Excess, removed)
2286                }
2287            }
2288
2289            // do we have enough outbound peers?
2290            if peers.len() >= mesh_n_low {
2291                // count number of outbound peers we have
2292                let outbound = peers
2293                    .iter()
2294                    .filter(|peer_id| {
2295                        self.connected_peers
2296                            .get(peer_id)
2297                            .is_some_and(|peer| peer.outbound)
2298                    })
2299                    .count();
2300
2301                // if we have not enough outbound peers, graft to some new outbound peers
2302                if outbound < mesh_outbound_min {
2303                    let needed = mesh_outbound_min - outbound;
2304                    let peer_list =
2305                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2306                            !peers.contains(peer_id)
2307                                && !explicit_peers.contains(peer_id)
2308                                && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2309                                && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2310                                && self
2311                                    .connected_peers
2312                                    .get(peer_id)
2313                                    .is_some_and(|peer| peer.outbound)
2314                        });
2315
2316                    for peer in &peer_list {
2317                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2318                        current_topic.push(topic_hash.clone());
2319                    }
2320                    // update the mesh
2321                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2322                    #[cfg(feature = "metrics")]
2323                    if let Some(m) = self.metrics.as_mut() {
2324                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2325                    }
2326                    peers.extend(peer_list);
2327                }
2328            }
2329
2330            // should we try to improve the mesh with opportunistic grafting?
2331            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2332                && peers.len() > 1
2333            {
2334                if let PeerScoreState::Active(peer_score) = &self.peer_score {
2335                    // Opportunistic grafting works as follows: we check the median score of peers
2336                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2337                    // select a few peers at random with score over the median.
2338                    // The intention is to (slowly) improve an underperforming mesh by introducing
2339                    // good scoring peers that may have been gossiping at us. This allows us to
2340                    // get out of sticky situations where we are stuck with poor peers and also
2341                    // recover from churn of good peers.
2342
2343                    // now compute the median peer score in the mesh
2344                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2345                    peers_by_score.sort_by(|p1, p2| {
2346                        let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2347                        let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2348                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2349                    });
2350
2351                    let middle = peers_by_score.len() / 2;
2352                    let median = if peers_by_score.len() % 2 == 0 {
2353                        let sub_middle_peer = *peers_by_score
2354                            .get(middle - 1)
2355                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2356                        let sub_middle_score = scores
2357                            .get(sub_middle_peer)
2358                            .map(|r| r.score)
2359                            .unwrap_or_default();
2360                        let middle_peer =
2361                            *peers_by_score.get(middle).expect("middle < vector length");
2362                        let middle_score =
2363                            scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2364
2365                        (sub_middle_score + middle_score) * 0.5
2366                    } else {
2367                        scores
2368                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2369                            .map(|r| r.score)
2370                            .unwrap_or_default()
2371                    };
2372
2373                    // if the median score is below the threshold, select a better peer (if any) and
2374                    // GRAFT
2375                    if median < peer_score.thresholds.opportunistic_graft_threshold {
2376                        let peer_list = get_random_peers(
2377                            &self.connected_peers,
2378                            topic_hash,
2379                            self.config.opportunistic_graft_peers(),
2380                            |peer_id| {
2381                                !peers.contains(peer_id)
2382                                    && !explicit_peers.contains(peer_id)
2383                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2384                                    && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2385                                        > median
2386                            },
2387                        );
2388                        for peer in &peer_list {
2389                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2390                            current_topic.push(topic_hash.clone());
2391                        }
2392                        // update the mesh
2393                        tracing::debug!(
2394                            topic=%topic_hash,
2395                            "Opportunistically graft in topic with peers {:?}",
2396                            peer_list
2397                        );
2398                        #[cfg(feature = "metrics")]
2399                        if let Some(m) = self.metrics.as_mut() {
2400                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2401                        }
2402                        peers.extend(peer_list);
2403                    }
2404                }
2405            }
2406            // Register the final count of peers in the mesh
2407            #[cfg(feature = "metrics")]
2408            if let Some(m) = self.metrics.as_mut() {
2409                m.set_mesh_peers(topic_hash, peers.len())
2410            }
2411        }
2412
2413        // remove expired fanout topics
2414        {
2415            let fanout = &mut self.fanout; // help the borrow checker
2416            let fanout_ttl = self.config.fanout_ttl();
2417            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2418                if *last_pub_time + fanout_ttl < Instant::now() {
2419                    tracing::debug!(
2420                        topic=%topic_hash,
2421                        "HEARTBEAT: Fanout topic removed due to timeout"
2422                    );
2423                    fanout.remove(topic_hash);
2424                    return false;
2425                }
2426                true
2427            });
2428        }
2429
2430        // maintain fanout
2431        // check if our peers are still a part of the topic
2432        for (topic_hash, peers) in self.fanout.iter_mut() {
2433            let mut to_remove_peers = Vec::new();
2434            let publish_threshold = match &self.peer_score {
2435                PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2436                _ => 0.0,
2437            };
2438            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2439
2440            for peer_id in peers.iter() {
2441                // is the peer still subscribed to the topic?
2442                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2443                match self.connected_peers.get(peer_id) {
2444                    Some(peer) => {
2445                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2446                            tracing::debug!(
2447                                topic=%topic_hash,
2448                                "HEARTBEAT: Peer removed from fanout for topic"
2449                            );
2450                            to_remove_peers.push(*peer_id);
2451                        }
2452                    }
2453                    None => {
2454                        // remove if the peer has disconnected
2455                        to_remove_peers.push(*peer_id);
2456                    }
2457                }
2458            }
2459            for to_remove in to_remove_peers {
2460                peers.remove(&to_remove);
2461            }
2462
2463            // not enough peers
2464            if peers.len() < mesh_n {
2465                tracing::debug!(
2466                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2467                    peers.len(),
2468                    mesh_n
2469                );
2470                let needed_peers = mesh_n - peers.len();
2471                let explicit_peers = &self.explicit_peers;
2472                let new_peers =
2473                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2474                        !peers.contains(peer_id)
2475                            && !explicit_peers.contains(peer_id)
2476                            && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2477                                < publish_threshold
2478                    });
2479                peers.extend(new_peers);
2480            }
2481        }
2482
2483        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2484            tracing::trace!("Mesh message deliveries: {:?}", {
2485                self.mesh
2486                    .iter()
2487                    .map(|(t, peers)| {
2488                        (
2489                            t.clone(),
2490                            peers
2491                                .iter()
2492                                .map(|p| {
2493                                    (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2494                                })
2495                                .collect::<HashMap<PeerId, f64>>(),
2496                        )
2497                    })
2498                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2499            })
2500        }
2501
2502        self.emit_gossip();
2503
2504        // send graft/prunes
2505        if !to_graft.is_empty() | !to_prune.is_empty() {
2506            self.send_graft_prune(to_graft, to_prune, no_px);
2507        }
2508
2509        // shift the memcache
2510        self.mcache.shift();
2511
2512        // Report expired messages
2513        for (peer_id, failed_messages) in self.failed_messages.drain() {
2514            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2515            self.events
2516                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2517                    peer_id,
2518                    failed_messages,
2519                }));
2520        }
2521        self.failed_messages.shrink_to_fit();
2522
2523        // Flush stale IDONTWANTs.
2524        for peer in self.connected_peers.values_mut() {
2525            while let Some((_front, instant)) = peer.dont_send.front() {
2526                if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2527                    break;
2528                } else {
2529                    peer.dont_send.pop_front();
2530                }
2531            }
2532        }
2533
2534        tracing::debug!("Completed Heartbeat");
2535        #[cfg(feature = "metrics")]
2536        if let Some(metrics) = self.metrics.as_mut() {
2537            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2538            metrics.observe_heartbeat_duration(duration);
2539        }
2540    }
2541
2542    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2543    /// and fanout peers
2544    fn emit_gossip(&mut self) {
2545        let mut rng = thread_rng();
2546        let mut messages = Vec::new();
2547        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2548            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2549            if message_ids.is_empty() {
2550                continue;
2551            }
2552
2553            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2554            if message_ids.len() > self.config.max_ihave_length() {
2555                // we do the truncation (with shuffling) per peer below
2556                tracing::debug!(
2557                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2558                    message_ids.len()
2559                );
2560            } else {
2561                // shuffle to emit in random order
2562                message_ids.shuffle(&mut rng);
2563            }
2564
2565            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2566            let n_map = |m| {
2567                max(
2568                    self.config.gossip_lazy(),
2569                    (self.config.gossip_factor() * m as f64) as usize,
2570                )
2571            };
2572            // get gossip_lazy random peers
2573            let to_msg_peers =
2574                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2575                    !peers.contains(peer)
2576                        && !self.explicit_peers.contains(peer)
2577                        && !self
2578                            .peer_score
2579                            .below_threshold(peer, |ts| ts.gossip_threshold)
2580                            .0
2581                });
2582
2583            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2584
2585            for peer_id in to_msg_peers {
2586                let mut peer_message_ids = message_ids.clone();
2587
2588                if peer_message_ids.len() > self.config.max_ihave_length() {
2589                    // We do this per peer so that we emit a different set for each peer.
2590                    // we have enough redundancy in the system that this will significantly increase
2591                    // the message coverage when we do truncate.
2592                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2593                    peer_message_ids.truncate(self.config.max_ihave_length());
2594                }
2595
2596                // send an IHAVE message
2597                messages.push((
2598                    peer_id,
2599                    RpcOut::IHave(IHave {
2600                        topic_hash: topic_hash.clone(),
2601                        message_ids: peer_message_ids,
2602                    }),
2603                ));
2604            }
2605        }
2606        for (peer_id, message) in messages {
2607            self.send_message(peer_id, message);
2608        }
2609    }
2610
2611    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2612    /// messages.
2613    fn send_graft_prune(
2614        &mut self,
2615        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2616        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2617        no_px: HashSet<PeerId>,
2618    ) {
2619        // handle the grafts and overlapping prunes per peer
2620        for (peer_id, topics) in to_graft.into_iter() {
2621            for topic in &topics {
2622                // inform scoring of graft
2623                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2624                    peer_score.graft(&peer_id, topic.clone());
2625                }
2626
2627                // inform the handler of the peer being added to the mesh
2628                // If the peer did not previously exist in any mesh, inform the handler
2629                peer_added_to_mesh(
2630                    peer_id,
2631                    vec![topic],
2632                    &self.mesh,
2633                    &mut self.events,
2634                    &self.connected_peers,
2635                );
2636            }
2637            let rpc_msgs = topics.iter().map(|topic_hash| {
2638                RpcOut::Graft(Graft {
2639                    topic_hash: topic_hash.clone(),
2640                })
2641            });
2642
2643            // If there are prunes associated with the same peer add them.
2644            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2645            // It therefore must be in at least one mesh and we do not need to inform the handler
2646            // of its removal from another.
2647
2648            // The following prunes are not due to unsubscribing.
2649            let prune_msgs = to_prune
2650                .remove(&peer_id)
2651                .into_iter()
2652                .flatten()
2653                .map(|topic_hash| {
2654                    let prune = self.make_prune(
2655                        &topic_hash,
2656                        &peer_id,
2657                        self.config.do_px() && !no_px.contains(&peer_id),
2658                        false,
2659                    );
2660                    RpcOut::Prune(prune)
2661                });
2662
2663            // send the rpc messages
2664            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2665                self.send_message(peer_id, msg);
2666            }
2667        }
2668
2669        // handle the remaining prunes
2670        // The following prunes are not due to unsubscribing.
2671        for (peer_id, topics) in to_prune.iter() {
2672            for topic_hash in topics {
2673                let prune = self.make_prune(
2674                    topic_hash,
2675                    peer_id,
2676                    self.config.do_px() && !no_px.contains(peer_id),
2677                    false,
2678                );
2679                self.send_message(*peer_id, RpcOut::Prune(prune));
2680
2681                // inform the handler
2682                peer_removed_from_mesh(
2683                    *peer_id,
2684                    topic_hash,
2685                    &self.mesh,
2686                    &mut self.events,
2687                    &self.connected_peers,
2688                );
2689            }
2690        }
2691    }
2692
2693    /// Helper function which forwards a message to mesh\[topic\] peers.
2694    ///
2695    /// Returns true if at least one peer was messaged.
2696    fn forward_msg(
2697        &mut self,
2698        msg_id: &MessageId,
2699        message: RawMessage,
2700        propagation_source: Option<&PeerId>,
2701        originating_peers: HashSet<PeerId>,
2702    ) -> bool {
2703        // message is fully validated inform peer_score
2704        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2705            if let Some(peer) = propagation_source {
2706                peer_score.deliver_message(peer, msg_id, &message.topic);
2707            }
2708        }
2709
2710        tracing::debug!(message_id=%msg_id, "Forwarding message");
2711        let mut recipient_peers = HashSet::new();
2712
2713        // Populate the recipient peers mapping
2714
2715        // Add explicit peers and floodsub peers
2716        for (peer_id, peer) in &self.connected_peers {
2717            if Some(peer_id) != propagation_source
2718                && !originating_peers.contains(peer_id)
2719                && Some(peer_id) != message.source.as_ref()
2720                && peer.topics.contains(&message.topic)
2721                && (self.explicit_peers.contains(peer_id)
2722                    || (peer.kind == PeerKind::Floodsub
2723                        && !self
2724                            .peer_score
2725                            .below_threshold(peer_id, |ts| ts.publish_threshold)
2726                            .0))
2727            {
2728                recipient_peers.insert(*peer_id);
2729            }
2730        }
2731
2732        // add mesh peers
2733        let topic = &message.topic;
2734        // mesh
2735        if let Some(mesh_peers) = self.mesh.get(topic) {
2736            for peer_id in mesh_peers {
2737                if Some(peer_id) != propagation_source
2738                    && !originating_peers.contains(peer_id)
2739                    && Some(peer_id) != message.source.as_ref()
2740                {
2741                    recipient_peers.insert(*peer_id);
2742                }
2743            }
2744        }
2745
2746        if recipient_peers.is_empty() {
2747            return false;
2748        }
2749
2750        // forward the message to peers
2751        for peer_id in recipient_peers.iter() {
2752            if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2753                if peer.dont_send.contains_key(msg_id) {
2754                    tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2755                    continue;
2756                }
2757
2758                tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2759
2760                self.send_message(
2761                    *peer_id,
2762                    RpcOut::Forward {
2763                        message: message.clone(),
2764                        timeout: Delay::new(self.config.forward_queue_duration()),
2765                    },
2766                );
2767            }
2768        }
2769        tracing::debug!("Completed forwarding message");
2770        true
2771    }
2772
2773    /// Constructs a [`RawMessage`] performing message signing if required.
2774    pub(crate) fn build_raw_message(
2775        &mut self,
2776        topic: TopicHash,
2777        data: Vec<u8>,
2778    ) -> Result<RawMessage, PublishError> {
2779        match &mut self.publish_config {
2780            PublishConfig::Signing {
2781                ref keypair,
2782                author,
2783                inline_key,
2784                last_seq_no,
2785            } => {
2786                let sequence_number = last_seq_no.next();
2787
2788                let signature = {
2789                    let message = proto::Message {
2790                        from: Some(author.to_bytes()),
2791                        data: Some(data.clone()),
2792                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2793                        topic: topic.clone().into_string(),
2794                        signature: None,
2795                        key: None,
2796                    };
2797
2798                    let mut buf = Vec::with_capacity(message.get_size());
2799                    let mut writer = Writer::new(&mut buf);
2800
2801                    message
2802                        .write_message(&mut writer)
2803                        .expect("Encoding to succeed");
2804
2805                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2806                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2807                    signature_bytes.extend_from_slice(&buf);
2808                    Some(keypair.sign(&signature_bytes)?)
2809                };
2810
2811                Ok(RawMessage {
2812                    source: Some(*author),
2813                    data,
2814                    // To be interoperable with the go-implementation this is treated as a 64-bit
2815                    // big-endian uint.
2816                    sequence_number: Some(sequence_number),
2817                    topic,
2818                    signature,
2819                    key: inline_key.clone(),
2820                    validated: true, // all published messages are valid
2821                })
2822            }
2823            PublishConfig::Author(peer_id) => {
2824                Ok(RawMessage {
2825                    source: Some(*peer_id),
2826                    data,
2827                    // To be interoperable with the go-implementation this is treated as a 64-bit
2828                    // big-endian uint.
2829                    sequence_number: Some(rand::random()),
2830                    topic,
2831                    signature: None,
2832                    key: None,
2833                    validated: true, // all published messages are valid
2834                })
2835            }
2836            PublishConfig::RandomAuthor => {
2837                Ok(RawMessage {
2838                    source: Some(PeerId::random()),
2839                    data,
2840                    // To be interoperable with the go-implementation this is treated as a 64-bit
2841                    // big-endian uint.
2842                    sequence_number: Some(rand::random()),
2843                    topic,
2844                    signature: None,
2845                    key: None,
2846                    validated: true, // all published messages are valid
2847                })
2848            }
2849            PublishConfig::Anonymous => {
2850                Ok(RawMessage {
2851                    source: None,
2852                    data,
2853                    // To be interoperable with the go-implementation this is treated as a 64-bit
2854                    // big-endian uint.
2855                    sequence_number: None,
2856                    topic,
2857                    signature: None,
2858                    key: None,
2859                    validated: true, // all published messages are valid
2860                })
2861            }
2862        }
2863    }
2864
2865    /// Send a [`RpcOut`] message to a peer.
2866    ///
2867    /// Returns `true` if sending was successful, `false` otherwise.
2868    /// The method will update the peer score and failed message counter if
2869    /// sending the message failed due to the channel to the connection handler being
2870    /// full (which indicates a slow peer).
2871    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2872        #[cfg(feature = "metrics")]
2873        if let Some(m) = self.metrics.as_mut() {
2874            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2875                // register bytes sent on the internal metrics.
2876                m.msg_sent(&message.topic, message.raw_protobuf_len());
2877            }
2878        }
2879
2880        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2881            tracing::error!(peer = %peer_id,
2882                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2883            return false;
2884        };
2885
2886        if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2887            tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2888            return false;
2889        }
2890
2891        // Try sending the message to the connection handler.
2892        match peer.sender.send_message(rpc) {
2893            Ok(()) => true,
2894            Err(rpc) => {
2895                // Sending failed because the channel is full.
2896                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2897
2898                // Update failed message counter.
2899                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2900                match rpc {
2901                    RpcOut::Publish { .. } => {
2902                        failed_messages.priority += 1;
2903                        failed_messages.publish += 1;
2904                    }
2905                    RpcOut::Forward { .. } => {
2906                        failed_messages.non_priority += 1;
2907                        failed_messages.forward += 1;
2908                    }
2909                    RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2910                        failed_messages.non_priority += 1;
2911                    }
2912                    RpcOut::Graft(_)
2913                    | RpcOut::Prune(_)
2914                    | RpcOut::Subscribe(_)
2915                    | RpcOut::Unsubscribe(_) => {
2916                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2917                    }
2918                }
2919
2920                // Update peer score.
2921                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2922                    peer_score.failed_message_slow_peer(&peer_id);
2923                }
2924
2925                false
2926            }
2927        }
2928    }
2929
2930    fn on_connection_established(
2931        &mut self,
2932        ConnectionEstablished {
2933            peer_id,
2934            endpoint,
2935            other_established,
2936            ..
2937        }: ConnectionEstablished,
2938    ) {
2939        // Add the IP to the peer scoring system
2940        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2941            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2942                peer_score.add_ip(&peer_id, ip);
2943            } else {
2944                tracing::trace!(
2945                    peer=%peer_id,
2946                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2947                    endpoint
2948                )
2949            }
2950        }
2951
2952        if other_established > 0 {
2953            return; // Not our first connection to this peer, hence nothing to do.
2954        }
2955
2956        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2957            peer_score.add_peer(peer_id);
2958        }
2959
2960        // Ignore connections from blacklisted peers.
2961        if self.blacklisted_peers.contains(&peer_id) {
2962            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2963            return;
2964        }
2965
2966        tracing::debug!(peer=%peer_id, "New peer connected");
2967        // We need to send our subscriptions to the newly-connected node.
2968        for topic_hash in self.mesh.clone().into_keys() {
2969            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2970        }
2971    }
2972
2973    fn on_connection_closed(
2974        &mut self,
2975        ConnectionClosed {
2976            peer_id,
2977            connection_id,
2978            endpoint,
2979            remaining_established,
2980            ..
2981        }: ConnectionClosed,
2982    ) {
2983        // Remove IP from peer scoring system
2984        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2985            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2986                peer_score.remove_ip(&peer_id, &ip);
2987            } else {
2988                tracing::trace!(
2989                    peer=%peer_id,
2990                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2991                    endpoint
2992                )
2993            }
2994        }
2995
2996        if remaining_established != 0 {
2997            // Remove the connection from the list
2998            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2999                let index = peer
3000                    .connections
3001                    .iter()
3002                    .position(|v| v == &connection_id)
3003                    .expect("Previously established connection to peer must be present");
3004                peer.connections.remove(index);
3005
3006                // If there are more connections and this peer is in a mesh, inform the first
3007                // connection handler.
3008                if !peer.connections.is_empty() {
3009                    for topic in &peer.topics {
3010                        if let Some(mesh_peers) = self.mesh.get(topic) {
3011                            if mesh_peers.contains(&peer_id) {
3012                                self.events.push_back(ToSwarm::NotifyHandler {
3013                                    peer_id,
3014                                    event: HandlerIn::JoinedMesh,
3015                                    handler: NotifyHandler::One(peer.connections[0]),
3016                                });
3017                                break;
3018                            }
3019                        }
3020                    }
3021                }
3022            }
3023        } else {
3024            // remove from mesh, topic_peers, peer_topic and the fanout
3025            tracing::debug!(peer=%peer_id, "Peer disconnected");
3026            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3027                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3028                return;
3029            };
3030
3031            // remove peer from all mappings
3032            for topic in &connected_peer.topics {
3033                // check the mesh for the topic
3034                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3035                    // check if the peer is in the mesh and remove it
3036                    if mesh_peers.remove(&peer_id) {
3037                        #[cfg(feature = "metrics")]
3038                        if let Some(m) = self.metrics.as_mut() {
3039                            m.peers_removed(topic, Churn::Dc, 1);
3040                            m.set_mesh_peers(topic, mesh_peers.len());
3041                        }
3042                    };
3043                }
3044
3045                #[cfg(feature = "metrics")]
3046                if let Some(m) = self.metrics.as_mut() {
3047                    m.dec_topic_peers(topic);
3048                }
3049
3050                // remove from fanout
3051                self.fanout
3052                    .get_mut(topic)
3053                    .map(|peers| peers.remove(&peer_id));
3054            }
3055
3056            // Forget px and outbound status for this peer
3057            self.px_peers.remove(&peer_id);
3058
3059            // If metrics are enabled, register the disconnection of a peer based on its protocol.
3060            #[cfg(feature = "metrics")]
3061            if let Some(metrics) = self.metrics.as_mut() {
3062                metrics.peer_protocol_disconnected(connected_peer.kind);
3063            }
3064
3065            self.connected_peers.remove(&peer_id);
3066
3067            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3068                peer_score.remove_peer(&peer_id);
3069            }
3070        }
3071    }
3072
3073    fn on_address_change(
3074        &mut self,
3075        AddressChange {
3076            peer_id,
3077            old: endpoint_old,
3078            new: endpoint_new,
3079            ..
3080        }: AddressChange,
3081    ) {
3082        // Exchange IP in peer scoring system
3083        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3084            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3085                peer_score.remove_ip(&peer_id, &ip);
3086            } else {
3087                tracing::trace!(
3088                    peer=%&peer_id,
3089                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3090                    endpoint_old
3091                )
3092            }
3093            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3094                peer_score.add_ip(&peer_id, ip);
3095            } else {
3096                tracing::trace!(
3097                    peer=%peer_id,
3098                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3099                    endpoint_new
3100                )
3101            }
3102        }
3103    }
3104
3105    #[cfg(feature = "metrics")]
3106    /// Register topics to ensure metrics are recorded correctly for these topics.
3107    pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3108        if let Some(metrics) = &mut self.metrics {
3109            metrics.register_allowed_topics(topics);
3110        }
3111    }
3112}
3113
3114fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3115    addr.iter().find_map(|p| match p {
3116        Ip4(addr) => Some(IpAddr::V4(addr)),
3117        Ip6(addr) => Some(IpAddr::V6(addr)),
3118        _ => None,
3119    })
3120}
3121
3122impl<C, F> NetworkBehaviour for Behaviour<C, F>
3123where
3124    C: Send + 'static + DataTransform,
3125    F: Send + 'static + TopicSubscriptionFilter,
3126{
3127    type ConnectionHandler = Handler;
3128    type ToSwarm = Event;
3129
3130    fn handle_established_inbound_connection(
3131        &mut self,
3132        connection_id: ConnectionId,
3133        peer_id: PeerId,
3134        _: &Multiaddr,
3135        _: &Multiaddr,
3136    ) -> Result<THandler<Self>, ConnectionDenied> {
3137        // By default we assume a peer is only a floodsub peer.
3138        //
3139        // The protocol negotiation occurs once a message is sent/received. Once this happens we
3140        // update the type of peer that this is in order to determine which kind of routing should
3141        // occur.
3142        let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3143            kind: PeerKind::Floodsub,
3144            connections: vec![],
3145            outbound: false,
3146            sender: Sender::new(self.config.connection_handler_queue_len()),
3147            topics: Default::default(),
3148            dont_send: LinkedHashMap::new(),
3149        });
3150        // Add the new connection
3151        connected_peer.connections.push(connection_id);
3152
3153        Ok(Handler::new(
3154            self.config.protocol_config(),
3155            connected_peer.sender.new_receiver(),
3156        ))
3157    }
3158
3159    fn handle_established_outbound_connection(
3160        &mut self,
3161        connection_id: ConnectionId,
3162        peer_id: PeerId,
3163        _: &Multiaddr,
3164        _: Endpoint,
3165        _: PortUse,
3166    ) -> Result<THandler<Self>, ConnectionDenied> {
3167        let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3168            kind: PeerKind::Floodsub,
3169            connections: vec![],
3170            // Diverging from the go implementation we only want to consider a peer as outbound peer
3171            // if its first connection is outbound.
3172            outbound: !self.px_peers.contains(&peer_id),
3173            sender: Sender::new(self.config.connection_handler_queue_len()),
3174            topics: Default::default(),
3175            dont_send: LinkedHashMap::new(),
3176        });
3177        // Add the new connection
3178        connected_peer.connections.push(connection_id);
3179
3180        Ok(Handler::new(
3181            self.config.protocol_config(),
3182            connected_peer.sender.new_receiver(),
3183        ))
3184    }
3185
3186    fn on_connection_handler_event(
3187        &mut self,
3188        propagation_source: PeerId,
3189        _connection_id: ConnectionId,
3190        handler_event: THandlerOutEvent<Self>,
3191    ) {
3192        match handler_event {
3193            HandlerEvent::PeerKind(kind) => {
3194                // We have identified the protocol this peer is using
3195
3196                #[cfg(feature = "metrics")]
3197                if let Some(metrics) = self.metrics.as_mut() {
3198                    metrics.peer_protocol_connected(kind);
3199                }
3200
3201                if let PeerKind::NotSupported = kind {
3202                    tracing::debug!(
3203                        peer=%propagation_source,
3204                        "Peer does not support gossipsub protocols"
3205                    );
3206                    self.events
3207                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3208                            peer_id: propagation_source,
3209                        }));
3210                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3211                    // Only change the value if the old value is Floodsub (the default set in
3212                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3213                    // All other PeerKind changes are ignored.
3214                    tracing::debug!(
3215                        peer=%propagation_source,
3216                        peer_type=%kind,
3217                        "New peer type found for peer"
3218                    );
3219                    if let PeerKind::Floodsub = conn.kind {
3220                        conn.kind = kind;
3221                    }
3222                }
3223            }
3224            HandlerEvent::MessageDropped(rpc) => {
3225                // Account for this in the scoring logic
3226                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3227                    peer_score.failed_message_slow_peer(&propagation_source);
3228                }
3229
3230                // Keep track of expired messages for the application layer.
3231                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3232                failed_messages.timeout += 1;
3233                match rpc {
3234                    RpcOut::Publish { .. } => {
3235                        failed_messages.publish += 1;
3236                    }
3237                    RpcOut::Forward { .. } => {
3238                        failed_messages.forward += 1;
3239                    }
3240                    _ => {}
3241                }
3242
3243                // Record metrics on the failure.
3244                #[cfg(feature = "metrics")]
3245                if let Some(metrics) = self.metrics.as_mut() {
3246                    match rpc {
3247                        RpcOut::Publish { message, .. } => {
3248                            metrics.publish_msg_dropped(&message.topic);
3249                            metrics.timeout_msg_dropped(&message.topic);
3250                        }
3251                        RpcOut::Forward { message, .. } => {
3252                            metrics.forward_msg_dropped(&message.topic);
3253                            metrics.timeout_msg_dropped(&message.topic);
3254                        }
3255                        _ => {}
3256                    }
3257                }
3258            }
3259            HandlerEvent::Message {
3260                rpc,
3261                invalid_messages,
3262            } => {
3263                // Handle the gossipsub RPC
3264
3265                // Handle subscriptions
3266                // Update connected peers topics
3267                if !rpc.subscriptions.is_empty() {
3268                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3269                }
3270
3271                // Check if peer is graylisted in which case we ignore the event
3272                if let (true, _) = self
3273                    .peer_score
3274                    .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3275                {
3276                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3277                    return;
3278                }
3279
3280                // Handle any invalid messages from this peer
3281                if let PeerScoreState::Active(_) = self.peer_score {
3282                    for (raw_message, validation_error) in invalid_messages {
3283                        self.handle_invalid_message(
3284                            &propagation_source,
3285                            &raw_message.topic,
3286                            None,
3287                            RejectReason::ValidationError(validation_error),
3288                        )
3289                    }
3290                } else {
3291                    // log the invalid messages
3292                    for (message, validation_error) in invalid_messages {
3293                        tracing::warn!(
3294                            peer=%propagation_source,
3295                            source=?message.source,
3296                            "Invalid message from peer. Reason: {:?}",
3297                            validation_error,
3298                        );
3299                    }
3300                }
3301
3302                // Handle messages
3303                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3304                    // Only process the amount of messages the configuration allows.
3305                    if self
3306                        .config
3307                        .max_messages_per_rpc()
3308                        .is_some_and(|max_msg| count >= max_msg)
3309                    {
3310                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3311                        break;
3312                    }
3313                    self.handle_received_message(raw_message, &propagation_source);
3314                }
3315
3316                // Handle control messages
3317                // group some control messages, this minimises SendEvents (code is simplified to
3318                // handle each event at a time however)
3319                let mut ihave_msgs = vec![];
3320                let mut graft_msgs = vec![];
3321                let mut prune_msgs = vec![];
3322                for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3323                    // Only process the amount of messages the configuration allows.
3324                    if self
3325                        .config
3326                        .max_messages_per_rpc()
3327                        .is_some_and(|max_msg| count >= max_msg)
3328                    {
3329                        tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3330                        break;
3331                    }
3332
3333                    match control_msg {
3334                        ControlAction::IHave(IHave {
3335                            topic_hash,
3336                            message_ids,
3337                        }) => {
3338                            ihave_msgs.push((topic_hash, message_ids));
3339                        }
3340                        ControlAction::IWant(IWant { message_ids }) => {
3341                            self.handle_iwant(&propagation_source, message_ids)
3342                        }
3343                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3344                        ControlAction::Prune(Prune {
3345                            topic_hash,
3346                            peers,
3347                            backoff,
3348                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3349                        ControlAction::IDontWant(IDontWant { message_ids }) => {
3350                            let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3351                            else {
3352                                tracing::error!(peer = %propagation_source,
3353                                    "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3354                                continue;
3355                            };
3356                            #[cfg(feature = "metrics")]
3357                            if let Some(metrics) = self.metrics.as_mut() {
3358                                metrics.register_idontwant(message_ids.len());
3359                            }
3360                            for message_id in message_ids {
3361                                peer.dont_send.insert(message_id, Instant::now());
3362                                // Don't exceed capacity.
3363                                if peer.dont_send.len() > IDONTWANT_CAP {
3364                                    peer.dont_send.pop_front();
3365                                }
3366                            }
3367                        }
3368                    }
3369                }
3370                if !ihave_msgs.is_empty() {
3371                    self.handle_ihave(&propagation_source, ihave_msgs);
3372                }
3373                if !graft_msgs.is_empty() {
3374                    self.handle_graft(&propagation_source, graft_msgs);
3375                }
3376                if !prune_msgs.is_empty() {
3377                    self.handle_prune(&propagation_source, prune_msgs);
3378                }
3379            }
3380        }
3381    }
3382
3383    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3384    fn poll(
3385        &mut self,
3386        cx: &mut Context<'_>,
3387    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3388        if let Some(event) = self.events.pop_front() {
3389            return Poll::Ready(event);
3390        }
3391
3392        // update scores
3393        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3394            if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3395                peer_score.refresh_scores();
3396                peer_score
3397                    .decay_interval
3398                    .reset(peer_score.params.decay_interval);
3399            }
3400        }
3401
3402        if self.heartbeat.poll_unpin(cx).is_ready() {
3403            self.heartbeat();
3404            self.heartbeat.reset(self.config.heartbeat_interval());
3405        }
3406
3407        Poll::Pending
3408    }
3409
3410    fn on_swarm_event(&mut self, event: FromSwarm) {
3411        match event {
3412            FromSwarm::ConnectionEstablished(connection_established) => {
3413                self.on_connection_established(connection_established)
3414            }
3415            FromSwarm::ConnectionClosed(connection_closed) => {
3416                self.on_connection_closed(connection_closed)
3417            }
3418            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3419            _ => {}
3420        }
3421    }
3422}
3423
3424/// This is called when peers are added to any mesh. It checks if the peer existed
3425/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3426/// the appropriate connection handler to maintain a connection.
3427fn peer_added_to_mesh(
3428    peer_id: PeerId,
3429    new_topics: Vec<&TopicHash>,
3430    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3431    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3432    connections: &HashMap<PeerId, PeerDetails>,
3433) {
3434    // Ensure there is an active connection
3435    let connection_id = match connections.get(&peer_id) {
3436        Some(p) => p
3437            .connections
3438            .first()
3439            .expect("There should be at least one connection to a peer."),
3440        None => {
3441            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3442            return;
3443        }
3444    };
3445
3446    if let Some(peer) = connections.get(&peer_id) {
3447        for topic in &peer.topics {
3448            if !new_topics.contains(&topic) {
3449                if let Some(mesh_peers) = mesh.get(topic) {
3450                    if mesh_peers.contains(&peer_id) {
3451                        // the peer is already in a mesh for another topic
3452                        return;
3453                    }
3454                }
3455            }
3456        }
3457    }
3458    // This is the first mesh the peer has joined, inform the handler
3459    events.push_back(ToSwarm::NotifyHandler {
3460        peer_id,
3461        event: HandlerIn::JoinedMesh,
3462        handler: NotifyHandler::One(*connection_id),
3463    });
3464}
3465
3466/// This is called when peers are removed from a mesh. It checks if the peer exists
3467/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3468/// notify the handler to no longer maintain a connection.
3469fn peer_removed_from_mesh(
3470    peer_id: PeerId,
3471    old_topic: &TopicHash,
3472    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3473    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3474    connections: &HashMap<PeerId, PeerDetails>,
3475) {
3476    // Ensure there is an active connection
3477    let connection_id = match connections.get(&peer_id) {
3478        Some(p) => p
3479            .connections
3480            .first()
3481            .expect("There should be at least one connection to a peer."),
3482        None => {
3483            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3484            return;
3485        }
3486    };
3487
3488    if let Some(peer) = connections.get(&peer_id) {
3489        for topic in &peer.topics {
3490            if topic != old_topic {
3491                if let Some(mesh_peers) = mesh.get(topic) {
3492                    if mesh_peers.contains(&peer_id) {
3493                        // the peer exists in another mesh still
3494                        return;
3495                    }
3496                }
3497            }
3498        }
3499    }
3500    // The peer is not in any other mesh, inform the handler
3501    events.push_back(ToSwarm::NotifyHandler {
3502        peer_id,
3503        event: HandlerIn::LeftMesh,
3504        handler: NotifyHandler::One(*connection_id),
3505    });
3506}
3507
3508/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3509/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3510/// that gets as input the number of filtered peers.
3511fn get_random_peers_dynamic(
3512    connected_peers: &HashMap<PeerId, PeerDetails>,
3513    topic_hash: &TopicHash,
3514    // maps the number of total peers to the number of selected peers
3515    n_map: impl Fn(usize) -> usize,
3516    mut f: impl FnMut(&PeerId) -> bool,
3517) -> BTreeSet<PeerId> {
3518    let mut gossip_peers = connected_peers
3519        .iter()
3520        .filter(|(_, p)| p.topics.contains(topic_hash))
3521        .filter(|(peer_id, _)| f(peer_id))
3522        .filter(|(_, p)| p.kind.is_gossipsub())
3523        .map(|(peer_id, _)| *peer_id)
3524        .collect::<Vec<PeerId>>();
3525
3526    // if we have less than needed, return them
3527    let n = n_map(gossip_peers.len());
3528    if gossip_peers.len() <= n {
3529        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3530        return gossip_peers.into_iter().collect();
3531    }
3532
3533    // we have more peers than needed, shuffle them and return n of them
3534    let mut rng = thread_rng();
3535    gossip_peers.partial_shuffle(&mut rng, n);
3536
3537    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3538
3539    gossip_peers.into_iter().take(n).collect()
3540}
3541
3542/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3543/// filtered by the function `f`.
3544fn get_random_peers(
3545    connected_peers: &HashMap<PeerId, PeerDetails>,
3546    topic_hash: &TopicHash,
3547    n: usize,
3548    f: impl FnMut(&PeerId) -> bool,
3549) -> BTreeSet<PeerId> {
3550    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3551}
3552
3553/// Validates the combination of signing, privacy and message validation to ensure the
3554/// configuration will not reject published messages.
3555fn validate_config(
3556    authenticity: &MessageAuthenticity,
3557    validation_mode: &ValidationMode,
3558) -> Result<(), &'static str> {
3559    match validation_mode {
3560        ValidationMode::Anonymous => {
3561            if authenticity.is_signing() {
3562                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3563            }
3564
3565            if !authenticity.is_anonymous() {
3566                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3567            }
3568        }
3569        ValidationMode::Strict => {
3570            if !authenticity.is_signing() {
3571                return Err(
3572                    "Messages will be
3573                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3574                the validation or privacy settings in the config"
3575                );
3576            }
3577        }
3578        _ => {}
3579    }
3580    Ok(())
3581}
3582
3583impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3585        f.debug_struct("Behaviour")
3586            .field("config", &self.config)
3587            .field("events", &self.events.len())
3588            .field("publish_config", &self.publish_config)
3589            .field("mesh", &self.mesh)
3590            .field("fanout", &self.fanout)
3591            .field("fanout_last_pub", &self.fanout_last_pub)
3592            .field("mcache", &self.mcache)
3593            .field("heartbeat", &self.heartbeat)
3594            .finish()
3595    }
3596}
3597
3598impl fmt::Debug for PublishConfig {
3599    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3600        match self {
3601            PublishConfig::Signing { author, .. } => {
3602                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3603            }
3604            PublishConfig::Author(author) => {
3605                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3606            }
3607            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3608            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3609        }
3610    }
3611}