libp2p_dcutr/handler/
relayed.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection.
22
23use std::{
24    collections::VecDeque,
25    io,
26    task::{Context, Poll},
27    time::Duration,
28};
29
30use either::Either;
31use futures::future;
32use libp2p_core::{
33    multiaddr::Multiaddr,
34    upgrade::{DeniedUpgrade, ReadyUpgrade},
35    ConnectedPoint,
36};
37use libp2p_swarm::{
38    handler::{
39        ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
40        ListenUpgradeError,
41    },
42    ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
43    SubstreamProtocol,
44};
45use protocol::{inbound, outbound};
46
47use crate::{behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS, protocol, PROTOCOL_NAME};
48
49#[derive(Debug)]
50pub enum Command {
51    Connect,
52}
53
54#[derive(Debug)]
55pub enum Event {
56    InboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
57    OutboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
58    InboundConnectFailed { error: inbound::Error },
59    OutboundConnectFailed { error: outbound::Error },
60}
61
62pub struct Handler {
63    endpoint: ConnectedPoint,
64    /// Queue of events to return when polled.
65    queued_events: VecDeque<
66        ConnectionHandlerEvent<
67            <Self as ConnectionHandler>::OutboundProtocol,
68            (),
69            <Self as ConnectionHandler>::ToBehaviour,
70        >,
71    >,
72
73    // Inbound DCUtR handshakes
74    inbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, inbound::Error>>,
75
76    // Outbound DCUtR handshake.
77    outbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, outbound::Error>>,
78
79    /// The addresses we will send to the other party for hole-punching attempts.
80    holepunch_candidates: Vec<Multiaddr>,
81
82    attempts: u8,
83}
84
85impl Handler {
86    pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
87        Self {
88            endpoint,
89            queued_events: Default::default(),
90            inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
91            outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
92            holepunch_candidates,
93            attempts: 0,
94        }
95    }
96
97    fn on_fully_negotiated_inbound(
98        &mut self,
99        FullyNegotiatedInbound {
100            protocol: output, ..
101        }: FullyNegotiatedInbound<<Self as ConnectionHandler>::InboundProtocol>,
102    ) {
103        match output {
104            future::Either::Left(stream) => {
105                if self
106                    .inbound_stream
107                    .try_push(inbound::handshake(
108                        stream,
109                        self.holepunch_candidates.clone(),
110                    ))
111                    .is_err()
112                {
113                    tracing::warn!(
114                        "New inbound connect stream while still upgrading previous one. Replacing previous with new.",
115                    );
116                }
117                self.attempts += 1;
118            }
119            // A connection listener denies all incoming substreams, thus none can ever be fully
120            // negotiated.
121            future::Either::Right(output) => libp2p_core::util::unreachable(output),
122        }
123    }
124
125    fn on_fully_negotiated_outbound(
126        &mut self,
127        FullyNegotiatedOutbound {
128            protocol: stream, ..
129        }: FullyNegotiatedOutbound<<Self as ConnectionHandler>::OutboundProtocol>,
130    ) {
131        assert!(
132            self.endpoint.is_listener(),
133            "A connection dialer never initiates a connection upgrade."
134        );
135        if self
136            .outbound_stream
137            .try_push(outbound::handshake(
138                stream,
139                self.holepunch_candidates.clone(),
140            ))
141            .is_err()
142        {
143            tracing::warn!(
144                "New outbound connect stream while still upgrading previous one. Replacing previous with new.",
145            );
146        }
147    }
148
149    fn on_listen_upgrade_error(
150        &mut self,
151        ListenUpgradeError { error, .. }: ListenUpgradeError<
152            (),
153            <Self as ConnectionHandler>::InboundProtocol,
154        >,
155    ) {
156        libp2p_core::util::unreachable(error.into_inner());
157    }
158
159    fn on_dial_upgrade_error(
160        &mut self,
161        DialUpgradeError { error, .. }: DialUpgradeError<
162            (),
163            <Self as ConnectionHandler>::OutboundProtocol,
164        >,
165    ) {
166        let error = match error {
167            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
168            StreamUpgradeError::NegotiationFailed => outbound::Error::Unsupported,
169            StreamUpgradeError::Io(e) => outbound::Error::Io(e),
170            StreamUpgradeError::Timeout => outbound::Error::Io(io::ErrorKind::TimedOut.into()),
171        };
172
173        self.queued_events
174            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
175                Event::OutboundConnectFailed { error },
176            ))
177    }
178}
179
180impl ConnectionHandler for Handler {
181    type FromBehaviour = Command;
182    type ToBehaviour = Event;
183    type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
184    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
185    type OutboundOpenInfo = ();
186    type InboundOpenInfo = ();
187
188    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
189        match self.endpoint {
190            ConnectedPoint::Dialer { .. } => {
191                SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ())
192            }
193            ConnectedPoint::Listener { .. } => {
194                // By the protocol specification the listening side of a relayed connection
195                // initiates the _direct connection upgrade_. In other words the listening side of
196                // the relayed connection opens a substream to the dialing side. (Connection roles
197                // and substream roles are reversed.) The listening side on a relayed connection
198                // never expects incoming substreams, hence the denied upgrade below.
199                SubstreamProtocol::new(Either::Right(DeniedUpgrade), ())
200            }
201        }
202    }
203
204    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
205        match event {
206            Command::Connect => {
207                self.queued_events
208                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
209                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
210                    });
211                self.attempts += 1;
212            }
213        }
214    }
215
216    fn connection_keep_alive(&self) -> bool {
217        if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
218            return true;
219        }
220
221        false
222    }
223
224    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
225    fn poll(
226        &mut self,
227        cx: &mut Context<'_>,
228    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
229        // Return queued events.
230        if let Some(event) = self.queued_events.pop_front() {
231            return Poll::Ready(event);
232        }
233
234        match self.inbound_stream.poll_unpin(cx) {
235            Poll::Ready(Ok(Ok(addresses))) => {
236                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
237                    Event::InboundConnectNegotiated {
238                        remote_addrs: addresses,
239                    },
240                ))
241            }
242            Poll::Ready(Ok(Err(error))) => {
243                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
244                    Event::InboundConnectFailed { error },
245                ))
246            }
247            Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
248                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
249                    Event::InboundConnectFailed {
250                        error: inbound::Error::Io(io::ErrorKind::TimedOut.into()),
251                    },
252                ))
253            }
254            Poll::Pending => {}
255        }
256
257        match self.outbound_stream.poll_unpin(cx) {
258            Poll::Ready(Ok(Ok(addresses))) => {
259                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
260                    Event::OutboundConnectNegotiated {
261                        remote_addrs: addresses,
262                    },
263                ))
264            }
265            Poll::Ready(Ok(Err(error))) => {
266                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
267                    Event::OutboundConnectFailed { error },
268                ))
269            }
270            Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
271                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
272                    Event::OutboundConnectFailed {
273                        error: outbound::Error::Io(io::ErrorKind::TimedOut.into()),
274                    },
275                ))
276            }
277            Poll::Pending => {}
278        }
279
280        Poll::Pending
281    }
282
283    fn on_connection_event(
284        &mut self,
285        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
286    ) {
287        match event {
288            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
289                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
290            }
291            ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
292                self.on_fully_negotiated_outbound(fully_negotiated_outbound)
293            }
294            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
295                self.on_listen_upgrade_error(listen_upgrade_error)
296            }
297            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
298                self.on_dial_upgrade_error(dial_upgrade_error)
299            }
300            _ => {}
301        }
302    }
303}