libp2p_perf/client/
handler.rs
1use std::{
22 collections::VecDeque,
23 task::{Context, Poll},
24};
25
26use futures::{
27 stream::{BoxStream, SelectAll},
28 StreamExt,
29};
30use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
31use libp2p_swarm::{
32 handler::{
33 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
34 ListenUpgradeError,
35 },
36 ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
37};
38
39use crate::{
40 client::{RunError, RunId},
41 RunParams, RunUpdate,
42};
43
44#[derive(Debug)]
45pub struct Command {
46 pub(crate) id: RunId,
47 pub(crate) params: RunParams,
48}
49
50#[derive(Debug)]
51pub struct Event {
52 pub(crate) id: RunId,
53 pub(crate) result: Result<RunUpdate, RunError>,
54}
55
56pub struct Handler {
57 queued_events: VecDeque<
59 ConnectionHandlerEvent<
60 <Self as ConnectionHandler>::OutboundProtocol,
61 (),
62 <Self as ConnectionHandler>::ToBehaviour,
63 >,
64 >,
65
66 requested_streams: VecDeque<Command>,
67
68 outbound: SelectAll<BoxStream<'static, (RunId, Result<crate::RunUpdate, std::io::Error>)>>,
69}
70
71impl Handler {
72 pub fn new() -> Self {
73 Self {
74 queued_events: Default::default(),
75 requested_streams: Default::default(),
76 outbound: Default::default(),
77 }
78 }
79}
80
81impl Default for Handler {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl ConnectionHandler for Handler {
88 type FromBehaviour = Command;
89 type ToBehaviour = Event;
90 type InboundProtocol = DeniedUpgrade;
91 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
92 type OutboundOpenInfo = ();
93 type InboundOpenInfo = ();
94
95 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
96 SubstreamProtocol::new(DeniedUpgrade, ())
97 }
98
99 fn on_behaviour_event(&mut self, command: Self::FromBehaviour) {
100 self.requested_streams.push_back(command);
101 self.queued_events
102 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
103 protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()),
104 })
105 }
106
107 fn on_connection_event(
108 &mut self,
109 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
110 ) {
111 match event {
112 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
113 protocol, ..
114 }) => libp2p_core::util::unreachable(protocol),
115 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
116 protocol,
117 info: (),
118 }) => {
119 let Command { id, params } = self
120 .requested_streams
121 .pop_front()
122 .expect("opened a stream without a pending command");
123 self.outbound.push(
124 crate::protocol::send_receive(params, protocol)
125 .map(move |result| (id, result))
126 .boxed(),
127 );
128 }
129
130 ConnectionEvent::AddressChange(_)
131 | ConnectionEvent::LocalProtocolsChange(_)
132 | ConnectionEvent::RemoteProtocolsChange(_) => {}
133 ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => {
134 let Command { id, .. } = self
135 .requested_streams
136 .pop_front()
137 .expect("requested stream without pending command");
138 self.queued_events
139 .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event {
140 id,
141 result: Err(error.into()),
142 }));
143 }
144 ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
145 libp2p_core::util::unreachable(error)
146 }
147 _ => {}
148 }
149 }
150
151 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
152 fn poll(
153 &mut self,
154 cx: &mut Context<'_>,
155 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
156 if let Some(event) = self.queued_events.pop_front() {
157 return Poll::Ready(event);
158 }
159
160 if let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) {
161 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event {
162 id,
163 result: result.map_err(Into::into),
164 }));
165 }
166
167 Poll::Pending
168 }
169}