libp2p_relay/
priv_client.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`NetworkBehaviour`] to act as a circuit relay v2 **client**.
22
23pub(crate) mod handler;
24pub(crate) mod transport;
25
26use std::{
27    collections::{hash_map, HashMap, VecDeque},
28    convert::Infallible,
29    io::{Error, ErrorKind, IoSlice},
30    pin::Pin,
31    task::{Context, Poll},
32};
33
34use bytes::Bytes;
35use either::Either;
36use futures::{
37    channel::mpsc::Receiver,
38    future::{BoxFuture, FutureExt},
39    io::{AsyncRead, AsyncWrite},
40    ready,
41    stream::StreamExt,
42};
43use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
44use libp2p_identity::PeerId;
45use libp2p_swarm::{
46    behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
47    dial_opts::DialOpts,
48    dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
49    NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
50};
51use transport::Transport;
52
53use crate::{
54    multiaddr_ext::MultiaddrExt,
55    priv_client::handler::Handler,
56    protocol::{self, inbound_stop},
57};
58
59/// The events produced by the client `Behaviour`.
60#[derive(Debug)]
61pub enum Event {
62    /// An outbound reservation has been accepted.
63    ReservationReqAccepted {
64        relay_peer_id: PeerId,
65        /// Indicates whether the request replaces an existing reservation.
66        renewal: bool,
67        limit: Option<protocol::Limit>,
68    },
69    OutboundCircuitEstablished {
70        relay_peer_id: PeerId,
71        limit: Option<protocol::Limit>,
72    },
73    /// An inbound circuit has been established.
74    InboundCircuitEstablished {
75        src_peer_id: PeerId,
76        limit: Option<protocol::Limit>,
77    },
78}
79
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81enum ReservationStatus {
82    Pending,
83    Confirmed,
84}
85
86/// [`NetworkBehaviour`] implementation of the relay client
87/// functionality of the circuit relay v2 protocol.
88pub struct Behaviour {
89    local_peer_id: PeerId,
90
91    from_transport: Receiver<transport::TransportToBehaviourMsg>,
92    /// Set of directly connected peers, i.e. not connected via a relayed
93    /// connection.
94    directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
95
96    /// Stores the address of a pending or confirmed reservation.
97    ///
98    /// This is indexed by the [`ConnectionId`] to a relay server and the address is the
99    /// `/p2p-circuit` address we reserved on it.
100    reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,
101
102    /// Queue of actions to return when polled.
103    queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,
104
105    pending_handler_commands: HashMap<ConnectionId, handler::In>,
106}
107
108/// Create a new client relay [`Behaviour`] with it's corresponding [`Transport`].
109pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
110    let (transport, from_transport) = Transport::new();
111    let behaviour = Behaviour {
112        local_peer_id,
113        from_transport,
114        directly_connected_peers: Default::default(),
115        reservation_addresses: Default::default(),
116        queued_actions: Default::default(),
117        pending_handler_commands: Default::default(),
118    };
119    (transport, behaviour)
120}
121
122impl Behaviour {
123    fn on_connection_closed(
124        &mut self,
125        ConnectionClosed {
126            peer_id,
127            connection_id,
128            endpoint,
129            ..
130        }: ConnectionClosed,
131    ) {
132        if !endpoint.is_relayed() {
133            match self.directly_connected_peers.entry(peer_id) {
134                hash_map::Entry::Occupied(mut connections) => {
135                    let position = connections
136                        .get()
137                        .iter()
138                        .position(|c| c == &connection_id)
139                        .expect("Connection to be known.");
140                    connections.get_mut().remove(position);
141
142                    if connections.get().is_empty() {
143                        connections.remove();
144                    }
145                }
146                hash_map::Entry::Vacant(_) => {
147                    unreachable!("`on_connection_closed` for unconnected peer.")
148                }
149            };
150            if let Some((addr, ReservationStatus::Confirmed)) =
151                self.reservation_addresses.remove(&connection_id)
152            {
153                self.queued_actions
154                    .push_back(ToSwarm::ExternalAddrExpired(addr));
155            }
156        }
157    }
158}
159
160impl NetworkBehaviour for Behaviour {
161    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
162    type ToSwarm = Event;
163
164    fn handle_established_inbound_connection(
165        &mut self,
166        connection_id: ConnectionId,
167        peer: PeerId,
168        local_addr: &Multiaddr,
169        remote_addr: &Multiaddr,
170    ) -> Result<THandler<Self>, ConnectionDenied> {
171        let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
172
173        if local_addr.is_relayed() {
174            return Ok(Either::Right(dummy::ConnectionHandler));
175        }
176        let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone());
177
178        if let Some(event) = pending_handler_command {
179            handler.on_behaviour_event(event)
180        }
181
182        Ok(Either::Left(handler))
183    }
184
185    fn handle_established_outbound_connection(
186        &mut self,
187        connection_id: ConnectionId,
188        peer: PeerId,
189        addr: &Multiaddr,
190        _: Endpoint,
191        _: PortUse,
192    ) -> Result<THandler<Self>, ConnectionDenied> {
193        let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
194
195        if addr.is_relayed() {
196            return Ok(Either::Right(dummy::ConnectionHandler));
197        }
198
199        let mut handler = Handler::new(self.local_peer_id, peer, addr.clone());
200
201        if let Some(event) = pending_handler_command {
202            handler.on_behaviour_event(event)
203        }
204
205        Ok(Either::Left(handler))
206    }
207
208    fn on_swarm_event(&mut self, event: FromSwarm) {
209        match event {
210            FromSwarm::ConnectionEstablished(ConnectionEstablished {
211                peer_id,
212                connection_id,
213                endpoint,
214                ..
215            }) if !endpoint.is_relayed() => {
216                self.directly_connected_peers
217                    .entry(peer_id)
218                    .or_default()
219                    .push(connection_id);
220            }
221            FromSwarm::ConnectionClosed(connection_closed) => {
222                self.on_connection_closed(connection_closed)
223            }
224            FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
225                self.reservation_addresses.remove(&connection_id);
226                self.pending_handler_commands.remove(&connection_id);
227            }
228            _ => {}
229        }
230    }
231
232    fn on_connection_handler_event(
233        &mut self,
234        event_source: PeerId,
235        connection: ConnectionId,
236        handler_event: THandlerOutEvent<Self>,
237    ) {
238        let handler_event = match handler_event {
239            Either::Left(e) => e,
240            Either::Right(v) => libp2p_core::util::unreachable(v),
241        };
242
243        let event = match handler_event {
244            handler::Event::ReservationReqAccepted { renewal, limit } => {
245                let (addr, status) = self
246                    .reservation_addresses
247                    .get_mut(&connection)
248                    .expect("Relay connection exist");
249
250                if !renewal && *status == ReservationStatus::Pending {
251                    *status = ReservationStatus::Confirmed;
252                    self.queued_actions
253                        .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
254                }
255
256                Event::ReservationReqAccepted {
257                    relay_peer_id: event_source,
258                    renewal,
259                    limit,
260                }
261            }
262            handler::Event::OutboundCircuitEstablished { limit } => {
263                Event::OutboundCircuitEstablished {
264                    relay_peer_id: event_source,
265                    limit,
266                }
267            }
268            handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
269                Event::InboundCircuitEstablished { src_peer_id, limit }
270            }
271        };
272
273        self.queued_actions.push_back(ToSwarm::GenerateEvent(event));
274    }
275
276    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
277    fn poll(
278        &mut self,
279        cx: &mut Context<'_>,
280    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
281        if let Some(action) = self.queued_actions.pop_front() {
282            return Poll::Ready(action);
283        }
284
285        let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
286            Some(transport::TransportToBehaviourMsg::ListenReq {
287                relay_peer_id,
288                relay_addr,
289                to_listener,
290            }) => {
291                match self
292                    .directly_connected_peers
293                    .get(&relay_peer_id)
294                    .and_then(|cs| cs.first())
295                {
296                    Some(connection_id) => {
297                        self.reservation_addresses.insert(
298                            *connection_id,
299                            (
300                                relay_addr
301                                    .with(Protocol::P2p(relay_peer_id))
302                                    .with(Protocol::P2pCircuit)
303                                    .with(Protocol::P2p(self.local_peer_id)),
304                                ReservationStatus::Pending,
305                            ),
306                        );
307
308                        ToSwarm::NotifyHandler {
309                            peer_id: relay_peer_id,
310                            handler: NotifyHandler::One(*connection_id),
311                            event: Either::Left(handler::In::Reserve { to_listener }),
312                        }
313                    }
314                    None => {
315                        let opts = DialOpts::peer_id(relay_peer_id)
316                            .addresses(vec![relay_addr.clone()])
317                            .extend_addresses_through_behaviour()
318                            .build();
319                        let relayed_connection_id = opts.connection_id();
320
321                        self.reservation_addresses.insert(
322                            relayed_connection_id,
323                            (
324                                relay_addr
325                                    .with(Protocol::P2p(relay_peer_id))
326                                    .with(Protocol::P2pCircuit)
327                                    .with(Protocol::P2p(self.local_peer_id)),
328                                ReservationStatus::Pending,
329                            ),
330                        );
331
332                        self.pending_handler_commands
333                            .insert(relayed_connection_id, handler::In::Reserve { to_listener });
334                        ToSwarm::Dial { opts }
335                    }
336                }
337            }
338            Some(transport::TransportToBehaviourMsg::DialReq {
339                relay_addr,
340                relay_peer_id,
341                dst_peer_id,
342                send_back,
343                ..
344            }) => {
345                match self
346                    .directly_connected_peers
347                    .get(&relay_peer_id)
348                    .and_then(|cs| cs.first())
349                {
350                    Some(connection_id) => ToSwarm::NotifyHandler {
351                        peer_id: relay_peer_id,
352                        handler: NotifyHandler::One(*connection_id),
353                        event: Either::Left(handler::In::EstablishCircuit {
354                            to_dial: send_back,
355                            dst_peer_id,
356                        }),
357                    },
358                    None => {
359                        let opts = DialOpts::peer_id(relay_peer_id)
360                            .addresses(vec![relay_addr])
361                            .extend_addresses_through_behaviour()
362                            .build();
363                        let connection_id = opts.connection_id();
364
365                        self.pending_handler_commands.insert(
366                            connection_id,
367                            handler::In::EstablishCircuit {
368                                to_dial: send_back,
369                                dst_peer_id,
370                            },
371                        );
372
373                        ToSwarm::Dial { opts }
374                    }
375                }
376            }
377            None => unreachable!(
378                "`relay::Behaviour` polled after channel from \
379                     `Transport` has been closed. Unreachable under \
380                     the assumption that the `client::Behaviour` is never polled after \
381                     `client::Transport` is dropped.",
382            ),
383        };
384
385        Poll::Ready(action)
386    }
387}
388
389/// Represents a connection to another peer via a relay.
390///
391/// Internally, this uses a stream to the relay.
392pub struct Connection {
393    pub(crate) state: ConnectionState,
394}
395
396pub(crate) enum ConnectionState {
397    InboundAccepting {
398        accept: BoxFuture<'static, Result<ConnectionState, Error>>,
399    },
400    Operational {
401        read_buffer: Bytes,
402        substream: Stream,
403    },
404}
405
406impl Unpin for ConnectionState {}
407
408impl ConnectionState {
409    pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
410        ConnectionState::InboundAccepting {
411            accept: async {
412                let (substream, read_buffer) = circuit
413                    .accept()
414                    .await
415                    .map_err(|e| Error::new(ErrorKind::Other, e))?;
416                Ok(ConnectionState::Operational {
417                    read_buffer,
418                    substream,
419                })
420            }
421            .boxed(),
422        }
423    }
424
425    pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
426        ConnectionState::Operational {
427            substream,
428            read_buffer,
429        }
430    }
431}
432
433impl AsyncWrite for Connection {
434    fn poll_write(
435        mut self: Pin<&mut Self>,
436        cx: &mut Context,
437        buf: &[u8],
438    ) -> Poll<Result<usize, Error>> {
439        loop {
440            match &mut self.state {
441                ConnectionState::InboundAccepting { accept } => {
442                    *self = Connection {
443                        state: ready!(accept.poll_unpin(cx))?,
444                    };
445                }
446                ConnectionState::Operational { substream, .. } => {
447                    return Pin::new(substream).poll_write(cx, buf);
448                }
449            }
450        }
451    }
452    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
453        loop {
454            match &mut self.state {
455                ConnectionState::InboundAccepting { accept } => {
456                    *self = Connection {
457                        state: ready!(accept.poll_unpin(cx))?,
458                    };
459                }
460                ConnectionState::Operational { substream, .. } => {
461                    return Pin::new(substream).poll_flush(cx);
462                }
463            }
464        }
465    }
466    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
467        loop {
468            match &mut self.state {
469                ConnectionState::InboundAccepting { accept } => {
470                    *self = Connection {
471                        state: ready!(accept.poll_unpin(cx))?,
472                    };
473                }
474                ConnectionState::Operational { substream, .. } => {
475                    return Pin::new(substream).poll_close(cx);
476                }
477            }
478        }
479    }
480
481    fn poll_write_vectored(
482        mut self: Pin<&mut Self>,
483        cx: &mut Context,
484        bufs: &[IoSlice],
485    ) -> Poll<Result<usize, Error>> {
486        loop {
487            match &mut self.state {
488                ConnectionState::InboundAccepting { accept } => {
489                    *self = Connection {
490                        state: ready!(accept.poll_unpin(cx))?,
491                    };
492                }
493                ConnectionState::Operational { substream, .. } => {
494                    return Pin::new(substream).poll_write_vectored(cx, bufs);
495                }
496            }
497        }
498    }
499}
500
501impl AsyncRead for Connection {
502    fn poll_read(
503        mut self: Pin<&mut Self>,
504        cx: &mut Context<'_>,
505        buf: &mut [u8],
506    ) -> Poll<Result<usize, Error>> {
507        loop {
508            match &mut self.state {
509                ConnectionState::InboundAccepting { accept } => {
510                    *self = Connection {
511                        state: ready!(accept.poll_unpin(cx))?,
512                    };
513                }
514                ConnectionState::Operational {
515                    read_buffer,
516                    substream,
517                    ..
518                } => {
519                    if !read_buffer.is_empty() {
520                        let n = std::cmp::min(read_buffer.len(), buf.len());
521                        let data = read_buffer.split_to(n);
522                        buf[0..n].copy_from_slice(&data[..]);
523                        return Poll::Ready(Ok(n));
524                    }
525
526                    return Pin::new(substream).poll_read(cx, buf);
527                }
528            }
529        }
530    }
531}