libp2p_gossipsub/
protocol.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{collections::HashMap, convert::Infallible, pin::Pin};
22
23use asynchronous_codec::{Decoder, Encoder, Framed};
24use byteorder::{BigEndian, ByteOrder};
25use bytes::BytesMut;
26use futures::prelude::*;
27use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
28use libp2p_identity::{PeerId, PublicKey};
29use libp2p_swarm::StreamProtocol;
30use quick_protobuf::{MessageWrite, Writer};
31
32use crate::{
33    config::ValidationMode,
34    handler::HandlerEvent,
35    rpc_proto::proto,
36    topic::TopicHash,
37    types::{
38        ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
39        RawMessage, RpcIn, Subscription, SubscriptionAction,
40    },
41    ValidationError,
42};
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
47    protocol: StreamProtocol::new("/meshsub/1.2.0"),
48    kind: PeerKind::Gossipsubv1_2,
49};
50
51pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
52    protocol: StreamProtocol::new("/meshsub/1.1.0"),
53    kind: PeerKind::Gossipsubv1_1,
54};
55pub(crate) const GOSSIPSUB_1_0_0_PROTOCOL: ProtocolId = ProtocolId {
56    protocol: StreamProtocol::new("/meshsub/1.0.0"),
57    kind: PeerKind::Gossipsub,
58};
59pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId {
60    protocol: StreamProtocol::new("/floodsub/1.0.0"),
61    kind: PeerKind::Floodsub,
62};
63
64/// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol.
65#[derive(Debug, Clone)]
66pub struct ProtocolConfig {
67    /// The Gossipsub protocol id to listen on.
68    pub(crate) protocol_ids: Vec<ProtocolId>,
69    /// Determines the level of validation to be done on incoming messages.
70    pub(crate) validation_mode: ValidationMode,
71    /// The default max transmit size.
72    pub(crate) default_max_transmit_size: usize,
73    /// The max transmit sizes for a topic.
74    pub(crate) max_transmit_sizes: HashMap<TopicHash, usize>,
75}
76
77impl Default for ProtocolConfig {
78    fn default() -> Self {
79        Self {
80            validation_mode: ValidationMode::Strict,
81            protocol_ids: vec![
82                GOSSIPSUB_1_2_0_PROTOCOL,
83                GOSSIPSUB_1_1_0_PROTOCOL,
84                GOSSIPSUB_1_0_0_PROTOCOL,
85            ],
86            default_max_transmit_size: 65536,
87            max_transmit_sizes: HashMap::new(),
88        }
89    }
90}
91
92impl ProtocolConfig {
93    /// Get the max transmit size for a given topic, falling back to the default.
94    pub fn max_transmit_size_for_topic(&self, topic: &TopicHash) -> usize {
95        self.max_transmit_sizes
96            .get(topic)
97            .copied()
98            .unwrap_or(self.default_max_transmit_size)
99    }
100}
101
102/// The protocol ID
103#[derive(Clone, Debug, PartialEq)]
104pub struct ProtocolId {
105    /// The RPC message type/name.
106    pub protocol: StreamProtocol,
107    /// The type of protocol we support
108    pub kind: PeerKind,
109}
110
111impl AsRef<str> for ProtocolId {
112    fn as_ref(&self) -> &str {
113        self.protocol.as_ref()
114    }
115}
116
117impl UpgradeInfo for ProtocolConfig {
118    type Info = ProtocolId;
119    type InfoIter = Vec<Self::Info>;
120
121    fn protocol_info(&self) -> Self::InfoIter {
122        self.protocol_ids.clone()
123    }
124}
125
126impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
127where
128    TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
129{
130    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
131    type Error = Infallible;
132    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
133
134    fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
135        Box::pin(future::ok((
136            Framed::new(
137                socket,
138                GossipsubCodec::new(
139                    self.default_max_transmit_size,
140                    self.validation_mode,
141                    self.max_transmit_sizes,
142                ),
143            ),
144            protocol_id.kind,
145        )))
146    }
147}
148
149impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
150where
151    TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
152{
153    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
154    type Error = Infallible;
155    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
156
157    fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
158        Box::pin(future::ok((
159            Framed::new(
160                socket,
161                GossipsubCodec::new(
162                    self.default_max_transmit_size,
163                    self.validation_mode,
164                    self.max_transmit_sizes,
165                ),
166            ),
167            protocol_id.kind,
168        )))
169    }
170}
171
172// Gossip codec for the framing
173
174pub struct GossipsubCodec {
175    /// Determines the level of validation performed on incoming messages.
176    validation_mode: ValidationMode,
177    /// The codec to handle common encoding/decoding of protobuf messages
178    codec: quick_protobuf_codec::Codec<proto::RPC>,
179    /// Maximum transmit sizes per topic, with a default if not specified.
180    max_transmit_sizes: HashMap<TopicHash, usize>,
181}
182
183impl GossipsubCodec {
184    pub fn new(
185        max_length: usize,
186        validation_mode: ValidationMode,
187        max_transmit_sizes: HashMap<TopicHash, usize>,
188    ) -> GossipsubCodec {
189        let codec = quick_protobuf_codec::Codec::new(max_length);
190        GossipsubCodec {
191            validation_mode,
192            codec,
193            max_transmit_sizes,
194        }
195    }
196
197    /// Get the max transmit size for a given topic if it exists.
198    fn max_transmit_size_for_topic(&self, topic: &TopicHash) -> Option<usize> {
199        self.max_transmit_sizes.get(topic).copied()
200    }
201
202    /// Verifies a gossipsub message. This returns either a success or failure. All errors
203    /// are logged, which prevents error handling in the codec and handler. We simply drop invalid
204    /// messages and log warnings, rather than propagating errors through the codec.
205    fn verify_signature(message: &proto::Message) -> bool {
206        use quick_protobuf::MessageWrite;
207
208        let Some(from) = message.from.as_ref() else {
209            tracing::debug!("Signature verification failed: No source id given");
210            return false;
211        };
212
213        let Ok(source) = PeerId::from_bytes(from) else {
214            tracing::debug!("Signature verification failed: Invalid Peer Id");
215            return false;
216        };
217
218        let Some(signature) = message.signature.as_ref() else {
219            tracing::debug!("Signature verification failed: No signature provided");
220            return false;
221        };
222
223        // If there is a key value in the protobuf, use that key otherwise the key must be
224        // obtained from the inlined source peer_id.
225        let public_key = match message.key.as_deref().map(PublicKey::try_decode_protobuf) {
226            Some(Ok(key)) => key,
227            _ => match PublicKey::try_decode_protobuf(&source.to_bytes()[2..]) {
228                Ok(v) => v,
229                Err(_) => {
230                    tracing::warn!("Signature verification failed: No valid public key supplied");
231                    return false;
232                }
233            },
234        };
235
236        // The key must match the peer_id
237        if source != public_key.to_peer_id() {
238            tracing::warn!(
239                "Signature verification failed: Public key doesn't match source peer id"
240            );
241            return false;
242        }
243
244        // Construct the signature bytes
245        let mut message_sig = message.clone();
246        message_sig.signature = None;
247        message_sig.key = None;
248        let mut buf = Vec::with_capacity(message_sig.get_size());
249        let mut writer = Writer::new(&mut buf);
250        message_sig
251            .write_message(&mut writer)
252            .expect("Encoding to succeed");
253        let mut signature_bytes = SIGNING_PREFIX.to_vec();
254        signature_bytes.extend_from_slice(&buf);
255        public_key.verify(&signature_bytes, signature)
256    }
257}
258
259impl Encoder for GossipsubCodec {
260    type Item<'a> = proto::RPC;
261    type Error = quick_protobuf_codec::Error;
262
263    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
264        self.codec.encode(item, dst)
265    }
266}
267
268impl Decoder for GossipsubCodec {
269    type Item = HandlerEvent;
270    type Error = quick_protobuf_codec::Error;
271
272    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
273        let Some(rpc) = self.codec.decode(src)? else {
274            return Ok(None);
275        };
276        // Store valid messages.
277        let mut messages = Vec::with_capacity(rpc.publish.len());
278        // Store any invalid messages.
279        let mut invalid_messages = Vec::new();
280
281        for message in rpc.publish.into_iter() {
282            let topic = TopicHash::from_raw(&message.topic);
283
284            // Check the message size to ensure it doesn't bypass the configured max.
285            if self
286                .max_transmit_size_for_topic(&topic)
287                .is_some_and(|max| message.get_size() > max)
288            {
289                let message = RawMessage {
290                    source: None, // don't bother inform the application
291                    data: message.data.unwrap_or_default(),
292                    sequence_number: None, // don't inform the application
293                    topic: TopicHash::from_raw(message.topic),
294                    signature: None, // don't inform the application
295                    key: message.key,
296                    validated: false,
297                };
298
299                invalid_messages.push((message, ValidationError::MessageSizeTooLargeForTopic));
300                continue;
301            }
302
303            // Keep track of the type of invalid message.
304            let mut invalid_kind = None;
305            let mut verify_signature = false;
306            let mut verify_sequence_no = false;
307            let mut verify_source = false;
308
309            match self.validation_mode {
310                ValidationMode::Strict => {
311                    // Validate everything
312                    verify_signature = true;
313                    verify_sequence_no = true;
314                    verify_source = true;
315                }
316                ValidationMode::Permissive => {
317                    // If the fields exist, validate them
318                    if message.signature.is_some() {
319                        verify_signature = true;
320                    }
321                    if message.seqno.is_some() {
322                        verify_sequence_no = true;
323                    }
324                    if message.from.is_some() {
325                        verify_source = true;
326                    }
327                }
328                ValidationMode::Anonymous => {
329                    if message.signature.is_some() {
330                        tracing::warn!(
331                            "Signature field was non-empty and anonymous validation mode is set"
332                        );
333                        invalid_kind = Some(ValidationError::SignaturePresent);
334                    } else if message.seqno.is_some() {
335                        tracing::warn!(
336                            "Sequence number was non-empty and anonymous validation mode is set"
337                        );
338                        invalid_kind = Some(ValidationError::SequenceNumberPresent);
339                    } else if message.from.is_some() {
340                        tracing::warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
341                        invalid_kind = Some(ValidationError::MessageSourcePresent);
342                    }
343                }
344                ValidationMode::None => {}
345            }
346
347            // If the initial validation logic failed, add the message to invalid messages and
348            // continue processing the others.
349            if let Some(validation_error) = invalid_kind.take() {
350                let message = RawMessage {
351                    source: None, // don't bother inform the application
352                    data: message.data.unwrap_or_default(),
353                    sequence_number: None, // don't inform the application
354                    topic: TopicHash::from_raw(message.topic),
355                    signature: None, // don't inform the application
356                    key: message.key,
357                    validated: false,
358                };
359                invalid_messages.push((message, validation_error));
360                // proceed to the next message
361                continue;
362            }
363
364            // verify message signatures if required
365            if verify_signature && !GossipsubCodec::verify_signature(&message) {
366                tracing::warn!("Invalid signature for received message");
367                // Build the invalid message (ignoring further validation of sequence number
368                // and source)
369                let message = RawMessage {
370                    source: None, // don't bother inform the application
371                    data: message.data.unwrap_or_default(),
372                    sequence_number: None, // don't inform the application
373                    topic: TopicHash::from_raw(message.topic),
374                    signature: None, // don't inform the application
375                    key: message.key,
376                    validated: false,
377                };
378                invalid_messages.push((message, ValidationError::InvalidSignature));
379                // proceed to the next message
380                continue;
381            }
382
383            // ensure the sequence number is a u64
384            let sequence_number = if verify_sequence_no {
385                if let Some(seq_no) = message.seqno {
386                    if seq_no.is_empty() {
387                        None
388                    } else if seq_no.len() != 8 {
389                        tracing::debug!(
390                            sequence_number=?seq_no,
391                            sequence_length=%seq_no.len(),
392                            "Invalid sequence number length for received message"
393                        );
394
395                        let message = RawMessage {
396                            source: None, // don't bother inform the application
397                            data: message.data.unwrap_or_default(),
398                            sequence_number: None, // don't inform the application
399                            topic: TopicHash::from_raw(message.topic),
400                            signature: message.signature, // don't inform the application
401                            key: message.key,
402                            validated: false,
403                        };
404                        invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
405                        // proceed to the next message
406                        continue;
407                    } else {
408                        // valid sequence number
409                        Some(BigEndian::read_u64(&seq_no))
410                    }
411                } else {
412                    // sequence number was not present
413                    tracing::debug!("Sequence number not present but expected");
414                    let message = RawMessage {
415                        source: None, // don't bother inform the application
416                        data: message.data.unwrap_or_default(),
417                        sequence_number: None, // don't inform the application
418                        topic: TopicHash::from_raw(message.topic),
419                        signature: message.signature, // don't inform the application
420                        key: message.key,
421                        validated: false,
422                    };
423                    invalid_messages.push((message, ValidationError::EmptySequenceNumber));
424                    continue;
425                }
426            } else {
427                // Do not verify the sequence number, consider it empty
428                None
429            };
430
431            // Verify the message source if required
432            let source = if verify_source {
433                if let Some(bytes) = message.from {
434                    if !bytes.is_empty() {
435                        match PeerId::from_bytes(&bytes) {
436                            Ok(peer_id) => Some(peer_id), // valid peer id
437                            Err(_) => {
438                                // invalid peer id, add to invalid messages
439                                tracing::debug!("Message source has an invalid PeerId");
440                                let message = RawMessage {
441                                    source: None, // don't bother inform the application
442                                    data: message.data.unwrap_or_default(),
443                                    sequence_number,
444                                    topic: TopicHash::from_raw(message.topic),
445                                    signature: message.signature, // don't inform the application
446                                    key: message.key,
447                                    validated: false,
448                                };
449                                invalid_messages.push((message, ValidationError::InvalidPeerId));
450                                continue;
451                            }
452                        }
453                    } else {
454                        None
455                    }
456                } else {
457                    None
458                }
459            } else {
460                None
461            };
462
463            // This message has passed all validation, add it to the validated messages.
464            messages.push(RawMessage {
465                source,
466                data: message.data.unwrap_or_default(),
467                sequence_number,
468                topic: TopicHash::from_raw(message.topic),
469                signature: message.signature,
470                key: message.key,
471                validated: false,
472            });
473        }
474
475        let mut control_msgs = Vec::new();
476
477        if let Some(rpc_control) = rpc.control {
478            // Collect the gossipsub control messages
479            let ihave_msgs: Vec<ControlAction> = rpc_control
480                .ihave
481                .into_iter()
482                .map(|ihave| {
483                    ControlAction::IHave(IHave {
484                        topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
485                        message_ids: ihave
486                            .message_ids
487                            .into_iter()
488                            .map(MessageId::from)
489                            .collect::<Vec<_>>(),
490                    })
491                })
492                .collect();
493
494            let iwant_msgs: Vec<ControlAction> = rpc_control
495                .iwant
496                .into_iter()
497                .map(|iwant| {
498                    ControlAction::IWant(IWant {
499                        message_ids: iwant
500                            .message_ids
501                            .into_iter()
502                            .map(MessageId::from)
503                            .collect::<Vec<_>>(),
504                    })
505                })
506                .collect();
507
508            let graft_msgs: Vec<ControlAction> = rpc_control
509                .graft
510                .into_iter()
511                .map(|graft| {
512                    ControlAction::Graft(Graft {
513                        topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
514                    })
515                })
516                .collect();
517
518            let mut prune_msgs = Vec::new();
519
520            for prune in rpc_control.prune {
521                // filter out invalid peers
522                let peers = prune
523                    .peers
524                    .into_iter()
525                    .filter_map(|info| {
526                        info.peer_id
527                            .as_ref()
528                            .and_then(|id| PeerId::from_bytes(id).ok())
529                            .map(|peer_id|
530                                    //TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
531                                    PeerInfo {
532                                        peer_id: Some(peer_id),
533                                    })
534                    })
535                    .collect::<Vec<PeerInfo>>();
536
537                let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
538                prune_msgs.push(ControlAction::Prune(Prune {
539                    topic_hash,
540                    peers,
541                    backoff: prune.backoff,
542                }));
543            }
544
545            let idontwant_msgs: Vec<ControlAction> = rpc_control
546                .idontwant
547                .into_iter()
548                .map(|idontwant| {
549                    ControlAction::IDontWant(IDontWant {
550                        message_ids: idontwant
551                            .message_ids
552                            .into_iter()
553                            .map(MessageId::from)
554                            .collect::<Vec<_>>(),
555                    })
556                })
557                .collect();
558
559            control_msgs.extend(ihave_msgs);
560            control_msgs.extend(iwant_msgs);
561            control_msgs.extend(graft_msgs);
562            control_msgs.extend(prune_msgs);
563            control_msgs.extend(idontwant_msgs);
564        }
565
566        Ok(Some(HandlerEvent::Message {
567            rpc: RpcIn {
568                messages,
569                subscriptions: rpc
570                    .subscriptions
571                    .into_iter()
572                    .map(|sub| Subscription {
573                        action: if Some(true) == sub.subscribe {
574                            SubscriptionAction::Subscribe
575                        } else {
576                            SubscriptionAction::Unsubscribe
577                        },
578                        topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
579                    })
580                    .collect(),
581                control_msgs,
582            },
583            invalid_messages,
584        }))
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use std::time::Duration;
591
592    use futures_timer::Delay;
593    use libp2p_identity::Keypair;
594    use quickcheck::*;
595
596    use super::*;
597    use crate::{
598        config::Config, types::RpcOut, Behaviour, ConfigBuilder, IdentTopic as Topic,
599        MessageAuthenticity, Version,
600    };
601
602    #[derive(Clone, Debug)]
603    struct Message(RawMessage);
604
605    impl Arbitrary for Message {
606        fn arbitrary(g: &mut Gen) -> Self {
607            let keypair = TestKeypair::arbitrary(g);
608
609            // generate an arbitrary GossipsubMessage using the behaviour signing functionality
610            let config = Config::default();
611            let mut gs: Behaviour =
612                Behaviour::new(MessageAuthenticity::Signed(keypair.0), config).unwrap();
613            let mut data_g = quickcheck::Gen::new(10024);
614            let data = (0..u8::arbitrary(&mut data_g))
615                .map(|_| u8::arbitrary(g))
616                .collect::<Vec<_>>();
617            let topic_id = TopicId::arbitrary(g).0;
618            Message(gs.build_raw_message(topic_id, data).unwrap())
619        }
620    }
621
622    #[derive(Clone, Debug)]
623    struct TopicId(TopicHash);
624
625    impl Arbitrary for TopicId {
626        fn arbitrary(g: &mut Gen) -> Self {
627            let mut data_g = quickcheck::Gen::new(1024);
628            let topic_string: String = (0..u8::arbitrary(&mut data_g))
629                .map(|_| char::arbitrary(g))
630                .collect::<String>();
631            TopicId(Topic::new(topic_string).into())
632        }
633    }
634
635    #[derive(Clone)]
636    struct TestKeypair(Keypair);
637
638    impl Arbitrary for TestKeypair {
639        fn arbitrary(_g: &mut Gen) -> Self {
640            // Small enough to be inlined.
641            TestKeypair(Keypair::generate_ed25519())
642        }
643    }
644
645    impl std::fmt::Debug for TestKeypair {
646        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647            f.debug_struct("TestKeypair")
648                .field("public", &self.0.public())
649                .finish()
650        }
651    }
652
653    #[test]
654    /// Test that RPC messages can be encoded and decoded successfully.
655    fn encode_decode() {
656        fn prop(message: Message) {
657            let message = message.0;
658
659            let rpc = RpcOut::Publish {
660                message: message.clone(),
661                timeout: Delay::new(Duration::from_secs(1)),
662            };
663
664            let mut codec =
665                GossipsubCodec::new(u32::MAX as usize, ValidationMode::Strict, HashMap::new());
666            let mut buf = BytesMut::new();
667            codec.encode(rpc.into_protobuf(), &mut buf).unwrap();
668            let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
669            // mark as validated as its a published message
670            match decoded_rpc {
671                HandlerEvent::Message { mut rpc, .. } => {
672                    rpc.messages[0].validated = true;
673
674                    assert_eq!(vec![message], rpc.messages);
675                }
676                _ => panic!("Must decode a message"),
677            }
678        }
679
680        QuickCheck::new().quickcheck(prop as fn(_) -> _)
681    }
682
683    #[test]
684    fn support_floodsub_with_custom_protocol() {
685        let protocol_config = ConfigBuilder::default()
686            .protocol_id("/foosub", Version::V1_1)
687            .support_floodsub()
688            .build()
689            .unwrap()
690            .protocol_config();
691
692        assert_eq!(protocol_config.protocol_ids[0].protocol, "/foosub");
693        assert_eq!(protocol_config.protocol_ids[1].protocol, "/floodsub/1.0.0");
694    }
695}