libp2p_gossipsub/
handler.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    pin::Pin,
23    task::{Context, Poll},
24};
25
26use asynchronous_codec::Framed;
27use futures::{future::Either, prelude::*, StreamExt};
28use libp2p_core::upgrade::DeniedUpgrade;
29use libp2p_swarm::{
30    handler::{
31        ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
32        FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
33    },
34    Stream,
35};
36use web_time::Instant;
37
38use crate::{
39    protocol::{GossipsubCodec, ProtocolConfig},
40    rpc::Receiver,
41    rpc_proto::proto,
42    types::{PeerKind, RawMessage, Rpc, RpcOut},
43    ValidationError,
44};
45
46/// The event emitted by the Handler. This informs the behaviour of various events created
47/// by the handler.
48#[derive(Debug)]
49pub enum HandlerEvent {
50    /// A GossipsubRPC message has been received. This also contains a list of invalid messages (if
51    /// any) that were received.
52    Message {
53        /// The GossipsubRPC message excluding any invalid messages.
54        rpc: Rpc,
55        /// Any invalid messages that were received in the RPC, along with the associated
56        /// validation error.
57        invalid_messages: Vec<(RawMessage, ValidationError)>,
58    },
59    /// An inbound or outbound substream has been established with the peer and this informs over
60    /// which protocol. This message only occurs once per connection.
61    PeerKind(PeerKind),
62    /// A message to be published was dropped because it could not be sent in time.
63    MessageDropped(RpcOut),
64}
65
66/// A message sent from the behaviour to the handler.
67#[allow(clippy::large_enum_variant)]
68#[derive(Debug)]
69pub enum HandlerIn {
70    /// The peer has joined the mesh.
71    JoinedMesh,
72    /// The peer has left the mesh.
73    LeftMesh,
74}
75
76/// The maximum number of inbound or outbound substreams attempts we allow.
77///
78/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we
79/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the
80/// connection faulty and disable the handler. This also prevents against potential substream
81/// creation loops.
82const MAX_SUBSTREAM_ATTEMPTS: usize = 5;
83
84#[allow(clippy::large_enum_variant)]
85pub enum Handler {
86    Enabled(EnabledHandler),
87    Disabled(DisabledHandler),
88}
89
90/// Protocol Handler that manages a single long-lived substream with a peer.
91pub struct EnabledHandler {
92    /// Upgrade configuration for the gossipsub protocol.
93    listen_protocol: ProtocolConfig,
94
95    /// The single long-lived outbound substream.
96    outbound_substream: Option<OutboundSubstreamState>,
97
98    /// The single long-lived inbound substream.
99    inbound_substream: Option<InboundSubstreamState>,
100
101    /// Queue of values that we want to send to the remote
102    send_queue: Receiver,
103
104    /// Flag indicating that an outbound substream is being established to prevent duplicate
105    /// requests.
106    outbound_substream_establishing: bool,
107
108    /// The number of outbound substreams we have requested.
109    outbound_substream_attempts: usize,
110
111    /// The number of inbound substreams that have been created by the peer.
112    inbound_substream_attempts: usize,
113
114    /// The type of peer this handler is associated to.
115    peer_kind: Option<PeerKind>,
116
117    /// Keeps track on whether we have sent the peer kind to the behaviour.
118    // NOTE: Use this flag rather than checking the substream count each poll.
119    peer_kind_sent: bool,
120
121    last_io_activity: Instant,
122
123    /// Keeps track of whether this connection is for a peer in the mesh. This is used to make
124    /// decisions about the keep alive state for this connection.
125    in_mesh: bool,
126}
127
128pub enum DisabledHandler {
129    /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
130    /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
131    /// established.
132    ProtocolUnsupported {
133        /// Keeps track on whether we have sent the peer kind to the behaviour.
134        peer_kind_sent: bool,
135    },
136    /// The maximum number of inbound or outbound substream attempts have happened and thereby the
137    /// handler has been disabled.
138    MaxSubstreamAttempts,
139}
140
141/// State of the inbound substream, opened either by us or by the remote.
142enum InboundSubstreamState {
143    /// Waiting for a message from the remote. The idle state for an inbound substream.
144    WaitingInput(Framed<Stream, GossipsubCodec>),
145    /// The substream is being closed.
146    Closing(Framed<Stream, GossipsubCodec>),
147    /// An error occurred during processing.
148    Poisoned,
149}
150
151/// State of the outbound substream, opened either by us or by the remote.
152enum OutboundSubstreamState {
153    /// Waiting for the user to send a message. The idle state for an outbound substream.
154    WaitingOutput(Framed<Stream, GossipsubCodec>),
155    /// Waiting to send a message to the remote.
156    PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
157    /// Waiting to flush the substream so that the data arrives to the remote.
158    PendingFlush(Framed<Stream, GossipsubCodec>),
159    /// An error occurred during processing.
160    Poisoned,
161}
162
163impl Handler {
164    /// Builds a new [`Handler`].
165    pub fn new(protocol_config: ProtocolConfig, message_queue: Receiver) -> Self {
166        Handler::Enabled(EnabledHandler {
167            listen_protocol: protocol_config,
168            inbound_substream: None,
169            outbound_substream: None,
170            outbound_substream_establishing: false,
171            outbound_substream_attempts: 0,
172            inbound_substream_attempts: 0,
173            send_queue: message_queue,
174            peer_kind: None,
175            peer_kind_sent: false,
176            last_io_activity: Instant::now(),
177            in_mesh: false,
178        })
179    }
180}
181
182impl EnabledHandler {
183    fn on_fully_negotiated_inbound(
184        &mut self,
185        (substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
186    ) {
187        // update the known kind of peer
188        if self.peer_kind.is_none() {
189            self.peer_kind = Some(peer_kind);
190        }
191
192        // new inbound substream. Replace the current one, if it exists.
193        tracing::trace!("New inbound substream request");
194        self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
195    }
196
197    fn on_fully_negotiated_outbound(
198        &mut self,
199        FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound<
200            <Handler as ConnectionHandler>::OutboundProtocol,
201        >,
202    ) {
203        let (substream, peer_kind) = protocol;
204
205        // update the known kind of peer
206        if self.peer_kind.is_none() {
207            self.peer_kind = Some(peer_kind);
208        }
209
210        assert!(
211            self.outbound_substream.is_none(),
212            "Established an outbound substream with one already available"
213        );
214        self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream));
215    }
216
217    fn poll(
218        &mut self,
219        cx: &mut Context<'_>,
220    ) -> Poll<
221        ConnectionHandlerEvent<
222            <Handler as ConnectionHandler>::OutboundProtocol,
223            (),
224            <Handler as ConnectionHandler>::ToBehaviour,
225        >,
226    > {
227        if !self.peer_kind_sent {
228            if let Some(peer_kind) = self.peer_kind.as_ref() {
229                self.peer_kind_sent = true;
230                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
231                    HandlerEvent::PeerKind(*peer_kind),
232                ));
233            }
234        }
235
236        // determine if we need to create the outbound stream
237        if !self.send_queue.poll_is_empty(cx)
238            && self.outbound_substream.is_none()
239            && !self.outbound_substream_establishing
240        {
241            self.outbound_substream_establishing = true;
242            return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
243                protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
244            });
245        }
246
247        // process outbound stream
248        loop {
249            match std::mem::replace(
250                &mut self.outbound_substream,
251                Some(OutboundSubstreamState::Poisoned),
252            ) {
253                // outbound idle state
254                Some(OutboundSubstreamState::WaitingOutput(substream)) => {
255                    if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {
256                        match message {
257                            RpcOut::Publish {
258                                message: _,
259                                ref mut timeout,
260                            }
261                            | RpcOut::Forward {
262                                message: _,
263                                ref mut timeout,
264                            } => {
265                                if Pin::new(timeout).poll(cx).is_ready() {
266                                    // Inform the behaviour and end the poll.
267                                    self.outbound_substream =
268                                        Some(OutboundSubstreamState::WaitingOutput(substream));
269                                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
270                                        HandlerEvent::MessageDropped(message),
271                                    ));
272                                }
273                            }
274                            _ => {} // All other messages are not time-bound.
275                        }
276                        self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
277                            substream,
278                            message.into_protobuf(),
279                        ));
280                        continue;
281                    }
282
283                    self.outbound_substream =
284                        Some(OutboundSubstreamState::WaitingOutput(substream));
285                    break;
286                }
287                Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
288                    match Sink::poll_ready(Pin::new(&mut substream), cx) {
289                        Poll::Ready(Ok(())) => {
290                            match Sink::start_send(Pin::new(&mut substream), message) {
291                                Ok(()) => {
292                                    self.outbound_substream =
293                                        Some(OutboundSubstreamState::PendingFlush(substream))
294                                }
295                                Err(e) => {
296                                    tracing::debug!(
297                                        "Failed to send message on outbound stream: {e}"
298                                    );
299                                    self.outbound_substream = None;
300                                    break;
301                                }
302                            }
303                        }
304                        Poll::Ready(Err(e)) => {
305                            tracing::debug!("Failed to send message on outbound stream: {e}");
306                            self.outbound_substream = None;
307                            break;
308                        }
309                        Poll::Pending => {
310                            self.outbound_substream =
311                                Some(OutboundSubstreamState::PendingSend(substream, message));
312                            break;
313                        }
314                    }
315                }
316                Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
317                    match Sink::poll_flush(Pin::new(&mut substream), cx) {
318                        Poll::Ready(Ok(())) => {
319                            self.last_io_activity = Instant::now();
320                            self.outbound_substream =
321                                Some(OutboundSubstreamState::WaitingOutput(substream))
322                        }
323                        Poll::Ready(Err(e)) => {
324                            tracing::debug!("Failed to flush outbound stream: {e}");
325                            self.outbound_substream = None;
326                            break;
327                        }
328                        Poll::Pending => {
329                            self.outbound_substream =
330                                Some(OutboundSubstreamState::PendingFlush(substream));
331                            break;
332                        }
333                    }
334                }
335                None => {
336                    self.outbound_substream = None;
337                    break;
338                }
339                Some(OutboundSubstreamState::Poisoned) => {
340                    unreachable!("Error occurred during outbound stream processing")
341                }
342            }
343        }
344
345        // Handle inbound messages.
346        loop {
347            match std::mem::replace(
348                &mut self.inbound_substream,
349                Some(InboundSubstreamState::Poisoned),
350            ) {
351                // inbound idle state
352                Some(InboundSubstreamState::WaitingInput(mut substream)) => {
353                    match substream.poll_next_unpin(cx) {
354                        Poll::Ready(Some(Ok(message))) => {
355                            self.last_io_activity = Instant::now();
356                            self.inbound_substream =
357                                Some(InboundSubstreamState::WaitingInput(substream));
358                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
359                        }
360                        Poll::Ready(Some(Err(error))) => {
361                            tracing::debug!("Failed to read from inbound stream: {error}");
362                            // Close this side of the stream. If the
363                            // peer is still around, they will re-establish their
364                            // outbound stream i.e. our inbound stream.
365                            self.inbound_substream =
366                                Some(InboundSubstreamState::Closing(substream));
367                        }
368                        // peer closed the stream
369                        Poll::Ready(None) => {
370                            tracing::debug!("Inbound stream closed by remote");
371                            self.inbound_substream =
372                                Some(InboundSubstreamState::Closing(substream));
373                        }
374                        Poll::Pending => {
375                            self.inbound_substream =
376                                Some(InboundSubstreamState::WaitingInput(substream));
377                            break;
378                        }
379                    }
380                }
381                Some(InboundSubstreamState::Closing(mut substream)) => {
382                    match Sink::poll_close(Pin::new(&mut substream), cx) {
383                        Poll::Ready(res) => {
384                            if let Err(e) = res {
385                                // Don't close the connection but just drop the inbound substream.
386                                // In case the remote has more to send, they will open up a new
387                                // substream.
388                                tracing::debug!("Inbound substream error while closing: {e}");
389                            }
390                            self.inbound_substream = None;
391                            break;
392                        }
393                        Poll::Pending => {
394                            self.inbound_substream =
395                                Some(InboundSubstreamState::Closing(substream));
396                            break;
397                        }
398                    }
399                }
400                None => {
401                    self.inbound_substream = None;
402                    break;
403                }
404                Some(InboundSubstreamState::Poisoned) => {
405                    unreachable!("Error occurred during inbound stream processing")
406                }
407            }
408        }
409
410        // Drop the next message in queue if it's stale.
411        if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) {
412            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
413                HandlerEvent::MessageDropped(rpc),
414            ));
415        }
416
417        Poll::Pending
418    }
419}
420
421impl ConnectionHandler for Handler {
422    type FromBehaviour = HandlerIn;
423    type ToBehaviour = HandlerEvent;
424    type InboundOpenInfo = ();
425    type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
426    type OutboundOpenInfo = ();
427    type OutboundProtocol = ProtocolConfig;
428
429    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
430        match self {
431            Handler::Enabled(handler) => {
432                SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ())
433            }
434            Handler::Disabled(_) => {
435                SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ())
436            }
437        }
438    }
439
440    fn on_behaviour_event(&mut self, message: HandlerIn) {
441        match self {
442            Handler::Enabled(handler) => match message {
443                HandlerIn::JoinedMesh => {
444                    handler.in_mesh = true;
445                }
446                HandlerIn::LeftMesh => {
447                    handler.in_mesh = false;
448                }
449            },
450            Handler::Disabled(_) => {
451                tracing::debug!(?message, "Handler is disabled. Dropping message");
452            }
453        }
454    }
455
456    fn connection_keep_alive(&self) -> bool {
457        matches!(self, Handler::Enabled(h) if h.in_mesh)
458    }
459
460    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
461    fn poll(
462        &mut self,
463        cx: &mut Context<'_>,
464    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
465        match self {
466            Handler::Enabled(handler) => handler.poll(cx),
467            Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
468                if !*peer_kind_sent {
469                    *peer_kind_sent = true;
470                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
471                        HandlerEvent::PeerKind(PeerKind::NotSupported),
472                    ));
473                }
474
475                Poll::Pending
476            }
477            Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
478        }
479    }
480
481    fn on_connection_event(
482        &mut self,
483        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
484    ) {
485        match self {
486            Handler::Enabled(handler) => {
487                if event.is_inbound() {
488                    handler.inbound_substream_attempts += 1;
489
490                    if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
491                        tracing::warn!(
492                            "The maximum number of inbound substreams attempts has been exceeded"
493                        );
494                        *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
495                        return;
496                    }
497                }
498
499                if event.is_outbound() {
500                    handler.outbound_substream_establishing = false;
501
502                    handler.outbound_substream_attempts += 1;
503
504                    if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
505                        tracing::warn!(
506                            "The maximum number of outbound substream attempts has been exceeded"
507                        );
508                        *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
509                        return;
510                    }
511                }
512
513                match event {
514                    ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
515                        protocol,
516                        ..
517                    }) => match protocol {
518                        Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol),
519                        Either::Right(v) => libp2p_core::util::unreachable(v),
520                    },
521                    ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
522                        handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
523                    }
524                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
525                        error: StreamUpgradeError::Timeout,
526                        ..
527                    }) => {
528                        tracing::debug!("Dial upgrade error: Protocol negotiation timeout");
529                    }
530                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
531                        error: StreamUpgradeError::Apply(e),
532                        ..
533                    }) => libp2p_core::util::unreachable(e),
534                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
535                        error: StreamUpgradeError::NegotiationFailed,
536                        ..
537                    }) => {
538                        // The protocol is not supported
539                        tracing::debug!(
540                            "The remote peer does not support gossipsub on this connection"
541                        );
542                        *self = Handler::Disabled(DisabledHandler::ProtocolUnsupported {
543                            peer_kind_sent: false,
544                        });
545                    }
546                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
547                        error: StreamUpgradeError::Io(e),
548                        ..
549                    }) => {
550                        tracing::debug!("Protocol negotiation failed: {e}")
551                    }
552                    _ => {}
553                }
554            }
555            Handler::Disabled(_) => {}
556        }
557    }
558}