libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{
23        max,
24        Ordering::{self, Equal},
25    },
26    collections::{BTreeSet, HashMap, HashSet, VecDeque},
27    fmt::{self, Debug},
28    net::IpAddr,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37    multiaddr::Protocol::{Ip4, Ip6},
38    transport::PortUse,
39    Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44    dial_opts::DialOpts,
45    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46    THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57    backoff::BackoffStorage,
58    config::{Config, ValidationMode},
59    gossip_promises::GossipPromises,
60    handler::{Handler, HandlerEvent, HandlerIn},
61    mcache::MessageCache,
62    peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63    protocol::SIGNING_PREFIX,
64    rpc::Sender,
65    rpc_proto::proto,
66    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67    time_cache::DuplicateCache,
68    topic::{Hasher, Topic, TopicHash},
69    transform::{DataTransform, IdentityTransform},
70    types::{
71        ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72        PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73        SubscriptionAction,
74    },
75    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81/// IDONTWANT cache capacity.
82const IDONTWANT_CAP: usize = 10_000;
83
84/// IDONTWANT timeout before removal.
85const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87/// Determines if published messages should be signed or not.
88///
89/// Without signing, a number of privacy preserving modes can be selected.
90///
91/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
92/// should be updated in the [`Config`] to allow for unsigned messages.
93#[derive(Clone)]
94pub enum MessageAuthenticity {
95    /// Message signing is enabled. The author will be the owner of the key and the sequence number
96    /// will be linearly increasing.
97    Signed(Keypair),
98    /// Message signing is disabled.
99    ///
100    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
101    /// number will be randomized.
102    Author(PeerId),
103    /// Message signing is disabled.
104    ///
105    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
106    /// randomized.
107    RandomAuthor,
108    /// Message signing is disabled.
109    ///
110    /// The author of the message and the sequence numbers are excluded from the message.
111    ///
112    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
113    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
114    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
115    /// function will need to be set to prevent all messages from a peer being filtered
116    /// as duplicates.
117    Anonymous,
118}
119
120impl MessageAuthenticity {
121    /// Returns true if signing is enabled.
122    pub fn is_signing(&self) -> bool {
123        matches!(self, MessageAuthenticity::Signed(_))
124    }
125
126    pub fn is_anonymous(&self) -> bool {
127        matches!(self, MessageAuthenticity::Anonymous)
128    }
129}
130
131/// Event that can be emitted by the gossipsub behaviour.
132#[derive(Debug)]
133pub enum Event {
134    /// A message has been received.
135    Message {
136        /// The peer that forwarded us this message.
137        propagation_source: PeerId,
138        /// The [`MessageId`] of the message. This should be referenced by the application when
139        /// validating a message (if required).
140        message_id: MessageId,
141        /// The decompressed message itself.
142        message: Message,
143    },
144    /// A remote subscribed to a topic.
145    Subscribed {
146        /// Remote that has subscribed.
147        peer_id: PeerId,
148        /// The topic it has subscribed to.
149        topic: TopicHash,
150    },
151    /// A remote unsubscribed from a topic.
152    Unsubscribed {
153        /// Remote that has unsubscribed.
154        peer_id: PeerId,
155        /// The topic it has subscribed from.
156        topic: TopicHash,
157    },
158    /// A peer that does not support gossipsub has connected.
159    GossipsubNotSupported { peer_id: PeerId },
160    /// A peer is not able to download messages in time.
161    SlowPeer {
162        /// The peer_id
163        peer_id: PeerId,
164        /// The types and amounts of failed messages that are occurring for this peer.
165        failed_messages: FailedMessages,
166    },
167}
168
169/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
170/// for further details.
171#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173    Signing {
174        keypair: Keypair,
175        author: PeerId,
176        inline_key: Option<Vec<u8>>,
177        last_seq_no: SequenceNumber,
178    },
179    Author(PeerId),
180    RandomAuthor,
181    Anonymous,
182}
183
184/// A strictly linearly increasing sequence number.
185///
186/// We start from the current time as unix timestamp in milliseconds.
187#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191    fn new() -> Self {
192        let unix_timestamp = SystemTime::now()
193            .duration_since(SystemTime::UNIX_EPOCH)
194            .expect("time to be linear")
195            .as_nanos();
196
197        Self(unix_timestamp as u64)
198    }
199
200    fn next(&mut self) -> u64 {
201        self.0 = self
202            .0
203            .checked_add(1)
204            .expect("to not exhaust u64 space for sequence numbers");
205
206        self.0
207    }
208}
209
210impl PublishConfig {
211    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212        match self {
213            Self::Signing { author, .. } => Some(author),
214            Self::Author(author) => Some(author),
215            _ => None,
216        }
217    }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221    fn from(authenticity: MessageAuthenticity) -> Self {
222        match authenticity {
223            MessageAuthenticity::Signed(keypair) => {
224                let public_key = keypair.public();
225                let key_enc = public_key.encode_protobuf();
226                let key = if key_enc.len() <= 42 {
227                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
228                    // don't include it specifically in the
229                    // [`rpc_proto::proto::Message::key`] field.
230                    None
231                } else {
232                    // Include the protobuf encoding of the public key in the message.
233                    Some(key_enc)
234                };
235
236                PublishConfig::Signing {
237                    keypair,
238                    author: public_key.to_peer_id(),
239                    inline_key: key,
240                    last_seq_no: SequenceNumber::new(),
241                }
242            }
243            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246        }
247    }
248}
249
250/// Network behaviour that handles the gossipsub protocol.
251///
252/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
253/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
254/// appropriate level to accept unsigned messages.
255///
256/// The DataTransform trait allows applications to optionally add extra encoding/decoding
257/// functionality to the underlying messages. This is intended for custom compression algorithms.
258///
259/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
260/// prevent unwanted messages being propagated and evaluated.
261pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262    /// Configuration providing gossipsub performance parameters.
263    config: Config,
264
265    /// Events that need to be yielded to the outside when polling.
266    events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268    /// Information used for publishing messages.
269    publish_config: PublishConfig,
270
271    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
272    /// duplicates from being propagated to the application and on the network.
273    duplicate_cache: DuplicateCache<MessageId>,
274
275    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
276    /// the set of [`ConnectionId`]s.
277    connected_peers: HashMap<PeerId, PeerDetails>,
278
279    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
280    /// forward messages to, outside of the scoring system.
281    explicit_peers: HashSet<PeerId>,
282
283    /// A list of peers that have been blacklisted by the user.
284    /// Messages are not sent to and are rejected from these peers.
285    blacklisted_peers: HashSet<PeerId>,
286
287    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
288    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290    /// Map of topics to list of peers that we publish to, but don't subscribe to.
291    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293    /// The last publish time for fanout topics.
294    fanout_last_pub: HashMap<TopicHash, Instant>,
295
296    /// Storage for backoffs
297    backoffs: BackoffStorage,
298
299    /// Message cache for the last few heartbeats.
300    mcache: MessageCache,
301
302    /// Heartbeat interval stream.
303    heartbeat: Delay,
304
305    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
306    /// clean up -- eg backoff clean up.
307    heartbeat_ticks: u64,
308
309    /// We remember all peers we found through peer exchange, since those peers are not considered
310    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
311    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
312    /// be removed from this list which may result in a true outbound rediscovery.
313    px_peers: HashSet<PeerId>,
314
315    /// Stores optional peer score data together with thresholds, decay interval and gossip
316    /// promises.
317    peer_score: PeerScoreState,
318
319    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
320    count_received_ihave: HashMap<PeerId, usize>,
321
322    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
323    count_sent_iwant: HashMap<PeerId, usize>,
324
325    /// The filter used to handle message subscriptions.
326    subscription_filter: F,
327
328    /// A general transformation function that can be applied to data received from the wire before
329    /// calculating the message-id and sending to the application. This is designed to allow the
330    /// user to implement arbitrary topic-based compression algorithms.
331    data_transform: D,
332
333    /// Keep track of a set of internal metrics relating to gossipsub.
334    #[cfg(feature = "metrics")]
335    metrics: Option<Metrics>,
336
337    /// Tracks the numbers of failed messages per peer-id.
338    failed_messages: HashMap<PeerId, FailedMessages>,
339
340    /// Tracks recently sent `IWANT` messages and checks if peers respond to them.
341    gossip_promises: GossipPromises,
342}
343
344impl<D, F> Behaviour<D, F>
345where
346    D: DataTransform + Default,
347    F: TopicSubscriptionFilter + Default,
348{
349    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
350    /// [`Config`]. This has no subscription filter and uses no compression.
351    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
352        Self::new_with_subscription_filter_and_transform(
353            privacy,
354            config,
355            F::default(),
356            D::default(),
357        )
358    }
359}
360
361impl<D, F> Behaviour<D, F>
362where
363    D: DataTransform + Default,
364    F: TopicSubscriptionFilter,
365{
366    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
367    /// [`Config`] and a custom subscription filter.
368    pub fn new_with_subscription_filter(
369        privacy: MessageAuthenticity,
370        config: Config,
371        subscription_filter: F,
372    ) -> Result<Self, &'static str> {
373        Self::new_with_subscription_filter_and_transform(
374            privacy,
375            config,
376            subscription_filter,
377            D::default(),
378        )
379    }
380}
381
382impl<D, F> Behaviour<D, F>
383where
384    D: DataTransform,
385    F: TopicSubscriptionFilter + Default,
386{
387    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
388    /// [`Config`] and a custom data transform.
389    /// Metrics are disabled by default.
390    pub fn new_with_transform(
391        privacy: MessageAuthenticity,
392        config: Config,
393        data_transform: D,
394    ) -> Result<Self, &'static str> {
395        Self::new_with_subscription_filter_and_transform(
396            privacy,
397            config,
398            F::default(),
399            data_transform,
400        )
401    }
402}
403
404impl<D, F> Behaviour<D, F>
405where
406    D: DataTransform,
407    F: TopicSubscriptionFilter,
408{
409    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
410    /// [`Config`] and a custom subscription filter and data transform.
411    /// Metrics are disabled by default.
412    pub fn new_with_subscription_filter_and_transform(
413        privacy: MessageAuthenticity,
414        config: Config,
415        subscription_filter: F,
416        data_transform: D,
417    ) -> Result<Self, &'static str> {
418        // Set up the router given the configuration settings.
419
420        // We do not allow configurations where a published message would also be rejected if it
421        // were received locally.
422        validate_config(&privacy, config.validation_mode())?;
423
424        Ok(Behaviour {
425            #[cfg(feature = "metrics")]
426            metrics: None,
427            events: VecDeque::new(),
428            publish_config: privacy.into(),
429            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
430            explicit_peers: HashSet::new(),
431            blacklisted_peers: HashSet::new(),
432            mesh: HashMap::new(),
433            fanout: HashMap::new(),
434            fanout_last_pub: HashMap::new(),
435            backoffs: BackoffStorage::new(
436                &config.prune_backoff(),
437                config.heartbeat_interval(),
438                config.backoff_slack(),
439            ),
440            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
441            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
442            heartbeat_ticks: 0,
443            px_peers: HashSet::new(),
444            peer_score: PeerScoreState::Disabled,
445            count_received_ihave: HashMap::new(),
446            count_sent_iwant: HashMap::new(),
447            connected_peers: HashMap::new(),
448            config,
449            subscription_filter,
450            data_transform,
451            failed_messages: Default::default(),
452            gossip_promises: Default::default(),
453        })
454    }
455
456    /// Allow the [`Behaviour`] to also record metrics.
457    /// Metrics can be evaluated by passing a reference to a [`Registry`].
458    #[cfg(feature = "metrics")]
459    pub fn with_metrics(
460        mut self,
461        metrics_registry: &mut Registry,
462        metrics_config: MetricsConfig,
463    ) -> Self {
464        self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
465        self
466    }
467}
468
469impl<D, F> Behaviour<D, F>
470where
471    D: DataTransform + Send + 'static,
472    F: TopicSubscriptionFilter + Send + 'static,
473{
474    /// Lists the hashes of the topics we are currently subscribed to.
475    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
476        self.mesh.keys()
477    }
478
479    /// Lists all mesh peers for a certain topic hash.
480    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
481        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
482    }
483
484    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
485        let mut res = BTreeSet::new();
486        for peers in self.mesh.values() {
487            res.extend(peers);
488        }
489        res.into_iter()
490    }
491
492    /// Lists all known peers and their associated subscribed topics.
493    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
494        self.connected_peers
495            .iter()
496            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
497    }
498
499    /// Lists all known peers and their associated protocol.
500    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
501        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
502    }
503
504    /// Returns the gossipsub score for a given peer, if one exists.
505    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
506        match &self.peer_score {
507            PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
508            PeerScoreState::Disabled => None,
509        }
510    }
511
512    /// Subscribe to a topic.
513    ///
514    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
515    /// subscribed.
516    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
517        let topic_hash = topic.hash();
518        if !self.subscription_filter.can_subscribe(&topic_hash) {
519            return Err(SubscriptionError::NotAllowed);
520        }
521
522        if self.mesh.contains_key(&topic_hash) {
523            tracing::debug!(%topic, "Topic is already in the mesh");
524            return Ok(false);
525        }
526
527        // send subscription request to all peers
528        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
529            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
530            let event = RpcOut::Subscribe(topic_hash.clone());
531            self.send_message(peer_id, event);
532        }
533
534        // call JOIN(topic)
535        // this will add new peers to the mesh for the topic
536        self.join(&topic_hash);
537        tracing::debug!(%topic, "Subscribed to topic");
538        Ok(true)
539    }
540
541    /// Unsubscribes from a topic.
542    ///
543    /// Returns `true` if we were subscribed to this topic.
544    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
545        let topic_hash = topic.hash();
546
547        if !self.mesh.contains_key(&topic_hash) {
548            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
549            // we are not subscribed
550            return false;
551        }
552
553        // announce to all peers
554        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
555            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
556            let event = RpcOut::Unsubscribe(topic_hash.clone());
557            self.send_message(peer, event);
558        }
559
560        // call LEAVE(topic)
561        // this will remove the topic from the mesh
562        self.leave(&topic_hash);
563
564        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
565        true
566    }
567
568    /// Publishes a message with multiple topics to the network.
569    pub fn publish(
570        &mut self,
571        topic: impl Into<TopicHash>,
572        data: impl Into<Vec<u8>>,
573    ) -> Result<MessageId, PublishError> {
574        let data = data.into();
575        let topic = topic.into();
576
577        // Transform the data before building a raw_message.
578        let transformed_data = self
579            .data_transform
580            .outbound_transform(&topic.clone(), data.clone())?;
581
582        let max_transmit_size_for_topic = self
583            .config
584            .protocol_config()
585            .max_transmit_size_for_topic(&topic);
586
587        // check that the size doesn't exceed the max transmission size.
588        if transformed_data.len() > max_transmit_size_for_topic {
589            return Err(PublishError::MessageTooLarge);
590        }
591
592        let mesh_n = self.config.mesh_n_for_topic(&topic);
593        let raw_message = self.build_raw_message(topic, transformed_data)?;
594
595        // calculate the message id from the un-transformed data
596        let msg_id = self.config.message_id(&Message {
597            source: raw_message.source,
598            data, // the uncompressed form
599            sequence_number: raw_message.sequence_number,
600            topic: raw_message.topic.clone(),
601        });
602
603        // Check the if the message has been published before
604        if self.duplicate_cache.contains(&msg_id) {
605            // This message has already been seen. We don't re-publish messages that have already
606            // been published on the network.
607            tracing::warn!(
608                message_id=%msg_id,
609                "Not publishing a message that has already been published"
610            );
611            return Err(PublishError::Duplicate);
612        }
613
614        tracing::trace!(message_id=%msg_id, "Publishing message");
615
616        let topic_hash = raw_message.topic.clone();
617
618        let mut peers_on_topic = self
619            .connected_peers
620            .iter()
621            .filter(|(_, p)| p.topics.contains(&topic_hash))
622            .map(|(peer_id, _)| peer_id)
623            .peekable();
624
625        if peers_on_topic.peek().is_none() {
626            return Err(PublishError::NoPeersSubscribedToTopic);
627        }
628
629        let mut recipient_peers = HashSet::new();
630        if self.config.flood_publish() {
631            // Forward to all peers above score and all explicit peers
632            recipient_peers.extend(peers_on_topic.filter(|p| {
633                self.explicit_peers.contains(*p)
634                    || !self
635                        .peer_score
636                        .below_threshold(p, |ts| ts.publish_threshold)
637                        .0
638            }));
639        } else {
640            match self.mesh.get(&topic_hash) {
641                // Mesh peers
642                Some(mesh_peers) => {
643                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
644                    // peers (if possible).
645                    let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
646
647                    if needed_extra_peers > 0 {
648                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
649                        // and publish to them.
650
651                        // Get a random set of peers that are appropriate to send messages too.
652                        let peer_list = get_random_peers(
653                            &self.connected_peers,
654                            &topic_hash,
655                            needed_extra_peers,
656                            |peer| {
657                                !mesh_peers.contains(peer)
658                                    && !self.explicit_peers.contains(peer)
659                                    && !self
660                                        .peer_score
661                                        .below_threshold(peer, |ts| ts.publish_threshold)
662                                        .0
663                            },
664                        );
665                        recipient_peers.extend(peer_list);
666                    }
667
668                    recipient_peers.extend(mesh_peers);
669                }
670                // Gossipsub peers
671                None => {
672                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
673                    // `fanout_peers` is always non-empty if it's `Some`.
674                    let fanout_peers = self
675                        .fanout
676                        .get(&topic_hash)
677                        .filter(|peers| !peers.is_empty());
678                    // If we have fanout peers add them to the map.
679                    if let Some(peers) = fanout_peers {
680                        for peer in peers {
681                            recipient_peers.insert(*peer);
682                        }
683                    } else {
684                        // We have no fanout peers, select mesh_n of them and add them to the fanout
685                        let new_peers =
686                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
687                                |p| {
688                                    !self.explicit_peers.contains(p)
689                                        && !self
690                                            .peer_score
691                                            .below_threshold(p, |ts| ts.publish_threshold)
692                                            .0
693                                }
694                            });
695                        // Add the new peers to the fanout and recipient peers
696                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
697                        for peer in new_peers {
698                            tracing::debug!(%peer, "Peer added to fanout");
699                            recipient_peers.insert(peer);
700                        }
701                    }
702                    // We are publishing to fanout peers - update the time we published
703                    self.fanout_last_pub
704                        .insert(topic_hash.clone(), Instant::now());
705                }
706            }
707
708            // Explicit peers that are part of the topic
709            recipient_peers
710                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
711
712            // Floodsub peers
713            for (peer, connections) in &self.connected_peers {
714                if connections.kind == PeerKind::Floodsub
715                    && connections.topics.contains(&topic_hash)
716                    && !self
717                        .peer_score
718                        .below_threshold(peer, |ts| ts.publish_threshold)
719                        .0
720                {
721                    recipient_peers.insert(*peer);
722                }
723            }
724        }
725
726        // If the message isn't a duplicate and we have sent it to some peers add it to the
727        // duplicate cache and memcache.
728        self.duplicate_cache.insert(msg_id.clone());
729        self.mcache.put(&msg_id, raw_message.clone());
730
731        // Consider the message as delivered for gossip promises.
732        self.gossip_promises.message_delivered(&msg_id);
733
734        // Send to peers we know are subscribed to the topic.
735        let mut publish_failed = true;
736        for peer_id in recipient_peers.iter() {
737            tracing::trace!(peer=%peer_id, "Sending message to peer");
738            // If enabled, Send first an IDONTWANT so that if we are slower than forwarders
739            // publishing the original message we don't receive it back.
740            if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
741                && self.config.idontwant_on_publish()
742            {
743                self.send_message(
744                    *peer_id,
745                    RpcOut::IDontWant(IDontWant {
746                        message_ids: vec![msg_id.clone()],
747                    }),
748                );
749            }
750
751            if self.send_message(
752                *peer_id,
753                RpcOut::Publish {
754                    message: raw_message.clone(),
755                    timeout: Delay::new(self.config.publish_queue_duration()),
756                },
757            ) {
758                publish_failed = false
759            }
760        }
761
762        if recipient_peers.is_empty() {
763            return Err(PublishError::NoPeersSubscribedToTopic);
764        }
765
766        if publish_failed {
767            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
768        }
769
770        tracing::debug!(message_id=%msg_id, "Published message");
771
772        #[cfg(feature = "metrics")]
773        if let Some(metrics) = self.metrics.as_mut() {
774            metrics.register_published_message(&topic_hash);
775        }
776
777        Ok(msg_id)
778    }
779
780    /// This function should be called when [`Config::validate_messages()`] is `true` after
781    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
782    /// validation is expected to be fast enough that the messages should still exist in the cache.
783    /// There are three possible validation outcomes and the outcome is given in acceptance.
784    ///
785    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
786    /// network. The `propagation_source` parameter indicates who the message was received by and
787    /// will not be forwarded back to that peer.
788    ///
789    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
790    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
791    //
792    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
793    /// but no Pâ‚„ penalty will be applied.
794    ///
795    /// This function will return true if the message was found in the cache and false if was not
796    /// in the cache anymore.
797    ///
798    /// This should only be called once per message.
799    pub fn report_message_validation_result(
800        &mut self,
801        msg_id: &MessageId,
802        propagation_source: &PeerId,
803        acceptance: MessageAcceptance,
804    ) -> bool {
805        let reject_reason = match acceptance {
806            MessageAcceptance::Accept => {
807                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
808                    Some((raw_message, originating_peers)) => {
809                        (raw_message.clone(), originating_peers)
810                    }
811                    None => {
812                        tracing::warn!(
813                            message_id=%msg_id,
814                            "Message not in cache. Ignoring forwarding"
815                        );
816                        #[cfg(feature = "metrics")]
817                        if let Some(metrics) = self.metrics.as_mut() {
818                            metrics.memcache_miss();
819                        }
820                        return false;
821                    }
822                };
823
824                #[cfg(feature = "metrics")]
825                if let Some(metrics) = self.metrics.as_mut() {
826                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
827                }
828
829                self.forward_msg(
830                    msg_id,
831                    raw_message,
832                    Some(propagation_source),
833                    originating_peers,
834                );
835                return true;
836            }
837            MessageAcceptance::Reject => RejectReason::ValidationFailed,
838            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
839        };
840
841        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
842            #[cfg(feature = "metrics")]
843            if let Some(metrics) = self.metrics.as_mut() {
844                metrics.register_msg_validation(&raw_message.topic, &acceptance);
845            }
846
847            // Tell peer_score about reject
848            // Reject the original source, and any duplicates we've seen from other peers.
849            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
850                peer_score.reject_message(
851                    propagation_source,
852                    msg_id,
853                    &raw_message.topic,
854                    reject_reason,
855                );
856                for peer in originating_peers.iter() {
857                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
858                }
859            }
860            true
861        } else {
862            tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
863            false
864        }
865    }
866
867    /// Adds a new peer to the list of explicitly connected peers.
868    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
869        tracing::debug!(peer=%peer_id, "Adding explicit peer");
870
871        self.explicit_peers.insert(*peer_id);
872
873        self.check_explicit_peer_connection(peer_id);
874    }
875
876    /// This removes the peer from explicitly connected peers, note that this does not disconnect
877    /// the peer.
878    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
879        tracing::debug!(peer=%peer_id, "Removing explicit peer");
880        self.explicit_peers.remove(peer_id);
881    }
882
883    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
884    /// created by this peer will be rejected.
885    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
886        if self.blacklisted_peers.insert(*peer_id) {
887            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
888        }
889    }
890
891    /// Removes a peer from the blacklist if it has previously been blacklisted.
892    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
893        if self.blacklisted_peers.remove(peer_id) {
894            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
895        }
896    }
897
898    /// Activates the peer scoring system with the given parameters. This will reset all scores
899    /// if there was already another peer scoring system activated. Returns an error if the
900    /// params are not valid or if they got already set.
901    pub fn with_peer_score(
902        &mut self,
903        params: PeerScoreParams,
904        threshold: PeerScoreThresholds,
905    ) -> Result<(), String> {
906        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
907    }
908
909    /// Activates the peer scoring system with the given parameters and a message delivery time
910    /// callback. Returns an error if the parameters got already set.
911    pub fn with_peer_score_and_message_delivery_time_callback(
912        &mut self,
913        params: PeerScoreParams,
914        thresholds: PeerScoreThresholds,
915        callback: Option<fn(&PeerId, &TopicHash, f64)>,
916    ) -> Result<(), String> {
917        params.validate()?;
918        thresholds.validate()?;
919
920        if let PeerScoreState::Active(_) = self.peer_score {
921            return Err("Peer score set twice".into());
922        }
923
924        let peer_score =
925            PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
926        self.peer_score = PeerScoreState::Active(Box::new(peer_score));
927        Ok(())
928    }
929
930    /// Sets scoring parameters for a topic.
931    ///
932    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
933    pub fn set_topic_params<H: Hasher>(
934        &mut self,
935        topic: Topic<H>,
936        params: TopicScoreParams,
937    ) -> Result<(), &'static str> {
938        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
939            peer_score.set_topic_params(topic.hash(), params);
940            Ok(())
941        } else {
942            Err("Peer score must be initialised with `with_peer_score()`")
943        }
944    }
945
946    /// Returns a scoring parameters for a topic if existent.
947    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
948        match &self.peer_score {
949            PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
950            PeerScoreState::Disabled => None,
951        }
952    }
953
954    /// Sets the application specific score for a peer. Returns true if scoring is active and
955    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
956    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
957        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
958            peer_score.set_application_score(peer_id, new_score)
959        } else {
960            false
961        }
962    }
963
964    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
965    fn join(&mut self, topic_hash: &TopicHash) {
966        let mut added_peers = HashSet::new();
967        let mesh_n = self.config.mesh_n_for_topic(topic_hash);
968        #[cfg(feature = "metrics")]
969        if let Some(m) = self.metrics.as_mut() {
970            m.joined(topic_hash)
971        }
972
973        // Always construct a mesh regardless if we find peers or not.
974        self.mesh.entry(topic_hash.clone()).or_default();
975
976        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
977        // removing the fanout entry.
978        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
979            tracing::debug!(
980                topic=%topic_hash,
981                "JOIN: Removing peers from the fanout for topic"
982            );
983
984            // remove explicit peers, peers with negative scores, and backoffed peers
985            peers.retain(|p| {
986                !self.explicit_peers.contains(p)
987                    && !self.peer_score.below_threshold(p, |_| 0.0).0
988                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
989            });
990
991            // Add up to mesh_n of them to the mesh
992            // NOTE: These aren't randomly added, currently FIFO
993            let add_peers = std::cmp::min(peers.len(), mesh_n);
994            tracing::debug!(
995                topic=%topic_hash,
996                "JOIN: Adding {:?} peers from the fanout for topic",
997                add_peers
998            );
999            added_peers.extend(peers.iter().take(add_peers));
1000
1001            self.mesh.insert(
1002                topic_hash.clone(),
1003                peers.into_iter().take(add_peers).collect(),
1004            );
1005
1006            // remove the last published time
1007            self.fanout_last_pub.remove(topic_hash);
1008        }
1009
1010        #[cfg(feature = "metrics")]
1011        if let Some(m) = self.metrics.as_mut() {
1012            m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1013        }
1014
1015        // check if we need to get more peers, which we randomly select
1016        if added_peers.len() < mesh_n {
1017            // get the peers
1018            let random_added = get_random_peers(
1019                &self.connected_peers,
1020                topic_hash,
1021                mesh_n - added_peers.len(),
1022                |peer| {
1023                    !added_peers.contains(peer)
1024                        && !self.explicit_peers.contains(peer)
1025                        && !self.peer_score.below_threshold(peer, |_| 0.0).0
1026                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1027                },
1028            );
1029
1030            added_peers.extend(random_added.clone());
1031            // add them to the mesh
1032            tracing::debug!(
1033                "JOIN: Inserting {:?} random peers into the mesh",
1034                random_added.len()
1035            );
1036
1037            #[cfg(feature = "metrics")]
1038            if let Some(m) = self.metrics.as_mut() {
1039                m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1040            }
1041
1042            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1043            mesh_peers.extend(random_added);
1044        }
1045
1046        for peer_id in added_peers {
1047            // Send a GRAFT control message
1048            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1049            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1050                peer_score.graft(&peer_id, topic_hash.clone());
1051            }
1052            self.send_message(
1053                peer_id,
1054                RpcOut::Graft(Graft {
1055                    topic_hash: topic_hash.clone(),
1056                }),
1057            );
1058
1059            // If the peer did not previously exist in any mesh, inform the handler
1060            peer_added_to_mesh(
1061                peer_id,
1062                vec![topic_hash],
1063                &self.mesh,
1064                &mut self.events,
1065                &self.connected_peers,
1066            );
1067        }
1068
1069        #[cfg(feature = "metrics")]
1070        {
1071            let mesh_peers = self.mesh_peers(topic_hash).count();
1072            if let Some(m) = self.metrics.as_mut() {
1073                m.set_mesh_peers(topic_hash, mesh_peers)
1074            }
1075        }
1076
1077        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1078    }
1079
1080    /// Creates a PRUNE gossipsub action.
1081    fn make_prune(
1082        &mut self,
1083        topic_hash: &TopicHash,
1084        peer: &PeerId,
1085        do_px: bool,
1086        on_unsubscribe: bool,
1087    ) -> Prune {
1088        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1089            peer_score.prune(peer, topic_hash.clone());
1090        }
1091
1092        match self.connected_peers.get(peer).map(|v| &v.kind) {
1093            Some(PeerKind::Floodsub) => {
1094                tracing::error!("Attempted to prune a Floodsub peer");
1095            }
1096            Some(PeerKind::Gossipsub) => {
1097                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1098                return Prune {
1099                    topic_hash: topic_hash.clone(),
1100                    peers: Vec::new(),
1101                    backoff: None,
1102                };
1103            }
1104            None => {
1105                tracing::error!("Attempted to Prune an unknown peer");
1106            }
1107            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1108        }
1109
1110        // Select peers for peer exchange
1111        let peers = if do_px {
1112            get_random_peers(
1113                &self.connected_peers,
1114                topic_hash,
1115                self.config.prune_peers(),
1116                |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1117            )
1118            .into_iter()
1119            .map(|p| PeerInfo { peer_id: Some(p) })
1120            .collect()
1121        } else {
1122            Vec::new()
1123        };
1124
1125        let backoff = if on_unsubscribe {
1126            self.config.unsubscribe_backoff()
1127        } else {
1128            self.config.prune_backoff()
1129        };
1130
1131        // update backoff
1132        self.backoffs.update_backoff(topic_hash, peer, backoff);
1133
1134        Prune {
1135            topic_hash: topic_hash.clone(),
1136            peers,
1137            backoff: Some(backoff.as_secs()),
1138        }
1139    }
1140
1141    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1142    fn leave(&mut self, topic_hash: &TopicHash) {
1143        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1144
1145        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1146        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1147            #[cfg(feature = "metrics")]
1148            if let Some(m) = self.metrics.as_mut() {
1149                m.left(topic_hash)
1150            }
1151            for peer_id in peers {
1152                // Send a PRUNE control message
1153                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1154
1155                let on_unsubscribe = true;
1156                let prune =
1157                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1158                self.send_message(peer_id, RpcOut::Prune(prune));
1159
1160                // If the peer did not previously exist in any mesh, inform the handler
1161                peer_removed_from_mesh(
1162                    peer_id,
1163                    topic_hash,
1164                    &self.mesh,
1165                    &mut self.events,
1166                    &self.connected_peers,
1167                );
1168            }
1169        }
1170        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1171    }
1172
1173    /// Checks if the given peer is still connected and if not dials the peer again.
1174    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1175        if !self.connected_peers.contains_key(peer_id) {
1176            // Connect to peer
1177            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1178            self.events.push_back(ToSwarm::Dial {
1179                opts: DialOpts::peer_id(*peer_id).build(),
1180            });
1181        }
1182    }
1183
1184    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1185    /// requests it with an IWANT control message.
1186    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1187        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1188        if let (true, score) = self
1189            .peer_score
1190            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1191        {
1192            tracing::debug!(
1193                peer=%peer_id,
1194                %score,
1195                "IHAVE: ignoring peer with score below threshold"
1196            );
1197            return;
1198        }
1199
1200        // IHAVE flood protection
1201        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1202        *peer_have += 1;
1203        if *peer_have > self.config.max_ihave_messages() {
1204            tracing::debug!(
1205                peer=%peer_id,
1206                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1207            interval; ignoring",
1208                *peer_have
1209            );
1210            return;
1211        }
1212
1213        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1214            if *iasked >= self.config.max_ihave_length() {
1215                tracing::debug!(
1216                    peer=%peer_id,
1217                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1218                    *iasked
1219                );
1220                return;
1221            }
1222        }
1223
1224        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1225
1226        let mut iwant_ids = HashSet::new();
1227
1228        for (topic, ids) in ihave_msgs {
1229            // only process the message if we are subscribed
1230            if !self.mesh.contains_key(&topic) {
1231                tracing::debug!(
1232                    %topic,
1233                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1234                );
1235                continue;
1236            }
1237
1238            for id in ids.into_iter().filter(|id| {
1239                if self.duplicate_cache.contains(id) {
1240                    return false;
1241                }
1242
1243                !self.gossip_promises.contains(id)
1244            }) {
1245                // have not seen this message and are not currently requesting it
1246                if iwant_ids.insert(id) {
1247                    // Register the IWANT metric
1248                    #[cfg(feature = "metrics")]
1249                    if let Some(metrics) = self.metrics.as_mut() {
1250                        metrics.register_iwant(&topic);
1251                    }
1252                }
1253            }
1254        }
1255
1256        if !iwant_ids.is_empty() {
1257            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1258            let mut iask = iwant_ids.len();
1259            if *iasked + iask > self.config.max_ihave_length() {
1260                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1261            }
1262
1263            // Send the list of IWANT control messages
1264            tracing::debug!(
1265                peer=%peer_id,
1266                "IHAVE: Asking for {} out of {} messages from peer",
1267                iask,
1268                iwant_ids.len()
1269            );
1270
1271            // Ask in random order
1272            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1273            let mut rng = thread_rng();
1274            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1275
1276            iwant_ids_vec.truncate(iask);
1277            *iasked += iask;
1278
1279            self.gossip_promises.add_promise(
1280                *peer_id,
1281                &iwant_ids_vec,
1282                Instant::now() + self.config.iwant_followup_time(),
1283            );
1284            tracing::trace!(
1285                peer=%peer_id,
1286                "IHAVE: Asking for the following messages from peer: {:?}",
1287                iwant_ids_vec
1288            );
1289
1290            self.send_message(
1291                *peer_id,
1292                RpcOut::IWant(IWant {
1293                    message_ids: iwant_ids_vec,
1294                }),
1295            );
1296        }
1297        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1298    }
1299
1300    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1301    /// forwarded to the requesting peer.
1302    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1303        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1304        if let (true, score) = self
1305            .peer_score
1306            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1307        {
1308            tracing::debug!(
1309                peer=%peer_id,
1310                "IWANT: ignoring peer with score below threshold [score = {}]",
1311                score
1312            );
1313            return;
1314        }
1315
1316        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1317
1318        for id in iwant_msgs {
1319            // If we have it and the IHAVE count is not above the threshold,
1320            // forward the message.
1321            if let Some((msg, count)) = self
1322                .mcache
1323                .get_with_iwant_counts(&id, peer_id)
1324                .map(|(msg, count)| (msg.clone(), count))
1325            {
1326                if count > self.config.gossip_retransimission() {
1327                    tracing::debug!(
1328                        peer=%peer_id,
1329                        message_id=%id,
1330                        "IWANT: Peer has asked for message too many times; ignoring request"
1331                    );
1332                } else {
1333                    if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1334                        if peer.dont_send.contains_key(&id) {
1335                            tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1336                            continue;
1337                        }
1338                    }
1339
1340                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1341                    self.send_message(
1342                        *peer_id,
1343                        RpcOut::Forward {
1344                            message: msg,
1345                            timeout: Delay::new(self.config.forward_queue_duration()),
1346                        },
1347                    );
1348                }
1349            }
1350        }
1351        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1352    }
1353
1354    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1355    /// responds with PRUNE messages.
1356    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1357        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1358
1359        let mut to_prune_topics = HashSet::new();
1360
1361        let mut do_px = self.config.do_px();
1362
1363        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1364            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1365            return;
1366        };
1367        // Needs to be here to comply with the borrow checker.
1368        let is_outbound = connected_peer.outbound;
1369
1370        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1371        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1372        for topic in &topics {
1373            if connected_peer.topics.insert(topic.clone()) {
1374                #[cfg(feature = "metrics")]
1375                if let Some(m) = self.metrics.as_mut() {
1376                    m.inc_topic_peers(topic);
1377                }
1378            }
1379        }
1380
1381        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1382        if self.explicit_peers.contains(peer_id) {
1383            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1384            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1385            to_prune_topics = topics.into_iter().collect();
1386            // but don't PX
1387            do_px = false
1388        } else {
1389            let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1390            let now = Instant::now();
1391            for topic_hash in topics {
1392                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1393                    // if the peer is already in the mesh ignore the graft
1394                    if peers.contains(peer_id) {
1395                        tracing::debug!(
1396                            peer=%peer_id,
1397                            topic=%&topic_hash,
1398                            "GRAFT: Received graft for peer that is already in topic"
1399                        );
1400                        continue;
1401                    }
1402
1403                    // make sure we are not backing off that peer
1404                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1405                    {
1406                        if backoff_time > now {
1407                            tracing::warn!(
1408                                peer=%peer_id,
1409                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1410                            );
1411                            // add behavioural penalty
1412                            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1413                                #[cfg(feature = "metrics")]
1414                                if let Some(metrics) = self.metrics.as_mut() {
1415                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1416                                }
1417                                peer_score.add_penalty(peer_id, 1);
1418
1419                                // check the flood cutoff
1420                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1421                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1422                                let flood_cutoff = (backoff_time
1423                                    + self.config.graft_flood_threshold())
1424                                    - self.config.prune_backoff();
1425                                if flood_cutoff > now {
1426                                    // extra penalty
1427                                    peer_score.add_penalty(peer_id, 1);
1428                                }
1429                            }
1430                            // no PX
1431                            do_px = false;
1432
1433                            to_prune_topics.insert(topic_hash.clone());
1434                            continue;
1435                        }
1436                    }
1437
1438                    // check the score
1439                    if below_zero {
1440                        // we don't GRAFT peers with negative score
1441                        tracing::debug!(
1442                            peer=%peer_id,
1443                            %score,
1444                            topic=%topic_hash,
1445                            "GRAFT: ignoring peer with negative score"
1446                        );
1447                        // we do send them PRUNE however, because it's a matter of protocol
1448                        // correctness
1449                        to_prune_topics.insert(topic_hash.clone());
1450                        // but we won't PX to them
1451                        do_px = false;
1452                        continue;
1453                    }
1454
1455                    // check mesh upper bound and only allow graft if the upper bound is not reached
1456                    // or if it is an outbound peer
1457                    let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1458
1459                    if peers.len() >= mesh_n_high && !is_outbound {
1460                        to_prune_topics.insert(topic_hash.clone());
1461                        continue;
1462                    }
1463
1464                    // add peer to the mesh
1465                    tracing::debug!(
1466                        peer=%peer_id,
1467                        topic=%topic_hash,
1468                        "GRAFT: Mesh link added for peer in topic"
1469                    );
1470
1471                    if peers.insert(*peer_id) {
1472                        #[cfg(feature = "metrics")]
1473                        if let Some(m) = self.metrics.as_mut() {
1474                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1475                        }
1476                    }
1477
1478                    // If the peer did not previously exist in any mesh, inform the handler
1479                    peer_added_to_mesh(
1480                        *peer_id,
1481                        vec![&topic_hash],
1482                        &self.mesh,
1483                        &mut self.events,
1484                        &self.connected_peers,
1485                    );
1486
1487                    if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1488                        peer_score.graft(peer_id, topic_hash);
1489                    }
1490                } else {
1491                    // don't do PX when there is an unknown topic to avoid leaking our peers
1492                    do_px = false;
1493                    tracing::debug!(
1494                        peer=%peer_id,
1495                        topic=%topic_hash,
1496                        "GRAFT: Received graft for unknown topic from peer"
1497                    );
1498                    // spam hardening: ignore GRAFTs for unknown topics
1499                    continue;
1500                }
1501            }
1502        }
1503
1504        if !to_prune_topics.is_empty() {
1505            // build the prune messages to send
1506            let on_unsubscribe = false;
1507
1508            for prune in to_prune_topics
1509                .iter()
1510                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1511                .collect::<Vec<_>>()
1512            {
1513                self.send_message(*peer_id, RpcOut::Prune(prune));
1514            }
1515            // Send the prune messages to the peer
1516            tracing::debug!(
1517                peer=%peer_id,
1518                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1519            );
1520        }
1521        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1522    }
1523
1524    /// Removes the specified peer from the mesh, returning true if it was present.
1525    fn remove_peer_from_mesh(
1526        &mut self,
1527        peer_id: &PeerId,
1528        topic_hash: &TopicHash,
1529        backoff: Option<u64>,
1530        always_update_backoff: bool,
1531    ) -> bool {
1532        let mut peer_removed = false;
1533        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1534            // remove the peer if it exists in the mesh
1535            peer_removed = peers.remove(peer_id);
1536            if peer_removed {
1537                tracing::debug!(
1538                    peer=%peer_id,
1539                    topic=%topic_hash,
1540                    "PRUNE: Removing peer from the mesh for topic"
1541                );
1542
1543                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1544                    peer_score.prune(peer_id, topic_hash.clone());
1545                }
1546
1547                // inform the handler
1548                peer_removed_from_mesh(
1549                    *peer_id,
1550                    topic_hash,
1551                    &self.mesh,
1552                    &mut self.events,
1553                    &self.connected_peers,
1554                );
1555            }
1556        }
1557        if always_update_backoff || peer_removed {
1558            let time = if let Some(backoff) = backoff {
1559                Duration::from_secs(backoff)
1560            } else {
1561                self.config.prune_backoff()
1562            };
1563            // is there a backoff specified by the peer? if so obey it.
1564            self.backoffs.update_backoff(topic_hash, peer_id, time);
1565        }
1566        peer_removed
1567    }
1568
1569    /// Handles PRUNE control messages. Removes peer from the mesh.
1570    fn handle_prune(
1571        &mut self,
1572        peer_id: &PeerId,
1573        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1574    ) {
1575        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1576        let (below_threshold, score) = self
1577            .peer_score
1578            .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1579        for (topic_hash, px, backoff) in prune_data {
1580            if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1581                #[cfg(feature = "metrics")]
1582                if let Some(m) = self.metrics.as_mut() {
1583                    m.peers_removed(&topic_hash, Churn::Prune, 1);
1584                }
1585            }
1586
1587            if self.mesh.contains_key(&topic_hash) {
1588                // connect to px peers
1589                if !px.is_empty() {
1590                    // we ignore PX from peers with insufficient score
1591                    if below_threshold {
1592                        tracing::debug!(
1593                            peer=%peer_id,
1594                            %score,
1595                            topic=%topic_hash,
1596                            "PRUNE: ignoring PX from peer with insufficient score"
1597                        );
1598                        continue;
1599                    }
1600
1601                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1602                    // know their multiaddr. Until SignedRecords are spec'd this
1603                    // remains a stub. By default `config.prune_peers()` is set to zero and
1604                    // this is skipped. If the user modifies this, this will only be able to
1605                    // dial already known peers (from an external discovery mechanism for
1606                    // example).
1607                    if self.config.prune_peers() > 0 {
1608                        self.px_connect(px);
1609                    }
1610                }
1611            }
1612        }
1613        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1614    }
1615
1616    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1617        let n = self.config.prune_peers();
1618        // Ignore peerInfo with no ID
1619        //
1620        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1621        // signed peer record?
1622        px.retain(|p| p.peer_id.is_some());
1623        if px.len() > n {
1624            // only use at most prune_peers many random peers
1625            let mut rng = thread_rng();
1626            px.partial_shuffle(&mut rng, n);
1627            px = px.into_iter().take(n).collect();
1628        }
1629
1630        for p in px {
1631            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1632            // it, see https://github.com/libp2p/specs/pull/217
1633            if let Some(peer_id) = p.peer_id {
1634                // mark as px peer
1635                self.px_peers.insert(peer_id);
1636
1637                // dial peer
1638                self.events.push_back(ToSwarm::Dial {
1639                    opts: DialOpts::peer_id(peer_id).build(),
1640                });
1641            }
1642        }
1643    }
1644
1645    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1646    /// checks.
1647    fn message_is_valid(
1648        &mut self,
1649        msg_id: &MessageId,
1650        raw_message: &mut RawMessage,
1651        propagation_source: &PeerId,
1652    ) -> bool {
1653        tracing::debug!(
1654            peer=%propagation_source,
1655            message_id=%msg_id,
1656            "Handling message from peer"
1657        );
1658
1659        // Reject any message from a blacklisted peer
1660        if self.blacklisted_peers.contains(propagation_source) {
1661            tracing::debug!(
1662                peer=%propagation_source,
1663                "Rejecting message from blacklisted peer"
1664            );
1665            self.gossip_promises
1666                .reject_message(msg_id, &RejectReason::BlackListedPeer);
1667            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1668                peer_score.reject_message(
1669                    propagation_source,
1670                    msg_id,
1671                    &raw_message.topic,
1672                    RejectReason::BlackListedPeer,
1673                );
1674            }
1675            return false;
1676        }
1677
1678        // Also reject any message that originated from a blacklisted peer
1679        if let Some(source) = raw_message.source.as_ref() {
1680            if self.blacklisted_peers.contains(source) {
1681                tracing::debug!(
1682                    peer=%propagation_source,
1683                    %source,
1684                    "Rejecting message from peer because of blacklisted source"
1685                );
1686                self.handle_invalid_message(
1687                    propagation_source,
1688                    &raw_message.topic,
1689                    Some(msg_id),
1690                    RejectReason::BlackListedSource,
1691                );
1692                return false;
1693            }
1694        }
1695
1696        // If we are not validating messages, assume this message is validated
1697        // This will allow the message to be gossiped without explicitly calling
1698        // `validate_message`.
1699        if !self.config.validate_messages() {
1700            raw_message.validated = true;
1701        }
1702
1703        // reject messages claiming to be from ourselves but not locally published
1704        let self_published = !self.config.allow_self_origin()
1705            && if let Some(own_id) = self.publish_config.get_own_id() {
1706                own_id != propagation_source
1707                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1708            } else {
1709                false
1710            };
1711
1712        if self_published {
1713            tracing::debug!(
1714                message_id=%msg_id,
1715                source=%propagation_source,
1716                "Dropping message claiming to be from self but forwarded from source"
1717            );
1718            self.handle_invalid_message(
1719                propagation_source,
1720                &raw_message.topic,
1721                Some(msg_id),
1722                RejectReason::SelfOrigin,
1723            );
1724            return false;
1725        }
1726
1727        true
1728    }
1729
1730    /// Handles a newly received [`RawMessage`].
1731    ///
1732    /// Forwards the message to all peers in the mesh.
1733    fn handle_received_message(
1734        &mut self,
1735        mut raw_message: RawMessage,
1736        propagation_source: &PeerId,
1737    ) {
1738        // Record the received metric
1739        #[cfg(feature = "metrics")]
1740        if let Some(metrics) = self.metrics.as_mut() {
1741            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1742        }
1743
1744        // Try and perform the data transform to the message. If it fails, consider it invalid.
1745        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1746            Ok(message) => message,
1747            Err(e) => {
1748                tracing::debug!("Invalid message. Transform error: {:?}", e);
1749                // Reject the message and return
1750                self.handle_invalid_message(
1751                    propagation_source,
1752                    &raw_message.topic,
1753                    None,
1754                    RejectReason::ValidationError(ValidationError::TransformFailed),
1755                );
1756                return;
1757            }
1758        };
1759
1760        // Calculate the message id on the transformed data.
1761        let msg_id = self.config.message_id(&message);
1762
1763        // Broadcast IDONTWANT messages
1764        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1765            let recipient_peers = self
1766                .mesh
1767                .get(&message.topic)
1768                .map(|mesh| mesh.iter())
1769                .unwrap_or_default()
1770                .copied()
1771                .chain(self.gossip_promises.peers_for_message(&msg_id))
1772                .filter(|peer_id| {
1773                    peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1774                })
1775                .collect::<Vec<PeerId>>();
1776
1777            for peer_id in recipient_peers {
1778                self.send_message(
1779                    peer_id,
1780                    RpcOut::IDontWant(IDontWant {
1781                        message_ids: vec![msg_id.clone()],
1782                    }),
1783                );
1784            }
1785        }
1786
1787        // Check the validity of the message
1788        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1789        // and instead continually penalize peers that repeatedly send this message.
1790        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1791            return;
1792        }
1793
1794        if !self.duplicate_cache.insert(msg_id.clone()) {
1795            tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1796            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1797                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1798            }
1799            self.mcache.observe_duplicate(&msg_id, propagation_source);
1800            return;
1801        }
1802
1803        tracing::debug!(
1804            message_id=%msg_id,
1805            "Put message in duplicate_cache and resolve promises"
1806        );
1807
1808        // Record the received message with the metrics
1809        #[cfg(feature = "metrics")]
1810        if let Some(metrics) = self.metrics.as_mut() {
1811            metrics.msg_recvd(&message.topic);
1812        }
1813
1814        // Tells score that message arrived (but is maybe not fully validated yet).
1815        // Consider the message as delivered for gossip promises.
1816        self.gossip_promises.message_delivered(&msg_id);
1817
1818        // Tells score that message arrived (but is maybe not fully validated yet).
1819        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1820            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1821        }
1822
1823        // Add the message to our memcache
1824        self.mcache.put(&msg_id, raw_message.clone());
1825
1826        // Dispatch the message to the user if we are subscribed to any of the topics
1827        #[allow(
1828            clippy::map_entry,
1829            reason = "False positive, see rust-lang/rust-clippy#14449."
1830        )]
1831        if self.mesh.contains_key(&message.topic) {
1832            tracing::debug!("Sending received message to user");
1833            self.events
1834                .push_back(ToSwarm::GenerateEvent(Event::Message {
1835                    propagation_source: *propagation_source,
1836                    message_id: msg_id.clone(),
1837                    message,
1838                }));
1839        } else {
1840            tracing::debug!(
1841                topic=%message.topic,
1842                "Received message on a topic we are not subscribed to"
1843            );
1844            return;
1845        }
1846
1847        // forward the message to mesh peers, if no validation is required
1848        if !self.config.validate_messages() {
1849            self.forward_msg(
1850                &msg_id,
1851                raw_message,
1852                Some(propagation_source),
1853                HashSet::new(),
1854            );
1855            tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1856        }
1857    }
1858
1859    // Handles invalid messages received.
1860    fn handle_invalid_message(
1861        &mut self,
1862        propagation_source: &PeerId,
1863        topic_hash: &TopicHash,
1864        message_id: Option<&MessageId>,
1865        reject_reason: RejectReason,
1866    ) {
1867        #[cfg(feature = "metrics")]
1868        if let Some(metrics) = self.metrics.as_mut() {
1869            metrics.register_invalid_message(topic_hash);
1870        }
1871        if let Some(msg_id) = message_id {
1872            // Valid transformation without peer scoring
1873            self.gossip_promises.reject_message(msg_id, &reject_reason);
1874        }
1875        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1876            // The compiler will optimize this pattern-matching
1877            if let Some(msg_id) = message_id {
1878                // The message itself is valid, but is from a banned peer or
1879                // claiming to be self-origin but is actually forwarded from other peers.
1880                peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1881            } else {
1882                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1883                // advertising this message via an IHAVE and it's invalid it will be double
1884                // penalized, one for sending us an invalid and again for breaking a promise.
1885                peer_score.reject_invalid_message(propagation_source, topic_hash);
1886            }
1887        }
1888    }
1889
1890    /// Handles received subscriptions.
1891    fn handle_received_subscriptions(
1892        &mut self,
1893        subscriptions: &[Subscription],
1894        propagation_source: &PeerId,
1895    ) {
1896        tracing::trace!(
1897            source=%propagation_source,
1898            "Handling subscriptions: {:?}",
1899            subscriptions,
1900        );
1901
1902        let mut unsubscribed_peers = Vec::new();
1903
1904        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1905            tracing::error!(
1906                peer=%propagation_source,
1907                "Subscription by unknown peer"
1908            );
1909            return;
1910        };
1911
1912        // Collect potential graft topics for the peer.
1913        let mut topics_to_graft = Vec::new();
1914
1915        // Notify the application about the subscription, after the grafts are sent.
1916        let mut application_event = Vec::new();
1917
1918        let filtered_topics = match self
1919            .subscription_filter
1920            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1921        {
1922            Ok(topics) => topics,
1923            Err(s) => {
1924                tracing::error!(
1925                    peer=%propagation_source,
1926                    "Subscription filter error: {}; ignoring RPC from peer",
1927                    s
1928                );
1929                return;
1930            }
1931        };
1932
1933        for subscription in filtered_topics {
1934            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1935            let topic_hash = &subscription.topic_hash;
1936
1937            match subscription.action {
1938                SubscriptionAction::Subscribe => {
1939                    if peer.topics.insert(topic_hash.clone()) {
1940                        tracing::debug!(
1941                            peer=%propagation_source,
1942                            topic=%topic_hash,
1943                            "SUBSCRIPTION: Adding gossip peer to topic"
1944                        );
1945
1946                        #[cfg(feature = "metrics")]
1947                        if let Some(m) = self.metrics.as_mut() {
1948                            m.inc_topic_peers(topic_hash);
1949                        }
1950                    }
1951
1952                    // if the mesh needs peers add the peer to the mesh
1953                    if !self.explicit_peers.contains(propagation_source)
1954                        && peer.kind.is_gossipsub()
1955                        && !self
1956                            .peer_score
1957                            .below_threshold(propagation_source, |_| 0.0)
1958                            .0
1959                        && !self
1960                            .backoffs
1961                            .is_backoff_with_slack(topic_hash, propagation_source)
1962                    {
1963                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1964                            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1965
1966                            if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1967                                tracing::debug!(
1968                                    peer=%propagation_source,
1969                                    topic=%topic_hash,
1970                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1971                                );
1972                                #[cfg(feature = "metrics")]
1973                                if let Some(m) = self.metrics.as_mut() {
1974                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1975                                }
1976                                // send graft to the peer
1977                                tracing::debug!(
1978                                    peer=%propagation_source,
1979                                    topic=%topic_hash,
1980                                    "Sending GRAFT to peer for topic"
1981                                );
1982                                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1983                                    peer_score.graft(propagation_source, topic_hash.clone());
1984                                }
1985                                topics_to_graft.push(topic_hash.clone());
1986                            }
1987                        }
1988                    }
1989                    // generates a subscription event to be polled
1990                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1991                        peer_id: *propagation_source,
1992                        topic: topic_hash.clone(),
1993                    }));
1994                }
1995                SubscriptionAction::Unsubscribe => {
1996                    if peer.topics.remove(topic_hash) {
1997                        tracing::debug!(
1998                            peer=%propagation_source,
1999                            topic=%topic_hash,
2000                            "SUBSCRIPTION: Removing gossip peer from topic"
2001                        );
2002
2003                        #[cfg(feature = "metrics")]
2004                        if let Some(m) = self.metrics.as_mut() {
2005                            m.dec_topic_peers(topic_hash);
2006                        }
2007                    }
2008
2009                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2010                    // generate an unsubscribe event to be polled
2011                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2012                        peer_id: *propagation_source,
2013                        topic: topic_hash.clone(),
2014                    }));
2015                }
2016            }
2017        }
2018
2019        // remove unsubscribed peers from the mesh and fanout if they exist there.
2020        for (peer_id, topic_hash) in unsubscribed_peers {
2021            self.fanout
2022                .get_mut(&topic_hash)
2023                .map(|peers| peers.remove(&peer_id));
2024            if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2025                #[cfg(feature = "metrics")]
2026                if let Some(m) = self.metrics.as_mut() {
2027                    m.peers_removed(&topic_hash, Churn::Unsub, 1);
2028                }
2029            };
2030        }
2031
2032        // Potentially inform the handler if we have added this peer to a mesh for the first time.
2033        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2034        if !topics_joined.is_empty() {
2035            peer_added_to_mesh(
2036                *propagation_source,
2037                topics_joined,
2038                &self.mesh,
2039                &mut self.events,
2040                &self.connected_peers,
2041            );
2042        }
2043
2044        // If we need to send grafts to peer, do so immediately, rather than waiting for the
2045        // heartbeat.
2046        for topic_hash in topics_to_graft.into_iter() {
2047            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2048        }
2049
2050        // Notify the application of the subscriptions
2051        for event in application_event {
2052            self.events.push_back(event);
2053        }
2054
2055        tracing::trace!(
2056            source=%propagation_source,
2057            "Completed handling subscriptions from source"
2058        );
2059    }
2060
2061    /// Applies penalties to peers that did not respond to our IWANT requests.
2062    fn apply_iwant_penalties(&mut self) {
2063        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2064            for (peer, count) in self.gossip_promises.get_broken_promises() {
2065                peer_score.add_penalty(&peer, count);
2066                #[cfg(feature = "metrics")]
2067                if let Some(metrics) = self.metrics.as_mut() {
2068                    metrics.register_score_penalty(Penalty::BrokenPromise);
2069                }
2070            }
2071        }
2072    }
2073
2074    /// Heartbeat function which shifts the memcache and updates the mesh.
2075    fn heartbeat(&mut self) {
2076        #[cfg(feature = "metrics")]
2077        let start = Instant::now();
2078
2079        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2080        // before we add all the gossip from this heartbeat in order to gain a true measure of
2081        // steady-state size of the queues.
2082        #[cfg(feature = "metrics")]
2083        if let Some(m) = &mut self.metrics {
2084            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2085                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2086                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2087            }
2088        }
2089
2090        self.heartbeat_ticks += 1;
2091
2092        let mut to_graft = HashMap::new();
2093        let mut to_prune = HashMap::new();
2094        let mut no_px = HashSet::new();
2095
2096        // clean up expired backoffs
2097        self.backoffs.heartbeat();
2098
2099        // clean up ihave counters
2100        self.count_sent_iwant.clear();
2101        self.count_received_ihave.clear();
2102
2103        // apply iwant penalties
2104        self.apply_iwant_penalties();
2105
2106        // check connections to explicit peers
2107        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2108            for p in self.explicit_peers.clone() {
2109                self.check_explicit_peer_connection(&p);
2110            }
2111        }
2112
2113        // Cache the scores of all connected peers, and record metrics for current penalties.
2114        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2115        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2116            for peer_id in self.connected_peers.keys() {
2117                #[allow(unused_variables)]
2118                let report = scores
2119                    .entry(peer_id)
2120                    .or_insert_with(|| peer_score.score_report(peer_id));
2121
2122                #[cfg(feature = "metrics")]
2123                if let Some(metrics) = self.metrics.as_mut() {
2124                    for penalty in &report.penalties {
2125                        metrics.register_score_penalty(*penalty);
2126                    }
2127                }
2128            }
2129        }
2130
2131        // maintain the mesh for each topic
2132        for (topic_hash, peers) in self.mesh.iter_mut() {
2133            let explicit_peers = &self.explicit_peers;
2134            let backoffs = &self.backoffs;
2135
2136            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2137            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2138            let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2139            let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2140
2141            // drop all peers with negative score, without PX
2142            // if there is at some point a stable retain method for BTreeSet the following can be
2143            // written more efficiently with retain.
2144            let mut to_remove_peers = Vec::new();
2145            for peer_id in peers.iter() {
2146                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2147
2148                // Record the score per mesh
2149                #[cfg(feature = "metrics")]
2150                if let Some(metrics) = self.metrics.as_mut() {
2151                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2152                }
2153
2154                if peer_score < 0.0 {
2155                    tracing::debug!(
2156                        peer=%peer_id,
2157                        score=%peer_score,
2158                        topic=%topic_hash,
2159                        "HEARTBEAT: Prune peer with negative score"
2160                    );
2161
2162                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2163                    current_topic.push(topic_hash.clone());
2164                    no_px.insert(*peer_id);
2165                    to_remove_peers.push(*peer_id);
2166                }
2167            }
2168
2169            #[cfg(feature = "metrics")]
2170            if let Some(m) = self.metrics.as_mut() {
2171                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2172            }
2173
2174            for peer_id in to_remove_peers {
2175                peers.remove(&peer_id);
2176            }
2177
2178            // too little peers - add some
2179            if peers.len() < mesh_n_low {
2180                tracing::debug!(
2181                    topic=%topic_hash,
2182                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2183                    peers.len(),
2184                    self.config.mesh_n()
2185                );
2186                // not enough peers - get mesh_n - current_length more
2187                let desired_peers = mesh_n - peers.len();
2188                let peer_list =
2189                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2190                        !peers.contains(peer)
2191                            && !explicit_peers.contains(peer)
2192                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2193                            && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2194                    });
2195                for peer in &peer_list {
2196                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2197                    current_topic.push(topic_hash.clone());
2198                }
2199                // update the mesh
2200                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2201                #[cfg(feature = "metrics")]
2202                if let Some(m) = self.metrics.as_mut() {
2203                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2204                }
2205                peers.extend(peer_list);
2206            }
2207
2208            // too many peers - remove some
2209            if peers.len() > mesh_n_high {
2210                tracing::debug!(
2211                    topic=%topic_hash,
2212                    "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2213                    peers.len(),
2214                    self.config.mesh_n()
2215                );
2216                let excess_peer_no = peers.len() - mesh_n;
2217
2218                // shuffle the peers and then sort by score ascending beginning with the worst
2219                let mut rng = thread_rng();
2220                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2221                shuffled.shuffle(&mut rng);
2222                shuffled.sort_by(|p1, p2| {
2223                    let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2224                    let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2225
2226                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2227                });
2228                // shuffle everything except the last retain_scores many peers (the best ones)
2229                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2230
2231                // count total number of outbound peers
2232                let mut outbound = shuffled
2233                    .iter()
2234                    .filter(|peer_id| {
2235                        self.connected_peers
2236                            .get(peer_id)
2237                            .is_some_and(|peer| peer.outbound)
2238                    })
2239                    .count();
2240
2241                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2242                // them to to_prune
2243                let mut removed = 0;
2244                for peer in shuffled {
2245                    if removed == excess_peer_no {
2246                        break;
2247                    }
2248                    if self
2249                        .connected_peers
2250                        .get(&peer)
2251                        .is_some_and(|peer| peer.outbound)
2252                    {
2253                        if outbound <= mesh_outbound_min {
2254                            // do not remove anymore outbound peers
2255                            continue;
2256                        }
2257                        // an outbound peer gets removed
2258                        outbound -= 1;
2259                    }
2260
2261                    // remove the peer
2262                    peers.remove(&peer);
2263                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2264                    current_topic.push(topic_hash.clone());
2265                    removed += 1;
2266                }
2267
2268                #[cfg(feature = "metrics")]
2269                if let Some(m) = self.metrics.as_mut() {
2270                    m.peers_removed(topic_hash, Churn::Excess, removed)
2271                }
2272            }
2273
2274            // do we have enough outbound peers?
2275            if peers.len() >= mesh_n_low {
2276                // count number of outbound peers we have
2277                let outbound = peers
2278                    .iter()
2279                    .filter(|peer_id| {
2280                        self.connected_peers
2281                            .get(peer_id)
2282                            .is_some_and(|peer| peer.outbound)
2283                    })
2284                    .count();
2285
2286                // if we have not enough outbound peers, graft to some new outbound peers
2287                if outbound < mesh_outbound_min {
2288                    let needed = mesh_outbound_min - outbound;
2289                    let peer_list =
2290                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2291                            !peers.contains(peer_id)
2292                                && !explicit_peers.contains(peer_id)
2293                                && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2294                                && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2295                                && self
2296                                    .connected_peers
2297                                    .get(peer_id)
2298                                    .is_some_and(|peer| peer.outbound)
2299                        });
2300
2301                    for peer in &peer_list {
2302                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2303                        current_topic.push(topic_hash.clone());
2304                    }
2305                    // update the mesh
2306                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2307                    #[cfg(feature = "metrics")]
2308                    if let Some(m) = self.metrics.as_mut() {
2309                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2310                    }
2311                    peers.extend(peer_list);
2312                }
2313            }
2314
2315            // should we try to improve the mesh with opportunistic grafting?
2316            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2317                && peers.len() > 1
2318            {
2319                if let PeerScoreState::Active(peer_score) = &self.peer_score {
2320                    // Opportunistic grafting works as follows: we check the median score of peers
2321                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2322                    // select a few peers at random with score over the median.
2323                    // The intention is to (slowly) improve an underperforming mesh by introducing
2324                    // good scoring peers that may have been gossiping at us. This allows us to
2325                    // get out of sticky situations where we are stuck with poor peers and also
2326                    // recover from churn of good peers.
2327
2328                    // now compute the median peer score in the mesh
2329                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2330                    peers_by_score.sort_by(|p1, p2| {
2331                        let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2332                        let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2333                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2334                    });
2335
2336                    let middle = peers_by_score.len() / 2;
2337                    let median = if peers_by_score.len() % 2 == 0 {
2338                        let sub_middle_peer = *peers_by_score
2339                            .get(middle - 1)
2340                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2341                        let sub_middle_score = scores
2342                            .get(sub_middle_peer)
2343                            .map(|r| r.score)
2344                            .unwrap_or_default();
2345                        let middle_peer =
2346                            *peers_by_score.get(middle).expect("middle < vector length");
2347                        let middle_score =
2348                            scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2349
2350                        (sub_middle_score + middle_score) * 0.5
2351                    } else {
2352                        scores
2353                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2354                            .map(|r| r.score)
2355                            .unwrap_or_default()
2356                    };
2357
2358                    // if the median score is below the threshold, select a better peer (if any) and
2359                    // GRAFT
2360                    if median < peer_score.thresholds.opportunistic_graft_threshold {
2361                        let peer_list = get_random_peers(
2362                            &self.connected_peers,
2363                            topic_hash,
2364                            self.config.opportunistic_graft_peers(),
2365                            |peer_id| {
2366                                !peers.contains(peer_id)
2367                                    && !explicit_peers.contains(peer_id)
2368                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2369                                    && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2370                                        > median
2371                            },
2372                        );
2373                        for peer in &peer_list {
2374                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2375                            current_topic.push(topic_hash.clone());
2376                        }
2377                        // update the mesh
2378                        tracing::debug!(
2379                            topic=%topic_hash,
2380                            "Opportunistically graft in topic with peers {:?}",
2381                            peer_list
2382                        );
2383                        #[cfg(feature = "metrics")]
2384                        if let Some(m) = self.metrics.as_mut() {
2385                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2386                        }
2387                        peers.extend(peer_list);
2388                    }
2389                }
2390            }
2391            // Register the final count of peers in the mesh
2392            #[cfg(feature = "metrics")]
2393            if let Some(m) = self.metrics.as_mut() {
2394                m.set_mesh_peers(topic_hash, peers.len())
2395            }
2396        }
2397
2398        // remove expired fanout topics
2399        {
2400            let fanout = &mut self.fanout; // help the borrow checker
2401            let fanout_ttl = self.config.fanout_ttl();
2402            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2403                if *last_pub_time + fanout_ttl < Instant::now() {
2404                    tracing::debug!(
2405                        topic=%topic_hash,
2406                        "HEARTBEAT: Fanout topic removed due to timeout"
2407                    );
2408                    fanout.remove(topic_hash);
2409                    return false;
2410                }
2411                true
2412            });
2413        }
2414
2415        // maintain fanout
2416        // check if our peers are still a part of the topic
2417        for (topic_hash, peers) in self.fanout.iter_mut() {
2418            let mut to_remove_peers = Vec::new();
2419            let publish_threshold = match &self.peer_score {
2420                PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2421                _ => 0.0,
2422            };
2423            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2424
2425            for peer_id in peers.iter() {
2426                // is the peer still subscribed to the topic?
2427                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2428                match self.connected_peers.get(peer_id) {
2429                    Some(peer) => {
2430                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2431                            tracing::debug!(
2432                                topic=%topic_hash,
2433                                "HEARTBEAT: Peer removed from fanout for topic"
2434                            );
2435                            to_remove_peers.push(*peer_id);
2436                        }
2437                    }
2438                    None => {
2439                        // remove if the peer has disconnected
2440                        to_remove_peers.push(*peer_id);
2441                    }
2442                }
2443            }
2444            for to_remove in to_remove_peers {
2445                peers.remove(&to_remove);
2446            }
2447
2448            // not enough peers
2449            if peers.len() < mesh_n {
2450                tracing::debug!(
2451                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2452                    peers.len(),
2453                    mesh_n
2454                );
2455                let needed_peers = mesh_n - peers.len();
2456                let explicit_peers = &self.explicit_peers;
2457                let new_peers =
2458                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2459                        !peers.contains(peer_id)
2460                            && !explicit_peers.contains(peer_id)
2461                            && !self
2462                                .peer_score
2463                                .below_threshold(peer_id, |ts| ts.publish_threshold)
2464                                .0
2465                    });
2466                peers.extend(new_peers);
2467            }
2468        }
2469
2470        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2471            tracing::trace!("Mesh message deliveries: {:?}", {
2472                self.mesh
2473                    .iter()
2474                    .map(|(t, peers)| {
2475                        (
2476                            t.clone(),
2477                            peers
2478                                .iter()
2479                                .map(|p| {
2480                                    (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2481                                })
2482                                .collect::<HashMap<PeerId, f64>>(),
2483                        )
2484                    })
2485                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2486            })
2487        }
2488
2489        self.emit_gossip();
2490
2491        // send graft/prunes
2492        if !to_graft.is_empty() | !to_prune.is_empty() {
2493            self.send_graft_prune(to_graft, to_prune, no_px);
2494        }
2495
2496        // shift the memcache
2497        self.mcache.shift();
2498
2499        // Report expired messages
2500        for (peer_id, failed_messages) in self.failed_messages.drain() {
2501            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2502            self.events
2503                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2504                    peer_id,
2505                    failed_messages,
2506                }));
2507        }
2508        self.failed_messages.shrink_to_fit();
2509
2510        // Flush stale IDONTWANTs.
2511        for peer in self.connected_peers.values_mut() {
2512            while let Some((_front, instant)) = peer.dont_send.front() {
2513                if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2514                    break;
2515                } else {
2516                    peer.dont_send.pop_front();
2517                }
2518            }
2519        }
2520
2521        #[cfg(feature = "metrics")]
2522        if let Some(metrics) = self.metrics.as_mut() {
2523            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2524            metrics.observe_heartbeat_duration(duration);
2525        }
2526    }
2527
2528    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2529    /// and fanout peers
2530    fn emit_gossip(&mut self) {
2531        let mut rng = thread_rng();
2532        let mut messages = Vec::new();
2533        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2534            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2535            if message_ids.is_empty() {
2536                continue;
2537            }
2538
2539            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2540            if message_ids.len() > self.config.max_ihave_length() {
2541                // we do the truncation (with shuffling) per peer below
2542                tracing::debug!(
2543                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2544                    message_ids.len()
2545                );
2546            } else {
2547                // shuffle to emit in random order
2548                message_ids.shuffle(&mut rng);
2549            }
2550
2551            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2552            let n_map = |m| {
2553                max(
2554                    self.config.gossip_lazy(),
2555                    (self.config.gossip_factor() * m as f64) as usize,
2556                )
2557            };
2558            // get gossip_lazy random peers
2559            let to_msg_peers =
2560                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2561                    !peers.contains(peer)
2562                        && !self.explicit_peers.contains(peer)
2563                        && !self
2564                            .peer_score
2565                            .below_threshold(peer, |ts| ts.gossip_threshold)
2566                            .0
2567                });
2568
2569            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2570
2571            for peer_id in to_msg_peers {
2572                let mut peer_message_ids = message_ids.clone();
2573
2574                if peer_message_ids.len() > self.config.max_ihave_length() {
2575                    // We do this per peer so that we emit a different set for each peer.
2576                    // we have enough redundancy in the system that this will significantly increase
2577                    // the message coverage when we do truncate.
2578                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2579                    peer_message_ids.truncate(self.config.max_ihave_length());
2580                }
2581
2582                // send an IHAVE message
2583                messages.push((
2584                    peer_id,
2585                    RpcOut::IHave(IHave {
2586                        topic_hash: topic_hash.clone(),
2587                        message_ids: peer_message_ids,
2588                    }),
2589                ));
2590            }
2591        }
2592        for (peer_id, message) in messages {
2593            self.send_message(peer_id, message);
2594        }
2595    }
2596
2597    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2598    /// messages.
2599    fn send_graft_prune(
2600        &mut self,
2601        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2602        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2603        no_px: HashSet<PeerId>,
2604    ) {
2605        // handle the grafts and overlapping prunes per peer
2606        for (peer_id, topics) in to_graft.into_iter() {
2607            for topic in &topics {
2608                // inform scoring of graft
2609                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2610                    peer_score.graft(&peer_id, topic.clone());
2611                }
2612
2613                // inform the handler of the peer being added to the mesh
2614                // If the peer did not previously exist in any mesh, inform the handler
2615                peer_added_to_mesh(
2616                    peer_id,
2617                    vec![topic],
2618                    &self.mesh,
2619                    &mut self.events,
2620                    &self.connected_peers,
2621                );
2622            }
2623            let rpc_msgs = topics.iter().map(|topic_hash| {
2624                RpcOut::Graft(Graft {
2625                    topic_hash: topic_hash.clone(),
2626                })
2627            });
2628
2629            // If there are prunes associated with the same peer add them.
2630            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2631            // It therefore must be in at least one mesh and we do not need to inform the handler
2632            // of its removal from another.
2633
2634            // The following prunes are not due to unsubscribing.
2635            let prune_msgs = to_prune
2636                .remove(&peer_id)
2637                .into_iter()
2638                .flatten()
2639                .map(|topic_hash| {
2640                    let prune = self.make_prune(
2641                        &topic_hash,
2642                        &peer_id,
2643                        self.config.do_px() && !no_px.contains(&peer_id),
2644                        false,
2645                    );
2646                    RpcOut::Prune(prune)
2647                });
2648
2649            // send the rpc messages
2650            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2651                self.send_message(peer_id, msg);
2652            }
2653        }
2654
2655        // handle the remaining prunes
2656        // The following prunes are not due to unsubscribing.
2657        for (peer_id, topics) in to_prune.iter() {
2658            for topic_hash in topics {
2659                let prune = self.make_prune(
2660                    topic_hash,
2661                    peer_id,
2662                    self.config.do_px() && !no_px.contains(peer_id),
2663                    false,
2664                );
2665                self.send_message(*peer_id, RpcOut::Prune(prune));
2666
2667                // inform the handler
2668                peer_removed_from_mesh(
2669                    *peer_id,
2670                    topic_hash,
2671                    &self.mesh,
2672                    &mut self.events,
2673                    &self.connected_peers,
2674                );
2675            }
2676        }
2677    }
2678
2679    /// Helper function which forwards a message to mesh\[topic\] peers.
2680    ///
2681    /// Returns true if at least one peer was messaged.
2682    fn forward_msg(
2683        &mut self,
2684        msg_id: &MessageId,
2685        message: RawMessage,
2686        propagation_source: Option<&PeerId>,
2687        originating_peers: HashSet<PeerId>,
2688    ) -> bool {
2689        // message is fully validated inform peer_score
2690        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2691            if let Some(peer) = propagation_source {
2692                peer_score.deliver_message(peer, msg_id, &message.topic);
2693            }
2694        }
2695
2696        tracing::debug!(message_id=%msg_id, "Forwarding message");
2697        let mut recipient_peers = HashSet::new();
2698
2699        // Populate the recipient peers mapping
2700
2701        // Add explicit peers and floodsub peers
2702        for (peer_id, peer) in &self.connected_peers {
2703            if Some(peer_id) != propagation_source
2704                && !originating_peers.contains(peer_id)
2705                && Some(peer_id) != message.source.as_ref()
2706                && peer.topics.contains(&message.topic)
2707                && (self.explicit_peers.contains(peer_id)
2708                    || (peer.kind == PeerKind::Floodsub
2709                        && !self
2710                            .peer_score
2711                            .below_threshold(peer_id, |ts| ts.publish_threshold)
2712                            .0))
2713            {
2714                recipient_peers.insert(*peer_id);
2715            }
2716        }
2717
2718        // add mesh peers
2719        let topic = &message.topic;
2720        // mesh
2721        if let Some(mesh_peers) = self.mesh.get(topic) {
2722            for peer_id in mesh_peers {
2723                if Some(peer_id) != propagation_source
2724                    && !originating_peers.contains(peer_id)
2725                    && Some(peer_id) != message.source.as_ref()
2726                {
2727                    recipient_peers.insert(*peer_id);
2728                }
2729            }
2730        }
2731
2732        if recipient_peers.is_empty() {
2733            return false;
2734        }
2735
2736        // forward the message to peers
2737        for peer_id in recipient_peers.iter() {
2738            if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2739                if peer.dont_send.contains_key(msg_id) {
2740                    tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2741                    continue;
2742                }
2743
2744                tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2745
2746                self.send_message(
2747                    *peer_id,
2748                    RpcOut::Forward {
2749                        message: message.clone(),
2750                        timeout: Delay::new(self.config.forward_queue_duration()),
2751                    },
2752                );
2753            }
2754        }
2755        tracing::debug!("Completed forwarding message");
2756        true
2757    }
2758
2759    /// Constructs a [`RawMessage`] performing message signing if required.
2760    pub(crate) fn build_raw_message(
2761        &mut self,
2762        topic: TopicHash,
2763        data: Vec<u8>,
2764    ) -> Result<RawMessage, PublishError> {
2765        match &mut self.publish_config {
2766            PublishConfig::Signing {
2767                ref keypair,
2768                author,
2769                inline_key,
2770                last_seq_no,
2771            } => {
2772                let sequence_number = last_seq_no.next();
2773
2774                let signature = {
2775                    let message = proto::Message {
2776                        from: Some(author.to_bytes()),
2777                        data: Some(data.clone()),
2778                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2779                        topic: topic.clone().into_string(),
2780                        signature: None,
2781                        key: None,
2782                    };
2783
2784                    let mut buf = Vec::with_capacity(message.get_size());
2785                    let mut writer = Writer::new(&mut buf);
2786
2787                    message
2788                        .write_message(&mut writer)
2789                        .expect("Encoding to succeed");
2790
2791                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2792                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2793                    signature_bytes.extend_from_slice(&buf);
2794                    Some(keypair.sign(&signature_bytes)?)
2795                };
2796
2797                Ok(RawMessage {
2798                    source: Some(*author),
2799                    data,
2800                    // To be interoperable with the go-implementation this is treated as a 64-bit
2801                    // big-endian uint.
2802                    sequence_number: Some(sequence_number),
2803                    topic,
2804                    signature,
2805                    key: inline_key.clone(),
2806                    validated: true, // all published messages are valid
2807                })
2808            }
2809            PublishConfig::Author(peer_id) => {
2810                Ok(RawMessage {
2811                    source: Some(*peer_id),
2812                    data,
2813                    // To be interoperable with the go-implementation this is treated as a 64-bit
2814                    // big-endian uint.
2815                    sequence_number: Some(rand::random()),
2816                    topic,
2817                    signature: None,
2818                    key: None,
2819                    validated: true, // all published messages are valid
2820                })
2821            }
2822            PublishConfig::RandomAuthor => {
2823                Ok(RawMessage {
2824                    source: Some(PeerId::random()),
2825                    data,
2826                    // To be interoperable with the go-implementation this is treated as a 64-bit
2827                    // big-endian uint.
2828                    sequence_number: Some(rand::random()),
2829                    topic,
2830                    signature: None,
2831                    key: None,
2832                    validated: true, // all published messages are valid
2833                })
2834            }
2835            PublishConfig::Anonymous => {
2836                Ok(RawMessage {
2837                    source: None,
2838                    data,
2839                    // To be interoperable with the go-implementation this is treated as a 64-bit
2840                    // big-endian uint.
2841                    sequence_number: None,
2842                    topic,
2843                    signature: None,
2844                    key: None,
2845                    validated: true, // all published messages are valid
2846                })
2847            }
2848        }
2849    }
2850
2851    /// Send a [`RpcOut`] message to a peer.
2852    ///
2853    /// Returns `true` if sending was successful, `false` otherwise.
2854    /// The method will update the peer score and failed message counter if
2855    /// sending the message failed due to the channel to the connection handler being
2856    /// full (which indicates a slow peer).
2857    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2858        #[cfg(feature = "metrics")]
2859        if let Some(m) = self.metrics.as_mut() {
2860            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2861                // register bytes sent on the internal metrics.
2862                m.msg_sent(&message.topic, message.raw_protobuf_len());
2863            }
2864        }
2865
2866        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2867            tracing::error!(peer = %peer_id,
2868                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2869            return false;
2870        };
2871
2872        if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2873            tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2874            return false;
2875        }
2876
2877        // Try sending the message to the connection handler.
2878        match peer.sender.send_message(rpc) {
2879            Ok(()) => true,
2880            Err(rpc) => {
2881                // Sending failed because the channel is full.
2882                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2883
2884                // Update failed message counter.
2885                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2886                match rpc {
2887                    RpcOut::Publish { .. } => {
2888                        failed_messages.priority += 1;
2889                        failed_messages.publish += 1;
2890                    }
2891                    RpcOut::Forward { .. } => {
2892                        failed_messages.non_priority += 1;
2893                        failed_messages.forward += 1;
2894                    }
2895                    RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2896                        failed_messages.non_priority += 1;
2897                    }
2898                    RpcOut::Graft(_)
2899                    | RpcOut::Prune(_)
2900                    | RpcOut::Subscribe(_)
2901                    | RpcOut::Unsubscribe(_) => {
2902                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2903                    }
2904                }
2905
2906                // Update peer score.
2907                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2908                    peer_score.failed_message_slow_peer(&peer_id);
2909                }
2910
2911                false
2912            }
2913        }
2914    }
2915
2916    fn on_connection_established(
2917        &mut self,
2918        ConnectionEstablished {
2919            peer_id,
2920            endpoint,
2921            other_established,
2922            ..
2923        }: ConnectionEstablished,
2924    ) {
2925        // Add the IP to the peer scoring system
2926        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2927            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2928                peer_score.add_ip(&peer_id, ip);
2929            } else {
2930                tracing::trace!(
2931                    peer=%peer_id,
2932                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2933                    endpoint
2934                )
2935            }
2936        }
2937
2938        if other_established > 0 {
2939            return; // Not our first connection to this peer, hence nothing to do.
2940        }
2941
2942        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2943            peer_score.add_peer(peer_id);
2944        }
2945
2946        // Ignore connections from blacklisted peers.
2947        if self.blacklisted_peers.contains(&peer_id) {
2948            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2949            return;
2950        }
2951
2952        tracing::debug!(peer=%peer_id, "New peer connected");
2953        // We need to send our subscriptions to the newly-connected node.
2954        for topic_hash in self.mesh.clone().into_keys() {
2955            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2956        }
2957    }
2958
2959    fn on_connection_closed(
2960        &mut self,
2961        ConnectionClosed {
2962            peer_id,
2963            connection_id,
2964            endpoint,
2965            remaining_established,
2966            ..
2967        }: ConnectionClosed,
2968    ) {
2969        // Remove IP from peer scoring system
2970        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2971            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2972                peer_score.remove_ip(&peer_id, &ip);
2973            } else {
2974                tracing::trace!(
2975                    peer=%peer_id,
2976                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2977                    endpoint
2978                )
2979            }
2980        }
2981
2982        if remaining_established != 0 {
2983            // Remove the connection from the list
2984            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2985                let index = peer
2986                    .connections
2987                    .iter()
2988                    .position(|v| v == &connection_id)
2989                    .expect("Previously established connection to peer must be present");
2990                peer.connections.remove(index);
2991
2992                // If there are more connections and this peer is in a mesh, inform the first
2993                // connection handler.
2994                if !peer.connections.is_empty() {
2995                    for topic in &peer.topics {
2996                        if let Some(mesh_peers) = self.mesh.get(topic) {
2997                            if mesh_peers.contains(&peer_id) {
2998                                self.events.push_back(ToSwarm::NotifyHandler {
2999                                    peer_id,
3000                                    event: HandlerIn::JoinedMesh,
3001                                    handler: NotifyHandler::One(peer.connections[0]),
3002                                });
3003                                break;
3004                            }
3005                        }
3006                    }
3007                }
3008            }
3009        } else {
3010            // remove from mesh, topic_peers, peer_topic and the fanout
3011            tracing::debug!(peer=%peer_id, "Peer disconnected");
3012            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3013                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3014                return;
3015            };
3016
3017            // remove peer from all mappings
3018            for topic in &connected_peer.topics {
3019                // check the mesh for the topic
3020                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3021                    // check if the peer is in the mesh and remove it
3022                    if mesh_peers.remove(&peer_id) {
3023                        #[cfg(feature = "metrics")]
3024                        if let Some(m) = self.metrics.as_mut() {
3025                            m.peers_removed(topic, Churn::Dc, 1);
3026                            m.set_mesh_peers(topic, mesh_peers.len());
3027                        }
3028                    };
3029                }
3030
3031                #[cfg(feature = "metrics")]
3032                if let Some(m) = self.metrics.as_mut() {
3033                    m.dec_topic_peers(topic);
3034                }
3035
3036                // remove from fanout
3037                self.fanout
3038                    .get_mut(topic)
3039                    .map(|peers| peers.remove(&peer_id));
3040            }
3041
3042            // Forget px and outbound status for this peer
3043            self.px_peers.remove(&peer_id);
3044
3045            // If metrics are enabled, register the disconnection of a peer based on its protocol.
3046            #[cfg(feature = "metrics")]
3047            if let Some(metrics) = self.metrics.as_mut() {
3048                metrics.peer_protocol_disconnected(connected_peer.kind);
3049            }
3050
3051            self.connected_peers.remove(&peer_id);
3052
3053            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3054                peer_score.remove_peer(&peer_id);
3055            }
3056        }
3057    }
3058
3059    fn on_address_change(
3060        &mut self,
3061        AddressChange {
3062            peer_id,
3063            old: endpoint_old,
3064            new: endpoint_new,
3065            ..
3066        }: AddressChange,
3067    ) {
3068        // Exchange IP in peer scoring system
3069        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3070            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3071                peer_score.remove_ip(&peer_id, &ip);
3072            } else {
3073                tracing::trace!(
3074                    peer=%&peer_id,
3075                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3076                    endpoint_old
3077                )
3078            }
3079            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3080                peer_score.add_ip(&peer_id, ip);
3081            } else {
3082                tracing::trace!(
3083                    peer=%peer_id,
3084                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3085                    endpoint_new
3086                )
3087            }
3088        }
3089    }
3090
3091    #[cfg(feature = "metrics")]
3092    /// Register topics to ensure metrics are recorded correctly for these topics.
3093    pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3094        if let Some(metrics) = &mut self.metrics {
3095            metrics.register_allowed_topics(topics);
3096        }
3097    }
3098}
3099
3100fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3101    addr.iter().find_map(|p| match p {
3102        Ip4(addr) => Some(IpAddr::V4(addr)),
3103        Ip6(addr) => Some(IpAddr::V6(addr)),
3104        _ => None,
3105    })
3106}
3107
3108impl<C, F> NetworkBehaviour for Behaviour<C, F>
3109where
3110    C: Send + 'static + DataTransform,
3111    F: Send + 'static + TopicSubscriptionFilter,
3112{
3113    type ConnectionHandler = Handler;
3114    type ToSwarm = Event;
3115
3116    fn handle_established_inbound_connection(
3117        &mut self,
3118        connection_id: ConnectionId,
3119        peer_id: PeerId,
3120        _: &Multiaddr,
3121        _: &Multiaddr,
3122    ) -> Result<THandler<Self>, ConnectionDenied> {
3123        // By default we assume a peer is only a floodsub peer.
3124        //
3125        // The protocol negotiation occurs once a message is sent/received. Once this happens we
3126        // update the type of peer that this is in order to determine which kind of routing should
3127        // occur.
3128        let connected_peer = self
3129            .connected_peers
3130            .entry(peer_id)
3131            .or_insert_with(|| PeerDetails {
3132                kind: PeerKind::Floodsub,
3133                connections: vec![],
3134                outbound: false,
3135                sender: Sender::new(self.config.connection_handler_queue_len()),
3136                topics: Default::default(),
3137                dont_send: LinkedHashMap::new(),
3138            });
3139        // Add the new connection
3140        connected_peer.connections.push(connection_id);
3141
3142        Ok(Handler::new(
3143            self.config.protocol_config(),
3144            connected_peer.sender.new_receiver(),
3145        ))
3146    }
3147
3148    fn handle_established_outbound_connection(
3149        &mut self,
3150        connection_id: ConnectionId,
3151        peer_id: PeerId,
3152        _: &Multiaddr,
3153        _: Endpoint,
3154        _: PortUse,
3155    ) -> Result<THandler<Self>, ConnectionDenied> {
3156        let connected_peer = self
3157            .connected_peers
3158            .entry(peer_id)
3159            .or_insert_with(|| PeerDetails {
3160                kind: PeerKind::Floodsub,
3161                connections: vec![],
3162                // Diverging from the go implementation we only want to consider a peer as outbound
3163                // peer if its first connection is outbound.
3164                outbound: !self.px_peers.contains(&peer_id),
3165                sender: Sender::new(self.config.connection_handler_queue_len()),
3166                topics: Default::default(),
3167                dont_send: LinkedHashMap::new(),
3168            });
3169        // Add the new connection
3170        connected_peer.connections.push(connection_id);
3171
3172        Ok(Handler::new(
3173            self.config.protocol_config(),
3174            connected_peer.sender.new_receiver(),
3175        ))
3176    }
3177
3178    fn on_connection_handler_event(
3179        &mut self,
3180        propagation_source: PeerId,
3181        _connection_id: ConnectionId,
3182        handler_event: THandlerOutEvent<Self>,
3183    ) {
3184        match handler_event {
3185            HandlerEvent::PeerKind(kind) => {
3186                // We have identified the protocol this peer is using
3187
3188                #[cfg(feature = "metrics")]
3189                if let Some(metrics) = self.metrics.as_mut() {
3190                    metrics.peer_protocol_connected(kind);
3191                }
3192
3193                if let PeerKind::NotSupported = kind {
3194                    tracing::debug!(
3195                        peer=%propagation_source,
3196                        "Peer does not support gossipsub protocols"
3197                    );
3198                    self.events
3199                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3200                            peer_id: propagation_source,
3201                        }));
3202                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3203                    // Only change the value if the old value is Floodsub (the default set in
3204                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3205                    // All other PeerKind changes are ignored.
3206                    tracing::debug!(
3207                        peer=%propagation_source,
3208                        peer_type=%kind,
3209                        "New peer type found for peer"
3210                    );
3211                    if let PeerKind::Floodsub = conn.kind {
3212                        conn.kind = kind;
3213                    }
3214                }
3215            }
3216            HandlerEvent::MessageDropped(rpc) => {
3217                // Account for this in the scoring logic
3218                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3219                    peer_score.failed_message_slow_peer(&propagation_source);
3220                }
3221
3222                // Keep track of expired messages for the application layer.
3223                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3224                failed_messages.timeout += 1;
3225                match rpc {
3226                    RpcOut::Publish { .. } => {
3227                        failed_messages.publish += 1;
3228                    }
3229                    RpcOut::Forward { .. } => {
3230                        failed_messages.forward += 1;
3231                    }
3232                    _ => {}
3233                }
3234
3235                // Record metrics on the failure.
3236                #[cfg(feature = "metrics")]
3237                if let Some(metrics) = self.metrics.as_mut() {
3238                    match rpc {
3239                        RpcOut::Publish { message, .. } => {
3240                            metrics.publish_msg_dropped(&message.topic);
3241                            metrics.timeout_msg_dropped(&message.topic);
3242                        }
3243                        RpcOut::Forward { message, .. } => {
3244                            metrics.forward_msg_dropped(&message.topic);
3245                            metrics.timeout_msg_dropped(&message.topic);
3246                        }
3247                        _ => {}
3248                    }
3249                }
3250            }
3251            HandlerEvent::Message {
3252                rpc,
3253                invalid_messages,
3254            } => {
3255                // Handle the gossipsub RPC
3256
3257                // Handle subscriptions
3258                // Update connected peers topics
3259                if !rpc.subscriptions.is_empty() {
3260                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3261                }
3262
3263                // Check if peer is graylisted in which case we ignore the event
3264                if let (true, _) = self
3265                    .peer_score
3266                    .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3267                {
3268                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3269                    return;
3270                }
3271
3272                // Handle any invalid messages from this peer
3273                if let PeerScoreState::Active(_) = self.peer_score {
3274                    for (raw_message, validation_error) in invalid_messages {
3275                        self.handle_invalid_message(
3276                            &propagation_source,
3277                            &raw_message.topic,
3278                            None,
3279                            RejectReason::ValidationError(validation_error),
3280                        )
3281                    }
3282                } else {
3283                    // log the invalid messages
3284                    for (message, validation_error) in invalid_messages {
3285                        tracing::warn!(
3286                            peer=%propagation_source,
3287                            source=?message.source,
3288                            "Invalid message from peer. Reason: {:?}",
3289                            validation_error,
3290                        );
3291                    }
3292                }
3293
3294                // Handle messages
3295                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3296                    // Only process the amount of messages the configuration allows.
3297                    if self
3298                        .config
3299                        .max_messages_per_rpc()
3300                        .is_some_and(|max_msg| count >= max_msg)
3301                    {
3302                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3303                        break;
3304                    }
3305                    self.handle_received_message(raw_message, &propagation_source);
3306                }
3307
3308                // Handle control messages
3309                // group some control messages, this minimises SendEvents (code is simplified to
3310                // handle each event at a time however)
3311                let mut ihave_msgs = vec![];
3312                let mut graft_msgs = vec![];
3313                let mut prune_msgs = vec![];
3314                for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3315                    // Only process the amount of messages the configuration allows.
3316                    if self
3317                        .config
3318                        .max_messages_per_rpc()
3319                        .is_some_and(|max_msg| count >= max_msg)
3320                    {
3321                        tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3322                        break;
3323                    }
3324
3325                    match control_msg {
3326                        ControlAction::IHave(IHave {
3327                            topic_hash,
3328                            message_ids,
3329                        }) => {
3330                            ihave_msgs.push((topic_hash, message_ids));
3331                        }
3332                        ControlAction::IWant(IWant { message_ids }) => {
3333                            self.handle_iwant(&propagation_source, message_ids)
3334                        }
3335                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3336                        ControlAction::Prune(Prune {
3337                            topic_hash,
3338                            peers,
3339                            backoff,
3340                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3341                        ControlAction::IDontWant(IDontWant { message_ids }) => {
3342                            let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3343                            else {
3344                                tracing::error!(peer = %propagation_source,
3345                                    "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3346                                continue;
3347                            };
3348                            #[cfg(feature = "metrics")]
3349                            if let Some(metrics) = self.metrics.as_mut() {
3350                                metrics.register_idontwant(message_ids.len());
3351                            }
3352                            for message_id in message_ids {
3353                                peer.dont_send.insert(message_id, Instant::now());
3354                                // Don't exceed capacity.
3355                                if peer.dont_send.len() > IDONTWANT_CAP {
3356                                    peer.dont_send.pop_front();
3357                                }
3358                            }
3359                        }
3360                    }
3361                }
3362                if !ihave_msgs.is_empty() {
3363                    self.handle_ihave(&propagation_source, ihave_msgs);
3364                }
3365                if !graft_msgs.is_empty() {
3366                    self.handle_graft(&propagation_source, graft_msgs);
3367                }
3368                if !prune_msgs.is_empty() {
3369                    self.handle_prune(&propagation_source, prune_msgs);
3370                }
3371            }
3372        }
3373    }
3374
3375    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3376    fn poll(
3377        &mut self,
3378        cx: &mut Context<'_>,
3379    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3380        if let Some(event) = self.events.pop_front() {
3381            return Poll::Ready(event);
3382        }
3383
3384        // update scores
3385        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3386            if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3387                peer_score.refresh_scores();
3388                peer_score
3389                    .decay_interval
3390                    .reset(peer_score.params.decay_interval);
3391            }
3392        }
3393
3394        if self.heartbeat.poll_unpin(cx).is_ready() {
3395            self.heartbeat();
3396            self.heartbeat.reset(self.config.heartbeat_interval());
3397        }
3398
3399        Poll::Pending
3400    }
3401
3402    fn on_swarm_event(&mut self, event: FromSwarm) {
3403        match event {
3404            FromSwarm::ConnectionEstablished(connection_established) => {
3405                self.on_connection_established(connection_established)
3406            }
3407            FromSwarm::ConnectionClosed(connection_closed) => {
3408                self.on_connection_closed(connection_closed)
3409            }
3410            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3411            _ => {}
3412        }
3413    }
3414}
3415
3416/// This is called when peers are added to any mesh. It checks if the peer existed
3417/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3418/// the appropriate connection handler to maintain a connection.
3419fn peer_added_to_mesh(
3420    peer_id: PeerId,
3421    new_topics: Vec<&TopicHash>,
3422    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3423    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3424    connections: &HashMap<PeerId, PeerDetails>,
3425) {
3426    // Ensure there is an active connection
3427    let connection_id = match connections.get(&peer_id) {
3428        Some(p) => p
3429            .connections
3430            .first()
3431            .expect("There should be at least one connection to a peer."),
3432        None => {
3433            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3434            return;
3435        }
3436    };
3437
3438    if let Some(peer) = connections.get(&peer_id) {
3439        for topic in &peer.topics {
3440            if !new_topics.contains(&topic) {
3441                if let Some(mesh_peers) = mesh.get(topic) {
3442                    if mesh_peers.contains(&peer_id) {
3443                        // the peer is already in a mesh for another topic
3444                        return;
3445                    }
3446                }
3447            }
3448        }
3449    }
3450    // This is the first mesh the peer has joined, inform the handler
3451    events.push_back(ToSwarm::NotifyHandler {
3452        peer_id,
3453        event: HandlerIn::JoinedMesh,
3454        handler: NotifyHandler::One(*connection_id),
3455    });
3456}
3457
3458/// This is called when peers are removed from a mesh. It checks if the peer exists
3459/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3460/// notify the handler to no longer maintain a connection.
3461fn peer_removed_from_mesh(
3462    peer_id: PeerId,
3463    old_topic: &TopicHash,
3464    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3465    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3466    connections: &HashMap<PeerId, PeerDetails>,
3467) {
3468    // Ensure there is an active connection
3469    let connection_id = match connections.get(&peer_id) {
3470        Some(p) => p
3471            .connections
3472            .first()
3473            .expect("There should be at least one connection to a peer."),
3474        None => {
3475            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3476            return;
3477        }
3478    };
3479
3480    if let Some(peer) = connections.get(&peer_id) {
3481        for topic in &peer.topics {
3482            if topic != old_topic {
3483                if let Some(mesh_peers) = mesh.get(topic) {
3484                    if mesh_peers.contains(&peer_id) {
3485                        // the peer exists in another mesh still
3486                        return;
3487                    }
3488                }
3489            }
3490        }
3491    }
3492    // The peer is not in any other mesh, inform the handler
3493    events.push_back(ToSwarm::NotifyHandler {
3494        peer_id,
3495        event: HandlerIn::LeftMesh,
3496        handler: NotifyHandler::One(*connection_id),
3497    });
3498}
3499
3500/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3501/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3502/// that gets as input the number of filtered peers.
3503fn get_random_peers_dynamic(
3504    connected_peers: &HashMap<PeerId, PeerDetails>,
3505    topic_hash: &TopicHash,
3506    // maps the number of total peers to the number of selected peers
3507    n_map: impl Fn(usize) -> usize,
3508    mut f: impl FnMut(&PeerId) -> bool,
3509) -> BTreeSet<PeerId> {
3510    let mut gossip_peers = connected_peers
3511        .iter()
3512        .filter(|(_, p)| p.topics.contains(topic_hash))
3513        .filter(|(peer_id, _)| f(peer_id))
3514        .filter(|(_, p)| p.kind.is_gossipsub())
3515        .map(|(peer_id, _)| *peer_id)
3516        .collect::<Vec<PeerId>>();
3517
3518    // if we have less than needed, return them
3519    let n = n_map(gossip_peers.len());
3520    if gossip_peers.len() <= n {
3521        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3522        return gossip_peers.into_iter().collect();
3523    }
3524
3525    // we have more peers than needed, shuffle them and return n of them
3526    let mut rng = thread_rng();
3527    gossip_peers.partial_shuffle(&mut rng, n);
3528
3529    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3530
3531    gossip_peers.into_iter().take(n).collect()
3532}
3533
3534/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3535/// filtered by the function `f`.
3536fn get_random_peers(
3537    connected_peers: &HashMap<PeerId, PeerDetails>,
3538    topic_hash: &TopicHash,
3539    n: usize,
3540    f: impl FnMut(&PeerId) -> bool,
3541) -> BTreeSet<PeerId> {
3542    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3543}
3544
3545/// Validates the combination of signing, privacy and message validation to ensure the
3546/// configuration will not reject published messages.
3547fn validate_config(
3548    authenticity: &MessageAuthenticity,
3549    validation_mode: &ValidationMode,
3550) -> Result<(), &'static str> {
3551    match validation_mode {
3552        ValidationMode::Anonymous => {
3553            if authenticity.is_signing() {
3554                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3555            }
3556
3557            if !authenticity.is_anonymous() {
3558                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3559            }
3560        }
3561        ValidationMode::Strict => {
3562            if !authenticity.is_signing() {
3563                return Err(
3564                    "Messages will be
3565                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3566                the validation or privacy settings in the config"
3567                );
3568            }
3569        }
3570        _ => {}
3571    }
3572    Ok(())
3573}
3574
3575impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3577        f.debug_struct("Behaviour")
3578            .field("config", &self.config)
3579            .field("events", &self.events.len())
3580            .field("publish_config", &self.publish_config)
3581            .field("mesh", &self.mesh)
3582            .field("fanout", &self.fanout)
3583            .field("fanout_last_pub", &self.fanout_last_pub)
3584            .field("mcache", &self.mcache)
3585            .field("heartbeat", &self.heartbeat)
3586            .finish()
3587    }
3588}
3589
3590impl fmt::Debug for PublishConfig {
3591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3592        match self {
3593            PublishConfig::Signing { author, .. } => {
3594                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3595            }
3596            PublishConfig::Author(author) => {
3597                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3598            }
3599            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3600            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3601        }
3602    }
3603}