libp2p_gossipsub/
protocol.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{convert::Infallible, pin::Pin};
22
23use asynchronous_codec::{Decoder, Encoder, Framed};
24use byteorder::{BigEndian, ByteOrder};
25use bytes::BytesMut;
26use futures::prelude::*;
27use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
28use libp2p_identity::{PeerId, PublicKey};
29use libp2p_swarm::StreamProtocol;
30use quick_protobuf::Writer;
31
32use crate::{
33    config::ValidationMode,
34    handler::HandlerEvent,
35    rpc_proto::proto,
36    topic::TopicHash,
37    types::{
38        ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
39        RawMessage, Rpc, Subscription, SubscriptionAction,
40    },
41    ValidationError,
42};
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
47    protocol: StreamProtocol::new("/meshsub/1.2.0"),
48    kind: PeerKind::Gossipsubv1_2,
49};
50
51pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
52    protocol: StreamProtocol::new("/meshsub/1.1.0"),
53    kind: PeerKind::Gossipsubv1_1,
54};
55pub(crate) const GOSSIPSUB_1_0_0_PROTOCOL: ProtocolId = ProtocolId {
56    protocol: StreamProtocol::new("/meshsub/1.0.0"),
57    kind: PeerKind::Gossipsub,
58};
59pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId {
60    protocol: StreamProtocol::new("/floodsub/1.0.0"),
61    kind: PeerKind::Floodsub,
62};
63
64/// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol.
65#[derive(Debug, Clone)]
66pub struct ProtocolConfig {
67    /// The Gossipsub protocol id to listen on.
68    pub(crate) protocol_ids: Vec<ProtocolId>,
69    /// The maximum transmit size for a packet.
70    pub(crate) max_transmit_size: usize,
71    /// Determines the level of validation to be done on incoming messages.
72    pub(crate) validation_mode: ValidationMode,
73}
74
75impl Default for ProtocolConfig {
76    fn default() -> Self {
77        Self {
78            max_transmit_size: 65536,
79            validation_mode: ValidationMode::Strict,
80            protocol_ids: vec![
81                GOSSIPSUB_1_2_0_PROTOCOL,
82                GOSSIPSUB_1_1_0_PROTOCOL,
83                GOSSIPSUB_1_0_0_PROTOCOL,
84            ],
85        }
86    }
87}
88
89/// The protocol ID
90#[derive(Clone, Debug, PartialEq)]
91pub struct ProtocolId {
92    /// The RPC message type/name.
93    pub protocol: StreamProtocol,
94    /// The type of protocol we support
95    pub kind: PeerKind,
96}
97
98impl AsRef<str> for ProtocolId {
99    fn as_ref(&self) -> &str {
100        self.protocol.as_ref()
101    }
102}
103
104impl UpgradeInfo for ProtocolConfig {
105    type Info = ProtocolId;
106    type InfoIter = Vec<Self::Info>;
107
108    fn protocol_info(&self) -> Self::InfoIter {
109        self.protocol_ids.clone()
110    }
111}
112
113impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
114where
115    TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
116{
117    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
118    type Error = Infallible;
119    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
120
121    fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
122        Box::pin(future::ok((
123            Framed::new(
124                socket,
125                GossipsubCodec::new(self.max_transmit_size, self.validation_mode),
126            ),
127            protocol_id.kind,
128        )))
129    }
130}
131
132impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
133where
134    TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
135{
136    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
137    type Error = Infallible;
138    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
139
140    fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
141        Box::pin(future::ok((
142            Framed::new(
143                socket,
144                GossipsubCodec::new(self.max_transmit_size, self.validation_mode),
145            ),
146            protocol_id.kind,
147        )))
148    }
149}
150
151// Gossip codec for the framing
152
153pub struct GossipsubCodec {
154    /// Determines the level of validation performed on incoming messages.
155    validation_mode: ValidationMode,
156    /// The codec to handle common encoding/decoding of protobuf messages
157    codec: quick_protobuf_codec::Codec<proto::RPC>,
158}
159
160impl GossipsubCodec {
161    pub fn new(max_length: usize, validation_mode: ValidationMode) -> GossipsubCodec {
162        let codec = quick_protobuf_codec::Codec::new(max_length);
163        GossipsubCodec {
164            validation_mode,
165            codec,
166        }
167    }
168
169    /// Verifies a gossipsub message. This returns either a success or failure. All errors
170    /// are logged, which prevents error handling in the codec and handler. We simply drop invalid
171    /// messages and log warnings, rather than propagating errors through the codec.
172    fn verify_signature(message: &proto::Message) -> bool {
173        use quick_protobuf::MessageWrite;
174
175        let Some(from) = message.from.as_ref() else {
176            tracing::debug!("Signature verification failed: No source id given");
177            return false;
178        };
179
180        let Ok(source) = PeerId::from_bytes(from) else {
181            tracing::debug!("Signature verification failed: Invalid Peer Id");
182            return false;
183        };
184
185        let Some(signature) = message.signature.as_ref() else {
186            tracing::debug!("Signature verification failed: No signature provided");
187            return false;
188        };
189
190        // If there is a key value in the protobuf, use that key otherwise the key must be
191        // obtained from the inlined source peer_id.
192        let public_key = match message.key.as_deref().map(PublicKey::try_decode_protobuf) {
193            Some(Ok(key)) => key,
194            _ => match PublicKey::try_decode_protobuf(&source.to_bytes()[2..]) {
195                Ok(v) => v,
196                Err(_) => {
197                    tracing::warn!("Signature verification failed: No valid public key supplied");
198                    return false;
199                }
200            },
201        };
202
203        // The key must match the peer_id
204        if source != public_key.to_peer_id() {
205            tracing::warn!(
206                "Signature verification failed: Public key doesn't match source peer id"
207            );
208            return false;
209        }
210
211        // Construct the signature bytes
212        let mut message_sig = message.clone();
213        message_sig.signature = None;
214        message_sig.key = None;
215        let mut buf = Vec::with_capacity(message_sig.get_size());
216        let mut writer = Writer::new(&mut buf);
217        message_sig
218            .write_message(&mut writer)
219            .expect("Encoding to succeed");
220        let mut signature_bytes = SIGNING_PREFIX.to_vec();
221        signature_bytes.extend_from_slice(&buf);
222        public_key.verify(&signature_bytes, signature)
223    }
224}
225
226impl Encoder for GossipsubCodec {
227    type Item<'a> = proto::RPC;
228    type Error = quick_protobuf_codec::Error;
229
230    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
231        self.codec.encode(item, dst)
232    }
233}
234
235impl Decoder for GossipsubCodec {
236    type Item = HandlerEvent;
237    type Error = quick_protobuf_codec::Error;
238
239    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
240        let Some(rpc) = self.codec.decode(src)? else {
241            return Ok(None);
242        };
243        // Store valid messages.
244        let mut messages = Vec::with_capacity(rpc.publish.len());
245        // Store any invalid messages.
246        let mut invalid_messages = Vec::new();
247
248        for message in rpc.publish.into_iter() {
249            // Keep track of the type of invalid message.
250            let mut invalid_kind = None;
251            let mut verify_signature = false;
252            let mut verify_sequence_no = false;
253            let mut verify_source = false;
254
255            match self.validation_mode {
256                ValidationMode::Strict => {
257                    // Validate everything
258                    verify_signature = true;
259                    verify_sequence_no = true;
260                    verify_source = true;
261                }
262                ValidationMode::Permissive => {
263                    // If the fields exist, validate them
264                    if message.signature.is_some() {
265                        verify_signature = true;
266                    }
267                    if message.seqno.is_some() {
268                        verify_sequence_no = true;
269                    }
270                    if message.from.is_some() {
271                        verify_source = true;
272                    }
273                }
274                ValidationMode::Anonymous => {
275                    if message.signature.is_some() {
276                        tracing::warn!(
277                            "Signature field was non-empty and anonymous validation mode is set"
278                        );
279                        invalid_kind = Some(ValidationError::SignaturePresent);
280                    } else if message.seqno.is_some() {
281                        tracing::warn!(
282                            "Sequence number was non-empty and anonymous validation mode is set"
283                        );
284                        invalid_kind = Some(ValidationError::SequenceNumberPresent);
285                    } else if message.from.is_some() {
286                        tracing::warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
287                        invalid_kind = Some(ValidationError::MessageSourcePresent);
288                    }
289                }
290                ValidationMode::None => {}
291            }
292
293            // If the initial validation logic failed, add the message to invalid messages and
294            // continue processing the others.
295            if let Some(validation_error) = invalid_kind.take() {
296                let message = RawMessage {
297                    source: None, // don't bother inform the application
298                    data: message.data.unwrap_or_default(),
299                    sequence_number: None, // don't inform the application
300                    topic: TopicHash::from_raw(message.topic),
301                    signature: None, // don't inform the application
302                    key: message.key,
303                    validated: false,
304                };
305                invalid_messages.push((message, validation_error));
306                // proceed to the next message
307                continue;
308            }
309
310            // verify message signatures if required
311            if verify_signature && !GossipsubCodec::verify_signature(&message) {
312                tracing::warn!("Invalid signature for received message");
313
314                // Build the invalid message (ignoring further validation of sequence number
315                // and source)
316                let message = RawMessage {
317                    source: None, // don't bother inform the application
318                    data: message.data.unwrap_or_default(),
319                    sequence_number: None, // don't inform the application
320                    topic: TopicHash::from_raw(message.topic),
321                    signature: None, // don't inform the application
322                    key: message.key,
323                    validated: false,
324                };
325                invalid_messages.push((message, ValidationError::InvalidSignature));
326                // proceed to the next message
327                continue;
328            }
329
330            // ensure the sequence number is a u64
331            let sequence_number = if verify_sequence_no {
332                if let Some(seq_no) = message.seqno {
333                    if seq_no.is_empty() {
334                        None
335                    } else if seq_no.len() != 8 {
336                        tracing::debug!(
337                            sequence_number=?seq_no,
338                            sequence_length=%seq_no.len(),
339                            "Invalid sequence number length for received message"
340                        );
341                        let message = RawMessage {
342                            source: None, // don't bother inform the application
343                            data: message.data.unwrap_or_default(),
344                            sequence_number: None, // don't inform the application
345                            topic: TopicHash::from_raw(message.topic),
346                            signature: message.signature, // don't inform the application
347                            key: message.key,
348                            validated: false,
349                        };
350                        invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
351                        // proceed to the next message
352                        continue;
353                    } else {
354                        // valid sequence number
355                        Some(BigEndian::read_u64(&seq_no))
356                    }
357                } else {
358                    // sequence number was not present
359                    tracing::debug!("Sequence number not present but expected");
360                    let message = RawMessage {
361                        source: None, // don't bother inform the application
362                        data: message.data.unwrap_or_default(),
363                        sequence_number: None, // don't inform the application
364                        topic: TopicHash::from_raw(message.topic),
365                        signature: message.signature, // don't inform the application
366                        key: message.key,
367                        validated: false,
368                    };
369                    invalid_messages.push((message, ValidationError::EmptySequenceNumber));
370                    continue;
371                }
372            } else {
373                // Do not verify the sequence number, consider it empty
374                None
375            };
376
377            // Verify the message source if required
378            let source = if verify_source {
379                if let Some(bytes) = message.from {
380                    if !bytes.is_empty() {
381                        match PeerId::from_bytes(&bytes) {
382                            Ok(peer_id) => Some(peer_id), // valid peer id
383                            Err(_) => {
384                                // invalid peer id, add to invalid messages
385                                tracing::debug!("Message source has an invalid PeerId");
386                                let message = RawMessage {
387                                    source: None, // don't bother inform the application
388                                    data: message.data.unwrap_or_default(),
389                                    sequence_number,
390                                    topic: TopicHash::from_raw(message.topic),
391                                    signature: message.signature, // don't inform the application
392                                    key: message.key,
393                                    validated: false,
394                                };
395                                invalid_messages.push((message, ValidationError::InvalidPeerId));
396                                continue;
397                            }
398                        }
399                    } else {
400                        None
401                    }
402                } else {
403                    None
404                }
405            } else {
406                None
407            };
408
409            // This message has passed all validation, add it to the validated messages.
410            messages.push(RawMessage {
411                source,
412                data: message.data.unwrap_or_default(),
413                sequence_number,
414                topic: TopicHash::from_raw(message.topic),
415                signature: message.signature,
416                key: message.key,
417                validated: false,
418            });
419        }
420
421        let mut control_msgs = Vec::new();
422
423        if let Some(rpc_control) = rpc.control {
424            // Collect the gossipsub control messages
425            let ihave_msgs: Vec<ControlAction> = rpc_control
426                .ihave
427                .into_iter()
428                .map(|ihave| {
429                    ControlAction::IHave(IHave {
430                        topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
431                        message_ids: ihave
432                            .message_ids
433                            .into_iter()
434                            .map(MessageId::from)
435                            .collect::<Vec<_>>(),
436                    })
437                })
438                .collect();
439
440            let iwant_msgs: Vec<ControlAction> = rpc_control
441                .iwant
442                .into_iter()
443                .map(|iwant| {
444                    ControlAction::IWant(IWant {
445                        message_ids: iwant
446                            .message_ids
447                            .into_iter()
448                            .map(MessageId::from)
449                            .collect::<Vec<_>>(),
450                    })
451                })
452                .collect();
453
454            let graft_msgs: Vec<ControlAction> = rpc_control
455                .graft
456                .into_iter()
457                .map(|graft| {
458                    ControlAction::Graft(Graft {
459                        topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
460                    })
461                })
462                .collect();
463
464            let mut prune_msgs = Vec::new();
465
466            for prune in rpc_control.prune {
467                // filter out invalid peers
468                let peers = prune
469                    .peers
470                    .into_iter()
471                    .filter_map(|info| {
472                        info.peer_id
473                            .as_ref()
474                            .and_then(|id| PeerId::from_bytes(id).ok())
475                            .map(|peer_id|
476                                    //TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
477                                    PeerInfo {
478                                        peer_id: Some(peer_id),
479                                    })
480                    })
481                    .collect::<Vec<PeerInfo>>();
482
483                let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
484                prune_msgs.push(ControlAction::Prune(Prune {
485                    topic_hash,
486                    peers,
487                    backoff: prune.backoff,
488                }));
489            }
490
491            let idontwant_msgs: Vec<ControlAction> = rpc_control
492                .idontwant
493                .into_iter()
494                .map(|idontwant| {
495                    ControlAction::IDontWant(IDontWant {
496                        message_ids: idontwant
497                            .message_ids
498                            .into_iter()
499                            .map(MessageId::from)
500                            .collect::<Vec<_>>(),
501                    })
502                })
503                .collect();
504
505            control_msgs.extend(ihave_msgs);
506            control_msgs.extend(iwant_msgs);
507            control_msgs.extend(graft_msgs);
508            control_msgs.extend(prune_msgs);
509            control_msgs.extend(idontwant_msgs);
510        }
511
512        Ok(Some(HandlerEvent::Message {
513            rpc: Rpc {
514                messages,
515                subscriptions: rpc
516                    .subscriptions
517                    .into_iter()
518                    .map(|sub| Subscription {
519                        action: if Some(true) == sub.subscribe {
520                            SubscriptionAction::Subscribe
521                        } else {
522                            SubscriptionAction::Unsubscribe
523                        },
524                        topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
525                    })
526                    .collect(),
527                control_msgs,
528            },
529            invalid_messages,
530        }))
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use libp2p_identity::Keypair;
537    use quickcheck::*;
538
539    use super::*;
540    use crate::{
541        config::Config, Behaviour, ConfigBuilder, IdentTopic as Topic, MessageAuthenticity, Version,
542    };
543
544    #[derive(Clone, Debug)]
545    struct Message(RawMessage);
546
547    impl Arbitrary for Message {
548        fn arbitrary(g: &mut Gen) -> Self {
549            let keypair = TestKeypair::arbitrary(g);
550
551            // generate an arbitrary GossipsubMessage using the behaviour signing functionality
552            let config = Config::default();
553            let mut gs: Behaviour =
554                Behaviour::new(MessageAuthenticity::Signed(keypair.0), config).unwrap();
555            let mut data_g = quickcheck::Gen::new(10024);
556            let data = (0..u8::arbitrary(&mut data_g))
557                .map(|_| u8::arbitrary(g))
558                .collect::<Vec<_>>();
559            let topic_id = TopicId::arbitrary(g).0;
560            Message(gs.build_raw_message(topic_id, data).unwrap())
561        }
562    }
563
564    #[derive(Clone, Debug)]
565    struct TopicId(TopicHash);
566
567    impl Arbitrary for TopicId {
568        fn arbitrary(g: &mut Gen) -> Self {
569            let mut data_g = quickcheck::Gen::new(1024);
570            let topic_string: String = (0..u8::arbitrary(&mut data_g))
571                .map(|_| char::arbitrary(g))
572                .collect::<String>();
573            TopicId(Topic::new(topic_string).into())
574        }
575    }
576
577    #[derive(Clone)]
578    struct TestKeypair(Keypair);
579
580    impl Arbitrary for TestKeypair {
581        fn arbitrary(_g: &mut Gen) -> Self {
582            // Small enough to be inlined.
583            TestKeypair(Keypair::generate_ed25519())
584        }
585    }
586
587    impl std::fmt::Debug for TestKeypair {
588        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589            f.debug_struct("TestKeypair")
590                .field("public", &self.0.public())
591                .finish()
592        }
593    }
594
595    #[test]
596    /// Test that RPC messages can be encoded and decoded successfully.
597    fn encode_decode() {
598        fn prop(message: Message) {
599            let message = message.0;
600
601            let rpc = Rpc {
602                messages: vec![message.clone()],
603                subscriptions: vec![],
604                control_msgs: vec![],
605            };
606
607            let mut codec = GossipsubCodec::new(u32::MAX as usize, ValidationMode::Strict);
608            let mut buf = BytesMut::new();
609            codec.encode(rpc.into_protobuf(), &mut buf).unwrap();
610            let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
611            // mark as validated as its a published message
612            match decoded_rpc {
613                HandlerEvent::Message { mut rpc, .. } => {
614                    rpc.messages[0].validated = true;
615
616                    assert_eq!(vec![message], rpc.messages);
617                }
618                _ => panic!("Must decode a message"),
619            }
620        }
621
622        QuickCheck::new().quickcheck(prop as fn(_) -> _)
623    }
624
625    #[test]
626    fn support_floodsub_with_custom_protocol() {
627        let protocol_config = ConfigBuilder::default()
628            .protocol_id("/foosub", Version::V1_1)
629            .support_floodsub()
630            .build()
631            .unwrap()
632            .protocol_config();
633
634        assert_eq!(protocol_config.protocol_ids[0].protocol, "/foosub");
635        assert_eq!(protocol_config.protocol_ids[1].protocol, "/floodsub/1.0.0");
636    }
637}