1use std::{
22 collections::VecDeque,
23 convert::Infallible,
24 fmt, io,
25 task::{Context, Poll},
26 time::Duration,
27};
28
29use futures::{
30 channel::{mpsc, mpsc::Sender, oneshot},
31 future::FutureExt,
32};
33use futures_timer::Delay;
34use libp2p_core::{multiaddr::Protocol, upgrade::ReadyUpgrade, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37 handler::{ConnectionEvent, FullyNegotiatedInbound},
38 ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39 SubstreamProtocol,
40};
41
42use crate::{
43 client::Connection,
44 priv_client,
45 priv_client::{transport, transport::ToListenerMsg},
46 proto,
47 protocol::{self, inbound_stop, outbound_hop},
48 HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
49};
50
51const MAX_NUMBER_DENYING_CIRCUIT: usize = 8;
55const DENYING_CIRCUIT_TIMEOUT: Duration = Duration::from_secs(60);
56
57const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
58const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
59
60pub enum In {
61 Reserve {
62 to_listener: mpsc::Sender<transport::ToListenerMsg>,
63 },
64 EstablishCircuit {
65 dst_peer_id: PeerId,
66 to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
67 },
68}
69
70impl fmt::Debug for In {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 match self {
73 In::Reserve { to_listener: _ } => f.debug_struct("In::Reserve").finish(),
74 In::EstablishCircuit {
75 dst_peer_id,
76 to_dial: _,
77 } => f
78 .debug_struct("In::EstablishCircuit")
79 .field("dst_peer_id", dst_peer_id)
80 .finish(),
81 }
82 }
83}
84
85#[derive(Debug)]
86pub enum Event {
87 ReservationReqAccepted {
88 renewal: bool,
90 limit: Option<protocol::Limit>,
91 },
92 OutboundCircuitEstablished { limit: Option<protocol::Limit> },
94 InboundCircuitEstablished {
96 src_peer_id: PeerId,
97 limit: Option<protocol::Limit>,
98 },
99}
100
101pub struct Handler {
102 local_peer_id: PeerId,
103 remote_peer_id: PeerId,
104 remote_addr: Multiaddr,
105
106 queued_events: VecDeque<
108 ConnectionHandlerEvent<
109 <Handler as ConnectionHandler>::OutboundProtocol,
110 (),
111 <Handler as ConnectionHandler>::ToBehaviour,
112 >,
113 >,
114
115 pending_streams: VecDeque<oneshot::Sender<Result<Stream, StreamUpgradeError<Infallible>>>>,
116
117 inflight_reserve_requests: futures_bounded::FuturesTupleSet<
118 Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
119 mpsc::Sender<transport::ToListenerMsg>,
120 >,
121
122 inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet<
123 Result<outbound_hop::Circuit, outbound_hop::ConnectError>,
124 oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
125 >,
126
127 inflight_inbound_circuit_requests:
128 futures_bounded::FuturesSet<Result<inbound_stop::Circuit, inbound_stop::Error>>,
129
130 inflight_outbound_circuit_deny_requests:
131 futures_bounded::FuturesSet<Result<(), inbound_stop::Error>>,
132
133 reservation: Reservation,
134}
135
136impl Handler {
137 pub fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self {
138 Self {
139 local_peer_id,
140 remote_peer_id,
141 remote_addr,
142 queued_events: Default::default(),
143 pending_streams: Default::default(),
144 inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
145 STREAM_TIMEOUT,
146 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
147 ),
148 inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
149 STREAM_TIMEOUT,
150 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
151 ),
152 inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
153 STREAM_TIMEOUT,
154 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
155 ),
156 inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
157 DENYING_CIRCUIT_TIMEOUT,
158 MAX_NUMBER_DENYING_CIRCUIT,
159 ),
160 reservation: Reservation::None,
161 }
162 }
163
164 fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) {
165 let src_peer_id = circuit.src_peer_id();
166
167 if self
168 .inflight_outbound_circuit_deny_requests
169 .try_push(circuit.deny(proto::Status::NO_RESERVATION))
170 .is_err()
171 {
172 tracing::warn!(
173 peer=%src_peer_id,
174 "Dropping existing inbound circuit request to be denied from peer in favor of new one"
175 )
176 }
177 }
178
179 fn make_new_reservation(&mut self, to_listener: Sender<ToListenerMsg>) {
180 let (sender, receiver) = oneshot::channel();
181
182 self.pending_streams.push_back(sender);
183 self.queued_events
184 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
185 protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
186 });
187 let result = self.inflight_reserve_requests.try_push(
188 async move {
189 let stream = receiver
190 .await
191 .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
192 .map_err(into_reserve_error)?;
193
194 let reservation = outbound_hop::make_reservation(stream).await?;
195
196 Ok(reservation)
197 },
198 to_listener,
199 );
200
201 if result.is_err() {
202 tracing::warn!("Dropping in-flight reservation request because we are at capacity");
203 }
204 }
205
206 fn establish_new_circuit(
207 &mut self,
208 to_dial: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
209 dst_peer_id: PeerId,
210 ) {
211 let (sender, receiver) = oneshot::channel();
212
213 self.pending_streams.push_back(sender);
214 self.queued_events
215 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
216 protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
217 });
218 let result = self.inflight_outbound_connect_requests.try_push(
219 async move {
220 let stream = receiver
221 .await
222 .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
223 .map_err(into_connect_error)?;
224
225 outbound_hop::open_circuit(stream, dst_peer_id).await
226 },
227 to_dial,
228 );
229
230 if result.is_err() {
231 tracing::warn!("Dropping in-flight connect request because we are at capacity")
232 }
233 }
234}
235
236impl ConnectionHandler for Handler {
237 type FromBehaviour = In;
238 type ToBehaviour = Event;
239 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
240 type InboundOpenInfo = ();
241 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
242 type OutboundOpenInfo = ();
243
244 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
245 SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ())
246 }
247
248 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
249 match event {
250 In::Reserve { to_listener } => {
251 self.make_new_reservation(to_listener);
252 }
253 In::EstablishCircuit {
254 to_dial,
255 dst_peer_id,
256 } => {
257 self.establish_new_circuit(to_dial, dst_peer_id);
258 }
259 }
260 }
261
262 fn connection_keep_alive(&self) -> bool {
263 self.reservation.is_some()
264 }
265
266 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
267 fn poll(
268 &mut self,
269 cx: &mut Context<'_>,
270 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
271 loop {
272 match self.inflight_reserve_requests.poll_unpin(cx) {
274 Poll::Ready((
275 Ok(Ok(outbound_hop::Reservation {
276 renewal_timeout,
277 addrs,
278 limit,
279 })),
280 to_listener,
281 )) => {
282 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
283 self.reservation.accepted(
284 renewal_timeout,
285 addrs,
286 to_listener,
287 self.local_peer_id,
288 limit,
289 ),
290 ));
291 }
292 Poll::Ready((Ok(Err(error)), mut to_listener)) => {
293 if let Err(e) =
294 to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
295 {
296 tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
297 }
298 self.reservation.failed();
299 continue;
300 }
301 Poll::Ready((Err(futures_bounded::Timeout { .. }), mut to_listener)) => {
302 if let Err(e) =
303 to_listener.try_send(transport::ToListenerMsg::Reservation(Err(
304 outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()),
305 )))
306 {
307 tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
308 }
309 self.reservation.failed();
310 continue;
311 }
312 Poll::Pending => {}
313 }
314
315 match self.inflight_outbound_connect_requests.poll_unpin(cx) {
317 Poll::Ready((
318 Ok(Ok(outbound_hop::Circuit {
319 limit,
320 read_buffer,
321 stream,
322 })),
323 to_dialer,
324 )) => {
325 if to_dialer
326 .send(Ok(priv_client::Connection {
327 state: priv_client::ConnectionState::new_outbound(stream, read_buffer),
328 }))
329 .is_err()
330 {
331 tracing::debug!(
332 "Dropping newly established circuit because the listener is gone"
333 );
334 continue;
335 }
336
337 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
338 Event::OutboundCircuitEstablished { limit },
339 ));
340 }
341 Poll::Ready((Ok(Err(error)), to_dialer)) => {
342 let _ = to_dialer.send(Err(error));
343 continue;
344 }
345 Poll::Ready((Err(futures_bounded::Timeout { .. }), to_dialer)) => {
346 if to_dialer
347 .send(Err(outbound_hop::ConnectError::Io(
348 io::ErrorKind::TimedOut.into(),
349 )))
350 .is_err()
351 {
352 tracing::debug!("Unable to send error to dialer")
353 }
354 self.reservation.failed();
355 continue;
356 }
357 Poll::Pending => {}
358 }
359
360 if let Some(event) = self.queued_events.pop_front() {
362 return Poll::Ready(event);
363 }
364
365 match self.inflight_inbound_circuit_requests.poll_unpin(cx) {
366 Poll::Ready(Ok(Ok(circuit))) => match &mut self.reservation {
367 Reservation::Accepted { pending_msgs, .. }
368 | Reservation::Renewing { pending_msgs, .. } => {
369 let src_peer_id = circuit.src_peer_id();
370 let limit = circuit.limit();
371
372 let connection = super::ConnectionState::new_inbound(circuit);
373
374 pending_msgs.push_back(
375 transport::ToListenerMsg::IncomingRelayedConnection {
376 stream: super::Connection { state: connection },
377 src_peer_id,
378 relay_peer_id: self.remote_peer_id,
379 relay_addr: self.remote_addr.clone(),
380 },
381 );
382 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
383 Event::InboundCircuitEstablished { src_peer_id, limit },
384 ));
385 }
386 Reservation::None => {
387 self.insert_to_deny_futs(circuit);
388 continue;
389 }
390 },
391 Poll::Ready(Ok(Err(e))) => {
392 tracing::debug!("An inbound circuit request failed: {e}");
393 continue;
394 }
395 Poll::Ready(Err(e)) => {
396 tracing::debug!("An inbound circuit request timed out: {e}");
397 continue;
398 }
399 Poll::Pending => {}
400 }
401
402 if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
403 self.make_new_reservation(to_listener);
404 continue;
405 }
406
407 match self.inflight_outbound_circuit_deny_requests.poll_unpin(cx) {
409 Poll::Ready(Ok(Ok(()))) => continue,
410 Poll::Ready(Ok(Err(error))) => {
411 tracing::debug!("Denying inbound circuit failed: {error}");
412 continue;
413 }
414 Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
415 tracing::debug!("Denying inbound circuit timed out");
416 continue;
417 }
418 Poll::Pending => {}
419 }
420
421 return Poll::Pending;
422 }
423 }
424
425 fn on_connection_event(
426 &mut self,
427 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
428 ) {
429 match event {
430 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
431 protocol: stream,
432 ..
433 }) => {
434 if self
435 .inflight_inbound_circuit_requests
436 .try_push(inbound_stop::handle_open_circuit(stream))
437 .is_err()
438 {
439 tracing::warn!("Dropping inbound stream because we are at capacity")
440 }
441 }
442 ConnectionEvent::FullyNegotiatedOutbound(ev) => {
443 if let Some(next) = self.pending_streams.pop_front() {
444 let _ = next.send(Ok(ev.protocol));
445 }
446 }
447 ConnectionEvent::ListenUpgradeError(ev) => libp2p_core::util::unreachable(ev.error),
448 ConnectionEvent::DialUpgradeError(ev) => {
449 if let Some(next) = self.pending_streams.pop_front() {
450 let _ = next.send(Err(ev.error));
451 }
452 }
453 _ => {}
454 }
455 }
456}
457
458enum Reservation {
459 Accepted {
461 renewal_timeout: Delay,
462 pending_msgs: VecDeque<transport::ToListenerMsg>,
464 to_listener: mpsc::Sender<transport::ToListenerMsg>,
465 },
466 Renewing {
468 pending_msgs: VecDeque<transport::ToListenerMsg>,
470 },
471 None,
472}
473
474impl Reservation {
475 fn accepted(
476 &mut self,
477 renewal_timeout: Delay,
478 addrs: Vec<Multiaddr>,
479 to_listener: mpsc::Sender<transport::ToListenerMsg>,
480 local_peer_id: PeerId,
481 limit: Option<protocol::Limit>,
482 ) -> Event {
483 let (renewal, mut pending_msgs) = match std::mem::replace(self, Self::None) {
484 Reservation::Accepted { pending_msgs, .. }
485 | Reservation::Renewing { pending_msgs, .. } => (true, pending_msgs),
486 Reservation::None => (false, VecDeque::new()),
487 };
488
489 pending_msgs.push_back(transport::ToListenerMsg::Reservation(Ok(
490 transport::Reservation {
491 addrs: addrs
492 .into_iter()
493 .map(|a| {
494 a.with(Protocol::P2pCircuit)
495 .with(Protocol::P2p(local_peer_id))
496 })
497 .collect(),
498 },
499 )));
500
501 *self = Reservation::Accepted {
502 renewal_timeout,
503 pending_msgs,
504 to_listener,
505 };
506
507 Event::ReservationReqAccepted { renewal, limit }
508 }
509
510 fn is_some(&self) -> bool {
511 matches!(self, Self::Accepted { .. } | Self::Renewing { .. })
512 }
513
514 fn failed(&mut self) {
516 *self = Reservation::None;
517 }
518
519 fn forward_messages_to_transport_listener(&mut self, cx: &mut Context<'_>) {
520 if let Reservation::Accepted {
521 pending_msgs,
522 to_listener,
523 ..
524 } = self
525 {
526 if !pending_msgs.is_empty() {
527 match to_listener.poll_ready(cx) {
528 Poll::Ready(Ok(())) => {
529 if let Err(e) = to_listener
530 .start_send(pending_msgs.pop_front().expect("Called !is_empty()."))
531 {
532 tracing::debug!("Failed to sent pending message to listener: {:?}", e);
533 *self = Reservation::None;
534 }
535 }
536 Poll::Ready(Err(e)) => {
537 tracing::debug!("Channel to listener failed: {:?}", e);
538 *self = Reservation::None;
539 }
540 Poll::Pending => {}
541 }
542 }
543 }
544 }
545
546 fn poll(
547 &mut self,
548 cx: &mut Context<'_>,
549 ) -> Poll<Option<mpsc::Sender<transport::ToListenerMsg>>> {
550 self.forward_messages_to_transport_listener(cx);
551
552 let (next_reservation, poll_val) = match std::mem::replace(self, Reservation::None) {
554 Reservation::Accepted {
555 mut renewal_timeout,
556 pending_msgs,
557 to_listener,
558 } => match renewal_timeout.poll_unpin(cx) {
559 Poll::Ready(()) => (
560 Reservation::Renewing { pending_msgs },
561 Poll::Ready(Some(to_listener)),
562 ),
563 Poll::Pending => (
564 Reservation::Accepted {
565 renewal_timeout,
566 pending_msgs,
567 to_listener,
568 },
569 Poll::Pending,
570 ),
571 },
572 r => (r, Poll::Pending),
573 };
574 *self = next_reservation;
575
576 poll_val
577 }
578}
579
580fn into_reserve_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ReserveError {
581 match e {
582 StreamUpgradeError::Timeout => {
583 outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
584 }
585 StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
586 StreamUpgradeError::NegotiationFailed => outbound_hop::ReserveError::Unsupported,
587 StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
588 }
589}
590
591fn into_connect_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ConnectError {
592 match e {
593 StreamUpgradeError::Timeout => {
594 outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
595 }
596 StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
597 StreamUpgradeError::NegotiationFailed => outbound_hop::ConnectError::Unsupported,
598 StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
599 }
600}