libp2p_perf/server/
handler.rs1use std::{
22 convert::Infallible,
23 task::{Context, Poll},
24};
25
26use futures::FutureExt;
27use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
28use libp2p_swarm::{
29 handler::{
30 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
31 ListenUpgradeError,
32 },
33 ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
34};
35use tracing::error;
36
37use crate::Run;
38
39#[derive(Debug)]
40pub struct Event {
41 pub stats: Run,
42}
43
44pub struct Handler {
45 inbound: futures_bounded::FuturesSet<Result<Run, std::io::Error>>,
46}
47
48impl Handler {
49 pub fn new() -> Self {
50 Self {
51 inbound: futures_bounded::FuturesSet::new(
52 crate::RUN_TIMEOUT,
53 crate::MAX_PARALLEL_RUNS_PER_CONNECTION,
54 ),
55 }
56 }
57}
58
59impl Default for Handler {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl ConnectionHandler for Handler {
66 type FromBehaviour = Infallible;
67 type ToBehaviour = Event;
68 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
69 type OutboundProtocol = DeniedUpgrade;
70 type OutboundOpenInfo = Infallible;
71 type InboundOpenInfo = ();
72
73 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
74 SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ())
75 }
76
77 fn on_behaviour_event(&mut self, v: Self::FromBehaviour) {
78 libp2p_core::util::unreachable(v)
79 }
80
81 fn on_connection_event(
82 &mut self,
83 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol, (), Infallible>,
84 ) {
85 match event {
86 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
87 protocol,
88 info: _,
89 }) => {
90 if self
91 .inbound
92 .try_push(crate::protocol::receive_send(protocol).boxed())
93 .is_err()
94 {
95 tracing::warn!("Dropping inbound stream because we are at capacity");
96 }
97 }
98 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => {
99 libp2p_core::util::unreachable(info)
100 }
101
102 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => {
103 libp2p_core::util::unreachable(info)
104 }
105 ConnectionEvent::AddressChange(_)
106 | ConnectionEvent::LocalProtocolsChange(_)
107 | ConnectionEvent::RemoteProtocolsChange(_) => {}
108 ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
109 libp2p_core::util::unreachable(error)
110 }
111 _ => {}
112 }
113 }
114
115 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
116 fn poll(
117 &mut self,
118 cx: &mut Context<'_>,
119 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Infallible, Self::ToBehaviour>> {
120 loop {
121 match self.inbound.poll_unpin(cx) {
122 Poll::Ready(Ok(Ok(stats))) => {
123 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
124 }
125 Poll::Ready(Ok(Err(e))) => {
126 error!("{e:?}");
127 continue;
128 }
129 Poll::Ready(Err(e @ futures_bounded::Timeout { .. })) => {
130 error!("inbound perf request timed out: {e}");
131 continue;
132 }
133 Poll::Pending => {}
134 }
135
136 return Poll::Pending;
137 }
138 }
139}