libp2p_relay/priv_client/
handler.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::VecDeque,
23    convert::Infallible,
24    fmt, io,
25    task::{Context, Poll},
26    time::Duration,
27};
28
29use futures::{
30    channel::{mpsc, mpsc::Sender, oneshot},
31    future::FutureExt,
32};
33use futures_timer::Delay;
34use libp2p_core::{multiaddr::Protocol, upgrade::ReadyUpgrade, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37    handler::{ConnectionEvent, FullyNegotiatedInbound},
38    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39    SubstreamProtocol,
40};
41
42use crate::{
43    client::Connection,
44    priv_client,
45    priv_client::{transport, transport::ToListenerMsg},
46    proto,
47    protocol::{self, inbound_stop, outbound_hop},
48    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
49};
50
51/// The maximum number of circuits being denied concurrently.
52///
53/// Circuits to be denied exceeding the limit are dropped.
54const MAX_NUMBER_DENYING_CIRCUIT: usize = 8;
55const DENYING_CIRCUIT_TIMEOUT: Duration = Duration::from_secs(60);
56
57const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
58const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
59
60pub enum In {
61    Reserve {
62        to_listener: mpsc::Sender<transport::ToListenerMsg>,
63    },
64    EstablishCircuit {
65        dst_peer_id: PeerId,
66        to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
67    },
68}
69
70impl fmt::Debug for In {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        match self {
73            In::Reserve { to_listener: _ } => f.debug_struct("In::Reserve").finish(),
74            In::EstablishCircuit {
75                dst_peer_id,
76                to_dial: _,
77            } => f
78                .debug_struct("In::EstablishCircuit")
79                .field("dst_peer_id", dst_peer_id)
80                .finish(),
81        }
82    }
83}
84
85#[derive(Debug)]
86pub enum Event {
87    ReservationReqAccepted {
88        /// Indicates whether the request replaces an existing reservation.
89        renewal: bool,
90        limit: Option<protocol::Limit>,
91    },
92    /// An outbound circuit has been established.
93    OutboundCircuitEstablished { limit: Option<protocol::Limit> },
94    /// An inbound circuit has been established.
95    InboundCircuitEstablished {
96        src_peer_id: PeerId,
97        limit: Option<protocol::Limit>,
98    },
99}
100
101pub struct Handler {
102    local_peer_id: PeerId,
103    remote_peer_id: PeerId,
104    remote_addr: Multiaddr,
105
106    /// Queue of events to return when polled.
107    queued_events: VecDeque<
108        ConnectionHandlerEvent<
109            <Handler as ConnectionHandler>::OutboundProtocol,
110            (),
111            <Handler as ConnectionHandler>::ToBehaviour,
112        >,
113    >,
114
115    pending_streams: VecDeque<oneshot::Sender<Result<Stream, StreamUpgradeError<Infallible>>>>,
116
117    inflight_reserve_requests: futures_bounded::FuturesTupleSet<
118        Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
119        mpsc::Sender<transport::ToListenerMsg>,
120    >,
121
122    inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet<
123        Result<outbound_hop::Circuit, outbound_hop::ConnectError>,
124        oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
125    >,
126
127    inflight_inbound_circuit_requests:
128        futures_bounded::FuturesSet<Result<inbound_stop::Circuit, inbound_stop::Error>>,
129
130    inflight_outbound_circuit_deny_requests:
131        futures_bounded::FuturesSet<Result<(), inbound_stop::Error>>,
132
133    reservation: Reservation,
134}
135
136impl Handler {
137    pub fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self {
138        Self {
139            local_peer_id,
140            remote_peer_id,
141            remote_addr,
142            queued_events: Default::default(),
143            pending_streams: Default::default(),
144            inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
145                STREAM_TIMEOUT,
146                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
147            ),
148            inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
149                STREAM_TIMEOUT,
150                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
151            ),
152            inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
153                STREAM_TIMEOUT,
154                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
155            ),
156            inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
157                DENYING_CIRCUIT_TIMEOUT,
158                MAX_NUMBER_DENYING_CIRCUIT,
159            ),
160            reservation: Reservation::None,
161        }
162    }
163
164    fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) {
165        let src_peer_id = circuit.src_peer_id();
166
167        if self
168            .inflight_outbound_circuit_deny_requests
169            .try_push(circuit.deny(proto::Status::NO_RESERVATION))
170            .is_err()
171        {
172            tracing::warn!(
173                peer=%src_peer_id,
174                "Dropping existing inbound circuit request to be denied from peer in favor of new one"
175            )
176        }
177    }
178
179    fn make_new_reservation(&mut self, to_listener: Sender<ToListenerMsg>) {
180        let (sender, receiver) = oneshot::channel();
181
182        self.pending_streams.push_back(sender);
183        self.queued_events
184            .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
185                protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
186            });
187        let result = self.inflight_reserve_requests.try_push(
188            async move {
189                let stream = receiver
190                    .await
191                    .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
192                    .map_err(into_reserve_error)?;
193
194                let reservation = outbound_hop::make_reservation(stream).await?;
195
196                Ok(reservation)
197            },
198            to_listener,
199        );
200
201        if result.is_err() {
202            tracing::warn!("Dropping in-flight reservation request because we are at capacity");
203        }
204    }
205
206    fn establish_new_circuit(
207        &mut self,
208        to_dial: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
209        dst_peer_id: PeerId,
210    ) {
211        let (sender, receiver) = oneshot::channel();
212
213        self.pending_streams.push_back(sender);
214        self.queued_events
215            .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
216                protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
217            });
218        let result = self.inflight_outbound_connect_requests.try_push(
219            async move {
220                let stream = receiver
221                    .await
222                    .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
223                    .map_err(into_connect_error)?;
224
225                outbound_hop::open_circuit(stream, dst_peer_id).await
226            },
227            to_dial,
228        );
229
230        if result.is_err() {
231            tracing::warn!("Dropping in-flight connect request because we are at capacity")
232        }
233    }
234}
235
236impl ConnectionHandler for Handler {
237    type FromBehaviour = In;
238    type ToBehaviour = Event;
239    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
240    type InboundOpenInfo = ();
241    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
242    type OutboundOpenInfo = ();
243
244    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
245        SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ())
246    }
247
248    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
249        match event {
250            In::Reserve { to_listener } => {
251                self.make_new_reservation(to_listener);
252            }
253            In::EstablishCircuit {
254                to_dial,
255                dst_peer_id,
256            } => {
257                self.establish_new_circuit(to_dial, dst_peer_id);
258            }
259        }
260    }
261
262    fn connection_keep_alive(&self) -> bool {
263        self.reservation.is_some()
264    }
265
266    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
267    fn poll(
268        &mut self,
269        cx: &mut Context<'_>,
270    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
271        loop {
272            // Reservations
273            match self.inflight_reserve_requests.poll_unpin(cx) {
274                Poll::Ready((
275                    Ok(Ok(outbound_hop::Reservation {
276                        renewal_timeout,
277                        addrs,
278                        limit,
279                    })),
280                    to_listener,
281                )) => {
282                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
283                        self.reservation.accepted(
284                            renewal_timeout,
285                            addrs,
286                            to_listener,
287                            self.local_peer_id,
288                            limit,
289                        ),
290                    ));
291                }
292                Poll::Ready((Ok(Err(error)), mut to_listener)) => {
293                    if let Err(e) =
294                        to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
295                    {
296                        tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
297                    }
298                    self.reservation.failed();
299                    continue;
300                }
301                Poll::Ready((Err(futures_bounded::Timeout { .. }), mut to_listener)) => {
302                    if let Err(e) =
303                        to_listener.try_send(transport::ToListenerMsg::Reservation(Err(
304                            outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()),
305                        )))
306                    {
307                        tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
308                    }
309                    self.reservation.failed();
310                    continue;
311                }
312                Poll::Pending => {}
313            }
314
315            // Circuits
316            match self.inflight_outbound_connect_requests.poll_unpin(cx) {
317                Poll::Ready((
318                    Ok(Ok(outbound_hop::Circuit {
319                        limit,
320                        read_buffer,
321                        stream,
322                    })),
323                    to_dialer,
324                )) => {
325                    if to_dialer
326                        .send(Ok(priv_client::Connection {
327                            state: priv_client::ConnectionState::new_outbound(stream, read_buffer),
328                        }))
329                        .is_err()
330                    {
331                        tracing::debug!(
332                            "Dropping newly established circuit because the listener is gone"
333                        );
334                        continue;
335                    }
336
337                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
338                        Event::OutboundCircuitEstablished { limit },
339                    ));
340                }
341                Poll::Ready((Ok(Err(error)), to_dialer)) => {
342                    let _ = to_dialer.send(Err(error));
343                    continue;
344                }
345                Poll::Ready((Err(futures_bounded::Timeout { .. }), to_dialer)) => {
346                    if to_dialer
347                        .send(Err(outbound_hop::ConnectError::Io(
348                            io::ErrorKind::TimedOut.into(),
349                        )))
350                        .is_err()
351                    {
352                        tracing::debug!("Unable to send error to dialer")
353                    }
354                    self.reservation.failed();
355                    continue;
356                }
357                Poll::Pending => {}
358            }
359
360            // Return queued events.
361            if let Some(event) = self.queued_events.pop_front() {
362                return Poll::Ready(event);
363            }
364
365            match self.inflight_inbound_circuit_requests.poll_unpin(cx) {
366                Poll::Ready(Ok(Ok(circuit))) => match &mut self.reservation {
367                    Reservation::Accepted { pending_msgs, .. }
368                    | Reservation::Renewing { pending_msgs, .. } => {
369                        let src_peer_id = circuit.src_peer_id();
370                        let limit = circuit.limit();
371
372                        let connection = super::ConnectionState::new_inbound(circuit);
373
374                        pending_msgs.push_back(
375                            transport::ToListenerMsg::IncomingRelayedConnection {
376                                stream: super::Connection { state: connection },
377                                src_peer_id,
378                                relay_peer_id: self.remote_peer_id,
379                                relay_addr: self.remote_addr.clone(),
380                            },
381                        );
382                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
383                            Event::InboundCircuitEstablished { src_peer_id, limit },
384                        ));
385                    }
386                    Reservation::None => {
387                        self.insert_to_deny_futs(circuit);
388                        continue;
389                    }
390                },
391                Poll::Ready(Ok(Err(e))) => {
392                    tracing::debug!("An inbound circuit request failed: {e}");
393                    continue;
394                }
395                Poll::Ready(Err(e)) => {
396                    tracing::debug!("An inbound circuit request timed out: {e}");
397                    continue;
398                }
399                Poll::Pending => {}
400            }
401
402            if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
403                self.make_new_reservation(to_listener);
404                continue;
405            }
406
407            // Deny incoming circuit requests.
408            match self.inflight_outbound_circuit_deny_requests.poll_unpin(cx) {
409                Poll::Ready(Ok(Ok(()))) => continue,
410                Poll::Ready(Ok(Err(error))) => {
411                    tracing::debug!("Denying inbound circuit failed: {error}");
412                    continue;
413                }
414                Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
415                    tracing::debug!("Denying inbound circuit timed out");
416                    continue;
417                }
418                Poll::Pending => {}
419            }
420
421            return Poll::Pending;
422        }
423    }
424
425    fn on_connection_event(
426        &mut self,
427        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
428    ) {
429        match event {
430            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
431                protocol: stream,
432                ..
433            }) => {
434                if self
435                    .inflight_inbound_circuit_requests
436                    .try_push(inbound_stop::handle_open_circuit(stream))
437                    .is_err()
438                {
439                    tracing::warn!("Dropping inbound stream because we are at capacity")
440                }
441            }
442            ConnectionEvent::FullyNegotiatedOutbound(ev) => {
443                if let Some(next) = self.pending_streams.pop_front() {
444                    let _ = next.send(Ok(ev.protocol));
445                }
446            }
447            ConnectionEvent::ListenUpgradeError(ev) => libp2p_core::util::unreachable(ev.error),
448            ConnectionEvent::DialUpgradeError(ev) => {
449                if let Some(next) = self.pending_streams.pop_front() {
450                    let _ = next.send(Err(ev.error));
451                }
452            }
453            _ => {}
454        }
455    }
456}
457
458enum Reservation {
459    /// The Reservation is accepted by the relay.
460    Accepted {
461        renewal_timeout: Delay,
462        /// Buffer of messages to be send to the transport listener.
463        pending_msgs: VecDeque<transport::ToListenerMsg>,
464        to_listener: mpsc::Sender<transport::ToListenerMsg>,
465    },
466    /// The reservation is being renewed with the relay.
467    Renewing {
468        /// Buffer of messages to be send to the transport listener.
469        pending_msgs: VecDeque<transport::ToListenerMsg>,
470    },
471    None,
472}
473
474impl Reservation {
475    fn accepted(
476        &mut self,
477        renewal_timeout: Delay,
478        addrs: Vec<Multiaddr>,
479        to_listener: mpsc::Sender<transport::ToListenerMsg>,
480        local_peer_id: PeerId,
481        limit: Option<protocol::Limit>,
482    ) -> Event {
483        let (renewal, mut pending_msgs) = match std::mem::replace(self, Self::None) {
484            Reservation::Accepted { pending_msgs, .. }
485            | Reservation::Renewing { pending_msgs, .. } => (true, pending_msgs),
486            Reservation::None => (false, VecDeque::new()),
487        };
488
489        pending_msgs.push_back(transport::ToListenerMsg::Reservation(Ok(
490            transport::Reservation {
491                addrs: addrs
492                    .into_iter()
493                    .map(|a| {
494                        a.with(Protocol::P2pCircuit)
495                            .with(Protocol::P2p(local_peer_id))
496                    })
497                    .collect(),
498            },
499        )));
500
501        *self = Reservation::Accepted {
502            renewal_timeout,
503            pending_msgs,
504            to_listener,
505        };
506
507        Event::ReservationReqAccepted { renewal, limit }
508    }
509
510    fn is_some(&self) -> bool {
511        matches!(self, Self::Accepted { .. } | Self::Renewing { .. })
512    }
513
514    /// Marks the current reservation as failed.
515    fn failed(&mut self) {
516        *self = Reservation::None;
517    }
518
519    fn forward_messages_to_transport_listener(&mut self, cx: &mut Context<'_>) {
520        if let Reservation::Accepted {
521            pending_msgs,
522            to_listener,
523            ..
524        } = self
525        {
526            if !pending_msgs.is_empty() {
527                match to_listener.poll_ready(cx) {
528                    Poll::Ready(Ok(())) => {
529                        if let Err(e) = to_listener
530                            .start_send(pending_msgs.pop_front().expect("Called !is_empty()."))
531                        {
532                            tracing::debug!("Failed to sent pending message to listener: {:?}", e);
533                            *self = Reservation::None;
534                        }
535                    }
536                    Poll::Ready(Err(e)) => {
537                        tracing::debug!("Channel to listener failed: {:?}", e);
538                        *self = Reservation::None;
539                    }
540                    Poll::Pending => {}
541                }
542            }
543        }
544    }
545
546    fn poll(
547        &mut self,
548        cx: &mut Context<'_>,
549    ) -> Poll<Option<mpsc::Sender<transport::ToListenerMsg>>> {
550        self.forward_messages_to_transport_listener(cx);
551
552        // Check renewal timeout if any.
553        let (next_reservation, poll_val) = match std::mem::replace(self, Reservation::None) {
554            Reservation::Accepted {
555                mut renewal_timeout,
556                pending_msgs,
557                to_listener,
558            } => match renewal_timeout.poll_unpin(cx) {
559                Poll::Ready(()) => (
560                    Reservation::Renewing { pending_msgs },
561                    Poll::Ready(Some(to_listener)),
562                ),
563                Poll::Pending => (
564                    Reservation::Accepted {
565                        renewal_timeout,
566                        pending_msgs,
567                        to_listener,
568                    },
569                    Poll::Pending,
570                ),
571            },
572            r => (r, Poll::Pending),
573        };
574        *self = next_reservation;
575
576        poll_val
577    }
578}
579
580fn into_reserve_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ReserveError {
581    match e {
582        StreamUpgradeError::Timeout => {
583            outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
584        }
585        StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
586        StreamUpgradeError::NegotiationFailed => outbound_hop::ReserveError::Unsupported,
587        StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
588    }
589}
590
591fn into_connect_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ConnectError {
592    match e {
593        StreamUpgradeError::Timeout => {
594            outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
595        }
596        StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
597        StreamUpgradeError::NegotiationFailed => outbound_hop::ConnectError::Unsupported,
598        StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
599    }
600}