libp2p_relay/behaviour/
handler.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, VecDeque},
23    fmt, io,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use bytes::Bytes;
29use either::Either;
30use futures::{
31    future::{BoxFuture, FutureExt, TryFutureExt},
32    io::AsyncWriteExt,
33    stream::{FuturesUnordered, StreamExt},
34};
35use futures_timer::Delay;
36use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37use libp2p_identity::PeerId;
38use libp2p_swarm::{
39    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40    ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41    StreamUpgradeError, SubstreamProtocol,
42};
43use web_time::Instant;
44
45use crate::{
46    behaviour::CircuitId,
47    copy_future::CopyFuture,
48    proto,
49    protocol::{inbound_hop, outbound_stop},
50    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51};
52
53const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55
56#[derive(Debug, Clone)]
57pub struct Config {
58    pub reservation_duration: Duration,
59    pub max_circuit_duration: Duration,
60    pub max_circuit_bytes: u64,
61}
62
63pub enum In {
64    AcceptReservationReq {
65        inbound_reservation_req: inbound_hop::ReservationReq,
66        addrs: Vec<Multiaddr>,
67    },
68    DenyReservationReq {
69        inbound_reservation_req: inbound_hop::ReservationReq,
70        status: proto::Status,
71    },
72    DenyCircuitReq {
73        circuit_id: Option<CircuitId>,
74        inbound_circuit_req: inbound_hop::CircuitReq,
75        status: proto::Status,
76    },
77    NegotiateOutboundConnect {
78        circuit_id: CircuitId,
79        inbound_circuit_req: inbound_hop::CircuitReq,
80        src_peer_id: PeerId,
81        src_connection_id: ConnectionId,
82    },
83    AcceptAndDriveCircuit {
84        circuit_id: CircuitId,
85        dst_peer_id: PeerId,
86        inbound_circuit_req: inbound_hop::CircuitReq,
87        dst_stream: Stream,
88        dst_pending_data: Bytes,
89    },
90}
91
92impl fmt::Debug for In {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self {
95            In::AcceptReservationReq {
96                inbound_reservation_req: _,
97                addrs,
98            } => f
99                .debug_struct("In::AcceptReservationReq")
100                .field("addrs", addrs)
101                .finish(),
102            In::DenyReservationReq {
103                inbound_reservation_req: _,
104                status,
105            } => f
106                .debug_struct("In::DenyReservationReq")
107                .field("status", status)
108                .finish(),
109            In::DenyCircuitReq {
110                circuit_id,
111                inbound_circuit_req: _,
112                status,
113            } => f
114                .debug_struct("In::DenyCircuitReq")
115                .field("circuit_id", circuit_id)
116                .field("status", status)
117                .finish(),
118            In::NegotiateOutboundConnect {
119                circuit_id,
120                inbound_circuit_req: _,
121                src_peer_id,
122                src_connection_id,
123            } => f
124                .debug_struct("In::NegotiateOutboundConnect")
125                .field("circuit_id", circuit_id)
126                .field("src_peer_id", src_peer_id)
127                .field("src_connection_id", src_connection_id)
128                .finish(),
129            In::AcceptAndDriveCircuit {
130                circuit_id,
131                inbound_circuit_req: _,
132                dst_peer_id,
133                dst_stream: _,
134                dst_pending_data: _,
135            } => f
136                .debug_struct("In::AcceptAndDriveCircuit")
137                .field("circuit_id", circuit_id)
138                .field("dst_peer_id", dst_peer_id)
139                .finish(),
140        }
141    }
142}
143
144/// The events produced by the [`Handler`].
145#[allow(clippy::large_enum_variant)]
146pub enum Event {
147    /// An inbound reservation request has been received.
148    ReservationReqReceived {
149        inbound_reservation_req: inbound_hop::ReservationReq,
150        endpoint: ConnectedPoint,
151        /// Indicates whether the request replaces an existing reservation.
152        renewed: bool,
153    },
154    /// An inbound reservation request has been accepted.
155    ReservationReqAccepted {
156        /// Indicates whether the request replaces an existing reservation.
157        renewed: bool,
158    },
159    /// Accepting an inbound reservation request failed.
160    ReservationReqAcceptFailed { error: inbound_hop::Error },
161    /// An inbound reservation request has been denied.
162    ReservationReqDenied {},
163    /// Denying an inbound reservation request has failed.
164    ReservationReqDenyFailed { error: inbound_hop::Error },
165    /// An inbound reservation has timed out.
166    ReservationTimedOut {},
167    /// An inbound circuit request has been received.
168    CircuitReqReceived {
169        inbound_circuit_req: inbound_hop::CircuitReq,
170        endpoint: ConnectedPoint,
171    },
172    /// An inbound circuit request has been denied.
173    CircuitReqDenied {
174        circuit_id: Option<CircuitId>,
175        dst_peer_id: PeerId,
176    },
177    /// Denying an inbound circuit request failed.
178    CircuitReqDenyFailed {
179        circuit_id: Option<CircuitId>,
180        dst_peer_id: PeerId,
181        error: inbound_hop::Error,
182    },
183    /// An inbound circuit request has been accepted.
184    CircuitReqAccepted {
185        circuit_id: CircuitId,
186        dst_peer_id: PeerId,
187    },
188    /// Accepting an inbound circuit request failed.
189    CircuitReqAcceptFailed {
190        circuit_id: CircuitId,
191        dst_peer_id: PeerId,
192        error: inbound_hop::Error,
193    },
194    /// An outbound substream for an inbound circuit request has been
195    /// negotiated.
196    OutboundConnectNegotiated {
197        circuit_id: CircuitId,
198        src_peer_id: PeerId,
199        src_connection_id: ConnectionId,
200        inbound_circuit_req: inbound_hop::CircuitReq,
201        dst_stream: Stream,
202        dst_pending_data: Bytes,
203    },
204    /// Negotiating an outbound substream for an inbound circuit request failed.
205    OutboundConnectNegotiationFailed {
206        circuit_id: CircuitId,
207        src_peer_id: PeerId,
208        src_connection_id: ConnectionId,
209        inbound_circuit_req: inbound_hop::CircuitReq,
210        status: proto::Status,
211        error: outbound_stop::Error,
212    },
213    /// An inbound circuit has closed.
214    CircuitClosed {
215        circuit_id: CircuitId,
216        dst_peer_id: PeerId,
217        error: Option<std::io::Error>,
218    },
219}
220
221impl fmt::Debug for Event {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            Event::ReservationReqReceived {
225                inbound_reservation_req: _,
226                endpoint,
227                renewed,
228            } => f
229                .debug_struct("Event::ReservationReqReceived")
230                .field("endpoint", endpoint)
231                .field("renewed", renewed)
232                .finish(),
233            Event::ReservationReqAccepted { renewed } => f
234                .debug_struct("Event::ReservationReqAccepted")
235                .field("renewed", renewed)
236                .finish(),
237            Event::ReservationReqAcceptFailed { error } => f
238                .debug_struct("Event::ReservationReqAcceptFailed")
239                .field("error", error)
240                .finish(),
241            Event::ReservationReqDenied {} => {
242                f.debug_struct("Event::ReservationReqDenied").finish()
243            }
244            Event::ReservationReqDenyFailed { error } => f
245                .debug_struct("Event::ReservationReqDenyFailed")
246                .field("error", error)
247                .finish(),
248            Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
249            Event::CircuitReqReceived {
250                endpoint,
251                inbound_circuit_req: _,
252            } => f
253                .debug_struct("Event::CircuitReqReceived")
254                .field("endpoint", endpoint)
255                .finish(),
256            Event::CircuitReqDenied {
257                circuit_id,
258                dst_peer_id,
259            } => f
260                .debug_struct("Event::CircuitReqDenied")
261                .field("circuit_id", circuit_id)
262                .field("dst_peer_id", dst_peer_id)
263                .finish(),
264            Event::CircuitReqDenyFailed {
265                circuit_id,
266                dst_peer_id,
267                error,
268            } => f
269                .debug_struct("Event::CircuitReqDenyFailed")
270                .field("circuit_id", circuit_id)
271                .field("dst_peer_id", dst_peer_id)
272                .field("error", error)
273                .finish(),
274            Event::CircuitReqAccepted {
275                circuit_id,
276                dst_peer_id,
277            } => f
278                .debug_struct("Event::CircuitReqAccepted")
279                .field("circuit_id", circuit_id)
280                .field("dst_peer_id", dst_peer_id)
281                .finish(),
282            Event::CircuitReqAcceptFailed {
283                circuit_id,
284                dst_peer_id,
285                error,
286            } => f
287                .debug_struct("Event::CircuitReqAcceptFailed")
288                .field("circuit_id", circuit_id)
289                .field("dst_peer_id", dst_peer_id)
290                .field("error", error)
291                .finish(),
292            Event::OutboundConnectNegotiated {
293                circuit_id,
294                src_peer_id,
295                src_connection_id,
296                inbound_circuit_req: _,
297                dst_stream: _,
298                dst_pending_data: _,
299            } => f
300                .debug_struct("Event::OutboundConnectNegotiated")
301                .field("circuit_id", circuit_id)
302                .field("src_peer_id", src_peer_id)
303                .field("src_connection_id", src_connection_id)
304                .finish(),
305            Event::OutboundConnectNegotiationFailed {
306                circuit_id,
307                src_peer_id,
308                src_connection_id,
309                inbound_circuit_req: _,
310                status,
311                error,
312            } => f
313                .debug_struct("Event::OutboundConnectNegotiationFailed")
314                .field("circuit_id", circuit_id)
315                .field("src_peer_id", src_peer_id)
316                .field("src_connection_id", src_connection_id)
317                .field("status", status)
318                .field("error", error)
319                .finish(),
320            Event::CircuitClosed {
321                circuit_id,
322                dst_peer_id,
323                error,
324            } => f
325                .debug_struct("Event::CircuitClosed")
326                .field("circuit_id", circuit_id)
327                .field("dst_peer_id", dst_peer_id)
328                .field("error", error)
329                .finish(),
330        }
331    }
332}
333
334/// [`ConnectionHandler`] that manages substreams for a relay on a single
335/// connection with a peer.
336pub struct Handler {
337    endpoint: ConnectedPoint,
338
339    /// Static [`Handler`] [`Config`].
340    config: Config,
341
342    /// Queue of events to return when polled.
343    queued_events: VecDeque<
344        ConnectionHandlerEvent<
345            <Self as ConnectionHandler>::OutboundProtocol,
346            (),
347            <Self as ConnectionHandler>::ToBehaviour,
348        >,
349    >,
350
351    /// The point in time when this connection started idleing.
352    idle_at: Option<Instant>,
353
354    /// Future handling inbound reservation request.
355    reservation_request_future: Option<ReservationRequestFuture>,
356    /// Timeout for the currently active reservation.
357    active_reservation: Option<Delay>,
358
359    /// Futures accepting an inbound circuit request.
360    circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
361    /// Futures denying an inbound circuit request.
362    circuit_deny_futures: Futures<(Option<CircuitId>, PeerId, Result<(), inbound_hop::Error>)>,
363    /// Futures relaying data for circuit between two peers.
364    circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
365
366    /// We issue a stream upgrade for each [`PendingConnect`] request.
367    pending_connect_requests: VecDeque<PendingConnect>,
368
369    /// A `CONNECT` request is in flight for these circuits.
370    active_connect_requests: HashMap<CircuitId, PendingConnect>,
371
372    inbound_workers: futures_bounded::FuturesSet<
373        Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
374    >,
375    outbound_workers: futures_bounded::FuturesMap<
376        CircuitId,
377        Result<outbound_stop::Circuit, outbound_stop::Error>,
378    >,
379}
380
381impl Handler {
382    pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
383        Handler {
384            inbound_workers: futures_bounded::FuturesSet::new(
385                STREAM_TIMEOUT,
386                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
387            ),
388            outbound_workers: futures_bounded::FuturesMap::new(
389                STREAM_TIMEOUT,
390                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
391            ),
392            endpoint,
393            config,
394            queued_events: Default::default(),
395            idle_at: None,
396            reservation_request_future: Default::default(),
397            circuit_accept_futures: Default::default(),
398            circuit_deny_futures: Default::default(),
399            circuits: Default::default(),
400            active_reservation: Default::default(),
401            pending_connect_requests: Default::default(),
402            active_connect_requests: Default::default(),
403        }
404    }
405
406    fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
407        if self
408            .inbound_workers
409            .try_push(inbound_hop::handle_inbound_request(
410                stream,
411                self.config.reservation_duration,
412                self.config.max_circuit_duration,
413                self.config.max_circuit_bytes,
414            ))
415            .is_err()
416        {
417            tracing::warn!("Dropping inbound stream because we are at capacity")
418        }
419    }
420
421    fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
422        let connect = self
423            .pending_connect_requests
424            .pop_front()
425            .expect("opened a stream without a pending stop command");
426
427        if self
428            .outbound_workers
429            .try_push(
430                connect.circuit_id,
431                outbound_stop::connect(
432                    stream,
433                    connect.src_peer_id,
434                    connect.max_circuit_duration,
435                    connect.max_circuit_bytes,
436                ),
437            )
438            .is_err()
439        {
440            tracing::warn!("Dropping outbound stream because we are at capacity")
441        }
442
443        self.active_connect_requests
444            .insert(connect.circuit_id, connect);
445    }
446
447    fn on_dial_upgrade_error(
448        &mut self,
449        DialUpgradeError { error, .. }: DialUpgradeError<
450            (),
451            <Self as ConnectionHandler>::OutboundProtocol,
452        >,
453    ) {
454        let error = match error {
455            StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
456            StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
457            StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
458            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
459        };
460
461        let stop_command = self
462            .pending_connect_requests
463            .pop_front()
464            .expect("failed to open a stream without a pending stop command");
465
466        self.queued_events
467            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
468                Event::OutboundConnectNegotiationFailed {
469                    circuit_id: stop_command.circuit_id,
470                    src_peer_id: stop_command.src_peer_id,
471                    src_connection_id: stop_command.src_connection_id,
472                    inbound_circuit_req: stop_command.inbound_circuit_req,
473                    status: proto::Status::CONNECTION_FAILED,
474                    error,
475                },
476            ));
477    }
478}
479
480enum ReservationRequestFuture {
481    Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
482    Denying(BoxFuture<'static, Result<(), inbound_hop::Error>>),
483}
484
485type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
486
487impl ConnectionHandler for Handler {
488    type FromBehaviour = In;
489    type ToBehaviour = Event;
490    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
491    type InboundOpenInfo = ();
492    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
493    type OutboundOpenInfo = ();
494
495    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
496        SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
497    }
498
499    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
500        match event {
501            In::AcceptReservationReq {
502                inbound_reservation_req,
503                addrs,
504            } => {
505                if self
506                    .reservation_request_future
507                    .replace(ReservationRequestFuture::Accepting(
508                        inbound_reservation_req.accept(addrs).err_into().boxed(),
509                    ))
510                    .is_some()
511                {
512                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
513                }
514            }
515            In::DenyReservationReq {
516                inbound_reservation_req,
517                status,
518            } => {
519                if self
520                    .reservation_request_future
521                    .replace(ReservationRequestFuture::Denying(
522                        inbound_reservation_req.deny(status).err_into().boxed(),
523                    ))
524                    .is_some()
525                {
526                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
527                }
528            }
529            In::NegotiateOutboundConnect {
530                circuit_id,
531                inbound_circuit_req,
532                src_peer_id,
533                src_connection_id,
534            } => {
535                self.pending_connect_requests.push_back(PendingConnect::new(
536                    circuit_id,
537                    inbound_circuit_req,
538                    src_peer_id,
539                    src_connection_id,
540                    &self.config,
541                ));
542                self.queued_events
543                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
544                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
545                    });
546            }
547            In::DenyCircuitReq {
548                circuit_id,
549                inbound_circuit_req,
550                status,
551            } => {
552                let dst_peer_id = inbound_circuit_req.dst();
553                self.circuit_deny_futures.push(
554                    inbound_circuit_req
555                        .deny(status)
556                        .err_into()
557                        .map(move |result| (circuit_id, dst_peer_id, result))
558                        .boxed(),
559                );
560            }
561            In::AcceptAndDriveCircuit {
562                circuit_id,
563                dst_peer_id,
564                inbound_circuit_req,
565                dst_stream,
566                dst_pending_data,
567            } => {
568                self.circuit_accept_futures.push(
569                    inbound_circuit_req
570                        .accept()
571                        .err_into()
572                        .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
573                            circuit_id,
574                            src_stream,
575                            src_pending_data,
576                            dst_peer_id,
577                            dst_stream,
578                            dst_pending_data,
579                        })
580                        .map_err(move |e| (circuit_id, dst_peer_id, e))
581                        .boxed(),
582                );
583            }
584        }
585    }
586
587    fn connection_keep_alive(&self) -> bool {
588        let Some(idle_at) = self.idle_at else {
589            return true;
590        };
591
592        Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
593    }
594
595    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
596    fn poll(
597        &mut self,
598        cx: &mut Context<'_>,
599    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
600        // Return queued events.
601        if let Some(event) = self.queued_events.pop_front() {
602            return Poll::Ready(event);
603        }
604
605        // Progress existing circuits.
606        if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
607            self.circuits.poll_next_unpin(cx)
608        {
609            match result {
610                Ok(()) => {
611                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
612                        Event::CircuitClosed {
613                            circuit_id,
614                            dst_peer_id,
615                            error: None,
616                        },
617                    ))
618                }
619                Err(e) => {
620                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
621                        Event::CircuitClosed {
622                            circuit_id,
623                            dst_peer_id,
624                            error: Some(e),
625                        },
626                    ))
627                }
628            }
629        }
630
631        // Process inbound protocol workers
632        loop {
633            match self.inbound_workers.poll_unpin(cx) {
634                Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
635                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
636                        Event::ReservationReqReceived {
637                            inbound_reservation_req,
638                            endpoint: self.endpoint.clone(),
639                            renewed: self.active_reservation.is_some(),
640                        },
641                    ));
642                }
643                Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
644                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
645                        Event::CircuitReqReceived {
646                            inbound_circuit_req,
647                            endpoint: self.endpoint.clone(),
648                        },
649                    ));
650                }
651                Poll::Ready(Err(e)) => {
652                    tracing::debug!("Inbound stream operation timed out: {e}");
653                    continue;
654                }
655                Poll::Ready(Ok(Err(e))) => {
656                    tracing::debug!("Inbound stream operation failed: {e}");
657                    continue;
658                }
659                Poll::Pending => {
660                    break;
661                }
662            }
663        }
664
665        // Process outbound protocol workers
666        match self.outbound_workers.poll_unpin(cx) {
667            Poll::Ready((id, Ok(Ok(circuit)))) => {
668                let connect = self
669                    .active_connect_requests
670                    .remove(&id)
671                    .expect("must have pending connect");
672
673                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
674                    Event::OutboundConnectNegotiated {
675                        circuit_id: id,
676                        src_peer_id: connect.src_peer_id,
677                        src_connection_id: connect.src_connection_id,
678                        inbound_circuit_req: connect.inbound_circuit_req,
679                        dst_stream: circuit.dst_stream,
680                        dst_pending_data: circuit.dst_pending_data,
681                    },
682                ));
683            }
684            Poll::Ready((id, Ok(Err(error)))) => {
685                let connect = self
686                    .active_connect_requests
687                    .remove(&id)
688                    .expect("must have pending connect");
689
690                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
691                    Event::OutboundConnectNegotiationFailed {
692                        circuit_id: connect.circuit_id,
693                        src_peer_id: connect.src_peer_id,
694                        src_connection_id: connect.src_connection_id,
695                        inbound_circuit_req: connect.inbound_circuit_req,
696                        status: error.to_status(),
697                        error,
698                    },
699                ));
700            }
701            Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
702                let connect = self
703                    .active_connect_requests
704                    .remove(&id)
705                    .expect("must have pending connect");
706
707                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
708                    Event::OutboundConnectNegotiationFailed {
709                        circuit_id: connect.circuit_id,
710                        src_peer_id: connect.src_peer_id,
711                        src_connection_id: connect.src_connection_id,
712                        inbound_circuit_req: connect.inbound_circuit_req,
713                        status: proto::Status::CONNECTION_FAILED, // Best fit?
714                        error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
715                    },
716                ));
717            }
718            Poll::Pending => {}
719        }
720
721        // Deny new circuits.
722        if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
723            self.circuit_deny_futures.poll_next_unpin(cx)
724        {
725            match result {
726                Ok(()) => {
727                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
728                        Event::CircuitReqDenied {
729                            circuit_id,
730                            dst_peer_id,
731                        },
732                    ));
733                }
734                Err(error) => {
735                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
736                        Event::CircuitReqDenyFailed {
737                            circuit_id,
738                            dst_peer_id,
739                            error,
740                        },
741                    ));
742                }
743            }
744        }
745
746        // Accept new circuits.
747        if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
748            match result {
749                Ok(parts) => {
750                    let CircuitParts {
751                        circuit_id,
752                        mut src_stream,
753                        src_pending_data,
754                        dst_peer_id,
755                        mut dst_stream,
756                        dst_pending_data,
757                    } = parts;
758                    let max_circuit_duration = self.config.max_circuit_duration;
759                    let max_circuit_bytes = self.config.max_circuit_bytes;
760
761                    let circuit = async move {
762                        let (result_1, result_2) = futures::future::join(
763                            src_stream.write_all(&dst_pending_data),
764                            dst_stream.write_all(&src_pending_data),
765                        )
766                        .await;
767                        result_1?;
768                        result_2?;
769
770                        CopyFuture::new(
771                            src_stream,
772                            dst_stream,
773                            max_circuit_duration,
774                            max_circuit_bytes,
775                        )
776                        .await?;
777
778                        Ok(())
779                    }
780                    .map(move |r| (circuit_id, dst_peer_id, r))
781                    .boxed();
782
783                    self.circuits.push(circuit);
784
785                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
786                        Event::CircuitReqAccepted {
787                            circuit_id,
788                            dst_peer_id,
789                        },
790                    ));
791                }
792                Err((circuit_id, dst_peer_id, error)) => {
793                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
794                        Event::CircuitReqAcceptFailed {
795                            circuit_id,
796                            dst_peer_id,
797                            error,
798                        },
799                    ));
800                }
801            }
802        }
803
804        // Check active reservation.
805        if let Some(Poll::Ready(())) = self
806            .active_reservation
807            .as_mut()
808            .map(|fut| fut.poll_unpin(cx))
809        {
810            self.active_reservation = None;
811            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
812                Event::ReservationTimedOut {},
813            ));
814        }
815
816        // Progress reservation request.
817        match self.reservation_request_future.as_mut() {
818            Some(ReservationRequestFuture::Accepting(fut)) => {
819                if let Poll::Ready(result) = fut.poll_unpin(cx) {
820                    self.reservation_request_future = None;
821
822                    match result {
823                        Ok(()) => {
824                            let renewed = self
825                                .active_reservation
826                                .replace(Delay::new(self.config.reservation_duration))
827                                .is_some();
828                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
829                                Event::ReservationReqAccepted { renewed },
830                            ));
831                        }
832                        Err(error) => {
833                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
834                                Event::ReservationReqAcceptFailed { error },
835                            ));
836                        }
837                    }
838                }
839            }
840            Some(ReservationRequestFuture::Denying(fut)) => {
841                if let Poll::Ready(result) = fut.poll_unpin(cx) {
842                    self.reservation_request_future = None;
843
844                    match result {
845                        Ok(()) => {
846                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
847                                Event::ReservationReqDenied {},
848                            ))
849                        }
850                        Err(error) => {
851                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
852                                Event::ReservationReqDenyFailed { error },
853                            ));
854                        }
855                    }
856                }
857            }
858            None => {}
859        }
860
861        // Check keep alive status.
862        if self.active_reservation.is_none() {
863            if self.idle_at.is_none() {
864                self.idle_at = Some(Instant::now());
865            }
866        } else {
867            self.idle_at = None;
868        }
869
870        Poll::Pending
871    }
872
873    fn on_connection_event(
874        &mut self,
875        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
876    ) {
877        match event {
878            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
879                protocol: stream,
880                ..
881            }) => {
882                self.on_fully_negotiated_inbound(stream);
883            }
884            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
885                protocol: stream,
886                ..
887            }) => {
888                self.on_fully_negotiated_outbound(stream);
889            }
890            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
891                self.on_dial_upgrade_error(dial_upgrade_error);
892            }
893            _ => {}
894        }
895    }
896}
897
898struct CircuitParts {
899    circuit_id: CircuitId,
900    src_stream: Stream,
901    src_pending_data: Bytes,
902    dst_peer_id: PeerId,
903    dst_stream: Stream,
904    dst_pending_data: Bytes,
905}
906
907/// Holds everything we know about a to-be-issued `CONNECT` request to a peer.
908struct PendingConnect {
909    circuit_id: CircuitId,
910    inbound_circuit_req: inbound_hop::CircuitReq,
911    src_peer_id: PeerId,
912    src_connection_id: ConnectionId,
913    max_circuit_duration: Duration,
914    max_circuit_bytes: u64,
915}
916
917impl PendingConnect {
918    fn new(
919        circuit_id: CircuitId,
920        inbound_circuit_req: inbound_hop::CircuitReq,
921        src_peer_id: PeerId,
922        src_connection_id: ConnectionId,
923        config: &Config,
924    ) -> Self {
925        Self {
926            circuit_id,
927            inbound_circuit_req,
928            src_peer_id,
929            src_connection_id,
930            max_circuit_duration: config.max_circuit_duration,
931            max_circuit_bytes: config.max_circuit_bytes,
932        }
933    }
934}