libp2p_core/transport/
and_then.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    error,
23    marker::PhantomPinned,
24    pin::Pin,
25    task::{Context, Poll},
26};
27
28use either::Either;
29use futures::prelude::*;
30use multiaddr::Multiaddr;
31
32use crate::{
33    connection::ConnectedPoint,
34    transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
35};
36
37/// See the [`Transport::and_then`] method.
38#[pin_project::pin_project]
39#[derive(Debug, Clone)]
40pub struct AndThen<T, C> {
41    #[pin]
42    transport: T,
43    fun: C,
44}
45
46impl<T, C> AndThen<T, C> {
47    pub(crate) fn new(transport: T, fun: C) -> Self {
48        AndThen { transport, fun }
49    }
50}
51
52impl<T, C, F, O> Transport for AndThen<T, C>
53where
54    T: Transport,
55    C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
56    F: TryFuture<Ok = O>,
57    F::Error: error::Error,
58{
59    type Output = O;
60    type Error = Either<T::Error, F::Error>;
61    type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
62    type Dial = AndThenFuture<T::Dial, C, F>;
63
64    fn listen_on(
65        &mut self,
66        id: ListenerId,
67        addr: Multiaddr,
68    ) -> Result<(), TransportError<Self::Error>> {
69        self.transport
70            .listen_on(id, addr)
71            .map_err(|err| err.map(Either::Left))
72    }
73
74    fn remove_listener(&mut self, id: ListenerId) -> bool {
75        self.transport.remove_listener(id)
76    }
77
78    fn dial(
79        &mut self,
80        addr: Multiaddr,
81        opts: DialOpts,
82    ) -> Result<Self::Dial, TransportError<Self::Error>> {
83        let dialed_fut = self
84            .transport
85            .dial(addr.clone(), opts)
86            .map_err(|err| err.map(Either::Left))?;
87        let future = AndThenFuture {
88            inner: Either::Left(Box::pin(dialed_fut)),
89            args: Some((
90                self.fun.clone(),
91                ConnectedPoint::Dialer {
92                    address: addr,
93                    role_override: opts.role,
94                    port_use: opts.port_use,
95                },
96            )),
97            _marker: PhantomPinned,
98        };
99        Ok(future)
100    }
101
102    fn poll(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
106        let this = self.project();
107        match this.transport.poll(cx) {
108            Poll::Ready(TransportEvent::Incoming {
109                listener_id,
110                upgrade,
111                local_addr,
112                send_back_addr,
113            }) => {
114                let point = ConnectedPoint::Listener {
115                    local_addr: local_addr.clone(),
116                    send_back_addr: send_back_addr.clone(),
117                };
118                Poll::Ready(TransportEvent::Incoming {
119                    listener_id,
120                    upgrade: AndThenFuture {
121                        inner: Either::Left(Box::pin(upgrade)),
122                        args: Some((this.fun.clone(), point)),
123                        _marker: PhantomPinned,
124                    },
125                    local_addr,
126                    send_back_addr,
127                })
128            }
129            Poll::Ready(other) => {
130                let mapped = other
131                    .map_upgrade(|_upgrade| unreachable!("case already matched"))
132                    .map_err(Either::Left);
133                Poll::Ready(mapped)
134            }
135            Poll::Pending => Poll::Pending,
136        }
137    }
138}
139
140/// Custom `Future` to avoid boxing.
141///
142/// Applies a function to the result of the inner future.
143#[derive(Debug)]
144pub struct AndThenFuture<TFut, TMap, TMapOut> {
145    inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
146    args: Option<(TMap, ConnectedPoint)>,
147    _marker: PhantomPinned,
148}
149
150impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
151where
152    TFut: TryFuture,
153    TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
154    TMapOut: TryFuture,
155{
156    type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
157
158    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159        loop {
160            let future = match &mut self.inner {
161                Either::Left(future) => {
162                    let item = match TryFuture::try_poll(future.as_mut(), cx) {
163                        Poll::Ready(Ok(v)) => v,
164                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
165                        Poll::Pending => return Poll::Pending,
166                    };
167                    let (f, a) = self
168                        .args
169                        .take()
170                        .expect("AndThenFuture has already finished.");
171                    f(item, a)
172                }
173                Either::Right(future) => {
174                    return match TryFuture::try_poll(future.as_mut(), cx) {
175                        Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
176                        Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
177                        Poll::Pending => Poll::Pending,
178                    }
179                }
180            };
181
182            self.inner = Either::Right(Box::pin(future));
183        }
184    }
185}
186
187impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}