1use std::{convert::Infallible, pin::Pin};
22
23use asynchronous_codec::{Decoder, Encoder, Framed};
24use byteorder::{BigEndian, ByteOrder};
25use bytes::BytesMut;
26use futures::prelude::*;
27use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
28use libp2p_identity::{PeerId, PublicKey};
29use libp2p_swarm::StreamProtocol;
30use quick_protobuf::Writer;
31
32use crate::{
33 config::ValidationMode,
34 handler::HandlerEvent,
35 rpc_proto::proto,
36 topic::TopicHash,
37 types::{
38 ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
39 RawMessage, Rpc, Subscription, SubscriptionAction,
40 },
41 ValidationError,
42};
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
47 protocol: StreamProtocol::new("/meshsub/1.2.0"),
48 kind: PeerKind::Gossipsubv1_2,
49};
50
51pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
52 protocol: StreamProtocol::new("/meshsub/1.1.0"),
53 kind: PeerKind::Gossipsubv1_1,
54};
55pub(crate) const GOSSIPSUB_1_0_0_PROTOCOL: ProtocolId = ProtocolId {
56 protocol: StreamProtocol::new("/meshsub/1.0.0"),
57 kind: PeerKind::Gossipsub,
58};
59pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId {
60 protocol: StreamProtocol::new("/floodsub/1.0.0"),
61 kind: PeerKind::Floodsub,
62};
63
64#[derive(Debug, Clone)]
66pub struct ProtocolConfig {
67 pub(crate) protocol_ids: Vec<ProtocolId>,
69 pub(crate) max_transmit_size: usize,
71 pub(crate) validation_mode: ValidationMode,
73}
74
75impl Default for ProtocolConfig {
76 fn default() -> Self {
77 Self {
78 max_transmit_size: 65536,
79 validation_mode: ValidationMode::Strict,
80 protocol_ids: vec![
81 GOSSIPSUB_1_2_0_PROTOCOL,
82 GOSSIPSUB_1_1_0_PROTOCOL,
83 GOSSIPSUB_1_0_0_PROTOCOL,
84 ],
85 }
86 }
87}
88
89#[derive(Clone, Debug, PartialEq)]
91pub struct ProtocolId {
92 pub protocol: StreamProtocol,
94 pub kind: PeerKind,
96}
97
98impl AsRef<str> for ProtocolId {
99 fn as_ref(&self) -> &str {
100 self.protocol.as_ref()
101 }
102}
103
104impl UpgradeInfo for ProtocolConfig {
105 type Info = ProtocolId;
106 type InfoIter = Vec<Self::Info>;
107
108 fn protocol_info(&self) -> Self::InfoIter {
109 self.protocol_ids.clone()
110 }
111}
112
113impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
114where
115 TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
116{
117 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
118 type Error = Infallible;
119 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
120
121 fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
122 Box::pin(future::ok((
123 Framed::new(
124 socket,
125 GossipsubCodec::new(self.max_transmit_size, self.validation_mode),
126 ),
127 protocol_id.kind,
128 )))
129 }
130}
131
132impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
133where
134 TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
135{
136 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
137 type Error = Infallible;
138 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
139
140 fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
141 Box::pin(future::ok((
142 Framed::new(
143 socket,
144 GossipsubCodec::new(self.max_transmit_size, self.validation_mode),
145 ),
146 protocol_id.kind,
147 )))
148 }
149}
150
151pub struct GossipsubCodec {
154 validation_mode: ValidationMode,
156 codec: quick_protobuf_codec::Codec<proto::RPC>,
158}
159
160impl GossipsubCodec {
161 pub fn new(max_length: usize, validation_mode: ValidationMode) -> GossipsubCodec {
162 let codec = quick_protobuf_codec::Codec::new(max_length);
163 GossipsubCodec {
164 validation_mode,
165 codec,
166 }
167 }
168
169 fn verify_signature(message: &proto::Message) -> bool {
173 use quick_protobuf::MessageWrite;
174
175 let Some(from) = message.from.as_ref() else {
176 tracing::debug!("Signature verification failed: No source id given");
177 return false;
178 };
179
180 let Ok(source) = PeerId::from_bytes(from) else {
181 tracing::debug!("Signature verification failed: Invalid Peer Id");
182 return false;
183 };
184
185 let Some(signature) = message.signature.as_ref() else {
186 tracing::debug!("Signature verification failed: No signature provided");
187 return false;
188 };
189
190 let public_key = match message.key.as_deref().map(PublicKey::try_decode_protobuf) {
193 Some(Ok(key)) => key,
194 _ => match PublicKey::try_decode_protobuf(&source.to_bytes()[2..]) {
195 Ok(v) => v,
196 Err(_) => {
197 tracing::warn!("Signature verification failed: No valid public key supplied");
198 return false;
199 }
200 },
201 };
202
203 if source != public_key.to_peer_id() {
205 tracing::warn!(
206 "Signature verification failed: Public key doesn't match source peer id"
207 );
208 return false;
209 }
210
211 let mut message_sig = message.clone();
213 message_sig.signature = None;
214 message_sig.key = None;
215 let mut buf = Vec::with_capacity(message_sig.get_size());
216 let mut writer = Writer::new(&mut buf);
217 message_sig
218 .write_message(&mut writer)
219 .expect("Encoding to succeed");
220 let mut signature_bytes = SIGNING_PREFIX.to_vec();
221 signature_bytes.extend_from_slice(&buf);
222 public_key.verify(&signature_bytes, signature)
223 }
224}
225
226impl Encoder for GossipsubCodec {
227 type Item<'a> = proto::RPC;
228 type Error = quick_protobuf_codec::Error;
229
230 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
231 self.codec.encode(item, dst)
232 }
233}
234
235impl Decoder for GossipsubCodec {
236 type Item = HandlerEvent;
237 type Error = quick_protobuf_codec::Error;
238
239 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
240 let Some(rpc) = self.codec.decode(src)? else {
241 return Ok(None);
242 };
243 let mut messages = Vec::with_capacity(rpc.publish.len());
245 let mut invalid_messages = Vec::new();
247
248 for message in rpc.publish.into_iter() {
249 let mut invalid_kind = None;
251 let mut verify_signature = false;
252 let mut verify_sequence_no = false;
253 let mut verify_source = false;
254
255 match self.validation_mode {
256 ValidationMode::Strict => {
257 verify_signature = true;
259 verify_sequence_no = true;
260 verify_source = true;
261 }
262 ValidationMode::Permissive => {
263 if message.signature.is_some() {
265 verify_signature = true;
266 }
267 if message.seqno.is_some() {
268 verify_sequence_no = true;
269 }
270 if message.from.is_some() {
271 verify_source = true;
272 }
273 }
274 ValidationMode::Anonymous => {
275 if message.signature.is_some() {
276 tracing::warn!(
277 "Signature field was non-empty and anonymous validation mode is set"
278 );
279 invalid_kind = Some(ValidationError::SignaturePresent);
280 } else if message.seqno.is_some() {
281 tracing::warn!(
282 "Sequence number was non-empty and anonymous validation mode is set"
283 );
284 invalid_kind = Some(ValidationError::SequenceNumberPresent);
285 } else if message.from.is_some() {
286 tracing::warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
287 invalid_kind = Some(ValidationError::MessageSourcePresent);
288 }
289 }
290 ValidationMode::None => {}
291 }
292
293 if let Some(validation_error) = invalid_kind.take() {
296 let message = RawMessage {
297 source: None, data: message.data.unwrap_or_default(),
299 sequence_number: None, topic: TopicHash::from_raw(message.topic),
301 signature: None, key: message.key,
303 validated: false,
304 };
305 invalid_messages.push((message, validation_error));
306 continue;
308 }
309
310 if verify_signature && !GossipsubCodec::verify_signature(&message) {
312 tracing::warn!("Invalid signature for received message");
313
314 let message = RawMessage {
317 source: None, data: message.data.unwrap_or_default(),
319 sequence_number: None, topic: TopicHash::from_raw(message.topic),
321 signature: None, key: message.key,
323 validated: false,
324 };
325 invalid_messages.push((message, ValidationError::InvalidSignature));
326 continue;
328 }
329
330 let sequence_number = if verify_sequence_no {
332 if let Some(seq_no) = message.seqno {
333 if seq_no.is_empty() {
334 None
335 } else if seq_no.len() != 8 {
336 tracing::debug!(
337 sequence_number=?seq_no,
338 sequence_length=%seq_no.len(),
339 "Invalid sequence number length for received message"
340 );
341 let message = RawMessage {
342 source: None, data: message.data.unwrap_or_default(),
344 sequence_number: None, topic: TopicHash::from_raw(message.topic),
346 signature: message.signature, key: message.key,
348 validated: false,
349 };
350 invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
351 continue;
353 } else {
354 Some(BigEndian::read_u64(&seq_no))
356 }
357 } else {
358 tracing::debug!("Sequence number not present but expected");
360 let message = RawMessage {
361 source: None, data: message.data.unwrap_or_default(),
363 sequence_number: None, topic: TopicHash::from_raw(message.topic),
365 signature: message.signature, key: message.key,
367 validated: false,
368 };
369 invalid_messages.push((message, ValidationError::EmptySequenceNumber));
370 continue;
371 }
372 } else {
373 None
375 };
376
377 let source = if verify_source {
379 if let Some(bytes) = message.from {
380 if !bytes.is_empty() {
381 match PeerId::from_bytes(&bytes) {
382 Ok(peer_id) => Some(peer_id), Err(_) => {
384 tracing::debug!("Message source has an invalid PeerId");
386 let message = RawMessage {
387 source: None, data: message.data.unwrap_or_default(),
389 sequence_number,
390 topic: TopicHash::from_raw(message.topic),
391 signature: message.signature, key: message.key,
393 validated: false,
394 };
395 invalid_messages.push((message, ValidationError::InvalidPeerId));
396 continue;
397 }
398 }
399 } else {
400 None
401 }
402 } else {
403 None
404 }
405 } else {
406 None
407 };
408
409 messages.push(RawMessage {
411 source,
412 data: message.data.unwrap_or_default(),
413 sequence_number,
414 topic: TopicHash::from_raw(message.topic),
415 signature: message.signature,
416 key: message.key,
417 validated: false,
418 });
419 }
420
421 let mut control_msgs = Vec::new();
422
423 if let Some(rpc_control) = rpc.control {
424 let ihave_msgs: Vec<ControlAction> = rpc_control
426 .ihave
427 .into_iter()
428 .map(|ihave| {
429 ControlAction::IHave(IHave {
430 topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
431 message_ids: ihave
432 .message_ids
433 .into_iter()
434 .map(MessageId::from)
435 .collect::<Vec<_>>(),
436 })
437 })
438 .collect();
439
440 let iwant_msgs: Vec<ControlAction> = rpc_control
441 .iwant
442 .into_iter()
443 .map(|iwant| {
444 ControlAction::IWant(IWant {
445 message_ids: iwant
446 .message_ids
447 .into_iter()
448 .map(MessageId::from)
449 .collect::<Vec<_>>(),
450 })
451 })
452 .collect();
453
454 let graft_msgs: Vec<ControlAction> = rpc_control
455 .graft
456 .into_iter()
457 .map(|graft| {
458 ControlAction::Graft(Graft {
459 topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
460 })
461 })
462 .collect();
463
464 let mut prune_msgs = Vec::new();
465
466 for prune in rpc_control.prune {
467 let peers = prune
469 .peers
470 .into_iter()
471 .filter_map(|info| {
472 info.peer_id
473 .as_ref()
474 .and_then(|id| PeerId::from_bytes(id).ok())
475 .map(|peer_id|
476 PeerInfo {
478 peer_id: Some(peer_id),
479 })
480 })
481 .collect::<Vec<PeerInfo>>();
482
483 let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
484 prune_msgs.push(ControlAction::Prune(Prune {
485 topic_hash,
486 peers,
487 backoff: prune.backoff,
488 }));
489 }
490
491 let idontwant_msgs: Vec<ControlAction> = rpc_control
492 .idontwant
493 .into_iter()
494 .map(|idontwant| {
495 ControlAction::IDontWant(IDontWant {
496 message_ids: idontwant
497 .message_ids
498 .into_iter()
499 .map(MessageId::from)
500 .collect::<Vec<_>>(),
501 })
502 })
503 .collect();
504
505 control_msgs.extend(ihave_msgs);
506 control_msgs.extend(iwant_msgs);
507 control_msgs.extend(graft_msgs);
508 control_msgs.extend(prune_msgs);
509 control_msgs.extend(idontwant_msgs);
510 }
511
512 Ok(Some(HandlerEvent::Message {
513 rpc: Rpc {
514 messages,
515 subscriptions: rpc
516 .subscriptions
517 .into_iter()
518 .map(|sub| Subscription {
519 action: if Some(true) == sub.subscribe {
520 SubscriptionAction::Subscribe
521 } else {
522 SubscriptionAction::Unsubscribe
523 },
524 topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
525 })
526 .collect(),
527 control_msgs,
528 },
529 invalid_messages,
530 }))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use libp2p_identity::Keypair;
537 use quickcheck::*;
538
539 use super::*;
540 use crate::{
541 config::Config, Behaviour, ConfigBuilder, IdentTopic as Topic, MessageAuthenticity, Version,
542 };
543
544 #[derive(Clone, Debug)]
545 struct Message(RawMessage);
546
547 impl Arbitrary for Message {
548 fn arbitrary(g: &mut Gen) -> Self {
549 let keypair = TestKeypair::arbitrary(g);
550
551 let config = Config::default();
553 let mut gs: Behaviour =
554 Behaviour::new(MessageAuthenticity::Signed(keypair.0), config).unwrap();
555 let mut data_g = quickcheck::Gen::new(10024);
556 let data = (0..u8::arbitrary(&mut data_g))
557 .map(|_| u8::arbitrary(g))
558 .collect::<Vec<_>>();
559 let topic_id = TopicId::arbitrary(g).0;
560 Message(gs.build_raw_message(topic_id, data).unwrap())
561 }
562 }
563
564 #[derive(Clone, Debug)]
565 struct TopicId(TopicHash);
566
567 impl Arbitrary for TopicId {
568 fn arbitrary(g: &mut Gen) -> Self {
569 let mut data_g = quickcheck::Gen::new(1024);
570 let topic_string: String = (0..u8::arbitrary(&mut data_g))
571 .map(|_| char::arbitrary(g))
572 .collect::<String>();
573 TopicId(Topic::new(topic_string).into())
574 }
575 }
576
577 #[derive(Clone)]
578 struct TestKeypair(Keypair);
579
580 impl Arbitrary for TestKeypair {
581 fn arbitrary(_g: &mut Gen) -> Self {
582 TestKeypair(Keypair::generate_ed25519())
584 }
585 }
586
587 impl std::fmt::Debug for TestKeypair {
588 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589 f.debug_struct("TestKeypair")
590 .field("public", &self.0.public())
591 .finish()
592 }
593 }
594
595 #[test]
596 fn encode_decode() {
598 fn prop(message: Message) {
599 let message = message.0;
600
601 let rpc = Rpc {
602 messages: vec![message.clone()],
603 subscriptions: vec![],
604 control_msgs: vec![],
605 };
606
607 let mut codec = GossipsubCodec::new(u32::MAX as usize, ValidationMode::Strict);
608 let mut buf = BytesMut::new();
609 codec.encode(rpc.into_protobuf(), &mut buf).unwrap();
610 let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
611 match decoded_rpc {
613 HandlerEvent::Message { mut rpc, .. } => {
614 rpc.messages[0].validated = true;
615
616 assert_eq!(vec![message], rpc.messages);
617 }
618 _ => panic!("Must decode a message"),
619 }
620 }
621
622 QuickCheck::new().quickcheck(prop as fn(_) -> _)
623 }
624
625 #[test]
626 fn support_floodsub_with_custom_protocol() {
627 let protocol_config = ConfigBuilder::default()
628 .protocol_id("/foosub", Version::V1_1)
629 .support_floodsub()
630 .build()
631 .unwrap()
632 .protocol_config();
633
634 assert_eq!(protocol_config.protocol_ids[0].protocol, "/foosub");
635 assert_eq!(protocol_config.protocol_ids[1].protocol, "/floodsub/1.0.0");
636 }
637}