libp2p_core/transport/
boxed.rs1use std::{
22 error::Error,
23 fmt, io,
24 pin::Pin,
25 task::{Context, Poll},
26};
27
28use futures::{prelude::*, stream::FusedStream};
29use multiaddr::Multiaddr;
30
31use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
32
33pub(crate) fn boxed<T>(transport: T) -> Boxed<T::Output>
35where
36 T: Transport + Send + Unpin + 'static,
37 T::Error: Send + Sync,
38 T::Dial: Send + 'static,
39 T::ListenerUpgrade: Send + 'static,
40{
41 Boxed {
42 inner: Box::new(transport) as Box<_>,
43 }
44}
45
46pub struct Boxed<O> {
50 inner: Box<dyn Abstract<O> + Send + Unpin>,
51}
52
53type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
54type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
55
56trait Abstract<O> {
57 fn listen_on(
58 &mut self,
59 id: ListenerId,
60 addr: Multiaddr,
61 ) -> Result<(), TransportError<io::Error>>;
62 fn remove_listener(&mut self, id: ListenerId) -> bool;
63 fn dial(
64 &mut self,
65 addr: Multiaddr,
66 opts: DialOpts,
67 ) -> Result<Dial<O>, TransportError<io::Error>>;
68 fn poll(
69 self: Pin<&mut Self>,
70 cx: &mut Context<'_>,
71 ) -> Poll<TransportEvent<ListenerUpgrade<O>, io::Error>>;
72}
73
74impl<T, O> Abstract<O> for T
75where
76 T: Transport<Output = O> + 'static,
77 T::Error: Send + Sync,
78 T::Dial: Send + 'static,
79 T::ListenerUpgrade: Send + 'static,
80{
81 fn listen_on(
82 &mut self,
83 id: ListenerId,
84 addr: Multiaddr,
85 ) -> Result<(), TransportError<io::Error>> {
86 Transport::listen_on(self, id, addr).map_err(|e| e.map(box_err))
87 }
88
89 fn remove_listener(&mut self, id: ListenerId) -> bool {
90 Transport::remove_listener(self, id)
91 }
92
93 fn dial(
94 &mut self,
95 addr: Multiaddr,
96 opts: DialOpts,
97 ) -> Result<Dial<O>, TransportError<io::Error>> {
98 let fut = Transport::dial(self, addr, opts)
99 .map(|r| r.map_err(box_err))
100 .map_err(|e| e.map(box_err))?;
101 Ok(Box::pin(fut) as Dial<_>)
102 }
103
104 fn poll(
105 self: Pin<&mut Self>,
106 cx: &mut Context<'_>,
107 ) -> Poll<TransportEvent<ListenerUpgrade<O>, io::Error>> {
108 self.poll(cx).map(|event| {
109 event
110 .map_upgrade(|upgrade| {
111 let up = upgrade.map_err(box_err);
112 Box::pin(up) as ListenerUpgrade<O>
113 })
114 .map_err(box_err)
115 })
116 }
117}
118
119impl<O> fmt::Debug for Boxed<O> {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(f, "BoxedTransport")
122 }
123}
124
125impl<O> Transport for Boxed<O> {
126 type Output = O;
127 type Error = io::Error;
128 type ListenerUpgrade = ListenerUpgrade<O>;
129 type Dial = Dial<O>;
130
131 fn listen_on(
132 &mut self,
133 id: ListenerId,
134 addr: Multiaddr,
135 ) -> Result<(), TransportError<Self::Error>> {
136 self.inner.listen_on(id, addr)
137 }
138
139 fn remove_listener(&mut self, id: ListenerId) -> bool {
140 self.inner.remove_listener(id)
141 }
142
143 fn dial(
144 &mut self,
145 addr: Multiaddr,
146 opts: DialOpts,
147 ) -> Result<Self::Dial, TransportError<Self::Error>> {
148 self.inner.dial(addr, opts)
149 }
150
151 fn poll(
152 mut self: Pin<&mut Self>,
153 cx: &mut Context<'_>,
154 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
155 Pin::new(self.inner.as_mut()).poll(cx)
156 }
157}
158
159impl<O> Stream for Boxed<O> {
160 type Item = TransportEvent<ListenerUpgrade<O>, io::Error>;
161
162 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163 Transport::poll(self, cx).map(Some)
164 }
165}
166
167impl<O> FusedStream for Boxed<O> {
168 fn is_terminated(&self) -> bool {
169 false
170 }
171}
172
173fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
174 io::Error::new(io::ErrorKind::Other, e)
175}