1use std::{collections::HashMap, convert::Infallible, pin::Pin};
22
23use asynchronous_codec::{Decoder, Encoder, Framed};
24use byteorder::{BigEndian, ByteOrder};
25use bytes::BytesMut;
26use futures::prelude::*;
27use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
28use libp2p_identity::{PeerId, PublicKey};
29use libp2p_swarm::StreamProtocol;
30use quick_protobuf::{MessageWrite, Writer};
31
32use crate::{
33 config::ValidationMode,
34 handler::HandlerEvent,
35 rpc_proto::proto,
36 topic::TopicHash,
37 types::{
38 ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
39 RawMessage, RpcIn, Subscription, SubscriptionAction,
40 },
41 ValidationError,
42};
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
47 protocol: StreamProtocol::new("/meshsub/1.2.0"),
48 kind: PeerKind::Gossipsubv1_2,
49};
50
51pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
52 protocol: StreamProtocol::new("/meshsub/1.1.0"),
53 kind: PeerKind::Gossipsubv1_1,
54};
55pub(crate) const GOSSIPSUB_1_0_0_PROTOCOL: ProtocolId = ProtocolId {
56 protocol: StreamProtocol::new("/meshsub/1.0.0"),
57 kind: PeerKind::Gossipsub,
58};
59pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId {
60 protocol: StreamProtocol::new("/floodsub/1.0.0"),
61 kind: PeerKind::Floodsub,
62};
63
64#[derive(Debug, Clone)]
66pub struct ProtocolConfig {
67 pub(crate) protocol_ids: Vec<ProtocolId>,
69 pub(crate) validation_mode: ValidationMode,
71 pub(crate) default_max_transmit_size: usize,
73 pub(crate) max_transmit_sizes: HashMap<TopicHash, usize>,
75}
76
77impl Default for ProtocolConfig {
78 fn default() -> Self {
79 Self {
80 validation_mode: ValidationMode::Strict,
81 protocol_ids: vec![
82 GOSSIPSUB_1_2_0_PROTOCOL,
83 GOSSIPSUB_1_1_0_PROTOCOL,
84 GOSSIPSUB_1_0_0_PROTOCOL,
85 ],
86 default_max_transmit_size: 65536,
87 max_transmit_sizes: HashMap::new(),
88 }
89 }
90}
91
92impl ProtocolConfig {
93 pub fn max_transmit_size_for_topic(&self, topic: &TopicHash) -> usize {
95 self.max_transmit_sizes
96 .get(topic)
97 .copied()
98 .unwrap_or(self.default_max_transmit_size)
99 }
100}
101
102#[derive(Clone, Debug, PartialEq)]
104pub struct ProtocolId {
105 pub protocol: StreamProtocol,
107 pub kind: PeerKind,
109}
110
111impl AsRef<str> for ProtocolId {
112 fn as_ref(&self) -> &str {
113 self.protocol.as_ref()
114 }
115}
116
117impl UpgradeInfo for ProtocolConfig {
118 type Info = ProtocolId;
119 type InfoIter = Vec<Self::Info>;
120
121 fn protocol_info(&self) -> Self::InfoIter {
122 self.protocol_ids.clone()
123 }
124}
125
126impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
127where
128 TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
129{
130 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
131 type Error = Infallible;
132 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
133
134 fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
135 Box::pin(future::ok((
136 Framed::new(
137 socket,
138 GossipsubCodec::new(
139 self.default_max_transmit_size,
140 self.validation_mode,
141 self.max_transmit_sizes,
142 ),
143 ),
144 protocol_id.kind,
145 )))
146 }
147}
148
149impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
150where
151 TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
152{
153 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
154 type Error = Infallible;
155 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
156
157 fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
158 Box::pin(future::ok((
159 Framed::new(
160 socket,
161 GossipsubCodec::new(
162 self.default_max_transmit_size,
163 self.validation_mode,
164 self.max_transmit_sizes,
165 ),
166 ),
167 protocol_id.kind,
168 )))
169 }
170}
171
172pub struct GossipsubCodec {
175 validation_mode: ValidationMode,
177 codec: quick_protobuf_codec::Codec<proto::RPC>,
179 max_transmit_sizes: HashMap<TopicHash, usize>,
181}
182
183impl GossipsubCodec {
184 pub fn new(
185 max_length: usize,
186 validation_mode: ValidationMode,
187 max_transmit_sizes: HashMap<TopicHash, usize>,
188 ) -> GossipsubCodec {
189 let codec = quick_protobuf_codec::Codec::new(max_length);
190 GossipsubCodec {
191 validation_mode,
192 codec,
193 max_transmit_sizes,
194 }
195 }
196
197 fn max_transmit_size_for_topic(&self, topic: &TopicHash) -> Option<usize> {
199 self.max_transmit_sizes.get(topic).copied()
200 }
201
202 fn verify_signature(message: &proto::Message) -> bool {
206 use quick_protobuf::MessageWrite;
207
208 let Some(from) = message.from.as_ref() else {
209 tracing::debug!("Signature verification failed: No source id given");
210 return false;
211 };
212
213 let Ok(source) = PeerId::from_bytes(from) else {
214 tracing::debug!("Signature verification failed: Invalid Peer Id");
215 return false;
216 };
217
218 let Some(signature) = message.signature.as_ref() else {
219 tracing::debug!("Signature verification failed: No signature provided");
220 return false;
221 };
222
223 let public_key = match message.key.as_deref().map(PublicKey::try_decode_protobuf) {
226 Some(Ok(key)) => key,
227 _ => match PublicKey::try_decode_protobuf(&source.to_bytes()[2..]) {
228 Ok(v) => v,
229 Err(_) => {
230 tracing::warn!("Signature verification failed: No valid public key supplied");
231 return false;
232 }
233 },
234 };
235
236 if source != public_key.to_peer_id() {
238 tracing::warn!(
239 "Signature verification failed: Public key doesn't match source peer id"
240 );
241 return false;
242 }
243
244 let mut message_sig = message.clone();
246 message_sig.signature = None;
247 message_sig.key = None;
248 let mut buf = Vec::with_capacity(message_sig.get_size());
249 let mut writer = Writer::new(&mut buf);
250 message_sig
251 .write_message(&mut writer)
252 .expect("Encoding to succeed");
253 let mut signature_bytes = SIGNING_PREFIX.to_vec();
254 signature_bytes.extend_from_slice(&buf);
255 public_key.verify(&signature_bytes, signature)
256 }
257}
258
259impl Encoder for GossipsubCodec {
260 type Item<'a> = proto::RPC;
261 type Error = quick_protobuf_codec::Error;
262
263 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
264 self.codec.encode(item, dst)
265 }
266}
267
268impl Decoder for GossipsubCodec {
269 type Item = HandlerEvent;
270 type Error = quick_protobuf_codec::Error;
271
272 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
273 let Some(rpc) = self.codec.decode(src)? else {
274 return Ok(None);
275 };
276 let mut messages = Vec::with_capacity(rpc.publish.len());
278 let mut invalid_messages = Vec::new();
280
281 for message in rpc.publish.into_iter() {
282 let topic = TopicHash::from_raw(&message.topic);
283
284 if self
286 .max_transmit_size_for_topic(&topic)
287 .is_some_and(|max| message.get_size() > max)
288 {
289 let message = RawMessage {
290 source: None, data: message.data.unwrap_or_default(),
292 sequence_number: None, topic: TopicHash::from_raw(message.topic),
294 signature: None, key: message.key,
296 validated: false,
297 };
298
299 invalid_messages.push((message, ValidationError::MessageSizeTooLargeForTopic));
300 continue;
301 }
302
303 let mut invalid_kind = None;
305 let mut verify_signature = false;
306 let mut verify_sequence_no = false;
307 let mut verify_source = false;
308
309 match self.validation_mode {
310 ValidationMode::Strict => {
311 verify_signature = true;
313 verify_sequence_no = true;
314 verify_source = true;
315 }
316 ValidationMode::Permissive => {
317 if message.signature.is_some() {
319 verify_signature = true;
320 }
321 if message.seqno.is_some() {
322 verify_sequence_no = true;
323 }
324 if message.from.is_some() {
325 verify_source = true;
326 }
327 }
328 ValidationMode::Anonymous => {
329 if message.signature.is_some() {
330 tracing::warn!(
331 "Signature field was non-empty and anonymous validation mode is set"
332 );
333 invalid_kind = Some(ValidationError::SignaturePresent);
334 } else if message.seqno.is_some() {
335 tracing::warn!(
336 "Sequence number was non-empty and anonymous validation mode is set"
337 );
338 invalid_kind = Some(ValidationError::SequenceNumberPresent);
339 } else if message.from.is_some() {
340 tracing::warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
341 invalid_kind = Some(ValidationError::MessageSourcePresent);
342 }
343 }
344 ValidationMode::None => {}
345 }
346
347 if let Some(validation_error) = invalid_kind.take() {
350 let message = RawMessage {
351 source: None, data: message.data.unwrap_or_default(),
353 sequence_number: None, topic: TopicHash::from_raw(message.topic),
355 signature: None, key: message.key,
357 validated: false,
358 };
359 invalid_messages.push((message, validation_error));
360 continue;
362 }
363
364 if verify_signature && !GossipsubCodec::verify_signature(&message) {
366 tracing::warn!("Invalid signature for received message");
367 let message = RawMessage {
370 source: None, data: message.data.unwrap_or_default(),
372 sequence_number: None, topic: TopicHash::from_raw(message.topic),
374 signature: None, key: message.key,
376 validated: false,
377 };
378 invalid_messages.push((message, ValidationError::InvalidSignature));
379 continue;
381 }
382
383 let sequence_number = if verify_sequence_no {
385 if let Some(seq_no) = message.seqno {
386 if seq_no.is_empty() {
387 None
388 } else if seq_no.len() != 8 {
389 tracing::debug!(
390 sequence_number=?seq_no,
391 sequence_length=%seq_no.len(),
392 "Invalid sequence number length for received message"
393 );
394
395 let message = RawMessage {
396 source: None, data: message.data.unwrap_or_default(),
398 sequence_number: None, topic: TopicHash::from_raw(message.topic),
400 signature: message.signature, key: message.key,
402 validated: false,
403 };
404 invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
405 continue;
407 } else {
408 Some(BigEndian::read_u64(&seq_no))
410 }
411 } else {
412 tracing::debug!("Sequence number not present but expected");
414 let message = RawMessage {
415 source: None, data: message.data.unwrap_or_default(),
417 sequence_number: None, topic: TopicHash::from_raw(message.topic),
419 signature: message.signature, key: message.key,
421 validated: false,
422 };
423 invalid_messages.push((message, ValidationError::EmptySequenceNumber));
424 continue;
425 }
426 } else {
427 None
429 };
430
431 let source = if verify_source {
433 if let Some(bytes) = message.from {
434 if !bytes.is_empty() {
435 match PeerId::from_bytes(&bytes) {
436 Ok(peer_id) => Some(peer_id), Err(_) => {
438 tracing::debug!("Message source has an invalid PeerId");
440 let message = RawMessage {
441 source: None, data: message.data.unwrap_or_default(),
443 sequence_number,
444 topic: TopicHash::from_raw(message.topic),
445 signature: message.signature, key: message.key,
447 validated: false,
448 };
449 invalid_messages.push((message, ValidationError::InvalidPeerId));
450 continue;
451 }
452 }
453 } else {
454 None
455 }
456 } else {
457 None
458 }
459 } else {
460 None
461 };
462
463 messages.push(RawMessage {
465 source,
466 data: message.data.unwrap_or_default(),
467 sequence_number,
468 topic: TopicHash::from_raw(message.topic),
469 signature: message.signature,
470 key: message.key,
471 validated: false,
472 });
473 }
474
475 let mut control_msgs = Vec::new();
476
477 if let Some(rpc_control) = rpc.control {
478 let ihave_msgs: Vec<ControlAction> = rpc_control
480 .ihave
481 .into_iter()
482 .map(|ihave| {
483 ControlAction::IHave(IHave {
484 topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
485 message_ids: ihave
486 .message_ids
487 .into_iter()
488 .map(MessageId::from)
489 .collect::<Vec<_>>(),
490 })
491 })
492 .collect();
493
494 let iwant_msgs: Vec<ControlAction> = rpc_control
495 .iwant
496 .into_iter()
497 .map(|iwant| {
498 ControlAction::IWant(IWant {
499 message_ids: iwant
500 .message_ids
501 .into_iter()
502 .map(MessageId::from)
503 .collect::<Vec<_>>(),
504 })
505 })
506 .collect();
507
508 let graft_msgs: Vec<ControlAction> = rpc_control
509 .graft
510 .into_iter()
511 .map(|graft| {
512 ControlAction::Graft(Graft {
513 topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
514 })
515 })
516 .collect();
517
518 let mut prune_msgs = Vec::new();
519
520 for prune in rpc_control.prune {
521 let peers = prune
523 .peers
524 .into_iter()
525 .filter_map(|info| {
526 info.peer_id
527 .as_ref()
528 .and_then(|id| PeerId::from_bytes(id).ok())
529 .map(|peer_id|
530 PeerInfo {
532 peer_id: Some(peer_id),
533 })
534 })
535 .collect::<Vec<PeerInfo>>();
536
537 let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
538 prune_msgs.push(ControlAction::Prune(Prune {
539 topic_hash,
540 peers,
541 backoff: prune.backoff,
542 }));
543 }
544
545 let idontwant_msgs: Vec<ControlAction> = rpc_control
546 .idontwant
547 .into_iter()
548 .map(|idontwant| {
549 ControlAction::IDontWant(IDontWant {
550 message_ids: idontwant
551 .message_ids
552 .into_iter()
553 .map(MessageId::from)
554 .collect::<Vec<_>>(),
555 })
556 })
557 .collect();
558
559 control_msgs.extend(ihave_msgs);
560 control_msgs.extend(iwant_msgs);
561 control_msgs.extend(graft_msgs);
562 control_msgs.extend(prune_msgs);
563 control_msgs.extend(idontwant_msgs);
564 }
565
566 Ok(Some(HandlerEvent::Message {
567 rpc: RpcIn {
568 messages,
569 subscriptions: rpc
570 .subscriptions
571 .into_iter()
572 .map(|sub| Subscription {
573 action: if Some(true) == sub.subscribe {
574 SubscriptionAction::Subscribe
575 } else {
576 SubscriptionAction::Unsubscribe
577 },
578 topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
579 })
580 .collect(),
581 control_msgs,
582 },
583 invalid_messages,
584 }))
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use std::time::Duration;
591
592 use futures_timer::Delay;
593 use libp2p_identity::Keypair;
594 use quickcheck::*;
595
596 use super::*;
597 use crate::{
598 config::Config, types::RpcOut, Behaviour, ConfigBuilder, IdentTopic as Topic,
599 MessageAuthenticity, Version,
600 };
601
602 #[derive(Clone, Debug)]
603 struct Message(RawMessage);
604
605 impl Arbitrary for Message {
606 fn arbitrary(g: &mut Gen) -> Self {
607 let keypair = TestKeypair::arbitrary(g);
608
609 let config = Config::default();
611 let mut gs: Behaviour =
612 Behaviour::new(MessageAuthenticity::Signed(keypair.0), config).unwrap();
613 let mut data_g = quickcheck::Gen::new(10024);
614 let data = (0..u8::arbitrary(&mut data_g))
615 .map(|_| u8::arbitrary(g))
616 .collect::<Vec<_>>();
617 let topic_id = TopicId::arbitrary(g).0;
618 Message(gs.build_raw_message(topic_id, data).unwrap())
619 }
620 }
621
622 #[derive(Clone, Debug)]
623 struct TopicId(TopicHash);
624
625 impl Arbitrary for TopicId {
626 fn arbitrary(g: &mut Gen) -> Self {
627 let mut data_g = quickcheck::Gen::new(1024);
628 let topic_string: String = (0..u8::arbitrary(&mut data_g))
629 .map(|_| char::arbitrary(g))
630 .collect::<String>();
631 TopicId(Topic::new(topic_string).into())
632 }
633 }
634
635 #[derive(Clone)]
636 struct TestKeypair(Keypair);
637
638 impl Arbitrary for TestKeypair {
639 fn arbitrary(_g: &mut Gen) -> Self {
640 TestKeypair(Keypair::generate_ed25519())
642 }
643 }
644
645 impl std::fmt::Debug for TestKeypair {
646 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647 f.debug_struct("TestKeypair")
648 .field("public", &self.0.public())
649 .finish()
650 }
651 }
652
653 #[test]
654 fn encode_decode() {
656 fn prop(message: Message) {
657 let message = message.0;
658
659 let rpc = RpcOut::Publish {
660 message: message.clone(),
661 timeout: Delay::new(Duration::from_secs(1)),
662 };
663
664 let mut codec =
665 GossipsubCodec::new(u32::MAX as usize, ValidationMode::Strict, HashMap::new());
666 let mut buf = BytesMut::new();
667 codec.encode(rpc.into_protobuf(), &mut buf).unwrap();
668 let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
669 match decoded_rpc {
671 HandlerEvent::Message { mut rpc, .. } => {
672 rpc.messages[0].validated = true;
673
674 assert_eq!(vec![message], rpc.messages);
675 }
676 _ => panic!("Must decode a message"),
677 }
678 }
679
680 QuickCheck::new().quickcheck(prop as fn(_) -> _)
681 }
682
683 #[test]
684 fn support_floodsub_with_custom_protocol() {
685 let protocol_config = ConfigBuilder::default()
686 .protocol_id("/foosub", Version::V1_1)
687 .support_floodsub()
688 .build()
689 .unwrap()
690 .protocol_config();
691
692 assert_eq!(protocol_config.protocol_ids[0].protocol, "/foosub");
693 assert_eq!(protocol_config.protocol_ids[1].protocol, "/floodsub/1.0.0");
694 }
695}