1use std::{
22 collections::{HashMap, VecDeque},
23 fmt, io,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use bytes::Bytes;
29use either::Either;
30use futures::{
31 future::{BoxFuture, FutureExt, TryFutureExt},
32 io::AsyncWriteExt,
33 stream::{FuturesUnordered, StreamExt},
34};
35use futures_timer::Delay;
36use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37use libp2p_identity::PeerId;
38use libp2p_swarm::{
39 handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40 ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41 StreamUpgradeError, SubstreamProtocol,
42};
43use web_time::Instant;
44
45use crate::{
46 behaviour::CircuitId,
47 copy_future::CopyFuture,
48 proto,
49 protocol::{inbound_hop, outbound_stop},
50 HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51};
52
53const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55
56#[derive(Debug, Clone)]
57pub struct Config {
58 pub reservation_duration: Duration,
59 pub max_circuit_duration: Duration,
60 pub max_circuit_bytes: u64,
61}
62
63pub enum In {
64 AcceptReservationReq {
65 inbound_reservation_req: inbound_hop::ReservationReq,
66 addrs: Vec<Multiaddr>,
67 },
68 DenyReservationReq {
69 inbound_reservation_req: inbound_hop::ReservationReq,
70 status: proto::Status,
71 },
72 DenyCircuitReq {
73 circuit_id: Option<CircuitId>,
74 inbound_circuit_req: inbound_hop::CircuitReq,
75 status: proto::Status,
76 },
77 NegotiateOutboundConnect {
78 circuit_id: CircuitId,
79 inbound_circuit_req: inbound_hop::CircuitReq,
80 src_peer_id: PeerId,
81 src_connection_id: ConnectionId,
82 },
83 AcceptAndDriveCircuit {
84 circuit_id: CircuitId,
85 dst_peer_id: PeerId,
86 inbound_circuit_req: inbound_hop::CircuitReq,
87 dst_stream: Stream,
88 dst_pending_data: Bytes,
89 },
90}
91
92impl fmt::Debug for In {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 match self {
95 In::AcceptReservationReq {
96 inbound_reservation_req: _,
97 addrs,
98 } => f
99 .debug_struct("In::AcceptReservationReq")
100 .field("addrs", addrs)
101 .finish(),
102 In::DenyReservationReq {
103 inbound_reservation_req: _,
104 status,
105 } => f
106 .debug_struct("In::DenyReservationReq")
107 .field("status", status)
108 .finish(),
109 In::DenyCircuitReq {
110 circuit_id,
111 inbound_circuit_req: _,
112 status,
113 } => f
114 .debug_struct("In::DenyCircuitReq")
115 .field("circuit_id", circuit_id)
116 .field("status", status)
117 .finish(),
118 In::NegotiateOutboundConnect {
119 circuit_id,
120 inbound_circuit_req: _,
121 src_peer_id,
122 src_connection_id,
123 } => f
124 .debug_struct("In::NegotiateOutboundConnect")
125 .field("circuit_id", circuit_id)
126 .field("src_peer_id", src_peer_id)
127 .field("src_connection_id", src_connection_id)
128 .finish(),
129 In::AcceptAndDriveCircuit {
130 circuit_id,
131 inbound_circuit_req: _,
132 dst_peer_id,
133 dst_stream: _,
134 dst_pending_data: _,
135 } => f
136 .debug_struct("In::AcceptAndDriveCircuit")
137 .field("circuit_id", circuit_id)
138 .field("dst_peer_id", dst_peer_id)
139 .finish(),
140 }
141 }
142}
143
144#[allow(clippy::large_enum_variant)]
146pub enum Event {
147 ReservationReqReceived {
149 inbound_reservation_req: inbound_hop::ReservationReq,
150 endpoint: ConnectedPoint,
151 renewed: bool,
153 },
154 ReservationReqAccepted {
156 renewed: bool,
158 },
159 ReservationReqAcceptFailed { error: inbound_hop::Error },
161 ReservationReqDenied {},
163 ReservationReqDenyFailed { error: inbound_hop::Error },
165 ReservationTimedOut {},
167 CircuitReqReceived {
169 inbound_circuit_req: inbound_hop::CircuitReq,
170 endpoint: ConnectedPoint,
171 },
172 CircuitReqDenied {
174 circuit_id: Option<CircuitId>,
175 dst_peer_id: PeerId,
176 },
177 CircuitReqDenyFailed {
179 circuit_id: Option<CircuitId>,
180 dst_peer_id: PeerId,
181 error: inbound_hop::Error,
182 },
183 CircuitReqAccepted {
185 circuit_id: CircuitId,
186 dst_peer_id: PeerId,
187 },
188 CircuitReqAcceptFailed {
190 circuit_id: CircuitId,
191 dst_peer_id: PeerId,
192 error: inbound_hop::Error,
193 },
194 OutboundConnectNegotiated {
197 circuit_id: CircuitId,
198 src_peer_id: PeerId,
199 src_connection_id: ConnectionId,
200 inbound_circuit_req: inbound_hop::CircuitReq,
201 dst_stream: Stream,
202 dst_pending_data: Bytes,
203 },
204 OutboundConnectNegotiationFailed {
206 circuit_id: CircuitId,
207 src_peer_id: PeerId,
208 src_connection_id: ConnectionId,
209 inbound_circuit_req: inbound_hop::CircuitReq,
210 status: proto::Status,
211 error: outbound_stop::Error,
212 },
213 CircuitClosed {
215 circuit_id: CircuitId,
216 dst_peer_id: PeerId,
217 error: Option<std::io::Error>,
218 },
219}
220
221impl fmt::Debug for Event {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 Event::ReservationReqReceived {
225 inbound_reservation_req: _,
226 endpoint,
227 renewed,
228 } => f
229 .debug_struct("Event::ReservationReqReceived")
230 .field("endpoint", endpoint)
231 .field("renewed", renewed)
232 .finish(),
233 Event::ReservationReqAccepted { renewed } => f
234 .debug_struct("Event::ReservationReqAccepted")
235 .field("renewed", renewed)
236 .finish(),
237 Event::ReservationReqAcceptFailed { error } => f
238 .debug_struct("Event::ReservationReqAcceptFailed")
239 .field("error", error)
240 .finish(),
241 Event::ReservationReqDenied {} => {
242 f.debug_struct("Event::ReservationReqDenied").finish()
243 }
244 Event::ReservationReqDenyFailed { error } => f
245 .debug_struct("Event::ReservationReqDenyFailed")
246 .field("error", error)
247 .finish(),
248 Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
249 Event::CircuitReqReceived {
250 endpoint,
251 inbound_circuit_req: _,
252 } => f
253 .debug_struct("Event::CircuitReqReceived")
254 .field("endpoint", endpoint)
255 .finish(),
256 Event::CircuitReqDenied {
257 circuit_id,
258 dst_peer_id,
259 } => f
260 .debug_struct("Event::CircuitReqDenied")
261 .field("circuit_id", circuit_id)
262 .field("dst_peer_id", dst_peer_id)
263 .finish(),
264 Event::CircuitReqDenyFailed {
265 circuit_id,
266 dst_peer_id,
267 error,
268 } => f
269 .debug_struct("Event::CircuitReqDenyFailed")
270 .field("circuit_id", circuit_id)
271 .field("dst_peer_id", dst_peer_id)
272 .field("error", error)
273 .finish(),
274 Event::CircuitReqAccepted {
275 circuit_id,
276 dst_peer_id,
277 } => f
278 .debug_struct("Event::CircuitReqAccepted")
279 .field("circuit_id", circuit_id)
280 .field("dst_peer_id", dst_peer_id)
281 .finish(),
282 Event::CircuitReqAcceptFailed {
283 circuit_id,
284 dst_peer_id,
285 error,
286 } => f
287 .debug_struct("Event::CircuitReqAcceptFailed")
288 .field("circuit_id", circuit_id)
289 .field("dst_peer_id", dst_peer_id)
290 .field("error", error)
291 .finish(),
292 Event::OutboundConnectNegotiated {
293 circuit_id,
294 src_peer_id,
295 src_connection_id,
296 inbound_circuit_req: _,
297 dst_stream: _,
298 dst_pending_data: _,
299 } => f
300 .debug_struct("Event::OutboundConnectNegotiated")
301 .field("circuit_id", circuit_id)
302 .field("src_peer_id", src_peer_id)
303 .field("src_connection_id", src_connection_id)
304 .finish(),
305 Event::OutboundConnectNegotiationFailed {
306 circuit_id,
307 src_peer_id,
308 src_connection_id,
309 inbound_circuit_req: _,
310 status,
311 error,
312 } => f
313 .debug_struct("Event::OutboundConnectNegotiationFailed")
314 .field("circuit_id", circuit_id)
315 .field("src_peer_id", src_peer_id)
316 .field("src_connection_id", src_connection_id)
317 .field("status", status)
318 .field("error", error)
319 .finish(),
320 Event::CircuitClosed {
321 circuit_id,
322 dst_peer_id,
323 error,
324 } => f
325 .debug_struct("Event::CircuitClosed")
326 .field("circuit_id", circuit_id)
327 .field("dst_peer_id", dst_peer_id)
328 .field("error", error)
329 .finish(),
330 }
331 }
332}
333
334pub struct Handler {
337 endpoint: ConnectedPoint,
338
339 config: Config,
341
342 queued_events: VecDeque<
344 ConnectionHandlerEvent<
345 <Self as ConnectionHandler>::OutboundProtocol,
346 (),
347 <Self as ConnectionHandler>::ToBehaviour,
348 >,
349 >,
350
351 idle_at: Option<Instant>,
353
354 reservation_request_future: Option<ReservationRequestFuture>,
356 active_reservation: Option<Delay>,
358
359 circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
361 circuit_deny_futures: Futures<(Option<CircuitId>, PeerId, Result<(), inbound_hop::Error>)>,
363 circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
365
366 pending_connect_requests: VecDeque<PendingConnect>,
368
369 active_connect_requests: HashMap<CircuitId, PendingConnect>,
371
372 inbound_workers: futures_bounded::FuturesSet<
373 Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
374 >,
375 outbound_workers: futures_bounded::FuturesMap<
376 CircuitId,
377 Result<outbound_stop::Circuit, outbound_stop::Error>,
378 >,
379}
380
381impl Handler {
382 pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
383 Handler {
384 inbound_workers: futures_bounded::FuturesSet::new(
385 STREAM_TIMEOUT,
386 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
387 ),
388 outbound_workers: futures_bounded::FuturesMap::new(
389 STREAM_TIMEOUT,
390 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
391 ),
392 endpoint,
393 config,
394 queued_events: Default::default(),
395 idle_at: None,
396 reservation_request_future: Default::default(),
397 circuit_accept_futures: Default::default(),
398 circuit_deny_futures: Default::default(),
399 circuits: Default::default(),
400 active_reservation: Default::default(),
401 pending_connect_requests: Default::default(),
402 active_connect_requests: Default::default(),
403 }
404 }
405
406 fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
407 if self
408 .inbound_workers
409 .try_push(inbound_hop::handle_inbound_request(
410 stream,
411 self.config.reservation_duration,
412 self.config.max_circuit_duration,
413 self.config.max_circuit_bytes,
414 ))
415 .is_err()
416 {
417 tracing::warn!("Dropping inbound stream because we are at capacity")
418 }
419 }
420
421 fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
422 let connect = self
423 .pending_connect_requests
424 .pop_front()
425 .expect("opened a stream without a pending stop command");
426
427 if self
428 .outbound_workers
429 .try_push(
430 connect.circuit_id,
431 outbound_stop::connect(
432 stream,
433 connect.src_peer_id,
434 connect.max_circuit_duration,
435 connect.max_circuit_bytes,
436 ),
437 )
438 .is_err()
439 {
440 tracing::warn!("Dropping outbound stream because we are at capacity")
441 }
442
443 self.active_connect_requests
444 .insert(connect.circuit_id, connect);
445 }
446
447 fn on_dial_upgrade_error(
448 &mut self,
449 DialUpgradeError { error, .. }: DialUpgradeError<
450 (),
451 <Self as ConnectionHandler>::OutboundProtocol,
452 >,
453 ) {
454 let error = match error {
455 StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
456 StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
457 StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
458 StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
459 };
460
461 let stop_command = self
462 .pending_connect_requests
463 .pop_front()
464 .expect("failed to open a stream without a pending stop command");
465
466 self.queued_events
467 .push_back(ConnectionHandlerEvent::NotifyBehaviour(
468 Event::OutboundConnectNegotiationFailed {
469 circuit_id: stop_command.circuit_id,
470 src_peer_id: stop_command.src_peer_id,
471 src_connection_id: stop_command.src_connection_id,
472 inbound_circuit_req: stop_command.inbound_circuit_req,
473 status: proto::Status::CONNECTION_FAILED,
474 error,
475 },
476 ));
477 }
478}
479
480enum ReservationRequestFuture {
481 Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
482 Denying(BoxFuture<'static, Result<(), inbound_hop::Error>>),
483}
484
485type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
486
487impl ConnectionHandler for Handler {
488 type FromBehaviour = In;
489 type ToBehaviour = Event;
490 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
491 type InboundOpenInfo = ();
492 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
493 type OutboundOpenInfo = ();
494
495 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
496 SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
497 }
498
499 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
500 match event {
501 In::AcceptReservationReq {
502 inbound_reservation_req,
503 addrs,
504 } => {
505 if self
506 .reservation_request_future
507 .replace(ReservationRequestFuture::Accepting(
508 inbound_reservation_req.accept(addrs).err_into().boxed(),
509 ))
510 .is_some()
511 {
512 tracing::warn!("Dropping existing deny/accept future in favor of new one")
513 }
514 }
515 In::DenyReservationReq {
516 inbound_reservation_req,
517 status,
518 } => {
519 if self
520 .reservation_request_future
521 .replace(ReservationRequestFuture::Denying(
522 inbound_reservation_req.deny(status).err_into().boxed(),
523 ))
524 .is_some()
525 {
526 tracing::warn!("Dropping existing deny/accept future in favor of new one")
527 }
528 }
529 In::NegotiateOutboundConnect {
530 circuit_id,
531 inbound_circuit_req,
532 src_peer_id,
533 src_connection_id,
534 } => {
535 self.pending_connect_requests.push_back(PendingConnect::new(
536 circuit_id,
537 inbound_circuit_req,
538 src_peer_id,
539 src_connection_id,
540 &self.config,
541 ));
542 self.queued_events
543 .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
544 protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
545 });
546 }
547 In::DenyCircuitReq {
548 circuit_id,
549 inbound_circuit_req,
550 status,
551 } => {
552 let dst_peer_id = inbound_circuit_req.dst();
553 self.circuit_deny_futures.push(
554 inbound_circuit_req
555 .deny(status)
556 .err_into()
557 .map(move |result| (circuit_id, dst_peer_id, result))
558 .boxed(),
559 );
560 }
561 In::AcceptAndDriveCircuit {
562 circuit_id,
563 dst_peer_id,
564 inbound_circuit_req,
565 dst_stream,
566 dst_pending_data,
567 } => {
568 self.circuit_accept_futures.push(
569 inbound_circuit_req
570 .accept()
571 .err_into()
572 .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
573 circuit_id,
574 src_stream,
575 src_pending_data,
576 dst_peer_id,
577 dst_stream,
578 dst_pending_data,
579 })
580 .map_err(move |e| (circuit_id, dst_peer_id, e))
581 .boxed(),
582 );
583 }
584 }
585 }
586
587 fn connection_keep_alive(&self) -> bool {
588 let Some(idle_at) = self.idle_at else {
589 return true;
590 };
591
592 Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
593 }
594
595 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
596 fn poll(
597 &mut self,
598 cx: &mut Context<'_>,
599 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
600 if let Some(event) = self.queued_events.pop_front() {
602 return Poll::Ready(event);
603 }
604
605 if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
607 self.circuits.poll_next_unpin(cx)
608 {
609 match result {
610 Ok(()) => {
611 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
612 Event::CircuitClosed {
613 circuit_id,
614 dst_peer_id,
615 error: None,
616 },
617 ))
618 }
619 Err(e) => {
620 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
621 Event::CircuitClosed {
622 circuit_id,
623 dst_peer_id,
624 error: Some(e),
625 },
626 ))
627 }
628 }
629 }
630
631 loop {
633 match self.inbound_workers.poll_unpin(cx) {
634 Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
635 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
636 Event::ReservationReqReceived {
637 inbound_reservation_req,
638 endpoint: self.endpoint.clone(),
639 renewed: self.active_reservation.is_some(),
640 },
641 ));
642 }
643 Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
644 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
645 Event::CircuitReqReceived {
646 inbound_circuit_req,
647 endpoint: self.endpoint.clone(),
648 },
649 ));
650 }
651 Poll::Ready(Err(e)) => {
652 tracing::debug!("Inbound stream operation timed out: {e}");
653 continue;
654 }
655 Poll::Ready(Ok(Err(e))) => {
656 tracing::debug!("Inbound stream operation failed: {e}");
657 continue;
658 }
659 Poll::Pending => {
660 break;
661 }
662 }
663 }
664
665 match self.outbound_workers.poll_unpin(cx) {
667 Poll::Ready((id, Ok(Ok(circuit)))) => {
668 let connect = self
669 .active_connect_requests
670 .remove(&id)
671 .expect("must have pending connect");
672
673 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
674 Event::OutboundConnectNegotiated {
675 circuit_id: id,
676 src_peer_id: connect.src_peer_id,
677 src_connection_id: connect.src_connection_id,
678 inbound_circuit_req: connect.inbound_circuit_req,
679 dst_stream: circuit.dst_stream,
680 dst_pending_data: circuit.dst_pending_data,
681 },
682 ));
683 }
684 Poll::Ready((id, Ok(Err(error)))) => {
685 let connect = self
686 .active_connect_requests
687 .remove(&id)
688 .expect("must have pending connect");
689
690 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
691 Event::OutboundConnectNegotiationFailed {
692 circuit_id: connect.circuit_id,
693 src_peer_id: connect.src_peer_id,
694 src_connection_id: connect.src_connection_id,
695 inbound_circuit_req: connect.inbound_circuit_req,
696 status: error.to_status(),
697 error,
698 },
699 ));
700 }
701 Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
702 let connect = self
703 .active_connect_requests
704 .remove(&id)
705 .expect("must have pending connect");
706
707 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
708 Event::OutboundConnectNegotiationFailed {
709 circuit_id: connect.circuit_id,
710 src_peer_id: connect.src_peer_id,
711 src_connection_id: connect.src_connection_id,
712 inbound_circuit_req: connect.inbound_circuit_req,
713 status: proto::Status::CONNECTION_FAILED, error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
715 },
716 ));
717 }
718 Poll::Pending => {}
719 }
720
721 if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
723 self.circuit_deny_futures.poll_next_unpin(cx)
724 {
725 match result {
726 Ok(()) => {
727 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
728 Event::CircuitReqDenied {
729 circuit_id,
730 dst_peer_id,
731 },
732 ));
733 }
734 Err(error) => {
735 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
736 Event::CircuitReqDenyFailed {
737 circuit_id,
738 dst_peer_id,
739 error,
740 },
741 ));
742 }
743 }
744 }
745
746 if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
748 match result {
749 Ok(parts) => {
750 let CircuitParts {
751 circuit_id,
752 mut src_stream,
753 src_pending_data,
754 dst_peer_id,
755 mut dst_stream,
756 dst_pending_data,
757 } = parts;
758 let max_circuit_duration = self.config.max_circuit_duration;
759 let max_circuit_bytes = self.config.max_circuit_bytes;
760
761 let circuit = async move {
762 let (result_1, result_2) = futures::future::join(
763 src_stream.write_all(&dst_pending_data),
764 dst_stream.write_all(&src_pending_data),
765 )
766 .await;
767 result_1?;
768 result_2?;
769
770 CopyFuture::new(
771 src_stream,
772 dst_stream,
773 max_circuit_duration,
774 max_circuit_bytes,
775 )
776 .await?;
777
778 Ok(())
779 }
780 .map(move |r| (circuit_id, dst_peer_id, r))
781 .boxed();
782
783 self.circuits.push(circuit);
784
785 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
786 Event::CircuitReqAccepted {
787 circuit_id,
788 dst_peer_id,
789 },
790 ));
791 }
792 Err((circuit_id, dst_peer_id, error)) => {
793 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
794 Event::CircuitReqAcceptFailed {
795 circuit_id,
796 dst_peer_id,
797 error,
798 },
799 ));
800 }
801 }
802 }
803
804 if let Some(Poll::Ready(())) = self
806 .active_reservation
807 .as_mut()
808 .map(|fut| fut.poll_unpin(cx))
809 {
810 self.active_reservation = None;
811 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
812 Event::ReservationTimedOut {},
813 ));
814 }
815
816 match self.reservation_request_future.as_mut() {
818 Some(ReservationRequestFuture::Accepting(fut)) => {
819 if let Poll::Ready(result) = fut.poll_unpin(cx) {
820 self.reservation_request_future = None;
821
822 match result {
823 Ok(()) => {
824 let renewed = self
825 .active_reservation
826 .replace(Delay::new(self.config.reservation_duration))
827 .is_some();
828 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
829 Event::ReservationReqAccepted { renewed },
830 ));
831 }
832 Err(error) => {
833 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
834 Event::ReservationReqAcceptFailed { error },
835 ));
836 }
837 }
838 }
839 }
840 Some(ReservationRequestFuture::Denying(fut)) => {
841 if let Poll::Ready(result) = fut.poll_unpin(cx) {
842 self.reservation_request_future = None;
843
844 match result {
845 Ok(()) => {
846 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
847 Event::ReservationReqDenied {},
848 ))
849 }
850 Err(error) => {
851 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
852 Event::ReservationReqDenyFailed { error },
853 ));
854 }
855 }
856 }
857 }
858 None => {}
859 }
860
861 if self.active_reservation.is_none() {
863 if self.idle_at.is_none() {
864 self.idle_at = Some(Instant::now());
865 }
866 } else {
867 self.idle_at = None;
868 }
869
870 Poll::Pending
871 }
872
873 fn on_connection_event(
874 &mut self,
875 event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
876 ) {
877 match event {
878 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
879 protocol: stream,
880 ..
881 }) => {
882 self.on_fully_negotiated_inbound(stream);
883 }
884 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
885 protocol: stream,
886 ..
887 }) => {
888 self.on_fully_negotiated_outbound(stream);
889 }
890 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
891 self.on_dial_upgrade_error(dial_upgrade_error);
892 }
893 _ => {}
894 }
895 }
896}
897
898struct CircuitParts {
899 circuit_id: CircuitId,
900 src_stream: Stream,
901 src_pending_data: Bytes,
902 dst_peer_id: PeerId,
903 dst_stream: Stream,
904 dst_pending_data: Bytes,
905}
906
907struct PendingConnect {
909 circuit_id: CircuitId,
910 inbound_circuit_req: inbound_hop::CircuitReq,
911 src_peer_id: PeerId,
912 src_connection_id: ConnectionId,
913 max_circuit_duration: Duration,
914 max_circuit_bytes: u64,
915}
916
917impl PendingConnect {
918 fn new(
919 circuit_id: CircuitId,
920 inbound_circuit_req: inbound_hop::CircuitReq,
921 src_peer_id: PeerId,
922 src_connection_id: ConnectionId,
923 config: &Config,
924 ) -> Self {
925 Self {
926 circuit_id,
927 inbound_circuit_req,
928 src_peer_id,
929 src_connection_id,
930 max_circuit_duration: config.max_circuit_duration,
931 max_circuit_bytes: config.max_circuit_bytes,
932 }
933 }
934}