libp2p_perf/server/
handler.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    convert::Infallible,
23    task::{Context, Poll},
24};
25
26use futures::FutureExt;
27use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
28use libp2p_swarm::{
29    handler::{
30        ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
31        ListenUpgradeError,
32    },
33    ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
34};
35use tracing::error;
36
37use crate::Run;
38
39#[derive(Debug)]
40pub struct Event {
41    pub stats: Run,
42}
43
44pub struct Handler {
45    inbound: futures_bounded::FuturesSet<Result<Run, std::io::Error>>,
46}
47
48impl Handler {
49    pub fn new() -> Self {
50        Self {
51            inbound: futures_bounded::FuturesSet::new(
52                crate::RUN_TIMEOUT,
53                crate::MAX_PARALLEL_RUNS_PER_CONNECTION,
54            ),
55        }
56    }
57}
58
59impl Default for Handler {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl ConnectionHandler for Handler {
66    type FromBehaviour = Infallible;
67    type ToBehaviour = Event;
68    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
69    type OutboundProtocol = DeniedUpgrade;
70    type OutboundOpenInfo = Infallible;
71    type InboundOpenInfo = ();
72
73    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
74        SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ())
75    }
76
77    fn on_behaviour_event(&mut self, v: Self::FromBehaviour) {
78        libp2p_core::util::unreachable(v)
79    }
80
81    fn on_connection_event(
82        &mut self,
83        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol, (), Infallible>,
84    ) {
85        match event {
86            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
87                protocol,
88                info: _,
89            }) => {
90                if self
91                    .inbound
92                    .try_push(crate::protocol::receive_send(protocol).boxed())
93                    .is_err()
94                {
95                    tracing::warn!("Dropping inbound stream because we are at capacity");
96                }
97            }
98            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => {
99                libp2p_core::util::unreachable(info)
100            }
101
102            ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => {
103                libp2p_core::util::unreachable(info)
104            }
105            ConnectionEvent::AddressChange(_)
106            | ConnectionEvent::LocalProtocolsChange(_)
107            | ConnectionEvent::RemoteProtocolsChange(_) => {}
108            ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
109                libp2p_core::util::unreachable(error)
110            }
111            _ => {}
112        }
113    }
114
115    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
116    fn poll(
117        &mut self,
118        cx: &mut Context<'_>,
119    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Infallible, Self::ToBehaviour>> {
120        loop {
121            match self.inbound.poll_unpin(cx) {
122                Poll::Ready(Ok(Ok(stats))) => {
123                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
124                }
125                Poll::Ready(Ok(Err(e))) => {
126                    error!("{e:?}");
127                    continue;
128                }
129                Poll::Ready(Err(e @ futures_bounded::Timeout { .. })) => {
130                    error!("inbound perf request timed out: {e}");
131                    continue;
132                }
133                Poll::Pending => {}
134            }
135
136            return Poll::Pending;
137        }
138    }
139}