1use std::{
22 collections::{HashMap, VecDeque},
23 fmt, io,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use bytes::Bytes;
29use either::Either;
30use futures::{
31 future::{BoxFuture, FutureExt, TryFutureExt},
32 io::AsyncWriteExt,
33 stream::{FuturesUnordered, StreamExt},
34};
35use futures_timer::Delay;
36use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37use libp2p_identity::PeerId;
38use libp2p_swarm::{
39 handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40 ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41 StreamUpgradeError, SubstreamProtocol,
42};
43use web_time::Instant;
44
45use crate::{
46 behaviour::CircuitId,
47 copy_future::CopyFuture,
48 proto,
49 protocol::{inbound_hop, outbound_stop},
50 HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51};
52
53const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55
56#[derive(Debug, Clone)]
57pub struct Config {
58 pub reservation_duration: Duration,
59 pub max_circuit_duration: Duration,
60 pub max_circuit_bytes: u64,
61}
62
63pub enum In {
64 AcceptReservationReq {
65 inbound_reservation_req: inbound_hop::ReservationReq,
66 addrs: Vec<Multiaddr>,
67 },
68 DenyReservationReq {
69 inbound_reservation_req: inbound_hop::ReservationReq,
70 status: proto::Status,
71 },
72 DenyCircuitReq {
73 circuit_id: Option<CircuitId>,
74 inbound_circuit_req: inbound_hop::CircuitReq,
75 status: proto::Status,
76 },
77 NegotiateOutboundConnect {
78 circuit_id: CircuitId,
79 inbound_circuit_req: inbound_hop::CircuitReq,
80 src_peer_id: PeerId,
81 src_connection_id: ConnectionId,
82 },
83 AcceptAndDriveCircuit {
84 circuit_id: CircuitId,
85 dst_peer_id: PeerId,
86 inbound_circuit_req: inbound_hop::CircuitReq,
87 dst_stream: Stream,
88 dst_pending_data: Bytes,
89 },
90}
91
92impl fmt::Debug for In {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 match self {
95 In::AcceptReservationReq {
96 inbound_reservation_req: _,
97 addrs,
98 } => f
99 .debug_struct("In::AcceptReservationReq")
100 .field("addrs", addrs)
101 .finish(),
102 In::DenyReservationReq {
103 inbound_reservation_req: _,
104 status,
105 } => f
106 .debug_struct("In::DenyReservationReq")
107 .field("status", status)
108 .finish(),
109 In::DenyCircuitReq {
110 circuit_id,
111 inbound_circuit_req: _,
112 status,
113 } => f
114 .debug_struct("In::DenyCircuitReq")
115 .field("circuit_id", circuit_id)
116 .field("status", status)
117 .finish(),
118 In::NegotiateOutboundConnect {
119 circuit_id,
120 inbound_circuit_req: _,
121 src_peer_id,
122 src_connection_id,
123 } => f
124 .debug_struct("In::NegotiateOutboundConnect")
125 .field("circuit_id", circuit_id)
126 .field("src_peer_id", src_peer_id)
127 .field("src_connection_id", src_connection_id)
128 .finish(),
129 In::AcceptAndDriveCircuit {
130 circuit_id,
131 inbound_circuit_req: _,
132 dst_peer_id,
133 dst_stream: _,
134 dst_pending_data: _,
135 } => f
136 .debug_struct("In::AcceptAndDriveCircuit")
137 .field("circuit_id", circuit_id)
138 .field("dst_peer_id", dst_peer_id)
139 .finish(),
140 }
141 }
142}
143
144#[allow(clippy::large_enum_variant)]
146pub enum Event {
147 ReservationReqReceived {
149 inbound_reservation_req: inbound_hop::ReservationReq,
150 endpoint: ConnectedPoint,
151 renewed: bool,
153 },
154 ReservationReqAccepted {
156 renewed: bool,
158 },
159 ReservationReqAcceptFailed { error: inbound_hop::Error },
161 ReservationReqDenied { status: proto::Status },
163 ReservationReqDenyFailed { error: inbound_hop::Error },
165 ReservationTimedOut {},
167 CircuitReqReceived {
169 inbound_circuit_req: inbound_hop::CircuitReq,
170 endpoint: ConnectedPoint,
171 },
172 CircuitReqDenied {
174 circuit_id: Option<CircuitId>,
175 dst_peer_id: PeerId,
176 status: proto::Status,
177 },
178 CircuitReqDenyFailed {
180 circuit_id: Option<CircuitId>,
181 dst_peer_id: PeerId,
182 error: inbound_hop::Error,
183 },
184 CircuitReqAccepted {
186 circuit_id: CircuitId,
187 dst_peer_id: PeerId,
188 },
189 CircuitReqAcceptFailed {
191 circuit_id: CircuitId,
192 dst_peer_id: PeerId,
193 error: inbound_hop::Error,
194 },
195 OutboundConnectNegotiated {
198 circuit_id: CircuitId,
199 src_peer_id: PeerId,
200 src_connection_id: ConnectionId,
201 inbound_circuit_req: inbound_hop::CircuitReq,
202 dst_stream: Stream,
203 dst_pending_data: Bytes,
204 },
205 OutboundConnectNegotiationFailed {
207 circuit_id: CircuitId,
208 src_peer_id: PeerId,
209 src_connection_id: ConnectionId,
210 inbound_circuit_req: inbound_hop::CircuitReq,
211 status: proto::Status,
212 error: outbound_stop::Error,
213 },
214 CircuitClosed {
216 circuit_id: CircuitId,
217 dst_peer_id: PeerId,
218 error: Option<std::io::Error>,
219 },
220}
221
222impl fmt::Debug for Event {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224 match self {
225 Event::ReservationReqReceived {
226 inbound_reservation_req: _,
227 endpoint,
228 renewed,
229 } => f
230 .debug_struct("Event::ReservationReqReceived")
231 .field("endpoint", endpoint)
232 .field("renewed", renewed)
233 .finish(),
234 Event::ReservationReqAccepted { renewed } => f
235 .debug_struct("Event::ReservationReqAccepted")
236 .field("renewed", renewed)
237 .finish(),
238 Event::ReservationReqAcceptFailed { error } => f
239 .debug_struct("Event::ReservationReqAcceptFailed")
240 .field("error", error)
241 .finish(),
242 Event::ReservationReqDenied { status } => f
243 .debug_struct("Event::ReservationReqDenied")
244 .field("status", status)
245 .finish(),
246 Event::ReservationReqDenyFailed { error } => f
247 .debug_struct("Event::ReservationReqDenyFailed")
248 .field("error", error)
249 .finish(),
250 Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
251 Event::CircuitReqReceived {
252 endpoint,
253 inbound_circuit_req: _,
254 } => f
255 .debug_struct("Event::CircuitReqReceived")
256 .field("endpoint", endpoint)
257 .finish(),
258 Event::CircuitReqDenied {
259 circuit_id,
260 dst_peer_id,
261 status,
262 } => f
263 .debug_struct("Event::CircuitReqDenied")
264 .field("circuit_id", circuit_id)
265 .field("dst_peer_id", dst_peer_id)
266 .field("status", status)
267 .finish(),
268 Event::CircuitReqDenyFailed {
269 circuit_id,
270 dst_peer_id,
271 error,
272 } => f
273 .debug_struct("Event::CircuitReqDenyFailed")
274 .field("circuit_id", circuit_id)
275 .field("dst_peer_id", dst_peer_id)
276 .field("error", error)
277 .finish(),
278 Event::CircuitReqAccepted {
279 circuit_id,
280 dst_peer_id,
281 } => f
282 .debug_struct("Event::CircuitReqAccepted")
283 .field("circuit_id", circuit_id)
284 .field("dst_peer_id", dst_peer_id)
285 .finish(),
286 Event::CircuitReqAcceptFailed {
287 circuit_id,
288 dst_peer_id,
289 error,
290 } => f
291 .debug_struct("Event::CircuitReqAcceptFailed")
292 .field("circuit_id", circuit_id)
293 .field("dst_peer_id", dst_peer_id)
294 .field("error", error)
295 .finish(),
296 Event::OutboundConnectNegotiated {
297 circuit_id,
298 src_peer_id,
299 src_connection_id,
300 inbound_circuit_req: _,
301 dst_stream: _,
302 dst_pending_data: _,
303 } => f
304 .debug_struct("Event::OutboundConnectNegotiated")
305 .field("circuit_id", circuit_id)
306 .field("src_peer_id", src_peer_id)
307 .field("src_connection_id", src_connection_id)
308 .finish(),
309 Event::OutboundConnectNegotiationFailed {
310 circuit_id,
311 src_peer_id,
312 src_connection_id,
313 inbound_circuit_req: _,
314 status,
315 error,
316 } => f
317 .debug_struct("Event::OutboundConnectNegotiationFailed")
318 .field("circuit_id", circuit_id)
319 .field("src_peer_id", src_peer_id)
320 .field("src_connection_id", src_connection_id)
321 .field("status", status)
322 .field("error", error)
323 .finish(),
324 Event::CircuitClosed {
325 circuit_id,
326 dst_peer_id,
327 error,
328 } => f
329 .debug_struct("Event::CircuitClosed")
330 .field("circuit_id", circuit_id)
331 .field("dst_peer_id", dst_peer_id)
332 .field("error", error)
333 .finish(),
334 }
335 }
336}
337
338pub struct Handler {
341 endpoint: ConnectedPoint,
342
343 config: Config,
345
346 queued_events: VecDeque<
348 ConnectionHandlerEvent<
349 <Self as ConnectionHandler>::OutboundProtocol,
350 (),
351 <Self as ConnectionHandler>::ToBehaviour,
352 >,
353 >,
354
355 idle_at: Option<Instant>,
357
358 reservation_request_future: Option<ReservationRequestFuture>,
360 active_reservation: Option<Delay>,
362
363 circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
365 circuit_deny_futures: Futures<(
367 Option<CircuitId>,
368 PeerId,
369 proto::Status,
370 Result<(), inbound_hop::Error>,
371 )>,
372 circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
374
375 pending_connect_requests: VecDeque<PendingConnect>,
377
378 active_connect_requests: HashMap<CircuitId, PendingConnect>,
380
381 inbound_workers: futures_bounded::FuturesSet<
382 Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
383 >,
384 outbound_workers: futures_bounded::FuturesMap<
385 CircuitId,
386 Result<outbound_stop::Circuit, outbound_stop::Error>,
387 >,
388}
389
390impl Handler {
391 pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
392 Handler {
393 inbound_workers: futures_bounded::FuturesSet::new(
394 STREAM_TIMEOUT,
395 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
396 ),
397 outbound_workers: futures_bounded::FuturesMap::new(
398 STREAM_TIMEOUT,
399 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
400 ),
401 endpoint,
402 config,
403 queued_events: Default::default(),
404 idle_at: None,
405 reservation_request_future: Default::default(),
406 circuit_accept_futures: Default::default(),
407 circuit_deny_futures: Default::default(),
408 circuits: Default::default(),
409 active_reservation: Default::default(),
410 pending_connect_requests: Default::default(),
411 active_connect_requests: Default::default(),
412 }
413 }
414
415 fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
416 if self
417 .inbound_workers
418 .try_push(inbound_hop::handle_inbound_request(
419 stream,
420 self.config.reservation_duration,
421 self.config.max_circuit_duration,
422 self.config.max_circuit_bytes,
423 ))
424 .is_err()
425 {
426 tracing::warn!("Dropping inbound stream because we are at capacity")
427 }
428 }
429
430 fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
431 let connect = self
432 .pending_connect_requests
433 .pop_front()
434 .expect("opened a stream without a pending stop command");
435
436 if self
437 .outbound_workers
438 .try_push(
439 connect.circuit_id,
440 outbound_stop::connect(
441 stream,
442 connect.src_peer_id,
443 connect.max_circuit_duration,
444 connect.max_circuit_bytes,
445 ),
446 )
447 .is_err()
448 {
449 tracing::warn!("Dropping outbound stream because we are at capacity")
450 }
451
452 self.active_connect_requests
453 .insert(connect.circuit_id, connect);
454 }
455
456 fn on_dial_upgrade_error(
457 &mut self,
458 DialUpgradeError { error, .. }: DialUpgradeError<
459 (),
460 <Self as ConnectionHandler>::OutboundProtocol,
461 >,
462 ) {
463 let error = match error {
464 StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
465 StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
466 StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
467 StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
468 };
469
470 let stop_command = self
471 .pending_connect_requests
472 .pop_front()
473 .expect("failed to open a stream without a pending stop command");
474
475 self.queued_events
476 .push_back(ConnectionHandlerEvent::NotifyBehaviour(
477 Event::OutboundConnectNegotiationFailed {
478 circuit_id: stop_command.circuit_id,
479 src_peer_id: stop_command.src_peer_id,
480 src_connection_id: stop_command.src_connection_id,
481 inbound_circuit_req: stop_command.inbound_circuit_req,
482 status: proto::Status::CONNECTION_FAILED,
483 error,
484 },
485 ));
486 }
487}
488
489enum ReservationRequestFuture {
490 Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
491 Denying(BoxFuture<'static, (proto::Status, Result<(), inbound_hop::Error>)>),
492}
493
494type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
495
496impl ConnectionHandler for Handler {
497 type FromBehaviour = In;
498 type ToBehaviour = Event;
499 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
500 type InboundOpenInfo = ();
501 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
502 type OutboundOpenInfo = ();
503
504 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
505 SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
506 }
507
508 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
509 match event {
510 In::AcceptReservationReq {
511 inbound_reservation_req,
512 addrs,
513 } => {
514 if self
515 .reservation_request_future
516 .replace(ReservationRequestFuture::Accepting(
517 inbound_reservation_req.accept(addrs).err_into().boxed(),
518 ))
519 .is_some()
520 {
521 tracing::warn!("Dropping existing deny/accept future in favor of new one")
522 }
523 }
524 In::DenyReservationReq {
525 inbound_reservation_req,
526 status,
527 } => {
528 if self
529 .reservation_request_future
530 .replace(ReservationRequestFuture::Denying(
531 inbound_reservation_req
532 .deny(status)
533 .err_into()
534 .map(move |result| (status, result))
535 .boxed(),
536 ))
537 .is_some()
538 {
539 tracing::warn!("Dropping existing deny/accept future in favor of new one")
540 }
541 }
542 In::NegotiateOutboundConnect {
543 circuit_id,
544 inbound_circuit_req,
545 src_peer_id,
546 src_connection_id,
547 } => {
548 self.pending_connect_requests.push_back(PendingConnect::new(
549 circuit_id,
550 inbound_circuit_req,
551 src_peer_id,
552 src_connection_id,
553 &self.config,
554 ));
555 self.queued_events
556 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
557 protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
558 });
559 }
560 In::DenyCircuitReq {
561 circuit_id,
562 inbound_circuit_req,
563 status,
564 } => {
565 let dst_peer_id = inbound_circuit_req.dst();
566 self.circuit_deny_futures.push(
567 inbound_circuit_req
568 .deny(status)
569 .err_into()
570 .map(move |result| (circuit_id, dst_peer_id, status, result))
571 .boxed(),
572 );
573 }
574 In::AcceptAndDriveCircuit {
575 circuit_id,
576 dst_peer_id,
577 inbound_circuit_req,
578 dst_stream,
579 dst_pending_data,
580 } => {
581 self.circuit_accept_futures.push(
582 inbound_circuit_req
583 .accept()
584 .err_into()
585 .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
586 circuit_id,
587 src_stream,
588 src_pending_data,
589 dst_peer_id,
590 dst_stream,
591 dst_pending_data,
592 })
593 .map_err(move |e| (circuit_id, dst_peer_id, e))
594 .boxed(),
595 );
596 }
597 }
598 }
599
600 fn connection_keep_alive(&self) -> bool {
601 let Some(idle_at) = self.idle_at else {
602 return true;
603 };
604
605 Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
606 }
607
608 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
609 fn poll(
610 &mut self,
611 cx: &mut Context<'_>,
612 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
613 if let Some(event) = self.queued_events.pop_front() {
615 return Poll::Ready(event);
616 }
617
618 if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
620 self.circuits.poll_next_unpin(cx)
621 {
622 match result {
623 Ok(()) => {
624 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
625 Event::CircuitClosed {
626 circuit_id,
627 dst_peer_id,
628 error: None,
629 },
630 ))
631 }
632 Err(e) => {
633 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
634 Event::CircuitClosed {
635 circuit_id,
636 dst_peer_id,
637 error: Some(e),
638 },
639 ))
640 }
641 }
642 }
643
644 loop {
646 match self.inbound_workers.poll_unpin(cx) {
647 Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
648 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
649 Event::ReservationReqReceived {
650 inbound_reservation_req,
651 endpoint: self.endpoint.clone(),
652 renewed: self.active_reservation.is_some(),
653 },
654 ));
655 }
656 Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
657 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
658 Event::CircuitReqReceived {
659 inbound_circuit_req,
660 endpoint: self.endpoint.clone(),
661 },
662 ));
663 }
664 Poll::Ready(Err(e)) => {
665 tracing::debug!("Inbound stream operation timed out: {e}");
666 continue;
667 }
668 Poll::Ready(Ok(Err(e))) => {
669 tracing::debug!("Inbound stream operation failed: {e}");
670 continue;
671 }
672 Poll::Pending => {
673 break;
674 }
675 }
676 }
677
678 match self.outbound_workers.poll_unpin(cx) {
680 Poll::Ready((id, Ok(Ok(circuit)))) => {
681 let connect = self
682 .active_connect_requests
683 .remove(&id)
684 .expect("must have pending connect");
685
686 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
687 Event::OutboundConnectNegotiated {
688 circuit_id: id,
689 src_peer_id: connect.src_peer_id,
690 src_connection_id: connect.src_connection_id,
691 inbound_circuit_req: connect.inbound_circuit_req,
692 dst_stream: circuit.dst_stream,
693 dst_pending_data: circuit.dst_pending_data,
694 },
695 ));
696 }
697 Poll::Ready((id, Ok(Err(error)))) => {
698 let connect = self
699 .active_connect_requests
700 .remove(&id)
701 .expect("must have pending connect");
702
703 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
704 Event::OutboundConnectNegotiationFailed {
705 circuit_id: connect.circuit_id,
706 src_peer_id: connect.src_peer_id,
707 src_connection_id: connect.src_connection_id,
708 inbound_circuit_req: connect.inbound_circuit_req,
709 status: error.to_status(),
710 error,
711 },
712 ));
713 }
714 Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
715 let connect = self
716 .active_connect_requests
717 .remove(&id)
718 .expect("must have pending connect");
719
720 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
721 Event::OutboundConnectNegotiationFailed {
722 circuit_id: connect.circuit_id,
723 src_peer_id: connect.src_peer_id,
724 src_connection_id: connect.src_connection_id,
725 inbound_circuit_req: connect.inbound_circuit_req,
726 status: proto::Status::CONNECTION_FAILED, error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
728 },
729 ));
730 }
731 Poll::Pending => {}
732 }
733
734 if let Poll::Ready(Some((circuit_id, dst_peer_id, status, result))) =
736 self.circuit_deny_futures.poll_next_unpin(cx)
737 {
738 match result {
739 Ok(()) => {
740 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
741 Event::CircuitReqDenied {
742 circuit_id,
743 dst_peer_id,
744 status,
745 },
746 ));
747 }
748 Err(error) => {
749 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
750 Event::CircuitReqDenyFailed {
751 circuit_id,
752 dst_peer_id,
753 error,
754 },
755 ));
756 }
757 }
758 }
759
760 if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
762 match result {
763 Ok(parts) => {
764 let CircuitParts {
765 circuit_id,
766 mut src_stream,
767 src_pending_data,
768 dst_peer_id,
769 mut dst_stream,
770 dst_pending_data,
771 } = parts;
772 let max_circuit_duration = self.config.max_circuit_duration;
773 let max_circuit_bytes = self.config.max_circuit_bytes;
774
775 let circuit = async move {
776 let (result_1, result_2) = futures::future::join(
777 src_stream.write_all(&dst_pending_data),
778 dst_stream.write_all(&src_pending_data),
779 )
780 .await;
781 result_1?;
782 result_2?;
783
784 CopyFuture::new(
785 src_stream,
786 dst_stream,
787 max_circuit_duration,
788 max_circuit_bytes,
789 )
790 .await?;
791
792 Ok(())
793 }
794 .map(move |r| (circuit_id, dst_peer_id, r))
795 .boxed();
796
797 self.circuits.push(circuit);
798
799 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
800 Event::CircuitReqAccepted {
801 circuit_id,
802 dst_peer_id,
803 },
804 ));
805 }
806 Err((circuit_id, dst_peer_id, error)) => {
807 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
808 Event::CircuitReqAcceptFailed {
809 circuit_id,
810 dst_peer_id,
811 error,
812 },
813 ));
814 }
815 }
816 }
817
818 if let Some(Poll::Ready(())) = self
820 .active_reservation
821 .as_mut()
822 .map(|fut| fut.poll_unpin(cx))
823 {
824 self.active_reservation = None;
825 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
826 Event::ReservationTimedOut {},
827 ));
828 }
829
830 match self.reservation_request_future.as_mut() {
832 Some(ReservationRequestFuture::Accepting(fut)) => {
833 if let Poll::Ready(result) = fut.poll_unpin(cx) {
834 self.reservation_request_future = None;
835
836 match result {
837 Ok(()) => {
838 let renewed = self
839 .active_reservation
840 .replace(Delay::new(self.config.reservation_duration))
841 .is_some();
842 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
843 Event::ReservationReqAccepted { renewed },
844 ));
845 }
846 Err(error) => {
847 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
848 Event::ReservationReqAcceptFailed { error },
849 ));
850 }
851 }
852 }
853 }
854 Some(ReservationRequestFuture::Denying(fut)) => {
855 if let Poll::Ready((status, result)) = fut.poll_unpin(cx) {
856 self.reservation_request_future = None;
857
858 match result {
859 Ok(()) => {
860 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
861 Event::ReservationReqDenied { status },
862 ))
863 }
864 Err(error) => {
865 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
866 Event::ReservationReqDenyFailed { error },
867 ));
868 }
869 }
870 }
871 }
872 None => {}
873 }
874
875 if self.active_reservation.is_none() {
877 if self.idle_at.is_none() {
878 self.idle_at = Some(Instant::now());
879 }
880 } else {
881 self.idle_at = None;
882 }
883
884 Poll::Pending
885 }
886
887 fn on_connection_event(
888 &mut self,
889 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
890 ) {
891 match event {
892 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
893 protocol: stream,
894 ..
895 }) => {
896 self.on_fully_negotiated_inbound(stream);
897 }
898 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
899 protocol: stream,
900 ..
901 }) => {
902 self.on_fully_negotiated_outbound(stream);
903 }
904 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
905 self.on_dial_upgrade_error(dial_upgrade_error);
906 }
907 _ => {}
908 }
909 }
910}
911
912struct CircuitParts {
913 circuit_id: CircuitId,
914 src_stream: Stream,
915 src_pending_data: Bytes,
916 dst_peer_id: PeerId,
917 dst_stream: Stream,
918 dst_pending_data: Bytes,
919}
920
921struct PendingConnect {
923 circuit_id: CircuitId,
924 inbound_circuit_req: inbound_hop::CircuitReq,
925 src_peer_id: PeerId,
926 src_connection_id: ConnectionId,
927 max_circuit_duration: Duration,
928 max_circuit_bytes: u64,
929}
930
931impl PendingConnect {
932 fn new(
933 circuit_id: CircuitId,
934 inbound_circuit_req: inbound_hop::CircuitReq,
935 src_peer_id: PeerId,
936 src_connection_id: ConnectionId,
937 config: &Config,
938 ) -> Self {
939 Self {
940 circuit_id,
941 inbound_circuit_req,
942 src_peer_id,
943 src_connection_id,
944 max_circuit_duration: config.max_circuit_duration,
945 max_circuit_bytes: config.max_circuit_bytes,
946 }
947 }
948}