libp2p_gossipsub/
types.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A collection of types using the Gossipsub system.
22use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use quick_protobuf::MessageWrite;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use web_time::Instant;
32
33use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
34
35/// Messages that have expired while attempting to be sent to a peer.
36#[derive(Clone, Debug, Default)]
37pub struct FailedMessages {
38    /// The number of publish messages that failed to be published in a heartbeat.
39    pub publish: usize,
40    /// The number of forward messages that failed to be published in a heartbeat.
41    pub forward: usize,
42    /// The number of messages that were failed to be sent to the priority queue as it was full.
43    pub priority: usize,
44    /// The number of messages that were failed to be sent to the non-priority queue as it was
45    /// full.
46    pub non_priority: usize,
47    /// The number of messages that timed out and could not be sent.
48    pub timeout: usize,
49}
50
51impl FailedMessages {
52    /// The total number of messages that failed due to the queue being full.
53    pub fn total_queue_full(&self) -> usize {
54        self.priority + self.non_priority
55    }
56
57    /// The total failed messages in a heartbeat.
58    pub fn total(&self) -> usize {
59        self.priority + self.non_priority
60    }
61}
62
63#[derive(Debug)]
64/// Validation kinds from the application for received messages.
65pub enum MessageAcceptance {
66    /// The message is considered valid, and it should be delivered and forwarded to the network.
67    Accept,
68    /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty.
69    Reject,
70    /// The message is neither delivered nor forwarded to the network, but the router does not
71    /// trigger the P₄ penalty.
72    Ignore,
73}
74
75#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
76#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct MessageId(pub Vec<u8>);
78
79impl MessageId {
80    pub fn new(value: &[u8]) -> Self {
81        Self(value.to_vec())
82    }
83}
84
85impl<T: Into<Vec<u8>>> From<T> for MessageId {
86    fn from(value: T) -> Self {
87        Self(value.into())
88    }
89}
90
91impl std::fmt::Display for MessageId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "{}", hex_fmt::HexFmt(&self.0))
94    }
95}
96
97impl std::fmt::Debug for MessageId {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
100    }
101}
102
103#[derive(Debug)]
104/// Connected peer details.
105pub(crate) struct PeerDetails {
106    /// The kind of protocol the peer supports.
107    pub(crate) kind: PeerKind,
108    /// If the peer is an outbound connection.
109    pub(crate) outbound: bool,
110    /// Its current connections.
111    pub(crate) connections: Vec<ConnectionId>,
112    /// Subscribed topics.
113    pub(crate) topics: BTreeSet<TopicHash>,
114    /// The rpc sender to the connection handler(s).
115    pub(crate) sender: Sender,
116    /// Don't send messages.
117    pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
118}
119
120/// Describes the types of peers that can exist in the gossipsub context.
121#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
122#[cfg_attr(
123    feature = "metrics",
124    derive(prometheus_client::encoding::EncodeLabelValue)
125)]
126pub enum PeerKind {
127    /// A gossipsub 1.2 peer.
128    Gossipsubv1_2,
129    /// A gossipsub 1.1 peer.
130    Gossipsubv1_1,
131    /// A gossipsub 1.0 peer.
132    Gossipsub,
133    /// A floodsub peer.
134    Floodsub,
135    /// The peer doesn't support any of the protocols.
136    NotSupported,
137}
138
139/// A message received by the gossipsub system and stored locally in caches..
140#[derive(Clone, PartialEq, Eq, Hash, Debug)]
141pub struct RawMessage {
142    /// Id of the peer that published this message.
143    pub source: Option<PeerId>,
144
145    /// Content of the message. Its meaning is out of scope of this library.
146    pub data: Vec<u8>,
147
148    /// A random sequence number.
149    pub sequence_number: Option<u64>,
150
151    /// The topic this message belongs to
152    pub topic: TopicHash,
153
154    /// The signature of the message if it's signed.
155    pub signature: Option<Vec<u8>>,
156
157    /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined.
158    pub key: Option<Vec<u8>>,
159
160    /// Flag indicating if this message has been validated by the application or not.
161    pub validated: bool,
162}
163
164impl PeerKind {
165    /// Returns true if peer speaks any gossipsub version.
166    pub(crate) fn is_gossipsub(&self) -> bool {
167        matches!(
168            self,
169            Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
170        )
171    }
172}
173
174impl RawMessage {
175    /// Calculates the encoded length of this message (used for calculating metrics).
176    pub fn raw_protobuf_len(&self) -> usize {
177        let message = proto::Message {
178            from: self.source.map(|m| m.to_bytes()),
179            data: Some(self.data.clone()),
180            seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
181            topic: TopicHash::into_string(self.topic.clone()),
182            signature: self.signature.clone(),
183            key: self.key.clone(),
184        };
185        message.get_size()
186    }
187}
188
189impl From<RawMessage> for proto::Message {
190    fn from(raw: RawMessage) -> Self {
191        proto::Message {
192            from: raw.source.map(|m| m.to_bytes()),
193            data: Some(raw.data),
194            seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
195            topic: TopicHash::into_string(raw.topic),
196            signature: raw.signature,
197            key: raw.key,
198        }
199    }
200}
201
202/// The message sent to the user after a [`RawMessage`] has been transformed by a
203/// [`crate::DataTransform`].
204#[derive(Clone, PartialEq, Eq, Hash)]
205pub struct Message {
206    /// Id of the peer that published this message.
207    pub source: Option<PeerId>,
208
209    /// Content of the message.
210    pub data: Vec<u8>,
211
212    /// A random sequence number.
213    pub sequence_number: Option<u64>,
214
215    /// The topic this message belongs to
216    pub topic: TopicHash,
217}
218
219impl fmt::Debug for Message {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("Message")
222            .field(
223                "data",
224                &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
225            )
226            .field("source", &self.source)
227            .field("sequence_number", &self.sequence_number)
228            .field("topic", &self.topic)
229            .finish()
230    }
231}
232
233/// A subscription received by the gossipsub system.
234#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235pub struct Subscription {
236    /// Action to perform.
237    pub action: SubscriptionAction,
238    /// The topic from which to subscribe or unsubscribe.
239    pub topic_hash: TopicHash,
240}
241
242/// Action that a subscription wants to perform.
243#[derive(Debug, Clone, PartialEq, Eq, Hash)]
244pub enum SubscriptionAction {
245    /// The remote wants to subscribe to the given topic.
246    Subscribe,
247    /// The remote wants to unsubscribe from the given topic.
248    Unsubscribe,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
252pub(crate) struct PeerInfo {
253    pub(crate) peer_id: Option<PeerId>,
254    // TODO add this when RFC: Signed Address Records got added to the spec (see pull request
255    // https://github.com/libp2p/specs/pull/217)
256    // pub signed_peer_record: ?,
257}
258
259/// A Control message received by the gossipsub system.
260#[derive(Debug, Clone, PartialEq, Eq, Hash)]
261pub enum ControlAction {
262    /// Node broadcasts known messages per topic - IHave control message.
263    IHave(IHave),
264    /// The node requests specific message ids (peer_id + sequence _number) - IWant control
265    /// message.
266    IWant(IWant),
267    /// The node has been added to the mesh - Graft control message.
268    Graft(Graft),
269    /// The node has been removed from the mesh - Prune control message.
270    Prune(Prune),
271    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
272    /// control message.
273    IDontWant(IDontWant),
274}
275
276/// Node broadcasts known messages per topic - IHave control message.
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct IHave {
279    /// The topic of the messages.
280    pub(crate) topic_hash: TopicHash,
281    /// A list of known message ids (peer_id + sequence _number) as a string.
282    pub(crate) message_ids: Vec<MessageId>,
283}
284
285/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
287pub struct IWant {
288    /// A list of known message ids (peer_id + sequence _number) as a string.
289    pub(crate) message_ids: Vec<MessageId>,
290}
291
292/// The node has been added to the mesh - Graft control message.
293#[derive(Debug, Clone, PartialEq, Eq, Hash)]
294pub struct Graft {
295    /// The mesh topic the peer should be added to.
296    pub(crate) topic_hash: TopicHash,
297}
298
299/// The node has been removed from the mesh - Prune control message.
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub struct Prune {
302    /// The mesh topic the peer should be removed from.
303    pub(crate) topic_hash: TopicHash,
304    /// A list of peers to be proposed to the removed peer as peer exchange
305    pub(crate) peers: Vec<PeerInfo>,
306    /// The backoff time in seconds before we allow to reconnect
307    pub(crate) backoff: Option<u64>,
308}
309
310/// The node requests us to not forward message ids - IDontWant control message.
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312pub struct IDontWant {
313    /// A list of known message ids.
314    pub(crate) message_ids: Vec<MessageId>,
315}
316
317/// A Gossipsub RPC message sent.
318#[derive(Debug)]
319pub enum RpcOut {
320    /// Publish a Gossipsub message on network.`timeout` limits the duration the message
321    /// can wait to be sent before it is abandoned.
322    Publish { message: RawMessage, timeout: Delay },
323    /// Forward a Gossipsub message on network. `timeout` limits the duration the message
324    /// can wait to be sent before it is abandoned.
325    Forward { message: RawMessage, timeout: Delay },
326    /// Subscribe a topic.
327    Subscribe(TopicHash),
328    /// Unsubscribe a topic.
329    Unsubscribe(TopicHash),
330    /// Send a GRAFT control message.
331    Graft(Graft),
332    /// Send a PRUNE control message.
333    Prune(Prune),
334    /// Send a IHave control message.
335    IHave(IHave),
336    /// Send a IWant control message.
337    IWant(IWant),
338    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
339    /// control message.
340    IDontWant(IDontWant),
341}
342
343impl RpcOut {
344    /// Converts the GossipsubRPC into its protobuf format.
345    // A convenience function to avoid explicitly specifying types.
346    pub fn into_protobuf(self) -> proto::RPC {
347        self.into()
348    }
349}
350
351impl From<RpcOut> for proto::RPC {
352    /// Converts the RPC into protobuf format.
353    fn from(rpc: RpcOut) -> Self {
354        match rpc {
355            RpcOut::Publish {
356                message,
357                timeout: _,
358            } => proto::RPC {
359                subscriptions: Vec::new(),
360                publish: vec![message.into()],
361                control: None,
362            },
363            RpcOut::Forward {
364                message,
365                timeout: _,
366            } => proto::RPC {
367                publish: vec![message.into()],
368                subscriptions: Vec::new(),
369                control: None,
370            },
371            RpcOut::Subscribe(topic) => proto::RPC {
372                publish: Vec::new(),
373                subscriptions: vec![proto::SubOpts {
374                    subscribe: Some(true),
375                    topic_id: Some(topic.into_string()),
376                }],
377                control: None,
378            },
379            RpcOut::Unsubscribe(topic) => proto::RPC {
380                publish: Vec::new(),
381                subscriptions: vec![proto::SubOpts {
382                    subscribe: Some(false),
383                    topic_id: Some(topic.into_string()),
384                }],
385                control: None,
386            },
387            RpcOut::IHave(IHave {
388                topic_hash,
389                message_ids,
390            }) => proto::RPC {
391                publish: Vec::new(),
392                subscriptions: Vec::new(),
393                control: Some(proto::ControlMessage {
394                    ihave: vec![proto::ControlIHave {
395                        topic_id: Some(topic_hash.into_string()),
396                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
397                    }],
398                    iwant: vec![],
399                    graft: vec![],
400                    prune: vec![],
401                    idontwant: vec![],
402                }),
403            },
404            RpcOut::IWant(IWant { message_ids }) => proto::RPC {
405                publish: Vec::new(),
406                subscriptions: Vec::new(),
407                control: Some(proto::ControlMessage {
408                    ihave: vec![],
409                    iwant: vec![proto::ControlIWant {
410                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
411                    }],
412                    graft: vec![],
413                    prune: vec![],
414                    idontwant: vec![],
415                }),
416            },
417            RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
418                publish: Vec::new(),
419                subscriptions: vec![],
420                control: Some(proto::ControlMessage {
421                    ihave: vec![],
422                    iwant: vec![],
423                    graft: vec![proto::ControlGraft {
424                        topic_id: Some(topic_hash.into_string()),
425                    }],
426                    prune: vec![],
427                    idontwant: vec![],
428                }),
429            },
430            RpcOut::Prune(Prune {
431                topic_hash,
432                peers,
433                backoff,
434            }) => {
435                proto::RPC {
436                    publish: Vec::new(),
437                    subscriptions: vec![],
438                    control: Some(proto::ControlMessage {
439                        ihave: vec![],
440                        iwant: vec![],
441                        graft: vec![],
442                        prune: vec![proto::ControlPrune {
443                            topic_id: Some(topic_hash.into_string()),
444                            peers: peers
445                                .into_iter()
446                                .map(|info| proto::PeerInfo {
447                                    peer_id: info.peer_id.map(|id| id.to_bytes()),
448                                    // TODO, see https://github.com/libp2p/specs/pull/217
449                                    signed_peer_record: None,
450                                })
451                                .collect(),
452                            backoff,
453                        }],
454                        idontwant: vec![],
455                    }),
456                }
457            }
458            RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
459                publish: Vec::new(),
460                subscriptions: Vec::new(),
461                control: Some(proto::ControlMessage {
462                    ihave: vec![],
463                    iwant: vec![],
464                    graft: vec![],
465                    prune: vec![],
466                    idontwant: vec![proto::ControlIDontWant {
467                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
468                    }],
469                }),
470            },
471        }
472    }
473}
474
475/// A Gossipsub RPC message received.
476#[derive(Clone, PartialEq, Eq, Hash)]
477pub struct RpcIn {
478    /// List of messages that were part of this RPC query.
479    pub messages: Vec<RawMessage>,
480    /// List of subscriptions.
481    pub subscriptions: Vec<Subscription>,
482    /// List of Gossipsub control messages.
483    pub control_msgs: Vec<ControlAction>,
484}
485
486impl fmt::Debug for RpcIn {
487    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488        let mut b = f.debug_struct("GossipsubRpc");
489        if !self.messages.is_empty() {
490            b.field("messages", &self.messages);
491        }
492        if !self.subscriptions.is_empty() {
493            b.field("subscriptions", &self.subscriptions);
494        }
495        if !self.control_msgs.is_empty() {
496            b.field("control_msgs", &self.control_msgs);
497        }
498        b.finish()
499    }
500}
501
502impl PeerKind {
503    pub fn as_static_ref(&self) -> &'static str {
504        match self {
505            Self::NotSupported => "Not Supported",
506            Self::Floodsub => "Floodsub",
507            Self::Gossipsub => "Gossipsub v1.0",
508            Self::Gossipsubv1_1 => "Gossipsub v1.1",
509            Self::Gossipsubv1_2 => "Gossipsub v1.2",
510        }
511    }
512}
513
514impl AsRef<str> for PeerKind {
515    fn as_ref(&self) -> &str {
516        self.as_static_ref()
517    }
518}
519
520impl fmt::Display for PeerKind {
521    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522        f.write_str(self.as_ref())
523    }
524}