1use std::{
22 pin::Pin,
23 task::{Context, Poll},
24};
25
26use asynchronous_codec::Framed;
27use futures::{future::Either, prelude::*, StreamExt};
28use libp2p_core::upgrade::DeniedUpgrade;
29use libp2p_swarm::{
30 handler::{
31 ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
32 FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
33 },
34 Stream,
35};
36use web_time::Instant;
37
38use crate::{
39 protocol::{GossipsubCodec, ProtocolConfig},
40 rpc::Receiver,
41 rpc_proto::proto,
42 types::{PeerKind, RawMessage, Rpc, RpcOut},
43 ValidationError,
44};
45
46#[derive(Debug)]
49pub enum HandlerEvent {
50 Message {
53 rpc: Rpc,
55 invalid_messages: Vec<(RawMessage, ValidationError)>,
58 },
59 PeerKind(PeerKind),
62 MessageDropped(RpcOut),
64}
65
66#[allow(clippy::large_enum_variant)]
68#[derive(Debug)]
69pub enum HandlerIn {
70 JoinedMesh,
72 LeftMesh,
74}
75
76const MAX_SUBSTREAM_ATTEMPTS: usize = 5;
83
84#[allow(clippy::large_enum_variant)]
85pub enum Handler {
86 Enabled(EnabledHandler),
87 Disabled(DisabledHandler),
88}
89
90pub struct EnabledHandler {
92 listen_protocol: ProtocolConfig,
94
95 outbound_substream: Option<OutboundSubstreamState>,
97
98 inbound_substream: Option<InboundSubstreamState>,
100
101 send_queue: Receiver,
103
104 outbound_substream_establishing: bool,
107
108 outbound_substream_attempts: usize,
110
111 inbound_substream_attempts: usize,
113
114 peer_kind: Option<PeerKind>,
116
117 peer_kind_sent: bool,
120
121 last_io_activity: Instant,
122
123 in_mesh: bool,
126}
127
128pub enum DisabledHandler {
129 ProtocolUnsupported {
133 peer_kind_sent: bool,
135 },
136 MaxSubstreamAttempts,
139}
140
141enum InboundSubstreamState {
143 WaitingInput(Framed<Stream, GossipsubCodec>),
145 Closing(Framed<Stream, GossipsubCodec>),
147 Poisoned,
149}
150
151enum OutboundSubstreamState {
153 WaitingOutput(Framed<Stream, GossipsubCodec>),
155 PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
157 PendingFlush(Framed<Stream, GossipsubCodec>),
159 Poisoned,
161}
162
163impl Handler {
164 pub fn new(protocol_config: ProtocolConfig, message_queue: Receiver) -> Self {
166 Handler::Enabled(EnabledHandler {
167 listen_protocol: protocol_config,
168 inbound_substream: None,
169 outbound_substream: None,
170 outbound_substream_establishing: false,
171 outbound_substream_attempts: 0,
172 inbound_substream_attempts: 0,
173 send_queue: message_queue,
174 peer_kind: None,
175 peer_kind_sent: false,
176 last_io_activity: Instant::now(),
177 in_mesh: false,
178 })
179 }
180}
181
182impl EnabledHandler {
183 fn on_fully_negotiated_inbound(
184 &mut self,
185 (substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
186 ) {
187 if self.peer_kind.is_none() {
189 self.peer_kind = Some(peer_kind);
190 }
191
192 tracing::trace!("New inbound substream request");
194 self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
195 }
196
197 fn on_fully_negotiated_outbound(
198 &mut self,
199 FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound<
200 <Handler as ConnectionHandler>::OutboundProtocol,
201 >,
202 ) {
203 let (substream, peer_kind) = protocol;
204
205 if self.peer_kind.is_none() {
207 self.peer_kind = Some(peer_kind);
208 }
209
210 assert!(
211 self.outbound_substream.is_none(),
212 "Established an outbound substream with one already available"
213 );
214 self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream));
215 }
216
217 fn poll(
218 &mut self,
219 cx: &mut Context<'_>,
220 ) -> Poll<
221 ConnectionHandlerEvent<
222 <Handler as ConnectionHandler>::OutboundProtocol,
223 (),
224 <Handler as ConnectionHandler>::ToBehaviour,
225 >,
226 > {
227 if !self.peer_kind_sent {
228 if let Some(peer_kind) = self.peer_kind.as_ref() {
229 self.peer_kind_sent = true;
230 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
231 HandlerEvent::PeerKind(*peer_kind),
232 ));
233 }
234 }
235
236 if !self.send_queue.poll_is_empty(cx)
238 && self.outbound_substream.is_none()
239 && !self.outbound_substream_establishing
240 {
241 self.outbound_substream_establishing = true;
242 return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
243 protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
244 });
245 }
246
247 loop {
249 match std::mem::replace(
250 &mut self.outbound_substream,
251 Some(OutboundSubstreamState::Poisoned),
252 ) {
253 Some(OutboundSubstreamState::WaitingOutput(substream)) => {
255 if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {
256 match message {
257 RpcOut::Publish {
258 message: _,
259 ref mut timeout,
260 }
261 | RpcOut::Forward {
262 message: _,
263 ref mut timeout,
264 } => {
265 if Pin::new(timeout).poll(cx).is_ready() {
266 self.outbound_substream =
268 Some(OutboundSubstreamState::WaitingOutput(substream));
269 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
270 HandlerEvent::MessageDropped(message),
271 ));
272 }
273 }
274 _ => {} }
276 self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
277 substream,
278 message.into_protobuf(),
279 ));
280 continue;
281 }
282
283 self.outbound_substream =
284 Some(OutboundSubstreamState::WaitingOutput(substream));
285 break;
286 }
287 Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
288 match Sink::poll_ready(Pin::new(&mut substream), cx) {
289 Poll::Ready(Ok(())) => {
290 match Sink::start_send(Pin::new(&mut substream), message) {
291 Ok(()) => {
292 self.outbound_substream =
293 Some(OutboundSubstreamState::PendingFlush(substream))
294 }
295 Err(e) => {
296 tracing::debug!(
297 "Failed to send message on outbound stream: {e}"
298 );
299 self.outbound_substream = None;
300 break;
301 }
302 }
303 }
304 Poll::Ready(Err(e)) => {
305 tracing::debug!("Failed to send message on outbound stream: {e}");
306 self.outbound_substream = None;
307 break;
308 }
309 Poll::Pending => {
310 self.outbound_substream =
311 Some(OutboundSubstreamState::PendingSend(substream, message));
312 break;
313 }
314 }
315 }
316 Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
317 match Sink::poll_flush(Pin::new(&mut substream), cx) {
318 Poll::Ready(Ok(())) => {
319 self.last_io_activity = Instant::now();
320 self.outbound_substream =
321 Some(OutboundSubstreamState::WaitingOutput(substream))
322 }
323 Poll::Ready(Err(e)) => {
324 tracing::debug!("Failed to flush outbound stream: {e}");
325 self.outbound_substream = None;
326 break;
327 }
328 Poll::Pending => {
329 self.outbound_substream =
330 Some(OutboundSubstreamState::PendingFlush(substream));
331 break;
332 }
333 }
334 }
335 None => {
336 self.outbound_substream = None;
337 break;
338 }
339 Some(OutboundSubstreamState::Poisoned) => {
340 unreachable!("Error occurred during outbound stream processing")
341 }
342 }
343 }
344
345 loop {
347 match std::mem::replace(
348 &mut self.inbound_substream,
349 Some(InboundSubstreamState::Poisoned),
350 ) {
351 Some(InboundSubstreamState::WaitingInput(mut substream)) => {
353 match substream.poll_next_unpin(cx) {
354 Poll::Ready(Some(Ok(message))) => {
355 self.last_io_activity = Instant::now();
356 self.inbound_substream =
357 Some(InboundSubstreamState::WaitingInput(substream));
358 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
359 }
360 Poll::Ready(Some(Err(error))) => {
361 tracing::debug!("Failed to read from inbound stream: {error}");
362 self.inbound_substream =
366 Some(InboundSubstreamState::Closing(substream));
367 }
368 Poll::Ready(None) => {
370 tracing::debug!("Inbound stream closed by remote");
371 self.inbound_substream =
372 Some(InboundSubstreamState::Closing(substream));
373 }
374 Poll::Pending => {
375 self.inbound_substream =
376 Some(InboundSubstreamState::WaitingInput(substream));
377 break;
378 }
379 }
380 }
381 Some(InboundSubstreamState::Closing(mut substream)) => {
382 match Sink::poll_close(Pin::new(&mut substream), cx) {
383 Poll::Ready(res) => {
384 if let Err(e) = res {
385 tracing::debug!("Inbound substream error while closing: {e}");
389 }
390 self.inbound_substream = None;
391 break;
392 }
393 Poll::Pending => {
394 self.inbound_substream =
395 Some(InboundSubstreamState::Closing(substream));
396 break;
397 }
398 }
399 }
400 None => {
401 self.inbound_substream = None;
402 break;
403 }
404 Some(InboundSubstreamState::Poisoned) => {
405 unreachable!("Error occurred during inbound stream processing")
406 }
407 }
408 }
409
410 if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) {
412 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
413 HandlerEvent::MessageDropped(rpc),
414 ));
415 }
416
417 Poll::Pending
418 }
419}
420
421impl ConnectionHandler for Handler {
422 type FromBehaviour = HandlerIn;
423 type ToBehaviour = HandlerEvent;
424 type InboundOpenInfo = ();
425 type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
426 type OutboundOpenInfo = ();
427 type OutboundProtocol = ProtocolConfig;
428
429 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
430 match self {
431 Handler::Enabled(handler) => {
432 SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ())
433 }
434 Handler::Disabled(_) => {
435 SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ())
436 }
437 }
438 }
439
440 fn on_behaviour_event(&mut self, message: HandlerIn) {
441 match self {
442 Handler::Enabled(handler) => match message {
443 HandlerIn::JoinedMesh => {
444 handler.in_mesh = true;
445 }
446 HandlerIn::LeftMesh => {
447 handler.in_mesh = false;
448 }
449 },
450 Handler::Disabled(_) => {
451 tracing::debug!(?message, "Handler is disabled. Dropping message");
452 }
453 }
454 }
455
456 fn connection_keep_alive(&self) -> bool {
457 matches!(self, Handler::Enabled(h) if h.in_mesh)
458 }
459
460 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
461 fn poll(
462 &mut self,
463 cx: &mut Context<'_>,
464 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
465 match self {
466 Handler::Enabled(handler) => handler.poll(cx),
467 Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
468 if !*peer_kind_sent {
469 *peer_kind_sent = true;
470 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
471 HandlerEvent::PeerKind(PeerKind::NotSupported),
472 ));
473 }
474
475 Poll::Pending
476 }
477 Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
478 }
479 }
480
481 fn on_connection_event(
482 &mut self,
483 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
484 ) {
485 match self {
486 Handler::Enabled(handler) => {
487 if event.is_inbound() {
488 handler.inbound_substream_attempts += 1;
489
490 if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
491 tracing::warn!(
492 "The maximum number of inbound substreams attempts has been exceeded"
493 );
494 *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
495 return;
496 }
497 }
498
499 if event.is_outbound() {
500 handler.outbound_substream_establishing = false;
501
502 handler.outbound_substream_attempts += 1;
503
504 if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
505 tracing::warn!(
506 "The maximum number of outbound substream attempts has been exceeded"
507 );
508 *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
509 return;
510 }
511 }
512
513 match event {
514 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
515 protocol,
516 ..
517 }) => match protocol {
518 Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol),
519 Either::Right(v) => libp2p_core::util::unreachable(v),
520 },
521 ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
522 handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
523 }
524 ConnectionEvent::DialUpgradeError(DialUpgradeError {
525 error: StreamUpgradeError::Timeout,
526 ..
527 }) => {
528 tracing::debug!("Dial upgrade error: Protocol negotiation timeout");
529 }
530 ConnectionEvent::DialUpgradeError(DialUpgradeError {
531 error: StreamUpgradeError::Apply(e),
532 ..
533 }) => libp2p_core::util::unreachable(e),
534 ConnectionEvent::DialUpgradeError(DialUpgradeError {
535 error: StreamUpgradeError::NegotiationFailed,
536 ..
537 }) => {
538 tracing::debug!(
540 "The remote peer does not support gossipsub on this connection"
541 );
542 *self = Handler::Disabled(DisabledHandler::ProtocolUnsupported {
543 peer_kind_sent: false,
544 });
545 }
546 ConnectionEvent::DialUpgradeError(DialUpgradeError {
547 error: StreamUpgradeError::Io(e),
548 ..
549 }) => {
550 tracing::debug!("Protocol negotiation failed: {e}")
551 }
552 _ => {}
553 }
554 }
555 Handler::Disabled(_) => {}
556 }
557 }
558}