libp2p_relay/behaviour/
handler.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, VecDeque},
23    fmt, io,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use bytes::Bytes;
29use either::Either;
30use futures::{
31    future::{BoxFuture, FutureExt, TryFutureExt},
32    io::AsyncWriteExt,
33    stream::{FuturesUnordered, StreamExt},
34};
35use futures_timer::Delay;
36use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37use libp2p_identity::PeerId;
38use libp2p_swarm::{
39    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40    ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41    StreamUpgradeError, SubstreamProtocol,
42};
43use web_time::Instant;
44
45use crate::{
46    behaviour::CircuitId,
47    copy_future::CopyFuture,
48    proto,
49    protocol::{inbound_hop, outbound_stop},
50    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51};
52
53const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55
56#[derive(Debug, Clone)]
57pub struct Config {
58    pub reservation_duration: Duration,
59    pub max_circuit_duration: Duration,
60    pub max_circuit_bytes: u64,
61}
62
63pub enum In {
64    AcceptReservationReq {
65        inbound_reservation_req: inbound_hop::ReservationReq,
66        addrs: Vec<Multiaddr>,
67    },
68    DenyReservationReq {
69        inbound_reservation_req: inbound_hop::ReservationReq,
70        status: proto::Status,
71    },
72    DenyCircuitReq {
73        circuit_id: Option<CircuitId>,
74        inbound_circuit_req: inbound_hop::CircuitReq,
75        status: proto::Status,
76    },
77    NegotiateOutboundConnect {
78        circuit_id: CircuitId,
79        inbound_circuit_req: inbound_hop::CircuitReq,
80        src_peer_id: PeerId,
81        src_connection_id: ConnectionId,
82    },
83    AcceptAndDriveCircuit {
84        circuit_id: CircuitId,
85        dst_peer_id: PeerId,
86        inbound_circuit_req: inbound_hop::CircuitReq,
87        dst_stream: Stream,
88        dst_pending_data: Bytes,
89    },
90}
91
92impl fmt::Debug for In {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self {
95            In::AcceptReservationReq {
96                inbound_reservation_req: _,
97                addrs,
98            } => f
99                .debug_struct("In::AcceptReservationReq")
100                .field("addrs", addrs)
101                .finish(),
102            In::DenyReservationReq {
103                inbound_reservation_req: _,
104                status,
105            } => f
106                .debug_struct("In::DenyReservationReq")
107                .field("status", status)
108                .finish(),
109            In::DenyCircuitReq {
110                circuit_id,
111                inbound_circuit_req: _,
112                status,
113            } => f
114                .debug_struct("In::DenyCircuitReq")
115                .field("circuit_id", circuit_id)
116                .field("status", status)
117                .finish(),
118            In::NegotiateOutboundConnect {
119                circuit_id,
120                inbound_circuit_req: _,
121                src_peer_id,
122                src_connection_id,
123            } => f
124                .debug_struct("In::NegotiateOutboundConnect")
125                .field("circuit_id", circuit_id)
126                .field("src_peer_id", src_peer_id)
127                .field("src_connection_id", src_connection_id)
128                .finish(),
129            In::AcceptAndDriveCircuit {
130                circuit_id,
131                inbound_circuit_req: _,
132                dst_peer_id,
133                dst_stream: _,
134                dst_pending_data: _,
135            } => f
136                .debug_struct("In::AcceptAndDriveCircuit")
137                .field("circuit_id", circuit_id)
138                .field("dst_peer_id", dst_peer_id)
139                .finish(),
140        }
141    }
142}
143
144/// The events produced by the [`Handler`].
145#[allow(clippy::large_enum_variant)]
146pub enum Event {
147    /// An inbound reservation request has been received.
148    ReservationReqReceived {
149        inbound_reservation_req: inbound_hop::ReservationReq,
150        endpoint: ConnectedPoint,
151        /// Indicates whether the request replaces an existing reservation.
152        renewed: bool,
153    },
154    /// An inbound reservation request has been accepted.
155    ReservationReqAccepted {
156        /// Indicates whether the request replaces an existing reservation.
157        renewed: bool,
158    },
159    /// Accepting an inbound reservation request failed.
160    ReservationReqAcceptFailed { error: inbound_hop::Error },
161    /// An inbound reservation request has been denied.
162    ReservationReqDenied { status: proto::Status },
163    /// Denying an inbound reservation request has failed.
164    ReservationReqDenyFailed { error: inbound_hop::Error },
165    /// An inbound reservation has timed out.
166    ReservationTimedOut {},
167    /// An inbound circuit request has been received.
168    CircuitReqReceived {
169        inbound_circuit_req: inbound_hop::CircuitReq,
170        endpoint: ConnectedPoint,
171    },
172    /// An inbound circuit request has been denied.
173    CircuitReqDenied {
174        circuit_id: Option<CircuitId>,
175        dst_peer_id: PeerId,
176        status: proto::Status,
177    },
178    /// Denying an inbound circuit request failed.
179    CircuitReqDenyFailed {
180        circuit_id: Option<CircuitId>,
181        dst_peer_id: PeerId,
182        error: inbound_hop::Error,
183    },
184    /// An inbound circuit request has been accepted.
185    CircuitReqAccepted {
186        circuit_id: CircuitId,
187        dst_peer_id: PeerId,
188    },
189    /// Accepting an inbound circuit request failed.
190    CircuitReqAcceptFailed {
191        circuit_id: CircuitId,
192        dst_peer_id: PeerId,
193        error: inbound_hop::Error,
194    },
195    /// An outbound substream for an inbound circuit request has been
196    /// negotiated.
197    OutboundConnectNegotiated {
198        circuit_id: CircuitId,
199        src_peer_id: PeerId,
200        src_connection_id: ConnectionId,
201        inbound_circuit_req: inbound_hop::CircuitReq,
202        dst_stream: Stream,
203        dst_pending_data: Bytes,
204    },
205    /// Negotiating an outbound substream for an inbound circuit request failed.
206    OutboundConnectNegotiationFailed {
207        circuit_id: CircuitId,
208        src_peer_id: PeerId,
209        src_connection_id: ConnectionId,
210        inbound_circuit_req: inbound_hop::CircuitReq,
211        status: proto::Status,
212        error: outbound_stop::Error,
213    },
214    /// An inbound circuit has closed.
215    CircuitClosed {
216        circuit_id: CircuitId,
217        dst_peer_id: PeerId,
218        error: Option<std::io::Error>,
219    },
220}
221
222impl fmt::Debug for Event {
223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224        match self {
225            Event::ReservationReqReceived {
226                inbound_reservation_req: _,
227                endpoint,
228                renewed,
229            } => f
230                .debug_struct("Event::ReservationReqReceived")
231                .field("endpoint", endpoint)
232                .field("renewed", renewed)
233                .finish(),
234            Event::ReservationReqAccepted { renewed } => f
235                .debug_struct("Event::ReservationReqAccepted")
236                .field("renewed", renewed)
237                .finish(),
238            Event::ReservationReqAcceptFailed { error } => f
239                .debug_struct("Event::ReservationReqAcceptFailed")
240                .field("error", error)
241                .finish(),
242            Event::ReservationReqDenied { status } => f
243                .debug_struct("Event::ReservationReqDenied")
244                .field("status", status)
245                .finish(),
246            Event::ReservationReqDenyFailed { error } => f
247                .debug_struct("Event::ReservationReqDenyFailed")
248                .field("error", error)
249                .finish(),
250            Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
251            Event::CircuitReqReceived {
252                endpoint,
253                inbound_circuit_req: _,
254            } => f
255                .debug_struct("Event::CircuitReqReceived")
256                .field("endpoint", endpoint)
257                .finish(),
258            Event::CircuitReqDenied {
259                circuit_id,
260                dst_peer_id,
261                status,
262            } => f
263                .debug_struct("Event::CircuitReqDenied")
264                .field("circuit_id", circuit_id)
265                .field("dst_peer_id", dst_peer_id)
266                .field("status", status)
267                .finish(),
268            Event::CircuitReqDenyFailed {
269                circuit_id,
270                dst_peer_id,
271                error,
272            } => f
273                .debug_struct("Event::CircuitReqDenyFailed")
274                .field("circuit_id", circuit_id)
275                .field("dst_peer_id", dst_peer_id)
276                .field("error", error)
277                .finish(),
278            Event::CircuitReqAccepted {
279                circuit_id,
280                dst_peer_id,
281            } => f
282                .debug_struct("Event::CircuitReqAccepted")
283                .field("circuit_id", circuit_id)
284                .field("dst_peer_id", dst_peer_id)
285                .finish(),
286            Event::CircuitReqAcceptFailed {
287                circuit_id,
288                dst_peer_id,
289                error,
290            } => f
291                .debug_struct("Event::CircuitReqAcceptFailed")
292                .field("circuit_id", circuit_id)
293                .field("dst_peer_id", dst_peer_id)
294                .field("error", error)
295                .finish(),
296            Event::OutboundConnectNegotiated {
297                circuit_id,
298                src_peer_id,
299                src_connection_id,
300                inbound_circuit_req: _,
301                dst_stream: _,
302                dst_pending_data: _,
303            } => f
304                .debug_struct("Event::OutboundConnectNegotiated")
305                .field("circuit_id", circuit_id)
306                .field("src_peer_id", src_peer_id)
307                .field("src_connection_id", src_connection_id)
308                .finish(),
309            Event::OutboundConnectNegotiationFailed {
310                circuit_id,
311                src_peer_id,
312                src_connection_id,
313                inbound_circuit_req: _,
314                status,
315                error,
316            } => f
317                .debug_struct("Event::OutboundConnectNegotiationFailed")
318                .field("circuit_id", circuit_id)
319                .field("src_peer_id", src_peer_id)
320                .field("src_connection_id", src_connection_id)
321                .field("status", status)
322                .field("error", error)
323                .finish(),
324            Event::CircuitClosed {
325                circuit_id,
326                dst_peer_id,
327                error,
328            } => f
329                .debug_struct("Event::CircuitClosed")
330                .field("circuit_id", circuit_id)
331                .field("dst_peer_id", dst_peer_id)
332                .field("error", error)
333                .finish(),
334        }
335    }
336}
337
338/// [`ConnectionHandler`] that manages substreams for a relay on a single
339/// connection with a peer.
340pub struct Handler {
341    endpoint: ConnectedPoint,
342
343    /// Static [`Handler`] [`Config`].
344    config: Config,
345
346    /// Queue of events to return when polled.
347    queued_events: VecDeque<
348        ConnectionHandlerEvent<
349            <Self as ConnectionHandler>::OutboundProtocol,
350            (),
351            <Self as ConnectionHandler>::ToBehaviour,
352        >,
353    >,
354
355    /// The point in time when this connection started idleing.
356    idle_at: Option<Instant>,
357
358    /// Future handling inbound reservation request.
359    reservation_request_future: Option<ReservationRequestFuture>,
360    /// Timeout for the currently active reservation.
361    active_reservation: Option<Delay>,
362
363    /// Futures accepting an inbound circuit request.
364    circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
365    /// Futures denying an inbound circuit request.
366    circuit_deny_futures: Futures<(
367        Option<CircuitId>,
368        PeerId,
369        proto::Status,
370        Result<(), inbound_hop::Error>,
371    )>,
372    /// Futures relaying data for circuit between two peers.
373    circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
374
375    /// We issue a stream upgrade for each [`PendingConnect`] request.
376    pending_connect_requests: VecDeque<PendingConnect>,
377
378    /// A `CONNECT` request is in flight for these circuits.
379    active_connect_requests: HashMap<CircuitId, PendingConnect>,
380
381    inbound_workers: futures_bounded::FuturesSet<
382        Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
383    >,
384    outbound_workers: futures_bounded::FuturesMap<
385        CircuitId,
386        Result<outbound_stop::Circuit, outbound_stop::Error>,
387    >,
388}
389
390impl Handler {
391    pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
392        Handler {
393            inbound_workers: futures_bounded::FuturesSet::new(
394                STREAM_TIMEOUT,
395                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
396            ),
397            outbound_workers: futures_bounded::FuturesMap::new(
398                STREAM_TIMEOUT,
399                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
400            ),
401            endpoint,
402            config,
403            queued_events: Default::default(),
404            idle_at: None,
405            reservation_request_future: Default::default(),
406            circuit_accept_futures: Default::default(),
407            circuit_deny_futures: Default::default(),
408            circuits: Default::default(),
409            active_reservation: Default::default(),
410            pending_connect_requests: Default::default(),
411            active_connect_requests: Default::default(),
412        }
413    }
414
415    fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
416        if self
417            .inbound_workers
418            .try_push(inbound_hop::handle_inbound_request(
419                stream,
420                self.config.reservation_duration,
421                self.config.max_circuit_duration,
422                self.config.max_circuit_bytes,
423            ))
424            .is_err()
425        {
426            tracing::warn!("Dropping inbound stream because we are at capacity")
427        }
428    }
429
430    fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
431        let connect = self
432            .pending_connect_requests
433            .pop_front()
434            .expect("opened a stream without a pending stop command");
435
436        if self
437            .outbound_workers
438            .try_push(
439                connect.circuit_id,
440                outbound_stop::connect(
441                    stream,
442                    connect.src_peer_id,
443                    connect.max_circuit_duration,
444                    connect.max_circuit_bytes,
445                ),
446            )
447            .is_err()
448        {
449            tracing::warn!("Dropping outbound stream because we are at capacity")
450        }
451
452        self.active_connect_requests
453            .insert(connect.circuit_id, connect);
454    }
455
456    fn on_dial_upgrade_error(
457        &mut self,
458        DialUpgradeError { error, .. }: DialUpgradeError<
459            (),
460            <Self as ConnectionHandler>::OutboundProtocol,
461        >,
462    ) {
463        let error = match error {
464            StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
465            StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
466            StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
467            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
468        };
469
470        let stop_command = self
471            .pending_connect_requests
472            .pop_front()
473            .expect("failed to open a stream without a pending stop command");
474
475        self.queued_events
476            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
477                Event::OutboundConnectNegotiationFailed {
478                    circuit_id: stop_command.circuit_id,
479                    src_peer_id: stop_command.src_peer_id,
480                    src_connection_id: stop_command.src_connection_id,
481                    inbound_circuit_req: stop_command.inbound_circuit_req,
482                    status: proto::Status::CONNECTION_FAILED,
483                    error,
484                },
485            ));
486    }
487}
488
489enum ReservationRequestFuture {
490    Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
491    Denying(BoxFuture<'static, (proto::Status, Result<(), inbound_hop::Error>)>),
492}
493
494type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
495
496impl ConnectionHandler for Handler {
497    type FromBehaviour = In;
498    type ToBehaviour = Event;
499    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
500    type InboundOpenInfo = ();
501    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
502    type OutboundOpenInfo = ();
503
504    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
505        SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
506    }
507
508    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
509        match event {
510            In::AcceptReservationReq {
511                inbound_reservation_req,
512                addrs,
513            } => {
514                if self
515                    .reservation_request_future
516                    .replace(ReservationRequestFuture::Accepting(
517                        inbound_reservation_req.accept(addrs).err_into().boxed(),
518                    ))
519                    .is_some()
520                {
521                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
522                }
523            }
524            In::DenyReservationReq {
525                inbound_reservation_req,
526                status,
527            } => {
528                if self
529                    .reservation_request_future
530                    .replace(ReservationRequestFuture::Denying(
531                        inbound_reservation_req
532                            .deny(status)
533                            .err_into()
534                            .map(move |result| (status, result))
535                            .boxed(),
536                    ))
537                    .is_some()
538                {
539                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
540                }
541            }
542            In::NegotiateOutboundConnect {
543                circuit_id,
544                inbound_circuit_req,
545                src_peer_id,
546                src_connection_id,
547            } => {
548                self.pending_connect_requests.push_back(PendingConnect::new(
549                    circuit_id,
550                    inbound_circuit_req,
551                    src_peer_id,
552                    src_connection_id,
553                    &self.config,
554                ));
555                self.queued_events
556                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
557                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
558                    });
559            }
560            In::DenyCircuitReq {
561                circuit_id,
562                inbound_circuit_req,
563                status,
564            } => {
565                let dst_peer_id = inbound_circuit_req.dst();
566                self.circuit_deny_futures.push(
567                    inbound_circuit_req
568                        .deny(status)
569                        .err_into()
570                        .map(move |result| (circuit_id, dst_peer_id, status, result))
571                        .boxed(),
572                );
573            }
574            In::AcceptAndDriveCircuit {
575                circuit_id,
576                dst_peer_id,
577                inbound_circuit_req,
578                dst_stream,
579                dst_pending_data,
580            } => {
581                self.circuit_accept_futures.push(
582                    inbound_circuit_req
583                        .accept()
584                        .err_into()
585                        .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
586                            circuit_id,
587                            src_stream,
588                            src_pending_data,
589                            dst_peer_id,
590                            dst_stream,
591                            dst_pending_data,
592                        })
593                        .map_err(move |e| (circuit_id, dst_peer_id, e))
594                        .boxed(),
595                );
596            }
597        }
598    }
599
600    fn connection_keep_alive(&self) -> bool {
601        let Some(idle_at) = self.idle_at else {
602            return true;
603        };
604
605        Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
606    }
607
608    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
609    fn poll(
610        &mut self,
611        cx: &mut Context<'_>,
612    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
613        // Return queued events.
614        if let Some(event) = self.queued_events.pop_front() {
615            return Poll::Ready(event);
616        }
617
618        // Progress existing circuits.
619        if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
620            self.circuits.poll_next_unpin(cx)
621        {
622            match result {
623                Ok(()) => {
624                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
625                        Event::CircuitClosed {
626                            circuit_id,
627                            dst_peer_id,
628                            error: None,
629                        },
630                    ))
631                }
632                Err(e) => {
633                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
634                        Event::CircuitClosed {
635                            circuit_id,
636                            dst_peer_id,
637                            error: Some(e),
638                        },
639                    ))
640                }
641            }
642        }
643
644        // Process inbound protocol workers
645        loop {
646            match self.inbound_workers.poll_unpin(cx) {
647                Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
648                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
649                        Event::ReservationReqReceived {
650                            inbound_reservation_req,
651                            endpoint: self.endpoint.clone(),
652                            renewed: self.active_reservation.is_some(),
653                        },
654                    ));
655                }
656                Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
657                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
658                        Event::CircuitReqReceived {
659                            inbound_circuit_req,
660                            endpoint: self.endpoint.clone(),
661                        },
662                    ));
663                }
664                Poll::Ready(Err(e)) => {
665                    tracing::debug!("Inbound stream operation timed out: {e}");
666                    continue;
667                }
668                Poll::Ready(Ok(Err(e))) => {
669                    tracing::debug!("Inbound stream operation failed: {e}");
670                    continue;
671                }
672                Poll::Pending => {
673                    break;
674                }
675            }
676        }
677
678        // Process outbound protocol workers
679        match self.outbound_workers.poll_unpin(cx) {
680            Poll::Ready((id, Ok(Ok(circuit)))) => {
681                let connect = self
682                    .active_connect_requests
683                    .remove(&id)
684                    .expect("must have pending connect");
685
686                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
687                    Event::OutboundConnectNegotiated {
688                        circuit_id: id,
689                        src_peer_id: connect.src_peer_id,
690                        src_connection_id: connect.src_connection_id,
691                        inbound_circuit_req: connect.inbound_circuit_req,
692                        dst_stream: circuit.dst_stream,
693                        dst_pending_data: circuit.dst_pending_data,
694                    },
695                ));
696            }
697            Poll::Ready((id, Ok(Err(error)))) => {
698                let connect = self
699                    .active_connect_requests
700                    .remove(&id)
701                    .expect("must have pending connect");
702
703                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
704                    Event::OutboundConnectNegotiationFailed {
705                        circuit_id: connect.circuit_id,
706                        src_peer_id: connect.src_peer_id,
707                        src_connection_id: connect.src_connection_id,
708                        inbound_circuit_req: connect.inbound_circuit_req,
709                        status: error.to_status(),
710                        error,
711                    },
712                ));
713            }
714            Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
715                let connect = self
716                    .active_connect_requests
717                    .remove(&id)
718                    .expect("must have pending connect");
719
720                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
721                    Event::OutboundConnectNegotiationFailed {
722                        circuit_id: connect.circuit_id,
723                        src_peer_id: connect.src_peer_id,
724                        src_connection_id: connect.src_connection_id,
725                        inbound_circuit_req: connect.inbound_circuit_req,
726                        status: proto::Status::CONNECTION_FAILED, // Best fit?
727                        error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
728                    },
729                ));
730            }
731            Poll::Pending => {}
732        }
733
734        // Deny new circuits.
735        if let Poll::Ready(Some((circuit_id, dst_peer_id, status, result))) =
736            self.circuit_deny_futures.poll_next_unpin(cx)
737        {
738            match result {
739                Ok(()) => {
740                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
741                        Event::CircuitReqDenied {
742                            circuit_id,
743                            dst_peer_id,
744                            status,
745                        },
746                    ));
747                }
748                Err(error) => {
749                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
750                        Event::CircuitReqDenyFailed {
751                            circuit_id,
752                            dst_peer_id,
753                            error,
754                        },
755                    ));
756                }
757            }
758        }
759
760        // Accept new circuits.
761        if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
762            match result {
763                Ok(parts) => {
764                    let CircuitParts {
765                        circuit_id,
766                        mut src_stream,
767                        src_pending_data,
768                        dst_peer_id,
769                        mut dst_stream,
770                        dst_pending_data,
771                    } = parts;
772                    let max_circuit_duration = self.config.max_circuit_duration;
773                    let max_circuit_bytes = self.config.max_circuit_bytes;
774
775                    let circuit = async move {
776                        let (result_1, result_2) = futures::future::join(
777                            src_stream.write_all(&dst_pending_data),
778                            dst_stream.write_all(&src_pending_data),
779                        )
780                        .await;
781                        result_1?;
782                        result_2?;
783
784                        CopyFuture::new(
785                            src_stream,
786                            dst_stream,
787                            max_circuit_duration,
788                            max_circuit_bytes,
789                        )
790                        .await?;
791
792                        Ok(())
793                    }
794                    .map(move |r| (circuit_id, dst_peer_id, r))
795                    .boxed();
796
797                    self.circuits.push(circuit);
798
799                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
800                        Event::CircuitReqAccepted {
801                            circuit_id,
802                            dst_peer_id,
803                        },
804                    ));
805                }
806                Err((circuit_id, dst_peer_id, error)) => {
807                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
808                        Event::CircuitReqAcceptFailed {
809                            circuit_id,
810                            dst_peer_id,
811                            error,
812                        },
813                    ));
814                }
815            }
816        }
817
818        // Check active reservation.
819        if let Some(Poll::Ready(())) = self
820            .active_reservation
821            .as_mut()
822            .map(|fut| fut.poll_unpin(cx))
823        {
824            self.active_reservation = None;
825            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
826                Event::ReservationTimedOut {},
827            ));
828        }
829
830        // Progress reservation request.
831        match self.reservation_request_future.as_mut() {
832            Some(ReservationRequestFuture::Accepting(fut)) => {
833                if let Poll::Ready(result) = fut.poll_unpin(cx) {
834                    self.reservation_request_future = None;
835
836                    match result {
837                        Ok(()) => {
838                            let renewed = self
839                                .active_reservation
840                                .replace(Delay::new(self.config.reservation_duration))
841                                .is_some();
842                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
843                                Event::ReservationReqAccepted { renewed },
844                            ));
845                        }
846                        Err(error) => {
847                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
848                                Event::ReservationReqAcceptFailed { error },
849                            ));
850                        }
851                    }
852                }
853            }
854            Some(ReservationRequestFuture::Denying(fut)) => {
855                if let Poll::Ready((status, result)) = fut.poll_unpin(cx) {
856                    self.reservation_request_future = None;
857
858                    match result {
859                        Ok(()) => {
860                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
861                                Event::ReservationReqDenied { status },
862                            ))
863                        }
864                        Err(error) => {
865                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
866                                Event::ReservationReqDenyFailed { error },
867                            ));
868                        }
869                    }
870                }
871            }
872            None => {}
873        }
874
875        // Check keep alive status.
876        if self.active_reservation.is_none() {
877            if self.idle_at.is_none() {
878                self.idle_at = Some(Instant::now());
879            }
880        } else {
881            self.idle_at = None;
882        }
883
884        Poll::Pending
885    }
886
887    fn on_connection_event(
888        &mut self,
889        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
890    ) {
891        match event {
892            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
893                protocol: stream,
894                ..
895            }) => {
896                self.on_fully_negotiated_inbound(stream);
897            }
898            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
899                protocol: stream,
900                ..
901            }) => {
902                self.on_fully_negotiated_outbound(stream);
903            }
904            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
905                self.on_dial_upgrade_error(dial_upgrade_error);
906            }
907            _ => {}
908        }
909    }
910}
911
912struct CircuitParts {
913    circuit_id: CircuitId,
914    src_stream: Stream,
915    src_pending_data: Bytes,
916    dst_peer_id: PeerId,
917    dst_stream: Stream,
918    dst_pending_data: Bytes,
919}
920
921/// Holds everything we know about a to-be-issued `CONNECT` request to a peer.
922struct PendingConnect {
923    circuit_id: CircuitId,
924    inbound_circuit_req: inbound_hop::CircuitReq,
925    src_peer_id: PeerId,
926    src_connection_id: ConnectionId,
927    max_circuit_duration: Duration,
928    max_circuit_bytes: u64,
929}
930
931impl PendingConnect {
932    fn new(
933        circuit_id: CircuitId,
934        inbound_circuit_req: inbound_hop::CircuitReq,
935        src_peer_id: PeerId,
936        src_connection_id: ConnectionId,
937        config: &Config,
938    ) -> Self {
939        Self {
940            circuit_id,
941            inbound_circuit_req,
942            src_peer_id,
943            src_connection_id,
944            max_circuit_duration: config.max_circuit_duration,
945            max_circuit_bytes: config.max_circuit_bytes,
946        }
947    }
948}