1use std::{
22 error,
23 marker::PhantomPinned,
24 pin::Pin,
25 task::{Context, Poll},
26};
27
28use either::Either;
29use futures::prelude::*;
30use multiaddr::Multiaddr;
31
32use crate::{
33 connection::ConnectedPoint,
34 transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
35};
36
37#[pin_project::pin_project]
39#[derive(Debug, Clone)]
40pub struct AndThen<T, C> {
41 #[pin]
42 transport: T,
43 fun: C,
44}
45
46impl<T, C> AndThen<T, C> {
47 pub(crate) fn new(transport: T, fun: C) -> Self {
48 AndThen { transport, fun }
49 }
50}
51
52impl<T, C, F, O> Transport for AndThen<T, C>
53where
54 T: Transport,
55 C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
56 F: TryFuture<Ok = O>,
57 F::Error: error::Error,
58{
59 type Output = O;
60 type Error = Either<T::Error, F::Error>;
61 type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
62 type Dial = AndThenFuture<T::Dial, C, F>;
63
64 fn listen_on(
65 &mut self,
66 id: ListenerId,
67 addr: Multiaddr,
68 ) -> Result<(), TransportError<Self::Error>> {
69 self.transport
70 .listen_on(id, addr)
71 .map_err(|err| err.map(Either::Left))
72 }
73
74 fn remove_listener(&mut self, id: ListenerId) -> bool {
75 self.transport.remove_listener(id)
76 }
77
78 fn dial(
79 &mut self,
80 addr: Multiaddr,
81 opts: DialOpts,
82 ) -> Result<Self::Dial, TransportError<Self::Error>> {
83 let dialed_fut = self
84 .transport
85 .dial(addr.clone(), opts)
86 .map_err(|err| err.map(Either::Left))?;
87 let future = AndThenFuture {
88 inner: Either::Left(Box::pin(dialed_fut)),
89 args: Some((
90 self.fun.clone(),
91 ConnectedPoint::Dialer {
92 address: addr,
93 role_override: opts.role,
94 port_use: opts.port_use,
95 },
96 )),
97 _marker: PhantomPinned,
98 };
99 Ok(future)
100 }
101
102 fn poll(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
106 let this = self.project();
107 match this.transport.poll(cx) {
108 Poll::Ready(TransportEvent::Incoming {
109 listener_id,
110 upgrade,
111 local_addr,
112 send_back_addr,
113 }) => {
114 let point = ConnectedPoint::Listener {
115 local_addr: local_addr.clone(),
116 send_back_addr: send_back_addr.clone(),
117 };
118 Poll::Ready(TransportEvent::Incoming {
119 listener_id,
120 upgrade: AndThenFuture {
121 inner: Either::Left(Box::pin(upgrade)),
122 args: Some((this.fun.clone(), point)),
123 _marker: PhantomPinned,
124 },
125 local_addr,
126 send_back_addr,
127 })
128 }
129 Poll::Ready(other) => {
130 let mapped = other
131 .map_upgrade(|_upgrade| unreachable!("case already matched"))
132 .map_err(Either::Left);
133 Poll::Ready(mapped)
134 }
135 Poll::Pending => Poll::Pending,
136 }
137 }
138}
139
140#[derive(Debug)]
144pub struct AndThenFuture<TFut, TMap, TMapOut> {
145 inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
146 args: Option<(TMap, ConnectedPoint)>,
147 _marker: PhantomPinned,
148}
149
150impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
151where
152 TFut: TryFuture,
153 TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
154 TMapOut: TryFuture,
155{
156 type Output = Result<TMapOut::Ok, Either<TFut::Error, TMapOut::Error>>;
157
158 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159 loop {
160 let future = match &mut self.inner {
161 Either::Left(future) => {
162 let item = match TryFuture::try_poll(future.as_mut(), cx) {
163 Poll::Ready(Ok(v)) => v,
164 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Left(err))),
165 Poll::Pending => return Poll::Pending,
166 };
167 let (f, a) = self
168 .args
169 .take()
170 .expect("AndThenFuture has already finished.");
171 f(item, a)
172 }
173 Either::Right(future) => {
174 return match TryFuture::try_poll(future.as_mut(), cx) {
175 Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
176 Poll::Ready(Err(err)) => return Poll::Ready(Err(Either::Right(err))),
177 Poll::Pending => Poll::Pending,
178 }
179 }
180 };
181
182 self.inner = Either::Right(Box::pin(future));
183 }
184 }
185}
186
187impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {}