libp2p_gossipsub/
types.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A collection of types using the Gossipsub system.
22use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use quick_protobuf::MessageWrite;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use web_time::Instant;
32
33use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
34
35/// Messages that have expired while attempting to be sent to a peer.
36#[derive(Clone, Debug, Default)]
37pub struct FailedMessages {
38    /// The number of publish messages that failed to be published in a heartbeat.
39    pub publish: usize,
40    /// The number of forward messages that failed to be published in a heartbeat.
41    pub forward: usize,
42    /// The number of messages that were failed to be sent to the priority queue as it was full.
43    pub priority: usize,
44    /// The number of messages that were failed to be sent to the non-priority queue as it was
45    /// full.
46    pub non_priority: usize,
47    /// The number of messages that timed out and could not be sent.
48    pub timeout: usize,
49}
50
51impl FailedMessages {
52    /// The total number of messages that failed due to the queue being full.
53    pub fn total_queue_full(&self) -> usize {
54        self.priority + self.non_priority
55    }
56
57    /// The total failed messages in a heartbeat.
58    pub fn total(&self) -> usize {
59        self.priority + self.non_priority
60    }
61}
62
63#[derive(Debug)]
64/// Validation kinds from the application for received messages.
65pub enum MessageAcceptance {
66    /// The message is considered valid, and it should be delivered and forwarded to the network.
67    Accept,
68    /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty.
69    Reject,
70    /// The message is neither delivered nor forwarded to the network, but the router does not
71    /// trigger the P₄ penalty.
72    Ignore,
73}
74
75#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
76#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct MessageId(pub Vec<u8>);
78
79impl MessageId {
80    pub fn new(value: &[u8]) -> Self {
81        Self(value.to_vec())
82    }
83}
84
85impl<T: Into<Vec<u8>>> From<T> for MessageId {
86    fn from(value: T) -> Self {
87        Self(value.into())
88    }
89}
90
91impl std::fmt::Display for MessageId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "{}", hex_fmt::HexFmt(&self.0))
94    }
95}
96
97impl std::fmt::Debug for MessageId {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
100    }
101}
102
103#[derive(Debug)]
104/// Connected peer details.
105pub(crate) struct PeerDetails {
106    /// The kind of protocol the peer supports.
107    pub(crate) kind: PeerKind,
108    /// If the peer is an outbound connection.
109    pub(crate) outbound: bool,
110    /// Its current connections.
111    pub(crate) connections: Vec<ConnectionId>,
112    /// Subscribed topics.
113    pub(crate) topics: BTreeSet<TopicHash>,
114    /// The rpc sender to the connection handler(s).
115    pub(crate) sender: Sender,
116    /// Don't send messages.
117    pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
118}
119
120/// Describes the types of peers that can exist in the gossipsub context.
121#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
122#[cfg_attr(
123    feature = "metrics",
124    derive(prometheus_client::encoding::EncodeLabelValue)
125)]
126pub enum PeerKind {
127    /// A gossipsub 1.2 peer.
128    Gossipsubv1_2,
129    /// A gossipsub 1.1 peer.
130    Gossipsubv1_1,
131    /// A gossipsub 1.0 peer.
132    Gossipsub,
133    /// A floodsub peer.
134    Floodsub,
135    /// The peer doesn't support any of the protocols.
136    NotSupported,
137}
138
139/// A message received by the gossipsub system and stored locally in caches..
140#[derive(Clone, PartialEq, Eq, Hash, Debug)]
141pub struct RawMessage {
142    /// Id of the peer that published this message.
143    pub source: Option<PeerId>,
144
145    /// Content of the message. Its meaning is out of scope of this library.
146    pub data: Vec<u8>,
147
148    /// A random sequence number.
149    pub sequence_number: Option<u64>,
150
151    /// The topic this message belongs to
152    pub topic: TopicHash,
153
154    /// The signature of the message if it's signed.
155    pub signature: Option<Vec<u8>>,
156
157    /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined.
158    pub key: Option<Vec<u8>>,
159
160    /// Flag indicating if this message has been validated by the application or not.
161    pub validated: bool,
162}
163
164impl PeerKind {
165    /// Returns true if peer speaks any gossipsub version.
166    pub(crate) fn is_gossipsub(&self) -> bool {
167        matches!(
168            self,
169            Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
170        )
171    }
172}
173
174impl RawMessage {
175    /// Calculates the encoded length of this message (used for calculating metrics).
176    pub fn raw_protobuf_len(&self) -> usize {
177        let message = proto::Message {
178            from: self.source.map(|m| m.to_bytes()),
179            data: Some(self.data.clone()),
180            seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
181            topic: TopicHash::into_string(self.topic.clone()),
182            signature: self.signature.clone(),
183            key: self.key.clone(),
184        };
185        message.get_size()
186    }
187}
188
189impl From<RawMessage> for proto::Message {
190    fn from(raw: RawMessage) -> Self {
191        proto::Message {
192            from: raw.source.map(|m| m.to_bytes()),
193            data: Some(raw.data),
194            seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
195            topic: TopicHash::into_string(raw.topic),
196            signature: raw.signature,
197            key: raw.key,
198        }
199    }
200}
201
202/// The message sent to the user after a [`RawMessage`] has been transformed by a
203/// [`crate::DataTransform`].
204#[derive(Clone, PartialEq, Eq, Hash)]
205pub struct Message {
206    /// Id of the peer that published this message.
207    pub source: Option<PeerId>,
208
209    /// Content of the message.
210    pub data: Vec<u8>,
211
212    /// A random sequence number.
213    pub sequence_number: Option<u64>,
214
215    /// The topic this message belongs to
216    pub topic: TopicHash,
217}
218
219impl fmt::Debug for Message {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("Message")
222            .field(
223                "data",
224                &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
225            )
226            .field("source", &self.source)
227            .field("sequence_number", &self.sequence_number)
228            .field("topic", &self.topic)
229            .finish()
230    }
231}
232
233/// A subscription received by the gossipsub system.
234#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235pub struct Subscription {
236    /// Action to perform.
237    pub action: SubscriptionAction,
238    /// The topic from which to subscribe or unsubscribe.
239    pub topic_hash: TopicHash,
240}
241
242/// Action that a subscription wants to perform.
243#[derive(Debug, Clone, PartialEq, Eq, Hash)]
244pub enum SubscriptionAction {
245    /// The remote wants to subscribe to the given topic.
246    Subscribe,
247    /// The remote wants to unsubscribe from the given topic.
248    Unsubscribe,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
252pub(crate) struct PeerInfo {
253    pub(crate) peer_id: Option<PeerId>,
254    // TODO add this when RFC: Signed Address Records got added to the spec (see pull request
255    // https://github.com/libp2p/specs/pull/217)
256    // pub signed_peer_record: ?,
257}
258
259/// A Control message received by the gossipsub system.
260#[derive(Debug, Clone, PartialEq, Eq, Hash)]
261pub enum ControlAction {
262    /// Node broadcasts known messages per topic - IHave control message.
263    IHave(IHave),
264    /// The node requests specific message ids (peer_id + sequence _number) - IWant control
265    /// message.
266    IWant(IWant),
267    /// The node has been added to the mesh - Graft control message.
268    Graft(Graft),
269    /// The node has been removed from the mesh - Prune control message.
270    Prune(Prune),
271    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
272    /// control message.
273    IDontWant(IDontWant),
274}
275
276/// Node broadcasts known messages per topic - IHave control message.
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct IHave {
279    /// The topic of the messages.
280    pub(crate) topic_hash: TopicHash,
281    /// A list of known message ids (peer_id + sequence _number) as a string.
282    pub(crate) message_ids: Vec<MessageId>,
283}
284
285/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
287pub struct IWant {
288    /// A list of known message ids (peer_id + sequence _number) as a string.
289    pub(crate) message_ids: Vec<MessageId>,
290}
291
292/// The node has been added to the mesh - Graft control message.
293#[derive(Debug, Clone, PartialEq, Eq, Hash)]
294pub struct Graft {
295    /// The mesh topic the peer should be added to.
296    pub(crate) topic_hash: TopicHash,
297}
298
299/// The node has been removed from the mesh - Prune control message.
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub struct Prune {
302    /// The mesh topic the peer should be removed from.
303    pub(crate) topic_hash: TopicHash,
304    /// A list of peers to be proposed to the removed peer as peer exchange
305    pub(crate) peers: Vec<PeerInfo>,
306    /// The backoff time in seconds before we allow to reconnect
307    pub(crate) backoff: Option<u64>,
308}
309
310/// The node requests us to not forward message ids - IDontWant control message.
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312pub struct IDontWant {
313    /// A list of known message ids.
314    pub(crate) message_ids: Vec<MessageId>,
315}
316
317/// A Gossipsub RPC message sent.
318#[derive(Debug)]
319pub enum RpcOut {
320    /// Publish a Gossipsub message on network.`timeout` limits the duration the message
321    /// can wait to be sent before it is abandoned.
322    Publish { message: RawMessage, timeout: Delay },
323    /// Forward a Gossipsub message on network. `timeout` limits the duration the message
324    /// can wait to be sent before it is abandoned.
325    Forward { message: RawMessage, timeout: Delay },
326    /// Subscribe a topic.
327    Subscribe(TopicHash),
328    /// Unsubscribe a topic.
329    Unsubscribe(TopicHash),
330    /// Send a GRAFT control message.
331    Graft(Graft),
332    /// Send a PRUNE control message.
333    Prune(Prune),
334    /// Send a IHave control message.
335    IHave(IHave),
336    /// Send a IWant control message.
337    IWant(IWant),
338    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
339    /// control message.
340    IDontWant(IDontWant),
341}
342
343impl RpcOut {
344    /// Converts the GossipsubRPC into its protobuf format.
345    // A convenience function to avoid explicitly specifying types.
346    pub fn into_protobuf(self) -> proto::RPC {
347        self.into()
348    }
349}
350
351impl From<RpcOut> for proto::RPC {
352    /// Converts the RPC into protobuf format.
353    fn from(rpc: RpcOut) -> Self {
354        match rpc {
355            RpcOut::Publish {
356                message,
357                timeout: _,
358            } => proto::RPC {
359                subscriptions: Vec::new(),
360                publish: vec![message.into()],
361                control: None,
362            },
363            RpcOut::Forward {
364                message,
365                timeout: _,
366            } => proto::RPC {
367                publish: vec![message.into()],
368                subscriptions: Vec::new(),
369                control: None,
370            },
371            RpcOut::Subscribe(topic) => proto::RPC {
372                publish: Vec::new(),
373                subscriptions: vec![proto::SubOpts {
374                    subscribe: Some(true),
375                    topic_id: Some(topic.into_string()),
376                }],
377                control: None,
378            },
379            RpcOut::Unsubscribe(topic) => proto::RPC {
380                publish: Vec::new(),
381                subscriptions: vec![proto::SubOpts {
382                    subscribe: Some(false),
383                    topic_id: Some(topic.into_string()),
384                }],
385                control: None,
386            },
387            RpcOut::IHave(IHave {
388                topic_hash,
389                message_ids,
390            }) => proto::RPC {
391                publish: Vec::new(),
392                subscriptions: Vec::new(),
393                control: Some(proto::ControlMessage {
394                    ihave: vec![proto::ControlIHave {
395                        topic_id: Some(topic_hash.into_string()),
396                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
397                    }],
398                    iwant: vec![],
399                    graft: vec![],
400                    prune: vec![],
401                    idontwant: vec![],
402                }),
403            },
404            RpcOut::IWant(IWant { message_ids }) => proto::RPC {
405                publish: Vec::new(),
406                subscriptions: Vec::new(),
407                control: Some(proto::ControlMessage {
408                    ihave: vec![],
409                    iwant: vec![proto::ControlIWant {
410                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
411                    }],
412                    graft: vec![],
413                    prune: vec![],
414                    idontwant: vec![],
415                }),
416            },
417            RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
418                publish: Vec::new(),
419                subscriptions: vec![],
420                control: Some(proto::ControlMessage {
421                    ihave: vec![],
422                    iwant: vec![],
423                    graft: vec![proto::ControlGraft {
424                        topic_id: Some(topic_hash.into_string()),
425                    }],
426                    prune: vec![],
427                    idontwant: vec![],
428                }),
429            },
430            RpcOut::Prune(Prune {
431                topic_hash,
432                peers,
433                backoff,
434            }) => {
435                proto::RPC {
436                    publish: Vec::new(),
437                    subscriptions: vec![],
438                    control: Some(proto::ControlMessage {
439                        ihave: vec![],
440                        iwant: vec![],
441                        graft: vec![],
442                        prune: vec![proto::ControlPrune {
443                            topic_id: Some(topic_hash.into_string()),
444                            peers: peers
445                                .into_iter()
446                                .map(|info| proto::PeerInfo {
447                                    peer_id: info.peer_id.map(|id| id.to_bytes()),
448                                    // TODO, see https://github.com/libp2p/specs/pull/217
449                                    signed_peer_record: None,
450                                })
451                                .collect(),
452                            backoff,
453                        }],
454                        idontwant: vec![],
455                    }),
456                }
457            }
458            RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
459                publish: Vec::new(),
460                subscriptions: Vec::new(),
461                control: Some(proto::ControlMessage {
462                    ihave: vec![],
463                    iwant: vec![],
464                    graft: vec![],
465                    prune: vec![],
466                    idontwant: vec![proto::ControlIDontWant {
467                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
468                    }],
469                }),
470            },
471        }
472    }
473}
474
475/// An RPC received/sent.
476#[derive(Clone, PartialEq, Eq, Hash)]
477pub struct Rpc {
478    /// List of messages that were part of this RPC query.
479    pub messages: Vec<RawMessage>,
480    /// List of subscriptions.
481    pub subscriptions: Vec<Subscription>,
482    /// List of Gossipsub control messages.
483    pub control_msgs: Vec<ControlAction>,
484}
485
486impl Rpc {
487    /// Converts the GossipsubRPC into its protobuf format.
488    // A convenience function to avoid explicitly specifying types.
489    pub fn into_protobuf(self) -> proto::RPC {
490        self.into()
491    }
492}
493
494impl From<Rpc> for proto::RPC {
495    /// Converts the RPC into protobuf format.
496    fn from(rpc: Rpc) -> Self {
497        // Messages
498        let mut publish = Vec::new();
499
500        for message in rpc.messages.into_iter() {
501            let message = proto::Message {
502                from: message.source.map(|m| m.to_bytes()),
503                data: Some(message.data),
504                seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
505                topic: TopicHash::into_string(message.topic),
506                signature: message.signature,
507                key: message.key,
508            };
509
510            publish.push(message);
511        }
512
513        // subscriptions
514        let subscriptions = rpc
515            .subscriptions
516            .into_iter()
517            .map(|sub| proto::SubOpts {
518                subscribe: Some(sub.action == SubscriptionAction::Subscribe),
519                topic_id: Some(sub.topic_hash.into_string()),
520            })
521            .collect::<Vec<_>>();
522
523        // control messages
524        let mut control = proto::ControlMessage {
525            ihave: Vec::new(),
526            iwant: Vec::new(),
527            graft: Vec::new(),
528            prune: Vec::new(),
529            idontwant: Vec::new(),
530        };
531
532        let empty_control_msg = rpc.control_msgs.is_empty();
533
534        for action in rpc.control_msgs {
535            match action {
536                // collect all ihave messages
537                ControlAction::IHave(IHave {
538                    topic_hash,
539                    message_ids,
540                }) => {
541                    let rpc_ihave = proto::ControlIHave {
542                        topic_id: Some(topic_hash.into_string()),
543                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
544                    };
545                    control.ihave.push(rpc_ihave);
546                }
547                ControlAction::IWant(IWant { message_ids }) => {
548                    let rpc_iwant = proto::ControlIWant {
549                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
550                    };
551                    control.iwant.push(rpc_iwant);
552                }
553                ControlAction::Graft(Graft { topic_hash }) => {
554                    let rpc_graft = proto::ControlGraft {
555                        topic_id: Some(topic_hash.into_string()),
556                    };
557                    control.graft.push(rpc_graft);
558                }
559                ControlAction::Prune(Prune {
560                    topic_hash,
561                    peers,
562                    backoff,
563                }) => {
564                    let rpc_prune = proto::ControlPrune {
565                        topic_id: Some(topic_hash.into_string()),
566                        peers: peers
567                            .into_iter()
568                            .map(|info| proto::PeerInfo {
569                                peer_id: info.peer_id.map(|id| id.to_bytes()),
570                                // TODO, see https://github.com/libp2p/specs/pull/217
571                                signed_peer_record: None,
572                            })
573                            .collect(),
574                        backoff,
575                    };
576                    control.prune.push(rpc_prune);
577                }
578                ControlAction::IDontWant(IDontWant { message_ids }) => {
579                    let rpc_idontwant = proto::ControlIDontWant {
580                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
581                    };
582                    control.idontwant.push(rpc_idontwant);
583                }
584            }
585        }
586
587        proto::RPC {
588            subscriptions,
589            publish,
590            control: if empty_control_msg {
591                None
592            } else {
593                Some(control)
594            },
595        }
596    }
597}
598
599impl fmt::Debug for Rpc {
600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601        let mut b = f.debug_struct("GossipsubRpc");
602        if !self.messages.is_empty() {
603            b.field("messages", &self.messages);
604        }
605        if !self.subscriptions.is_empty() {
606            b.field("subscriptions", &self.subscriptions);
607        }
608        if !self.control_msgs.is_empty() {
609            b.field("control_msgs", &self.control_msgs);
610        }
611        b.finish()
612    }
613}
614
615impl PeerKind {
616    pub fn as_static_ref(&self) -> &'static str {
617        match self {
618            Self::NotSupported => "Not Supported",
619            Self::Floodsub => "Floodsub",
620            Self::Gossipsub => "Gossipsub v1.0",
621            Self::Gossipsubv1_1 => "Gossipsub v1.1",
622            Self::Gossipsubv1_2 => "Gossipsub v1.2",
623        }
624    }
625}
626
627impl AsRef<str> for PeerKind {
628    fn as_ref(&self) -> &str {
629        self.as_static_ref()
630    }
631}
632
633impl fmt::Display for PeerKind {
634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635        f.write_str(self.as_ref())
636    }
637}