libp2p_perf/client/
handler.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::VecDeque,
23    task::{Context, Poll},
24};
25
26use futures::{
27    stream::{BoxStream, SelectAll},
28    StreamExt,
29};
30use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
31use libp2p_swarm::{
32    handler::{
33        ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
34        ListenUpgradeError,
35    },
36    ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
37};
38
39use crate::{
40    client::{RunError, RunId},
41    RunParams, RunUpdate,
42};
43
44#[derive(Debug)]
45pub struct Command {
46    pub(crate) id: RunId,
47    pub(crate) params: RunParams,
48}
49
50#[derive(Debug)]
51pub struct Event {
52    pub(crate) id: RunId,
53    pub(crate) result: Result<RunUpdate, RunError>,
54}
55
56pub struct Handler {
57    /// Queue of events to return when polled.
58    queued_events: VecDeque<
59        ConnectionHandlerEvent<
60            <Self as ConnectionHandler>::OutboundProtocol,
61            (),
62            <Self as ConnectionHandler>::ToBehaviour,
63        >,
64    >,
65
66    requested_streams: VecDeque<Command>,
67
68    outbound: SelectAll<BoxStream<'static, (RunId, Result<crate::RunUpdate, std::io::Error>)>>,
69}
70
71impl Handler {
72    pub fn new() -> Self {
73        Self {
74            queued_events: Default::default(),
75            requested_streams: Default::default(),
76            outbound: Default::default(),
77        }
78    }
79}
80
81impl Default for Handler {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl ConnectionHandler for Handler {
88    type FromBehaviour = Command;
89    type ToBehaviour = Event;
90    type InboundProtocol = DeniedUpgrade;
91    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
92    type OutboundOpenInfo = ();
93    type InboundOpenInfo = ();
94
95    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
96        SubstreamProtocol::new(DeniedUpgrade, ())
97    }
98
99    fn on_behaviour_event(&mut self, command: Self::FromBehaviour) {
100        self.requested_streams.push_back(command);
101        self.queued_events
102            .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
103                protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()),
104            })
105    }
106
107    fn on_connection_event(
108        &mut self,
109        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
110    ) {
111        match event {
112            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
113                protocol, ..
114            }) => libp2p_core::util::unreachable(protocol),
115            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
116                protocol,
117                info: (),
118            }) => {
119                let Command { id, params } = self
120                    .requested_streams
121                    .pop_front()
122                    .expect("opened a stream without a pending command");
123                self.outbound.push(
124                    crate::protocol::send_receive(params, protocol)
125                        .map(move |result| (id, result))
126                        .boxed(),
127                );
128            }
129
130            ConnectionEvent::AddressChange(_)
131            | ConnectionEvent::LocalProtocolsChange(_)
132            | ConnectionEvent::RemoteProtocolsChange(_) => {}
133            ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => {
134                let Command { id, .. } = self
135                    .requested_streams
136                    .pop_front()
137                    .expect("requested stream without pending command");
138                self.queued_events
139                    .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event {
140                        id,
141                        result: Err(error.into()),
142                    }));
143            }
144            ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
145                libp2p_core::util::unreachable(error)
146            }
147            _ => {}
148        }
149    }
150
151    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
152    fn poll(
153        &mut self,
154        cx: &mut Context<'_>,
155    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
156        if let Some(event) = self.queued_events.pop_front() {
157            return Poll::Ready(event);
158        }
159
160        if let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) {
161            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event {
162                id,
163                result: result.map_err(Into::into),
164            }));
165        }
166
167        Poll::Pending
168    }
169}