libp2p_core/transport/
upgrade.rs

1// Copyright 2017-2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Configuration of transport protocol upgrades.
22
23use std::{
24    error::Error,
25    fmt,
26    pin::Pin,
27    task::{Context, Poll},
28    time::Duration,
29};
30
31use futures::{prelude::*, ready};
32use libp2p_identity::PeerId;
33use multiaddr::Multiaddr;
34
35pub use crate::upgrade::Version;
36use crate::{
37    connection::ConnectedPoint,
38    muxing::{StreamMuxer, StreamMuxerBox},
39    transport::{
40        and_then::AndThen, boxed::boxed, timeout::TransportTimeout, DialOpts, ListenerId,
41        Transport, TransportError, TransportEvent,
42    },
43    upgrade::{
44        self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
45        OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
46    },
47    Negotiated,
48};
49
50/// A `Builder` facilitates upgrading of a [`Transport`] for use with
51/// a `Swarm`.
52///
53/// The upgrade process is defined by the following stages:
54///
55///    [`authenticate`](Builder::authenticate)`{1}`
56/// -> [`apply`](Authenticated::apply)`{*}`
57/// -> [`multiplex`](Authenticated::multiplex)`{1}`
58///
59/// It thus enforces the following invariants on every transport
60/// obtained from [`multiplex`](Authenticated::multiplex):
61///
62///   1. The transport must be [authenticated](Builder::authenticate) and
63///      [multiplexed](Authenticated::multiplex).
64///   2. Authentication must precede the negotiation of a multiplexer.
65///   3. Applying a multiplexer is the last step in the upgrade process.
66///   4. The [`Transport::Output`] conforms to the requirements of a `Swarm`, namely a tuple of a
67///      [`PeerId`] (from the authentication upgrade) and a [`StreamMuxer`] (from the multiplexing
68///      upgrade).
69#[derive(Clone)]
70pub struct Builder<T> {
71    inner: T,
72    version: upgrade::Version,
73}
74
75impl<T> Builder<T>
76where
77    T: Transport,
78    T::Error: 'static,
79{
80    /// Creates a `Builder` over the given (base) `Transport`.
81    pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
82        Builder { inner, version }
83    }
84
85    /// Upgrades the transport to perform authentication of the remote.
86    ///
87    /// The supplied upgrade receives the I/O resource `C` and must
88    /// produce a pair `(PeerId, D)`, where `D` is a new I/O resource.
89    /// The upgrade must thus at a minimum identify the remote, which typically
90    /// involves the use of a cryptographic authentication protocol in the
91    /// context of establishing a secure channel.
92    ///
93    /// ## Transitions
94    ///
95    ///   * I/O upgrade: `C -> (PeerId, D)`.
96    ///   * Transport output: `C -> (PeerId, D)`
97    pub fn authenticate<C, D, U, E>(
98        self,
99        upgrade: U,
100    ) -> Authenticated<AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>>
101    where
102        T: Transport<Output = C>,
103        C: AsyncRead + AsyncWrite + Unpin,
104        D: AsyncRead + AsyncWrite + Unpin,
105        U: InboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
106        U: OutboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
107        E: Error + 'static,
108    {
109        let version = self.version;
110        Authenticated(Builder::new(
111            self.inner.and_then(move |conn, endpoint| Authenticate {
112                inner: upgrade::apply(conn, upgrade, endpoint, version),
113            }),
114            version,
115        ))
116    }
117}
118
119/// An upgrade that authenticates the remote peer, typically
120/// in the context of negotiating a secure channel.
121///
122/// Configured through [`Builder::authenticate`].
123#[pin_project::pin_project]
124pub struct Authenticate<C, U>
125where
126    C: AsyncRead + AsyncWrite + Unpin,
127    U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
128{
129    #[pin]
130    inner: EitherUpgrade<C, U>,
131}
132
133impl<C, U> Future for Authenticate<C, U>
134where
135    C: AsyncRead + AsyncWrite + Unpin,
136    U: InboundConnectionUpgrade<Negotiated<C>>
137        + OutboundConnectionUpgrade<
138            Negotiated<C>,
139            Output = <U as InboundConnectionUpgrade<Negotiated<C>>>::Output,
140            Error = <U as InboundConnectionUpgrade<Negotiated<C>>>::Error,
141        >,
142{
143    type Output = <EitherUpgrade<C, U> as Future>::Output;
144
145    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        let this = self.project();
147        Future::poll(this.inner, cx)
148    }
149}
150
151/// An upgrade that negotiates a (sub)stream multiplexer on
152/// top of an authenticated transport.
153///
154/// Configured through [`Authenticated::multiplex`].
155#[pin_project::pin_project]
156pub struct Multiplex<C, U>
157where
158    C: AsyncRead + AsyncWrite + Unpin,
159    U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
160{
161    peer_id: Option<PeerId>,
162    #[pin]
163    upgrade: EitherUpgrade<C, U>,
164}
165
166impl<C, U, M, E> Future for Multiplex<C, U>
167where
168    C: AsyncRead + AsyncWrite + Unpin,
169    U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
170    U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
171{
172    type Output = Result<(PeerId, M), UpgradeError<E>>;
173
174    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175        let this = self.project();
176        let m = match ready!(Future::poll(this.upgrade, cx)) {
177            Ok(m) => m,
178            Err(err) => return Poll::Ready(Err(err)),
179        };
180        let i = this
181            .peer_id
182            .take()
183            .expect("Multiplex future polled after completion.");
184        Poll::Ready(Ok((i, m)))
185    }
186}
187
188/// A transport with peer authentication, obtained from [`Builder::authenticate`].
189#[derive(Clone)]
190pub struct Authenticated<T>(Builder<T>);
191
192impl<T> Authenticated<T>
193where
194    T: Transport,
195    T::Error: 'static,
196{
197    /// Applies an arbitrary upgrade.
198    ///
199    /// The upgrade receives the I/O resource (i.e. connection) `C` and
200    /// must produce a new I/O resource `D`. Any number of such upgrades
201    /// can be performed.
202    ///
203    /// ## Transitions
204    ///
205    ///   * I/O upgrade: `C -> D`.
206    ///   * Transport output: `(PeerId, C) -> (PeerId, D)`.
207    pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
208    where
209        T: Transport<Output = (PeerId, C)>,
210        C: AsyncRead + AsyncWrite + Unpin,
211        D: AsyncRead + AsyncWrite + Unpin,
212        U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
213        U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
214        E: Error + 'static,
215    {
216        Authenticated(Builder::new(
217            Upgrade::new(self.0.inner, upgrade),
218            self.0.version,
219        ))
220    }
221
222    /// Upgrades the transport with a (sub)stream multiplexer.
223    ///
224    /// The supplied upgrade receives the I/O resource `C` and must
225    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
226    /// This ends the (regular) transport upgrade process.
227    ///
228    /// ## Transitions
229    ///
230    ///   * I/O upgrade: `C -> M`.
231    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
232    pub fn multiplex<C, M, U, E>(
233        self,
234        upgrade: U,
235    ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
236    where
237        T: Transport<Output = (PeerId, C)>,
238        C: AsyncRead + AsyncWrite + Unpin,
239        M: StreamMuxer,
240        U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
241        U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
242        E: Error + 'static,
243    {
244        let version = self.0.version;
245        Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
246            let upgrade = upgrade::apply(c, upgrade, endpoint, version);
247            Multiplex {
248                peer_id: Some(i),
249                upgrade,
250            }
251        }))
252    }
253
254    /// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
255    ///
256    /// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`]
257    /// and returns an upgrade which receives the I/O resource `C` and must
258    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
259    /// This ends the (regular) transport upgrade process.
260    ///
261    /// ## Transitions
262    ///
263    ///   * I/O upgrade: `C -> M`.
264    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
265    pub fn multiplex_ext<C, M, U, E, F>(
266        self,
267        up: F,
268    ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
269    where
270        T: Transport<Output = (PeerId, C)>,
271        C: AsyncRead + AsyncWrite + Unpin,
272        M: StreamMuxer,
273        U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
274        U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
275        E: Error + 'static,
276        F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone,
277    {
278        let version = self.0.version;
279        Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
280            let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
281            Multiplex {
282                peer_id: Some(peer_id),
283                upgrade,
284            }
285        }))
286    }
287}
288
289/// A authenticated and multiplexed transport, obtained from
290/// [`Authenticated::multiplex`].
291#[derive(Clone)]
292#[pin_project::pin_project]
293pub struct Multiplexed<T>(#[pin] T);
294
295impl<T> Multiplexed<T> {
296    /// Boxes the authenticated, multiplexed transport, including
297    /// the [`StreamMuxer`] and custom transport errors.
298    pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
299    where
300        T: Transport<Output = (PeerId, M)> + Sized + Send + Unpin + 'static,
301        T::Dial: Send + 'static,
302        T::ListenerUpgrade: Send + 'static,
303        T::Error: Send + Sync,
304        M: StreamMuxer + Send + 'static,
305        M::Substream: Send + 'static,
306        M::Error: Send + Sync + 'static,
307    {
308        boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
309    }
310
311    /// Adds a timeout to the setup and protocol upgrade process for all
312    /// inbound and outbound connections established through the transport.
313    pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
314        Multiplexed(TransportTimeout::new(self.0, timeout))
315    }
316
317    /// Adds a timeout to the setup and protocol upgrade process for all
318    /// outbound connections established through the transport.
319    pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
320        Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
321    }
322
323    /// Adds a timeout to the setup and protocol upgrade process for all
324    /// inbound connections established through the transport.
325    pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
326        Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
327    }
328}
329
330impl<T> Transport for Multiplexed<T>
331where
332    T: Transport,
333{
334    type Output = T::Output;
335    type Error = T::Error;
336    type ListenerUpgrade = T::ListenerUpgrade;
337    type Dial = T::Dial;
338
339    fn dial(
340        &mut self,
341        addr: Multiaddr,
342        opts: DialOpts,
343    ) -> Result<Self::Dial, TransportError<Self::Error>> {
344        self.0.dial(addr, opts)
345    }
346
347    fn remove_listener(&mut self, id: ListenerId) -> bool {
348        self.0.remove_listener(id)
349    }
350
351    fn listen_on(
352        &mut self,
353        id: ListenerId,
354        addr: Multiaddr,
355    ) -> Result<(), TransportError<Self::Error>> {
356        self.0.listen_on(id, addr)
357    }
358
359    fn poll(
360        self: Pin<&mut Self>,
361        cx: &mut Context<'_>,
362    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
363        self.project().0.poll(cx)
364    }
365}
366
367/// An inbound or outbound upgrade.
368type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
369
370/// A custom upgrade on an [`Authenticated`] transport.
371///
372/// See [`Transport::upgrade`]
373#[derive(Debug, Copy, Clone)]
374#[pin_project::pin_project]
375pub struct Upgrade<T, U> {
376    #[pin]
377    inner: T,
378    upgrade: U,
379}
380
381impl<T, U> Upgrade<T, U> {
382    pub fn new(inner: T, upgrade: U) -> Self {
383        Upgrade { inner, upgrade }
384    }
385}
386
387impl<T, C, D, U, E> Transport for Upgrade<T, U>
388where
389    T: Transport<Output = (PeerId, C)>,
390    T::Error: 'static,
391    C: AsyncRead + AsyncWrite + Unpin,
392    U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
393    U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
394    E: Error + 'static,
395{
396    type Output = (PeerId, D);
397    type Error = TransportUpgradeError<T::Error, E>;
398    type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
399    type Dial = DialUpgradeFuture<T::Dial, U, C>;
400
401    fn dial(
402        &mut self,
403        addr: Multiaddr,
404        opts: DialOpts,
405    ) -> Result<Self::Dial, TransportError<Self::Error>> {
406        let future = self
407            .inner
408            .dial(addr, opts)
409            .map_err(|err| err.map(TransportUpgradeError::Transport))?;
410        Ok(DialUpgradeFuture {
411            future: Box::pin(future),
412            upgrade: future::Either::Left(Some(self.upgrade.clone())),
413        })
414    }
415
416    fn remove_listener(&mut self, id: ListenerId) -> bool {
417        self.inner.remove_listener(id)
418    }
419
420    fn listen_on(
421        &mut self,
422        id: ListenerId,
423        addr: Multiaddr,
424    ) -> Result<(), TransportError<Self::Error>> {
425        self.inner
426            .listen_on(id, addr)
427            .map_err(|err| err.map(TransportUpgradeError::Transport))
428    }
429
430    fn poll(
431        self: Pin<&mut Self>,
432        cx: &mut Context<'_>,
433    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
434        let this = self.project();
435        let upgrade = this.upgrade.clone();
436        this.inner.poll(cx).map(|event| {
437            event
438                .map_upgrade(move |future| ListenerUpgradeFuture {
439                    future: Box::pin(future),
440                    upgrade: future::Either::Left(Some(upgrade)),
441                })
442                .map_err(TransportUpgradeError::Transport)
443        })
444    }
445}
446
447/// Errors produced by a transport upgrade.
448#[derive(Debug)]
449pub enum TransportUpgradeError<T, U> {
450    /// Error in the transport.
451    Transport(T),
452    /// Error while upgrading to a protocol.
453    Upgrade(UpgradeError<U>),
454}
455
456impl<T, U> fmt::Display for TransportUpgradeError<T, U>
457where
458    T: fmt::Display,
459    U: fmt::Display,
460{
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        match self {
463            TransportUpgradeError::Transport(e) => write!(f, "Transport error: {e}"),
464            TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {e}"),
465        }
466    }
467}
468
469impl<T, U> Error for TransportUpgradeError<T, U>
470where
471    T: Error + 'static,
472    U: Error + 'static,
473{
474    fn source(&self) -> Option<&(dyn Error + 'static)> {
475        match self {
476            TransportUpgradeError::Transport(e) => Some(e),
477            TransportUpgradeError::Upgrade(e) => Some(e),
478        }
479    }
480}
481
482/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
483pub struct DialUpgradeFuture<F, U, C>
484where
485    U: OutboundConnectionUpgrade<Negotiated<C>>,
486    C: AsyncRead + AsyncWrite + Unpin,
487{
488    future: Pin<Box<F>>,
489    upgrade: future::Either<Option<U>, (PeerId, OutboundUpgradeApply<C, U>)>,
490}
491
492impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
493where
494    F: TryFuture<Ok = (PeerId, C)>,
495    C: AsyncRead + AsyncWrite + Unpin,
496    U: OutboundConnectionUpgrade<Negotiated<C>, Output = D>,
497    U::Error: Error,
498{
499    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
500
501    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502        // We use a `this` variable because the compiler can't mutably borrow multiple times
503        // across a `Deref`.
504        let this = &mut *self;
505
506        loop {
507            this.upgrade = match this.upgrade {
508                future::Either::Left(ref mut up) => {
509                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
510                        .map_err(TransportUpgradeError::Transport))
511                    {
512                        Ok(v) => v,
513                        Err(err) => return Poll::Ready(Err(err)),
514                    };
515                    let u = up
516                        .take()
517                        .expect("DialUpgradeFuture is constructed with Either::Left(Some).");
518                    future::Either::Right((i, apply_outbound(c, u, upgrade::Version::V1)))
519                }
520                future::Either::Right((i, ref mut up)) => {
521                    let d = match ready!(
522                        Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)
523                    ) {
524                        Ok(d) => d,
525                        Err(err) => return Poll::Ready(Err(err)),
526                    };
527                    return Poll::Ready(Ok((i, d)));
528                }
529            }
530        }
531    }
532}
533
534impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
535where
536    U: OutboundConnectionUpgrade<Negotiated<C>>,
537    C: AsyncRead + AsyncWrite + Unpin,
538{
539}
540
541/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
542pub struct ListenerUpgradeFuture<F, U, C>
543where
544    C: AsyncRead + AsyncWrite + Unpin,
545    U: InboundConnectionUpgrade<Negotiated<C>>,
546{
547    future: Pin<Box<F>>,
548    upgrade: future::Either<Option<U>, (PeerId, InboundUpgradeApply<C, U>)>,
549}
550
551impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
552where
553    F: TryFuture<Ok = (PeerId, C)>,
554    C: AsyncRead + AsyncWrite + Unpin,
555    U: InboundConnectionUpgrade<Negotiated<C>, Output = D>,
556    U::Error: Error,
557{
558    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
559
560    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561        // We use a `this` variable because the compiler can't mutably borrow multiple times
562        // across a `Deref`.
563        let this = &mut *self;
564
565        loop {
566            this.upgrade = match this.upgrade {
567                future::Either::Left(ref mut up) => {
568                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
569                        .map_err(TransportUpgradeError::Transport))
570                    {
571                        Ok(v) => v,
572                        Err(err) => return Poll::Ready(Err(err)),
573                    };
574                    let u = up
575                        .take()
576                        .expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
577                    future::Either::Right((i, apply_inbound(c, u)))
578                }
579                future::Either::Right((i, ref mut up)) => {
580                    let d = match ready!(TryFuture::try_poll(Pin::new(up), cx)
581                        .map_err(TransportUpgradeError::Upgrade))
582                    {
583                        Ok(v) => v,
584                        Err(err) => return Poll::Ready(Err(err)),
585                    };
586                    return Poll::Ready(Ok((i, d)));
587                }
588            }
589        }
590    }
591}
592
593impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
594where
595    C: AsyncRead + AsyncWrite + Unpin,
596    U: InboundConnectionUpgrade<Negotiated<C>>,
597{
598}