1use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use quick_protobuf::MessageWrite;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use web_time::Instant;
32
33use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
34
35#[derive(Clone, Debug, Default)]
37pub struct FailedMessages {
38 pub publish: usize,
40 pub forward: usize,
42 pub priority: usize,
44 pub non_priority: usize,
47 pub timeout: usize,
49}
50
51impl FailedMessages {
52 pub fn total_queue_full(&self) -> usize {
54 self.priority + self.non_priority
55 }
56
57 pub fn total(&self) -> usize {
59 self.priority + self.non_priority
60 }
61}
62
63#[derive(Debug)]
64pub enum MessageAcceptance {
66 Accept,
68 Reject,
70 Ignore,
73}
74
75#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
76#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct MessageId(pub Vec<u8>);
78
79impl MessageId {
80 pub fn new(value: &[u8]) -> Self {
81 Self(value.to_vec())
82 }
83}
84
85impl<T: Into<Vec<u8>>> From<T> for MessageId {
86 fn from(value: T) -> Self {
87 Self(value.into())
88 }
89}
90
91impl std::fmt::Display for MessageId {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "{}", hex_fmt::HexFmt(&self.0))
94 }
95}
96
97impl std::fmt::Debug for MessageId {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
100 }
101}
102
103#[derive(Debug)]
104pub(crate) struct PeerDetails {
106 pub(crate) kind: PeerKind,
108 pub(crate) outbound: bool,
110 pub(crate) connections: Vec<ConnectionId>,
112 pub(crate) topics: BTreeSet<TopicHash>,
114 pub(crate) sender: Sender,
116 pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
122#[cfg_attr(
123 feature = "metrics",
124 derive(prometheus_client::encoding::EncodeLabelValue)
125)]
126pub enum PeerKind {
127 Gossipsubv1_2,
129 Gossipsubv1_1,
131 Gossipsub,
133 Floodsub,
135 NotSupported,
137}
138
139#[derive(Clone, PartialEq, Eq, Hash, Debug)]
141pub struct RawMessage {
142 pub source: Option<PeerId>,
144
145 pub data: Vec<u8>,
147
148 pub sequence_number: Option<u64>,
150
151 pub topic: TopicHash,
153
154 pub signature: Option<Vec<u8>>,
156
157 pub key: Option<Vec<u8>>,
159
160 pub validated: bool,
162}
163
164impl PeerKind {
165 pub(crate) fn is_gossipsub(&self) -> bool {
167 matches!(
168 self,
169 Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
170 )
171 }
172}
173
174impl RawMessage {
175 pub fn raw_protobuf_len(&self) -> usize {
177 let message = proto::Message {
178 from: self.source.map(|m| m.to_bytes()),
179 data: Some(self.data.clone()),
180 seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
181 topic: TopicHash::into_string(self.topic.clone()),
182 signature: self.signature.clone(),
183 key: self.key.clone(),
184 };
185 message.get_size()
186 }
187}
188
189impl From<RawMessage> for proto::Message {
190 fn from(raw: RawMessage) -> Self {
191 proto::Message {
192 from: raw.source.map(|m| m.to_bytes()),
193 data: Some(raw.data),
194 seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
195 topic: TopicHash::into_string(raw.topic),
196 signature: raw.signature,
197 key: raw.key,
198 }
199 }
200}
201
202#[derive(Clone, PartialEq, Eq, Hash)]
205pub struct Message {
206 pub source: Option<PeerId>,
208
209 pub data: Vec<u8>,
211
212 pub sequence_number: Option<u64>,
214
215 pub topic: TopicHash,
217}
218
219impl fmt::Debug for Message {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("Message")
222 .field(
223 "data",
224 &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
225 )
226 .field("source", &self.source)
227 .field("sequence_number", &self.sequence_number)
228 .field("topic", &self.topic)
229 .finish()
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235pub struct Subscription {
236 pub action: SubscriptionAction,
238 pub topic_hash: TopicHash,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Hash)]
244pub enum SubscriptionAction {
245 Subscribe,
247 Unsubscribe,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
252pub(crate) struct PeerInfo {
253 pub(crate) peer_id: Option<PeerId>,
254 }
258
259#[derive(Debug, Clone, PartialEq, Eq, Hash)]
261pub enum ControlAction {
262 IHave(IHave),
264 IWant(IWant),
267 Graft(Graft),
269 Prune(Prune),
271 IDontWant(IDontWant),
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct IHave {
279 pub(crate) topic_hash: TopicHash,
281 pub(crate) message_ids: Vec<MessageId>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Hash)]
287pub struct IWant {
288 pub(crate) message_ids: Vec<MessageId>,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
294pub struct Graft {
295 pub(crate) topic_hash: TopicHash,
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub struct Prune {
302 pub(crate) topic_hash: TopicHash,
304 pub(crate) peers: Vec<PeerInfo>,
306 pub(crate) backoff: Option<u64>,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312pub struct IDontWant {
313 pub(crate) message_ids: Vec<MessageId>,
315}
316
317#[derive(Debug)]
319pub enum RpcOut {
320 Publish { message: RawMessage, timeout: Delay },
323 Forward { message: RawMessage, timeout: Delay },
326 Subscribe(TopicHash),
328 Unsubscribe(TopicHash),
330 Graft(Graft),
332 Prune(Prune),
334 IHave(IHave),
336 IWant(IWant),
338 IDontWant(IDontWant),
341}
342
343impl RpcOut {
344 pub fn into_protobuf(self) -> proto::RPC {
347 self.into()
348 }
349}
350
351impl From<RpcOut> for proto::RPC {
352 fn from(rpc: RpcOut) -> Self {
354 match rpc {
355 RpcOut::Publish {
356 message,
357 timeout: _,
358 } => proto::RPC {
359 subscriptions: Vec::new(),
360 publish: vec![message.into()],
361 control: None,
362 },
363 RpcOut::Forward {
364 message,
365 timeout: _,
366 } => proto::RPC {
367 publish: vec![message.into()],
368 subscriptions: Vec::new(),
369 control: None,
370 },
371 RpcOut::Subscribe(topic) => proto::RPC {
372 publish: Vec::new(),
373 subscriptions: vec![proto::SubOpts {
374 subscribe: Some(true),
375 topic_id: Some(topic.into_string()),
376 }],
377 control: None,
378 },
379 RpcOut::Unsubscribe(topic) => proto::RPC {
380 publish: Vec::new(),
381 subscriptions: vec![proto::SubOpts {
382 subscribe: Some(false),
383 topic_id: Some(topic.into_string()),
384 }],
385 control: None,
386 },
387 RpcOut::IHave(IHave {
388 topic_hash,
389 message_ids,
390 }) => proto::RPC {
391 publish: Vec::new(),
392 subscriptions: Vec::new(),
393 control: Some(proto::ControlMessage {
394 ihave: vec![proto::ControlIHave {
395 topic_id: Some(topic_hash.into_string()),
396 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
397 }],
398 iwant: vec![],
399 graft: vec![],
400 prune: vec![],
401 idontwant: vec![],
402 }),
403 },
404 RpcOut::IWant(IWant { message_ids }) => proto::RPC {
405 publish: Vec::new(),
406 subscriptions: Vec::new(),
407 control: Some(proto::ControlMessage {
408 ihave: vec![],
409 iwant: vec![proto::ControlIWant {
410 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
411 }],
412 graft: vec![],
413 prune: vec![],
414 idontwant: vec![],
415 }),
416 },
417 RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
418 publish: Vec::new(),
419 subscriptions: vec![],
420 control: Some(proto::ControlMessage {
421 ihave: vec![],
422 iwant: vec![],
423 graft: vec![proto::ControlGraft {
424 topic_id: Some(topic_hash.into_string()),
425 }],
426 prune: vec![],
427 idontwant: vec![],
428 }),
429 },
430 RpcOut::Prune(Prune {
431 topic_hash,
432 peers,
433 backoff,
434 }) => {
435 proto::RPC {
436 publish: Vec::new(),
437 subscriptions: vec![],
438 control: Some(proto::ControlMessage {
439 ihave: vec![],
440 iwant: vec![],
441 graft: vec![],
442 prune: vec![proto::ControlPrune {
443 topic_id: Some(topic_hash.into_string()),
444 peers: peers
445 .into_iter()
446 .map(|info| proto::PeerInfo {
447 peer_id: info.peer_id.map(|id| id.to_bytes()),
448 signed_peer_record: None,
450 })
451 .collect(),
452 backoff,
453 }],
454 idontwant: vec![],
455 }),
456 }
457 }
458 RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
459 publish: Vec::new(),
460 subscriptions: Vec::new(),
461 control: Some(proto::ControlMessage {
462 ihave: vec![],
463 iwant: vec![],
464 graft: vec![],
465 prune: vec![],
466 idontwant: vec![proto::ControlIDontWant {
467 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
468 }],
469 }),
470 },
471 }
472 }
473}
474
475#[derive(Clone, PartialEq, Eq, Hash)]
477pub struct RpcIn {
478 pub messages: Vec<RawMessage>,
480 pub subscriptions: Vec<Subscription>,
482 pub control_msgs: Vec<ControlAction>,
484}
485
486impl fmt::Debug for RpcIn {
487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488 let mut b = f.debug_struct("GossipsubRpc");
489 if !self.messages.is_empty() {
490 b.field("messages", &self.messages);
491 }
492 if !self.subscriptions.is_empty() {
493 b.field("subscriptions", &self.subscriptions);
494 }
495 if !self.control_msgs.is_empty() {
496 b.field("control_msgs", &self.control_msgs);
497 }
498 b.finish()
499 }
500}
501
502impl PeerKind {
503 pub fn as_static_ref(&self) -> &'static str {
504 match self {
505 Self::NotSupported => "Not Supported",
506 Self::Floodsub => "Floodsub",
507 Self::Gossipsub => "Gossipsub v1.0",
508 Self::Gossipsubv1_1 => "Gossipsub v1.1",
509 Self::Gossipsubv1_2 => "Gossipsub v1.2",
510 }
511 }
512}
513
514impl AsRef<str> for PeerKind {
515 fn as_ref(&self) -> &str {
516 self.as_static_ref()
517 }
518}
519
520impl fmt::Display for PeerKind {
521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522 f.write_str(self.as_ref())
523 }
524}