libp2p_core/transport/
map.rs1use std::{
22 pin::Pin,
23 task::{Context, Poll},
24};
25
26use futures::prelude::*;
27use multiaddr::Multiaddr;
28
29use super::ListenerId;
30use crate::{
31 connection::ConnectedPoint,
32 transport::{DialOpts, Transport, TransportError, TransportEvent},
33};
34
35#[derive(Debug, Copy, Clone)]
37#[pin_project::pin_project]
38pub struct Map<T, F> {
39 #[pin]
40 transport: T,
41 fun: F,
42}
43
44impl<T, F> Map<T, F> {
45 pub(crate) fn new(transport: T, fun: F) -> Self {
46 Map { transport, fun }
47 }
48
49 pub fn inner(&self) -> &T {
50 &self.transport
51 }
52
53 pub fn inner_mut(&mut self) -> &mut T {
54 &mut self.transport
55 }
56}
57
58impl<T, F, D> Transport for Map<T, F>
59where
60 T: Transport,
61 F: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
62{
63 type Output = D;
64 type Error = T::Error;
65 type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
66 type Dial = MapFuture<T::Dial, F>;
67
68 fn listen_on(
69 &mut self,
70 id: ListenerId,
71 addr: Multiaddr,
72 ) -> Result<(), TransportError<Self::Error>> {
73 self.transport.listen_on(id, addr)
74 }
75
76 fn remove_listener(&mut self, id: ListenerId) -> bool {
77 self.transport.remove_listener(id)
78 }
79
80 fn dial(
81 &mut self,
82 addr: Multiaddr,
83 opts: DialOpts,
84 ) -> Result<Self::Dial, TransportError<Self::Error>> {
85 let future = self.transport.dial(addr.clone(), opts)?;
86 let p = ConnectedPoint::Dialer {
87 address: addr,
88 role_override: opts.role,
89 port_use: opts.port_use,
90 };
91 Ok(MapFuture {
92 inner: future,
93 args: Some((self.fun.clone(), p)),
94 })
95 }
96
97 fn poll(
98 self: Pin<&mut Self>,
99 cx: &mut Context<'_>,
100 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
101 let this = self.project();
102 match this.transport.poll(cx) {
103 Poll::Ready(TransportEvent::Incoming {
104 listener_id,
105 upgrade,
106 local_addr,
107 send_back_addr,
108 }) => {
109 let point = ConnectedPoint::Listener {
110 local_addr: local_addr.clone(),
111 send_back_addr: send_back_addr.clone(),
112 };
113 Poll::Ready(TransportEvent::Incoming {
114 listener_id,
115 upgrade: MapFuture {
116 inner: upgrade,
117 args: Some((this.fun.clone(), point)),
118 },
119 local_addr,
120 send_back_addr,
121 })
122 }
123 Poll::Ready(other) => {
124 let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched"));
125 Poll::Ready(mapped)
126 }
127 Poll::Pending => Poll::Pending,
128 }
129 }
130}
131
132#[pin_project::pin_project]
136#[derive(Clone, Debug)]
137pub struct MapFuture<T, F> {
138 #[pin]
139 inner: T,
140 args: Option<(F, ConnectedPoint)>,
141}
142
143impl<T, A, F, B> Future for MapFuture<T, F>
144where
145 T: TryFuture<Ok = A>,
146 F: FnOnce(A, ConnectedPoint) -> B,
147{
148 type Output = Result<B, T::Error>;
149
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 let this = self.project();
152 let item = match TryFuture::try_poll(this.inner, cx) {
153 Poll::Pending => return Poll::Pending,
154 Poll::Ready(Ok(v)) => v,
155 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
156 };
157 let (f, a) = this.args.take().expect("MapFuture has already finished.");
158 Poll::Ready(Ok(f(item, a)))
159 }
160}