1use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use quick_protobuf::MessageWrite;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use web_time::Instant;
32
33use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
34
35#[derive(Clone, Debug, Default)]
37pub struct FailedMessages {
38 pub publish: usize,
40 pub forward: usize,
42 pub priority: usize,
44 pub non_priority: usize,
47 pub timeout: usize,
49}
50
51impl FailedMessages {
52 pub fn total_queue_full(&self) -> usize {
54 self.priority + self.non_priority
55 }
56
57 pub fn total(&self) -> usize {
59 self.priority + self.non_priority
60 }
61}
62
63#[derive(Debug)]
64pub enum MessageAcceptance {
66 Accept,
68 Reject,
70 Ignore,
73}
74
75#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
76#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct MessageId(pub Vec<u8>);
78
79impl MessageId {
80 pub fn new(value: &[u8]) -> Self {
81 Self(value.to_vec())
82 }
83}
84
85impl<T: Into<Vec<u8>>> From<T> for MessageId {
86 fn from(value: T) -> Self {
87 Self(value.into())
88 }
89}
90
91impl std::fmt::Display for MessageId {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "{}", hex_fmt::HexFmt(&self.0))
94 }
95}
96
97impl std::fmt::Debug for MessageId {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
100 }
101}
102
103#[derive(Debug)]
104pub(crate) struct PeerDetails {
106 pub(crate) kind: PeerKind,
108 pub(crate) outbound: bool,
110 pub(crate) connections: Vec<ConnectionId>,
112 pub(crate) topics: BTreeSet<TopicHash>,
114 pub(crate) sender: Sender,
116 pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
122#[cfg_attr(
123 feature = "metrics",
124 derive(prometheus_client::encoding::EncodeLabelValue)
125)]
126pub enum PeerKind {
127 Gossipsubv1_2,
129 Gossipsubv1_1,
131 Gossipsub,
133 Floodsub,
135 NotSupported,
137}
138
139#[derive(Clone, PartialEq, Eq, Hash, Debug)]
141pub struct RawMessage {
142 pub source: Option<PeerId>,
144
145 pub data: Vec<u8>,
147
148 pub sequence_number: Option<u64>,
150
151 pub topic: TopicHash,
153
154 pub signature: Option<Vec<u8>>,
156
157 pub key: Option<Vec<u8>>,
159
160 pub validated: bool,
162}
163
164impl PeerKind {
165 pub(crate) fn is_gossipsub(&self) -> bool {
167 matches!(
168 self,
169 Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
170 )
171 }
172}
173
174impl RawMessage {
175 pub fn raw_protobuf_len(&self) -> usize {
177 let message = proto::Message {
178 from: self.source.map(|m| m.to_bytes()),
179 data: Some(self.data.clone()),
180 seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
181 topic: TopicHash::into_string(self.topic.clone()),
182 signature: self.signature.clone(),
183 key: self.key.clone(),
184 };
185 message.get_size()
186 }
187}
188
189impl From<RawMessage> for proto::Message {
190 fn from(raw: RawMessage) -> Self {
191 proto::Message {
192 from: raw.source.map(|m| m.to_bytes()),
193 data: Some(raw.data),
194 seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
195 topic: TopicHash::into_string(raw.topic),
196 signature: raw.signature,
197 key: raw.key,
198 }
199 }
200}
201
202#[derive(Clone, PartialEq, Eq, Hash)]
205pub struct Message {
206 pub source: Option<PeerId>,
208
209 pub data: Vec<u8>,
211
212 pub sequence_number: Option<u64>,
214
215 pub topic: TopicHash,
217}
218
219impl fmt::Debug for Message {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("Message")
222 .field(
223 "data",
224 &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
225 )
226 .field("source", &self.source)
227 .field("sequence_number", &self.sequence_number)
228 .field("topic", &self.topic)
229 .finish()
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235pub struct Subscription {
236 pub action: SubscriptionAction,
238 pub topic_hash: TopicHash,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Hash)]
244pub enum SubscriptionAction {
245 Subscribe,
247 Unsubscribe,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
252pub(crate) struct PeerInfo {
253 pub(crate) peer_id: Option<PeerId>,
254 }
258
259#[derive(Debug, Clone, PartialEq, Eq, Hash)]
261pub enum ControlAction {
262 IHave(IHave),
264 IWant(IWant),
267 Graft(Graft),
269 Prune(Prune),
271 IDontWant(IDontWant),
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct IHave {
279 pub(crate) topic_hash: TopicHash,
281 pub(crate) message_ids: Vec<MessageId>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Hash)]
287pub struct IWant {
288 pub(crate) message_ids: Vec<MessageId>,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
294pub struct Graft {
295 pub(crate) topic_hash: TopicHash,
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub struct Prune {
302 pub(crate) topic_hash: TopicHash,
304 pub(crate) peers: Vec<PeerInfo>,
306 pub(crate) backoff: Option<u64>,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312pub struct IDontWant {
313 pub(crate) message_ids: Vec<MessageId>,
315}
316
317#[derive(Debug)]
319pub enum RpcOut {
320 Publish { message: RawMessage, timeout: Delay },
323 Forward { message: RawMessage, timeout: Delay },
326 Subscribe(TopicHash),
328 Unsubscribe(TopicHash),
330 Graft(Graft),
332 Prune(Prune),
334 IHave(IHave),
336 IWant(IWant),
338 IDontWant(IDontWant),
341}
342
343impl RpcOut {
344 pub fn into_protobuf(self) -> proto::RPC {
347 self.into()
348 }
349}
350
351impl From<RpcOut> for proto::RPC {
352 fn from(rpc: RpcOut) -> Self {
354 match rpc {
355 RpcOut::Publish {
356 message,
357 timeout: _,
358 } => proto::RPC {
359 subscriptions: Vec::new(),
360 publish: vec![message.into()],
361 control: None,
362 },
363 RpcOut::Forward {
364 message,
365 timeout: _,
366 } => proto::RPC {
367 publish: vec![message.into()],
368 subscriptions: Vec::new(),
369 control: None,
370 },
371 RpcOut::Subscribe(topic) => proto::RPC {
372 publish: Vec::new(),
373 subscriptions: vec![proto::SubOpts {
374 subscribe: Some(true),
375 topic_id: Some(topic.into_string()),
376 }],
377 control: None,
378 },
379 RpcOut::Unsubscribe(topic) => proto::RPC {
380 publish: Vec::new(),
381 subscriptions: vec![proto::SubOpts {
382 subscribe: Some(false),
383 topic_id: Some(topic.into_string()),
384 }],
385 control: None,
386 },
387 RpcOut::IHave(IHave {
388 topic_hash,
389 message_ids,
390 }) => proto::RPC {
391 publish: Vec::new(),
392 subscriptions: Vec::new(),
393 control: Some(proto::ControlMessage {
394 ihave: vec![proto::ControlIHave {
395 topic_id: Some(topic_hash.into_string()),
396 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
397 }],
398 iwant: vec![],
399 graft: vec![],
400 prune: vec![],
401 idontwant: vec![],
402 }),
403 },
404 RpcOut::IWant(IWant { message_ids }) => proto::RPC {
405 publish: Vec::new(),
406 subscriptions: Vec::new(),
407 control: Some(proto::ControlMessage {
408 ihave: vec![],
409 iwant: vec![proto::ControlIWant {
410 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
411 }],
412 graft: vec![],
413 prune: vec![],
414 idontwant: vec![],
415 }),
416 },
417 RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
418 publish: Vec::new(),
419 subscriptions: vec![],
420 control: Some(proto::ControlMessage {
421 ihave: vec![],
422 iwant: vec![],
423 graft: vec![proto::ControlGraft {
424 topic_id: Some(topic_hash.into_string()),
425 }],
426 prune: vec![],
427 idontwant: vec![],
428 }),
429 },
430 RpcOut::Prune(Prune {
431 topic_hash,
432 peers,
433 backoff,
434 }) => {
435 proto::RPC {
436 publish: Vec::new(),
437 subscriptions: vec![],
438 control: Some(proto::ControlMessage {
439 ihave: vec![],
440 iwant: vec![],
441 graft: vec![],
442 prune: vec![proto::ControlPrune {
443 topic_id: Some(topic_hash.into_string()),
444 peers: peers
445 .into_iter()
446 .map(|info| proto::PeerInfo {
447 peer_id: info.peer_id.map(|id| id.to_bytes()),
448 signed_peer_record: None,
450 })
451 .collect(),
452 backoff,
453 }],
454 idontwant: vec![],
455 }),
456 }
457 }
458 RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
459 publish: Vec::new(),
460 subscriptions: Vec::new(),
461 control: Some(proto::ControlMessage {
462 ihave: vec![],
463 iwant: vec![],
464 graft: vec![],
465 prune: vec![],
466 idontwant: vec![proto::ControlIDontWant {
467 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
468 }],
469 }),
470 },
471 }
472 }
473}
474
475#[derive(Clone, PartialEq, Eq, Hash)]
477pub struct Rpc {
478 pub messages: Vec<RawMessage>,
480 pub subscriptions: Vec<Subscription>,
482 pub control_msgs: Vec<ControlAction>,
484}
485
486impl Rpc {
487 pub fn into_protobuf(self) -> proto::RPC {
490 self.into()
491 }
492}
493
494impl From<Rpc> for proto::RPC {
495 fn from(rpc: Rpc) -> Self {
497 let mut publish = Vec::new();
499
500 for message in rpc.messages.into_iter() {
501 let message = proto::Message {
502 from: message.source.map(|m| m.to_bytes()),
503 data: Some(message.data),
504 seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
505 topic: TopicHash::into_string(message.topic),
506 signature: message.signature,
507 key: message.key,
508 };
509
510 publish.push(message);
511 }
512
513 let subscriptions = rpc
515 .subscriptions
516 .into_iter()
517 .map(|sub| proto::SubOpts {
518 subscribe: Some(sub.action == SubscriptionAction::Subscribe),
519 topic_id: Some(sub.topic_hash.into_string()),
520 })
521 .collect::<Vec<_>>();
522
523 let mut control = proto::ControlMessage {
525 ihave: Vec::new(),
526 iwant: Vec::new(),
527 graft: Vec::new(),
528 prune: Vec::new(),
529 idontwant: Vec::new(),
530 };
531
532 let empty_control_msg = rpc.control_msgs.is_empty();
533
534 for action in rpc.control_msgs {
535 match action {
536 ControlAction::IHave(IHave {
538 topic_hash,
539 message_ids,
540 }) => {
541 let rpc_ihave = proto::ControlIHave {
542 topic_id: Some(topic_hash.into_string()),
543 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
544 };
545 control.ihave.push(rpc_ihave);
546 }
547 ControlAction::IWant(IWant { message_ids }) => {
548 let rpc_iwant = proto::ControlIWant {
549 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
550 };
551 control.iwant.push(rpc_iwant);
552 }
553 ControlAction::Graft(Graft { topic_hash }) => {
554 let rpc_graft = proto::ControlGraft {
555 topic_id: Some(topic_hash.into_string()),
556 };
557 control.graft.push(rpc_graft);
558 }
559 ControlAction::Prune(Prune {
560 topic_hash,
561 peers,
562 backoff,
563 }) => {
564 let rpc_prune = proto::ControlPrune {
565 topic_id: Some(topic_hash.into_string()),
566 peers: peers
567 .into_iter()
568 .map(|info| proto::PeerInfo {
569 peer_id: info.peer_id.map(|id| id.to_bytes()),
570 signed_peer_record: None,
572 })
573 .collect(),
574 backoff,
575 };
576 control.prune.push(rpc_prune);
577 }
578 ControlAction::IDontWant(IDontWant { message_ids }) => {
579 let rpc_idontwant = proto::ControlIDontWant {
580 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
581 };
582 control.idontwant.push(rpc_idontwant);
583 }
584 }
585 }
586
587 proto::RPC {
588 subscriptions,
589 publish,
590 control: if empty_control_msg {
591 None
592 } else {
593 Some(control)
594 },
595 }
596 }
597}
598
599impl fmt::Debug for Rpc {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 let mut b = f.debug_struct("GossipsubRpc");
602 if !self.messages.is_empty() {
603 b.field("messages", &self.messages);
604 }
605 if !self.subscriptions.is_empty() {
606 b.field("subscriptions", &self.subscriptions);
607 }
608 if !self.control_msgs.is_empty() {
609 b.field("control_msgs", &self.control_msgs);
610 }
611 b.finish()
612 }
613}
614
615impl PeerKind {
616 pub fn as_static_ref(&self) -> &'static str {
617 match self {
618 Self::NotSupported => "Not Supported",
619 Self::Floodsub => "Floodsub",
620 Self::Gossipsub => "Gossipsub v1.0",
621 Self::Gossipsubv1_1 => "Gossipsub v1.1",
622 Self::Gossipsubv1_2 => "Gossipsub v1.2",
623 }
624 }
625}
626
627impl AsRef<str> for PeerKind {
628 fn as_ref(&self) -> &str {
629 self.as_static_ref()
630 }
631}
632
633impl fmt::Display for PeerKind {
634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635 f.write_str(self.as_ref())
636 }
637}