1use std::{
24 collections::VecDeque,
25 io,
26 task::{Context, Poll},
27 time::Duration,
28};
29
30use either::Either;
31use futures::future;
32use libp2p_core::{
33 multiaddr::Multiaddr,
34 upgrade::{DeniedUpgrade, ReadyUpgrade},
35 ConnectedPoint,
36};
37use libp2p_swarm::{
38 handler::{
39 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
40 ListenUpgradeError,
41 },
42 ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
43 SubstreamProtocol,
44};
45use protocol::{inbound, outbound};
46
47use crate::{behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS, protocol, PROTOCOL_NAME};
48
49#[derive(Debug)]
50pub enum Command {
51 Connect,
52}
53
54#[derive(Debug)]
55pub enum Event {
56 InboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
57 OutboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
58 InboundConnectFailed { error: inbound::Error },
59 OutboundConnectFailed { error: outbound::Error },
60}
61
62pub struct Handler {
63 endpoint: ConnectedPoint,
64 queued_events: VecDeque<
66 ConnectionHandlerEvent<
67 <Self as ConnectionHandler>::OutboundProtocol,
68 (),
69 <Self as ConnectionHandler>::ToBehaviour,
70 >,
71 >,
72
73 inbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, inbound::Error>>,
75
76 outbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, outbound::Error>>,
78
79 holepunch_candidates: Vec<Multiaddr>,
81
82 attempts: u8,
83}
84
85impl Handler {
86 pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
87 Self {
88 endpoint,
89 queued_events: Default::default(),
90 inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
91 outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
92 holepunch_candidates,
93 attempts: 0,
94 }
95 }
96
97 fn on_fully_negotiated_inbound(
98 &mut self,
99 FullyNegotiatedInbound {
100 protocol: output, ..
101 }: FullyNegotiatedInbound<<Self as ConnectionHandler>::InboundProtocol>,
102 ) {
103 match output {
104 future::Either::Left(stream) => {
105 if self
106 .inbound_stream
107 .try_push(inbound::handshake(
108 stream,
109 self.holepunch_candidates.clone(),
110 ))
111 .is_err()
112 {
113 tracing::warn!(
114 "New inbound connect stream while still upgrading previous one. Replacing previous with new.",
115 );
116 }
117 self.attempts += 1;
118 }
119 future::Either::Right(output) => libp2p_core::util::unreachable(output),
122 }
123 }
124
125 fn on_fully_negotiated_outbound(
126 &mut self,
127 FullyNegotiatedOutbound {
128 protocol: stream, ..
129 }: FullyNegotiatedOutbound<<Self as ConnectionHandler>::OutboundProtocol>,
130 ) {
131 assert!(
132 self.endpoint.is_listener(),
133 "A connection dialer never initiates a connection upgrade."
134 );
135 if self
136 .outbound_stream
137 .try_push(outbound::handshake(
138 stream,
139 self.holepunch_candidates.clone(),
140 ))
141 .is_err()
142 {
143 tracing::warn!(
144 "New outbound connect stream while still upgrading previous one. Replacing previous with new.",
145 );
146 }
147 }
148
149 fn on_listen_upgrade_error(
150 &mut self,
151 ListenUpgradeError { error, .. }: ListenUpgradeError<
152 (),
153 <Self as ConnectionHandler>::InboundProtocol,
154 >,
155 ) {
156 libp2p_core::util::unreachable(error.into_inner());
157 }
158
159 fn on_dial_upgrade_error(
160 &mut self,
161 DialUpgradeError { error, .. }: DialUpgradeError<
162 (),
163 <Self as ConnectionHandler>::OutboundProtocol,
164 >,
165 ) {
166 let error = match error {
167 StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
168 StreamUpgradeError::NegotiationFailed => outbound::Error::Unsupported,
169 StreamUpgradeError::Io(e) => outbound::Error::Io(e),
170 StreamUpgradeError::Timeout => outbound::Error::Io(io::ErrorKind::TimedOut.into()),
171 };
172
173 self.queued_events
174 .push_back(ConnectionHandlerEvent::NotifyBehaviour(
175 Event::OutboundConnectFailed { error },
176 ))
177 }
178}
179
180impl ConnectionHandler for Handler {
181 type FromBehaviour = Command;
182 type ToBehaviour = Event;
183 type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
184 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
185 type OutboundOpenInfo = ();
186 type InboundOpenInfo = ();
187
188 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
189 match self.endpoint {
190 ConnectedPoint::Dialer { .. } => {
191 SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ())
192 }
193 ConnectedPoint::Listener { .. } => {
194 SubstreamProtocol::new(Either::Right(DeniedUpgrade), ())
200 }
201 }
202 }
203
204 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
205 match event {
206 Command::Connect => {
207 self.queued_events
208 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
209 protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
210 });
211 self.attempts += 1;
212 }
213 }
214 }
215
216 fn connection_keep_alive(&self) -> bool {
217 if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
218 return true;
219 }
220
221 false
222 }
223
224 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
225 fn poll(
226 &mut self,
227 cx: &mut Context<'_>,
228 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
229 if let Some(event) = self.queued_events.pop_front() {
231 return Poll::Ready(event);
232 }
233
234 match self.inbound_stream.poll_unpin(cx) {
235 Poll::Ready(Ok(Ok(addresses))) => {
236 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
237 Event::InboundConnectNegotiated {
238 remote_addrs: addresses,
239 },
240 ))
241 }
242 Poll::Ready(Ok(Err(error))) => {
243 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
244 Event::InboundConnectFailed { error },
245 ))
246 }
247 Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
248 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
249 Event::InboundConnectFailed {
250 error: inbound::Error::Io(io::ErrorKind::TimedOut.into()),
251 },
252 ))
253 }
254 Poll::Pending => {}
255 }
256
257 match self.outbound_stream.poll_unpin(cx) {
258 Poll::Ready(Ok(Ok(addresses))) => {
259 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
260 Event::OutboundConnectNegotiated {
261 remote_addrs: addresses,
262 },
263 ))
264 }
265 Poll::Ready(Ok(Err(error))) => {
266 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
267 Event::OutboundConnectFailed { error },
268 ))
269 }
270 Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
271 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
272 Event::OutboundConnectFailed {
273 error: outbound::Error::Io(io::ErrorKind::TimedOut.into()),
274 },
275 ))
276 }
277 Poll::Pending => {}
278 }
279
280 Poll::Pending
281 }
282
283 fn on_connection_event(
284 &mut self,
285 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
286 ) {
287 match event {
288 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
289 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
290 }
291 ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
292 self.on_fully_negotiated_outbound(fully_negotiated_outbound)
293 }
294 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
295 self.on_listen_upgrade_error(listen_upgrade_error)
296 }
297 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
298 self.on_dial_upgrade_error(dial_upgrade_error)
299 }
300 _ => {}
301 }
302 }
303}