libp2p_core/transport/
map.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    pin::Pin,
23    task::{Context, Poll},
24};
25
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28
29use super::ListenerId;
30use crate::{
31    connection::ConnectedPoint,
32    transport::{DialOpts, Transport, TransportError, TransportEvent},
33};
34
35/// See `Transport::map`.
36#[derive(Debug, Copy, Clone)]
37#[pin_project::pin_project]
38pub struct Map<T, F> {
39    #[pin]
40    transport: T,
41    fun: F,
42}
43
44impl<T, F> Map<T, F> {
45    pub(crate) fn new(transport: T, fun: F) -> Self {
46        Map { transport, fun }
47    }
48
49    pub fn inner(&self) -> &T {
50        &self.transport
51    }
52
53    pub fn inner_mut(&mut self) -> &mut T {
54        &mut self.transport
55    }
56}
57
58impl<T, F, D> Transport for Map<T, F>
59where
60    T: Transport,
61    F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
62{
63    type Output = D;
64    type Error = T::Error;
65    type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
66    type Dial = MapFuture<T::Dial, F>;
67
68    fn listen_on(
69        &mut self,
70        id: ListenerId,
71        addr: Multiaddr,
72    ) -> Result<(), TransportError<Self::Error>> {
73        self.transport.listen_on(id, addr)
74    }
75
76    fn remove_listener(&mut self, id: ListenerId) -> bool {
77        self.transport.remove_listener(id)
78    }
79
80    fn dial(
81        &mut self,
82        addr: Multiaddr,
83        opts: DialOpts,
84    ) -> Result<Self::Dial, TransportError<Self::Error>> {
85        let future = self.transport.dial(addr.clone(), opts)?;
86        let p = ConnectedPoint::Dialer {
87            address: addr,
88            role_override: opts.role,
89            port_use: opts.port_use,
90        };
91        Ok(MapFuture {
92            inner: future,
93            args: Some((self.fun.clone(), p)),
94        })
95    }
96
97    fn poll(
98        self: Pin<&mut Self>,
99        cx: &mut Context<'_>,
100    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
101        let this = self.project();
102        match this.transport.poll(cx) {
103            Poll::Ready(TransportEvent::Incoming {
104                listener_id,
105                upgrade,
106                local_addr,
107                send_back_addr,
108            }) => {
109                let point = ConnectedPoint::Listener {
110                    local_addr: local_addr.clone(),
111                    send_back_addr: send_back_addr.clone(),
112                };
113                Poll::Ready(TransportEvent::Incoming {
114                    listener_id,
115                    upgrade: MapFuture {
116                        inner: upgrade,
117                        args: Some((this.fun.clone(), point)),
118                    },
119                    local_addr,
120                    send_back_addr,
121                })
122            }
123            Poll::Ready(other) => {
124                let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
125                Poll::Ready(mapped)
126            }
127            Poll::Pending => Poll::Pending,
128        }
129    }
130}
131
132/// Custom `Future` to avoid boxing.
133///
134/// Applies a function to the inner future's result.
135#[pin_project::pin_project]
136#[derive(Clone, Debug)]
137pub struct MapFuture<T, F> {
138    #[pin]
139    inner: T,
140    args: Option<(F, ConnectedPoint)>,
141}
142
143impl<T, A, F, B> Future for MapFuture<T, F>
144where
145    T: TryFuture<Ok = A>,
146    F: FnOnce(A, ConnectedPoint) -> B,
147{
148    type Output = Result<B, T::Error>;
149
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        let this = self.project();
152        let item = match TryFuture::try_poll(this.inner, cx) {
153            Poll::Pending => return Poll::Pending,
154            Poll::Ready(Ok(v)) => v,
155            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
156        };
157        let (f, a) = this.args.take().expect("MapFuture has already finished.");
158        Poll::Ready(Ok(f(item, a)))
159    }
160}