1use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use quick_protobuf::MessageWrite;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use web_time::Instant;
32
33use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
34
35#[derive(Clone, Debug, Default)]
37pub struct FailedMessages {
38    pub publish: usize,
40    pub forward: usize,
42    pub priority: usize,
44    pub non_priority: usize,
47    pub timeout: usize,
49}
50
51impl FailedMessages {
52    pub fn total_queue_full(&self) -> usize {
54        self.priority + self.non_priority
55    }
56
57    pub fn total(&self) -> usize {
59        self.priority + self.non_priority
60    }
61}
62
63#[derive(Debug)]
64pub enum MessageAcceptance {
66    Accept,
68    Reject,
70    Ignore,
73}
74
75#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
76#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct MessageId(pub Vec<u8>);
78
79impl MessageId {
80    pub fn new(value: &[u8]) -> Self {
81        Self(value.to_vec())
82    }
83}
84
85impl<T: Into<Vec<u8>>> From<T> for MessageId {
86    fn from(value: T) -> Self {
87        Self(value.into())
88    }
89}
90
91impl std::fmt::Display for MessageId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "{}", hex_fmt::HexFmt(&self.0))
94    }
95}
96
97impl std::fmt::Debug for MessageId {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
100    }
101}
102
103#[derive(Debug)]
104pub(crate) struct PeerDetails {
106    pub(crate) kind: PeerKind,
108    pub(crate) outbound: bool,
110    pub(crate) connections: Vec<ConnectionId>,
112    pub(crate) topics: BTreeSet<TopicHash>,
114    pub(crate) sender: Sender,
116    pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
122#[cfg_attr(
123    feature = "metrics",
124    derive(prometheus_client::encoding::EncodeLabelValue)
125)]
126pub enum PeerKind {
127    Gossipsubv1_2,
129    Gossipsubv1_1,
131    Gossipsub,
133    Floodsub,
135    NotSupported,
137}
138
139#[derive(Clone, PartialEq, Eq, Hash, Debug)]
141pub struct RawMessage {
142    pub source: Option<PeerId>,
144
145    pub data: Vec<u8>,
147
148    pub sequence_number: Option<u64>,
150
151    pub topic: TopicHash,
153
154    pub signature: Option<Vec<u8>>,
156
157    pub key: Option<Vec<u8>>,
159
160    pub validated: bool,
162}
163
164impl PeerKind {
165    pub(crate) fn is_gossipsub(&self) -> bool {
167        matches!(
168            self,
169            Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
170        )
171    }
172}
173
174impl RawMessage {
175    pub fn raw_protobuf_len(&self) -> usize {
177        let message = proto::Message {
178            from: self.source.map(|m| m.to_bytes()),
179            data: Some(self.data.clone()),
180            seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
181            topic: TopicHash::into_string(self.topic.clone()),
182            signature: self.signature.clone(),
183            key: self.key.clone(),
184        };
185        message.get_size()
186    }
187}
188
189impl From<RawMessage> for proto::Message {
190    fn from(raw: RawMessage) -> Self {
191        proto::Message {
192            from: raw.source.map(|m| m.to_bytes()),
193            data: Some(raw.data),
194            seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
195            topic: TopicHash::into_string(raw.topic),
196            signature: raw.signature,
197            key: raw.key,
198        }
199    }
200}
201
202#[derive(Clone, PartialEq, Eq, Hash)]
205pub struct Message {
206    pub source: Option<PeerId>,
208
209    pub data: Vec<u8>,
211
212    pub sequence_number: Option<u64>,
214
215    pub topic: TopicHash,
217}
218
219impl fmt::Debug for Message {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("Message")
222            .field(
223                "data",
224                &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
225            )
226            .field("source", &self.source)
227            .field("sequence_number", &self.sequence_number)
228            .field("topic", &self.topic)
229            .finish()
230    }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235pub struct Subscription {
236    pub action: SubscriptionAction,
238    pub topic_hash: TopicHash,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Hash)]
244pub enum SubscriptionAction {
245    Subscribe,
247    Unsubscribe,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
252pub(crate) struct PeerInfo {
253    pub(crate) peer_id: Option<PeerId>,
254    }
258
259#[derive(Debug, Clone, PartialEq, Eq, Hash)]
261pub enum ControlAction {
262    IHave(IHave),
264    IWant(IWant),
267    Graft(Graft),
269    Prune(Prune),
271    IDontWant(IDontWant),
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct IHave {
279    pub(crate) topic_hash: TopicHash,
281    pub(crate) message_ids: Vec<MessageId>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Hash)]
287pub struct IWant {
288    pub(crate) message_ids: Vec<MessageId>,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
294pub struct Graft {
295    pub(crate) topic_hash: TopicHash,
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub struct Prune {
302    pub(crate) topic_hash: TopicHash,
304    pub(crate) peers: Vec<PeerInfo>,
306    pub(crate) backoff: Option<u64>,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312pub struct IDontWant {
313    pub(crate) message_ids: Vec<MessageId>,
315}
316
317#[derive(Debug)]
319pub enum RpcOut {
320    Publish { message: RawMessage, timeout: Delay },
323    Forward { message: RawMessage, timeout: Delay },
326    Subscribe(TopicHash),
328    Unsubscribe(TopicHash),
330    Graft(Graft),
332    Prune(Prune),
334    IHave(IHave),
336    IWant(IWant),
338    IDontWant(IDontWant),
341}
342
343impl RpcOut {
344    pub fn into_protobuf(self) -> proto::RPC {
347        self.into()
348    }
349}
350
351impl From<RpcOut> for proto::RPC {
352    fn from(rpc: RpcOut) -> Self {
354        match rpc {
355            RpcOut::Publish {
356                message,
357                timeout: _,
358            } => proto::RPC {
359                subscriptions: Vec::new(),
360                publish: vec![message.into()],
361                control: None,
362            },
363            RpcOut::Forward {
364                message,
365                timeout: _,
366            } => proto::RPC {
367                publish: vec![message.into()],
368                subscriptions: Vec::new(),
369                control: None,
370            },
371            RpcOut::Subscribe(topic) => proto::RPC {
372                publish: Vec::new(),
373                subscriptions: vec![proto::SubOpts {
374                    subscribe: Some(true),
375                    topic_id: Some(topic.into_string()),
376                }],
377                control: None,
378            },
379            RpcOut::Unsubscribe(topic) => proto::RPC {
380                publish: Vec::new(),
381                subscriptions: vec![proto::SubOpts {
382                    subscribe: Some(false),
383                    topic_id: Some(topic.into_string()),
384                }],
385                control: None,
386            },
387            RpcOut::IHave(IHave {
388                topic_hash,
389                message_ids,
390            }) => proto::RPC {
391                publish: Vec::new(),
392                subscriptions: Vec::new(),
393                control: Some(proto::ControlMessage {
394                    ihave: vec![proto::ControlIHave {
395                        topic_id: Some(topic_hash.into_string()),
396                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
397                    }],
398                    iwant: vec![],
399                    graft: vec![],
400                    prune: vec![],
401                    idontwant: vec![],
402                }),
403            },
404            RpcOut::IWant(IWant { message_ids }) => proto::RPC {
405                publish: Vec::new(),
406                subscriptions: Vec::new(),
407                control: Some(proto::ControlMessage {
408                    ihave: vec![],
409                    iwant: vec![proto::ControlIWant {
410                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
411                    }],
412                    graft: vec![],
413                    prune: vec![],
414                    idontwant: vec![],
415                }),
416            },
417            RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
418                publish: Vec::new(),
419                subscriptions: vec![],
420                control: Some(proto::ControlMessage {
421                    ihave: vec![],
422                    iwant: vec![],
423                    graft: vec![proto::ControlGraft {
424                        topic_id: Some(topic_hash.into_string()),
425                    }],
426                    prune: vec![],
427                    idontwant: vec![],
428                }),
429            },
430            RpcOut::Prune(Prune {
431                topic_hash,
432                peers,
433                backoff,
434            }) => {
435                proto::RPC {
436                    publish: Vec::new(),
437                    subscriptions: vec![],
438                    control: Some(proto::ControlMessage {
439                        ihave: vec![],
440                        iwant: vec![],
441                        graft: vec![],
442                        prune: vec![proto::ControlPrune {
443                            topic_id: Some(topic_hash.into_string()),
444                            peers: peers
445                                .into_iter()
446                                .map(|info| proto::PeerInfo {
447                                    peer_id: info.peer_id.map(|id| id.to_bytes()),
448                                    signed_peer_record: None,
450                                })
451                                .collect(),
452                            backoff,
453                        }],
454                        idontwant: vec![],
455                    }),
456                }
457            }
458            RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
459                publish: Vec::new(),
460                subscriptions: Vec::new(),
461                control: Some(proto::ControlMessage {
462                    ihave: vec![],
463                    iwant: vec![],
464                    graft: vec![],
465                    prune: vec![],
466                    idontwant: vec![proto::ControlIDontWant {
467                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
468                    }],
469                }),
470            },
471        }
472    }
473}
474
475#[derive(Clone, PartialEq, Eq, Hash)]
477pub struct RpcIn {
478    pub messages: Vec<RawMessage>,
480    pub subscriptions: Vec<Subscription>,
482    pub control_msgs: Vec<ControlAction>,
484}
485
486impl fmt::Debug for RpcIn {
487    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488        let mut b = f.debug_struct("GossipsubRpc");
489        if !self.messages.is_empty() {
490            b.field("messages", &self.messages);
491        }
492        if !self.subscriptions.is_empty() {
493            b.field("subscriptions", &self.subscriptions);
494        }
495        if !self.control_msgs.is_empty() {
496            b.field("control_msgs", &self.control_msgs);
497        }
498        b.finish()
499    }
500}
501
502impl PeerKind {
503    pub fn as_static_ref(&self) -> &'static str {
504        match self {
505            Self::NotSupported => "Not Supported",
506            Self::Floodsub => "Floodsub",
507            Self::Gossipsub => "Gossipsub v1.0",
508            Self::Gossipsubv1_1 => "Gossipsub v1.1",
509            Self::Gossipsubv1_2 => "Gossipsub v1.2",
510        }
511    }
512}
513
514impl AsRef<str> for PeerKind {
515    fn as_ref(&self) -> &str {
516        self.as_static_ref()
517    }
518}
519
520impl fmt::Display for PeerKind {
521    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522        f.write_str(self.as_ref())
523    }
524}