libp2p_floodsub/
layer.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{
23        hash_map::{DefaultHasher, HashMap},
24        VecDeque,
25    },
26    iter,
27    task::{Context, Poll},
28};
29
30use bytes::Bytes;
31use cuckoofilter::{CuckooError, CuckooFilter};
32use fnv::FnvHashSet;
33use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use libp2p_swarm::{
36    behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
37    dial_opts::DialOpts,
38    CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
39    OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use smallvec::SmallVec;
42
43use crate::{
44    protocol::{
45        FloodsubMessage, FloodsubProtocol, FloodsubRpc, FloodsubSubscription,
46        FloodsubSubscriptionAction,
47    },
48    topic::Topic,
49    Config,
50};
51
52#[deprecated = "Use `Behaviour` instead."]
53pub type Floodsub = Behaviour;
54
55/// Network behaviour that handles the floodsub protocol.
56pub struct Behaviour {
57    /// Events that need to be yielded to the outside when polling.
58    events: VecDeque<ToSwarm<Event, FloodsubRpc>>,
59
60    config: Config,
61
62    /// List of peers to send messages to.
63    target_peers: FnvHashSet<PeerId>,
64
65    /// List of peers the network is connected to, and the topics that they're subscribed to.
66    // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
67    //       opened substreams
68    connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
69
70    // List of topics we're subscribed to. Necessary to filter out messages that we receive
71    // erroneously.
72    subscribed_topics: SmallVec<[Topic; 16]>,
73
74    // We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
75    // we don't dispatch the same message twice if we receive it twice on the network.
76    received: CuckooFilter<DefaultHasher>,
77}
78
79impl Behaviour {
80    /// Creates a `Floodsub` with default configuration.
81    pub fn new(local_peer_id: PeerId) -> Self {
82        Self::from_config(Config::new(local_peer_id))
83    }
84
85    /// Creates a `Floodsub` with the given configuration.
86    pub fn from_config(config: Config) -> Self {
87        Behaviour {
88            events: VecDeque::new(),
89            config,
90            target_peers: FnvHashSet::default(),
91            connected_peers: HashMap::new(),
92            subscribed_topics: SmallVec::new(),
93            received: CuckooFilter::new(),
94        }
95    }
96
97    /// Add a node to the list of nodes to propagate messages to.
98    #[inline]
99    pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
100        // Send our topics to this node if we're already connected to it.
101        if self.connected_peers.contains_key(&peer_id) {
102            for topic in self.subscribed_topics.iter().cloned() {
103                self.events.push_back(ToSwarm::NotifyHandler {
104                    peer_id,
105                    handler: NotifyHandler::Any,
106                    event: FloodsubRpc {
107                        messages: Vec::new(),
108                        subscriptions: vec![FloodsubSubscription {
109                            topic,
110                            action: FloodsubSubscriptionAction::Subscribe,
111                        }],
112                    },
113                });
114            }
115        }
116
117        if self.target_peers.insert(peer_id) {
118            self.events.push_back(ToSwarm::Dial {
119                opts: DialOpts::peer_id(peer_id).build(),
120            });
121        }
122    }
123
124    /// Remove a node from the list of nodes to propagate messages to.
125    #[inline]
126    pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
127        self.target_peers.remove(peer_id);
128    }
129
130    /// Subscribes to a topic.
131    ///
132    /// Returns true if the subscription worked. Returns false if we were already subscribed.
133    pub fn subscribe(&mut self, topic: Topic) -> bool {
134        if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
135            return false;
136        }
137
138        for peer in self.connected_peers.keys() {
139            self.events.push_back(ToSwarm::NotifyHandler {
140                peer_id: *peer,
141                handler: NotifyHandler::Any,
142                event: FloodsubRpc {
143                    messages: Vec::new(),
144                    subscriptions: vec![FloodsubSubscription {
145                        topic: topic.clone(),
146                        action: FloodsubSubscriptionAction::Subscribe,
147                    }],
148                },
149            });
150        }
151
152        self.subscribed_topics.push(topic);
153        true
154    }
155
156    /// Unsubscribes from a topic.
157    ///
158    /// Note that this only requires the topic name.
159    ///
160    /// Returns true if we were subscribed to this topic.
161    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
162        let Some(pos) = self.subscribed_topics.iter().position(|t| *t == topic) else {
163            return false;
164        };
165
166        self.subscribed_topics.remove(pos);
167
168        for peer in self.connected_peers.keys() {
169            self.events.push_back(ToSwarm::NotifyHandler {
170                peer_id: *peer,
171                handler: NotifyHandler::Any,
172                event: FloodsubRpc {
173                    messages: Vec::new(),
174                    subscriptions: vec![FloodsubSubscription {
175                        topic: topic.clone(),
176                        action: FloodsubSubscriptionAction::Unsubscribe,
177                    }],
178                },
179            });
180        }
181
182        true
183    }
184
185    /// Publishes a message to the network, if we're subscribed to the topic only.
186    pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
187        self.publish_many(iter::once(topic), data)
188    }
189
190    /// Publishes a message to the network, even if we're not subscribed to the topic.
191    pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
192        self.publish_many_any(iter::once(topic), data)
193    }
194
195    /// Publishes a message with multiple topics to the network.
196    ///
197    ///
198    /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
199    pub fn publish_many(
200        &mut self,
201        topic: impl IntoIterator<Item = impl Into<Topic>>,
202        data: impl Into<Bytes>,
203    ) {
204        self.publish_many_inner(topic, data, true)
205    }
206
207    /// Publishes a message with multiple topics to the network, even if we're not subscribed to any
208    /// of the topics.
209    pub fn publish_many_any(
210        &mut self,
211        topic: impl IntoIterator<Item = impl Into<Topic>>,
212        data: impl Into<Bytes>,
213    ) {
214        self.publish_many_inner(topic, data, false)
215    }
216
217    fn publish_many_inner(
218        &mut self,
219        topic: impl IntoIterator<Item = impl Into<Topic>>,
220        data: impl Into<Bytes>,
221        check_self_subscriptions: bool,
222    ) {
223        let message = FloodsubMessage {
224            source: self.config.local_peer_id,
225            data: data.into(),
226            // If the sequence numbers are predictable, then an attacker could flood the network
227            // with packets with the predetermined sequence numbers and absorb our legitimate
228            // messages. We therefore use a random number.
229            sequence_number: rand::random::<[u8; 20]>().to_vec(),
230            topics: topic.into_iter().map(Into::into).collect(),
231        };
232
233        let self_subscribed = self
234            .subscribed_topics
235            .iter()
236            .any(|t| message.topics.iter().any(|u| t == u));
237        if self_subscribed {
238            if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
239                tracing::warn!(
240                    "Message was added to 'received' Cuckoofilter but some \
241                     other message was removed as a consequence: {}",
242                    e,
243                );
244            }
245            if self.config.subscribe_local_messages {
246                self.events
247                    .push_back(ToSwarm::GenerateEvent(Event::Message(message.clone())));
248            }
249        }
250        // Don't publish the message if we have to check subscriptions
251        // and we're not subscribed ourselves to any of the topics.
252        if check_self_subscriptions && !self_subscribed {
253            return;
254        }
255
256        // Send to peers we know are subscribed to the topic.
257        for (peer_id, sub_topic) in self.connected_peers.iter() {
258            // Peer must be in a communication list.
259            if !self.target_peers.contains(peer_id) {
260                continue;
261            }
262
263            // Peer must be subscribed for the topic.
264            if !sub_topic
265                .iter()
266                .any(|t| message.topics.iter().any(|u| t == u))
267            {
268                continue;
269            }
270
271            self.events.push_back(ToSwarm::NotifyHandler {
272                peer_id: *peer_id,
273                handler: NotifyHandler::Any,
274                event: FloodsubRpc {
275                    subscriptions: Vec::new(),
276                    messages: vec![message.clone()],
277                },
278            });
279        }
280    }
281
282    fn on_connection_established(
283        &mut self,
284        ConnectionEstablished {
285            peer_id,
286            other_established,
287            ..
288        }: ConnectionEstablished,
289    ) {
290        if other_established > 0 {
291            // We only care about the first time a peer connects.
292            return;
293        }
294
295        // We need to send our subscriptions to the newly-connected node.
296        if self.target_peers.contains(&peer_id) {
297            for topic in self.subscribed_topics.iter().cloned() {
298                self.events.push_back(ToSwarm::NotifyHandler {
299                    peer_id,
300                    handler: NotifyHandler::Any,
301                    event: FloodsubRpc {
302                        messages: Vec::new(),
303                        subscriptions: vec![FloodsubSubscription {
304                            topic,
305                            action: FloodsubSubscriptionAction::Subscribe,
306                        }],
307                    },
308                });
309            }
310        }
311
312        self.connected_peers.insert(peer_id, SmallVec::new());
313    }
314
315    fn on_connection_closed(
316        &mut self,
317        ConnectionClosed {
318            peer_id,
319            remaining_established,
320            ..
321        }: ConnectionClosed,
322    ) {
323        if remaining_established > 0 {
324            // we only care about peer disconnections
325            return;
326        }
327
328        let was_in = self.connected_peers.remove(&peer_id);
329        debug_assert!(was_in.is_some());
330
331        // We can be disconnected by the remote in case of inactivity for example, so we always
332        // try to reconnect.
333        if self.target_peers.contains(&peer_id) {
334            self.events.push_back(ToSwarm::Dial {
335                opts: DialOpts::peer_id(peer_id).build(),
336            });
337        }
338    }
339}
340
341impl NetworkBehaviour for Behaviour {
342    type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
343    type ToSwarm = Event;
344
345    fn handle_established_inbound_connection(
346        &mut self,
347        _: ConnectionId,
348        _: PeerId,
349        _: &Multiaddr,
350        _: &Multiaddr,
351    ) -> Result<THandler<Self>, ConnectionDenied> {
352        Ok(Default::default())
353    }
354
355    fn handle_established_outbound_connection(
356        &mut self,
357        _: ConnectionId,
358        _: PeerId,
359        _: &Multiaddr,
360        _: Endpoint,
361        _: PortUse,
362    ) -> Result<THandler<Self>, ConnectionDenied> {
363        Ok(Default::default())
364    }
365
366    fn on_connection_handler_event(
367        &mut self,
368        propagation_source: PeerId,
369        connection_id: ConnectionId,
370        event: THandlerOutEvent<Self>,
371    ) {
372        // We ignore successful sends or timeouts.
373        let event = match event {
374            Ok(InnerMessage::Rx(event)) => event,
375            Ok(InnerMessage::Sent) => return,
376            Err(e) => {
377                tracing::debug!("Failed to send floodsub message: {e}");
378                self.events.push_back(ToSwarm::CloseConnection {
379                    peer_id: propagation_source,
380                    connection: CloseConnection::One(connection_id),
381                });
382                return;
383            }
384        };
385
386        // Update connected peers topics
387        for subscription in event.subscriptions {
388            let remote_peer_topics = self.connected_peers
389                .get_mut(&propagation_source)
390                .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
391            match subscription.action {
392                FloodsubSubscriptionAction::Subscribe => {
393                    if !remote_peer_topics.contains(&subscription.topic) {
394                        remote_peer_topics.push(subscription.topic.clone());
395                    }
396                    self.events
397                        .push_back(ToSwarm::GenerateEvent(Event::Subscribed {
398                            peer_id: propagation_source,
399                            topic: subscription.topic,
400                        }));
401                }
402                FloodsubSubscriptionAction::Unsubscribe => {
403                    if let Some(pos) = remote_peer_topics
404                        .iter()
405                        .position(|t| t == &subscription.topic)
406                    {
407                        remote_peer_topics.remove(pos);
408                    }
409                    self.events
410                        .push_back(ToSwarm::GenerateEvent(Event::Unsubscribed {
411                            peer_id: propagation_source,
412                            topic: subscription.topic,
413                        }));
414                }
415            }
416        }
417
418        // List of messages we're going to propagate on the network.
419        let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
420
421        for message in event.messages {
422            // Use `self.received` to skip the messages that we have already received in the past.
423            // Note that this can result in false positives.
424            match self.received.test_and_add(&message) {
425                Ok(true) => {}         // Message  was added.
426                Ok(false) => continue, // Message already existed.
427                Err(e @ CuckooError::NotEnoughSpace) => {
428                    // Message added, but some other removed.
429                    tracing::warn!(
430                        "Message was added to 'received' Cuckoofilter but some \
431                         other message was removed as a consequence: {}",
432                        e,
433                    );
434                }
435            }
436
437            // Add the message to be dispatched to the user.
438            if self
439                .subscribed_topics
440                .iter()
441                .any(|t| message.topics.iter().any(|u| t == u))
442            {
443                let event = Event::Message(message.clone());
444                self.events.push_back(ToSwarm::GenerateEvent(event));
445            }
446
447            // Propagate the message to everyone else who is subscribed to any of the topics.
448            for (peer_id, subscr_topics) in self.connected_peers.iter() {
449                if peer_id == &propagation_source {
450                    continue;
451                }
452
453                // Peer must be in a communication list.
454                if !self.target_peers.contains(peer_id) {
455                    continue;
456                }
457
458                // Peer must be subscribed for the topic.
459                if !subscr_topics
460                    .iter()
461                    .any(|t| message.topics.iter().any(|u| t == u))
462                {
463                    continue;
464                }
465
466                if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
467                    rpcs_to_dispatch[pos].1.messages.push(message.clone());
468                } else {
469                    rpcs_to_dispatch.push((
470                        *peer_id,
471                        FloodsubRpc {
472                            subscriptions: Vec::new(),
473                            messages: vec![message.clone()],
474                        },
475                    ));
476                }
477            }
478        }
479
480        for (peer_id, rpc) in rpcs_to_dispatch {
481            self.events.push_back(ToSwarm::NotifyHandler {
482                peer_id,
483                handler: NotifyHandler::Any,
484                event: rpc,
485            });
486        }
487    }
488
489    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
490    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
491        if let Some(event) = self.events.pop_front() {
492            return Poll::Ready(event);
493        }
494
495        Poll::Pending
496    }
497
498    fn on_swarm_event(&mut self, event: FromSwarm) {
499        match event {
500            FromSwarm::ConnectionEstablished(connection_established) => {
501                self.on_connection_established(connection_established)
502            }
503            FromSwarm::ConnectionClosed(connection_closed) => {
504                self.on_connection_closed(connection_closed)
505            }
506            _ => {}
507        }
508    }
509}
510
511/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
512#[derive(Debug)]
513pub enum InnerMessage {
514    /// We received an RPC from a remote.
515    Rx(FloodsubRpc),
516    /// We successfully sent an RPC request.
517    Sent,
518}
519
520impl From<FloodsubRpc> for InnerMessage {
521    #[inline]
522    fn from(rpc: FloodsubRpc) -> InnerMessage {
523        InnerMessage::Rx(rpc)
524    }
525}
526
527impl From<()> for InnerMessage {
528    #[inline]
529    fn from(_: ()) -> InnerMessage {
530        InnerMessage::Sent
531    }
532}
533
534#[deprecated = "Use `Event` instead."]
535pub type FloodsubEvent = Event;
536
537/// Event that can happen on the floodsub behaviour.
538#[derive(Debug)]
539pub enum Event {
540    /// A message has been received.
541    Message(FloodsubMessage),
542
543    /// A remote subscribed to a topic.
544    Subscribed {
545        /// Remote that has subscribed.
546        peer_id: PeerId,
547        /// The topic it has subscribed to.
548        topic: Topic,
549    },
550
551    /// A remote unsubscribed from a topic.
552    Unsubscribed {
553        /// Remote that has unsubscribed.
554        peer_id: PeerId,
555        /// The topic it has subscribed from.
556        topic: Topic,
557    },
558}