1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25pub mod error;
26pub mod framed;
27mod quicksink;
28pub mod tls;
29
30use std::{
31 io,
32 pin::Pin,
33 task::{Context, Poll},
34};
35
36use error::Error;
37use framed::{Connection, Incoming};
38use futures::{future::BoxFuture, prelude::*, ready};
39use libp2p_core::{
40 connection::ConnectedPoint,
41 multiaddr::Multiaddr,
42 transport::{map::MapFuture, DialOpts, ListenerId, TransportError, TransportEvent},
43 Transport,
44};
45use rw_stream_sink::RwStreamSink;
46
47#[deprecated = "Use `Config` instead"]
139pub type WsConfig<Transport> = Config<Transport>;
140
141#[derive(Debug)]
142pub struct Config<T: Transport>
143where
144 T: Transport,
145 T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static,
146{
147 transport: libp2p_core::transport::map::Map<framed::Config<T>, WrapperFn<T::Output>>,
148}
149
150impl<T: Transport> Config<T>
151where
152 T: Transport + Send + Unpin + 'static,
153 T::Error: Send + 'static,
154 T::Dial: Send + 'static,
155 T::ListenerUpgrade: Send + 'static,
156 T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static,
157{
158 pub fn new(transport: T) -> Self {
167 Self {
168 transport: framed::Config::new(transport).map(wrap_connection as WrapperFn<T::Output>),
169 }
170 }
171
172 pub fn max_redirects(&self) -> u8 {
174 self.transport.inner().max_redirects()
175 }
176
177 pub fn set_max_redirects(&mut self, max: u8) -> &mut Self {
179 self.transport.inner_mut().set_max_redirects(max);
180 self
181 }
182
183 pub fn max_data_size(&self) -> usize {
185 self.transport.inner().max_data_size()
186 }
187
188 pub fn set_max_data_size(&mut self, size: usize) -> &mut Self {
190 self.transport.inner_mut().set_max_data_size(size);
191 self
192 }
193
194 pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self {
196 self.transport.inner_mut().set_tls_config(c);
197 self
198 }
199}
200
201impl<T> Transport for Config<T>
202where
203 T: Transport + Send + Unpin + 'static,
204 T::Error: Send + 'static,
205 T::Dial: Send + 'static,
206 T::ListenerUpgrade: Send + 'static,
207 T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static,
208{
209 type Output = RwStreamSink<BytesConnection<T::Output>>;
210 type Error = Error<T::Error>;
211 type ListenerUpgrade = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
212 type Dial = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
213
214 fn listen_on(
215 &mut self,
216 id: ListenerId,
217 addr: Multiaddr,
218 ) -> Result<(), TransportError<Self::Error>> {
219 self.transport.listen_on(id, addr)
220 }
221
222 fn remove_listener(&mut self, id: ListenerId) -> bool {
223 self.transport.remove_listener(id)
224 }
225
226 fn dial(
227 &mut self,
228 addr: Multiaddr,
229 opts: DialOpts,
230 ) -> Result<Self::Dial, TransportError<Self::Error>> {
231 self.transport.dial(addr, opts)
232 }
233
234 fn poll(
235 mut self: Pin<&mut Self>,
236 cx: &mut Context<'_>,
237 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
238 Pin::new(&mut self.transport).poll(cx)
239 }
240}
241
242pub type InnerFuture<T, E> = BoxFuture<'static, Result<Connection<T>, Error<E>>>;
244
245pub type WrapperFn<T> = fn(Connection<T>, ConnectedPoint) -> RwStreamSink<BytesConnection<T>>;
247
248fn wrap_connection<T>(c: Connection<T>, _: ConnectedPoint) -> RwStreamSink<BytesConnection<T>>
251where
252 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
253{
254 RwStreamSink::new(BytesConnection(c))
255}
256
257#[derive(Debug)]
259pub struct BytesConnection<T>(Connection<T>);
260
261impl<T> Stream for BytesConnection<T>
262where
263 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
264{
265 type Item = io::Result<Vec<u8>>;
266
267 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268 loop {
269 if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
270 if let Incoming::Data(payload) = item {
271 return Poll::Ready(Some(Ok(payload.into_bytes())));
272 }
273 } else {
274 return Poll::Ready(None);
275 }
276 }
277 }
278}
279
280impl<T> Sink<Vec<u8>> for BytesConnection<T>
281where
282 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
283{
284 type Error = io::Error;
285
286 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
287 Pin::new(&mut self.0).poll_ready(cx)
288 }
289
290 fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> io::Result<()> {
291 Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item))
292 }
293
294 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
295 Pin::new(&mut self.0).poll_flush(cx)
296 }
297
298 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
299 Pin::new(&mut self.0).poll_close(cx)
300 }
301}
302
303#[cfg(test)]
306mod tests {
307 use futures::prelude::*;
308 use libp2p_core::{
309 multiaddr::Protocol,
310 transport::{DialOpts, ListenerId, PortUse},
311 Endpoint, Multiaddr, Transport,
312 };
313 use libp2p_identity::PeerId;
314 use libp2p_tcp as tcp;
315
316 use super::Config;
317
318 #[test]
319 fn dialer_connects_to_listener_ipv4() {
320 let a = "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap();
321 futures::executor::block_on(connect(a))
322 }
323
324 #[test]
325 fn dialer_connects_to_listener_ipv6() {
326 let a = "/ip6/::1/tcp/0/ws".parse().unwrap();
327 futures::executor::block_on(connect(a))
328 }
329
330 fn new_ws_config() -> Config<tcp::async_io::Transport> {
331 Config::new(tcp::async_io::Transport::new(tcp::Config::default()))
332 }
333
334 async fn connect(listen_addr: Multiaddr) {
335 let mut ws_config = new_ws_config().boxed();
336 ws_config
337 .listen_on(ListenerId::next(), listen_addr)
338 .expect("listener");
339
340 let addr = ws_config
341 .next()
342 .await
343 .expect("no error")
344 .into_new_address()
345 .expect("listen address");
346
347 assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
348 assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
349
350 let inbound = async move {
351 let (conn, _addr) = ws_config
352 .select_next_some()
353 .map(|ev| ev.into_incoming())
354 .await
355 .unwrap();
356 conn.await
357 };
358
359 let outbound = new_ws_config()
360 .boxed()
361 .dial(
362 addr.with(Protocol::P2p(PeerId::random())),
363 DialOpts {
364 role: Endpoint::Dialer,
365 port_use: PortUse::New,
366 },
367 )
368 .unwrap();
369
370 let (a, b) = futures::join!(inbound, outbound);
371 a.and(b).unwrap();
372 }
373}