libp2p_core/muxing/
boxed.rs

1use std::{
2    error::Error,
3    fmt, io,
4    io::{IoSlice, IoSliceMut},
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::{AsyncRead, AsyncWrite};
10use pin_project::pin_project;
11
12use crate::muxing::{StreamMuxer, StreamMuxerEvent};
13
14/// Abstract `StreamMuxer`.
15pub struct StreamMuxerBox {
16    inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
17}
18
19impl fmt::Debug for StreamMuxerBox {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("StreamMuxerBox").finish_non_exhaustive()
22    }
23}
24
25/// Abstract type for asynchronous reading and writing.
26///
27/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
28/// and `AsyncWrite` capabilities.
29pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
30
31#[pin_project]
32struct Wrap<T>
33where
34    T: StreamMuxer,
35{
36    #[pin]
37    inner: T,
38}
39
40impl<T> StreamMuxer for Wrap<T>
41where
42    T: StreamMuxer,
43    T::Substream: Send + 'static,
44    T::Error: Send + Sync + 'static,
45{
46    type Substream = SubstreamBox;
47    type Error = io::Error;
48
49    fn poll_inbound(
50        self: Pin<&mut Self>,
51        cx: &mut Context<'_>,
52    ) -> Poll<Result<Self::Substream, Self::Error>> {
53        self.project()
54            .inner
55            .poll_inbound(cx)
56            .map_ok(SubstreamBox::new)
57            .map_err(into_io_error)
58    }
59
60    fn poll_outbound(
61        self: Pin<&mut Self>,
62        cx: &mut Context<'_>,
63    ) -> Poll<Result<Self::Substream, Self::Error>> {
64        self.project()
65            .inner
66            .poll_outbound(cx)
67            .map_ok(SubstreamBox::new)
68            .map_err(into_io_error)
69    }
70
71    #[inline]
72    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        self.project().inner.poll_close(cx).map_err(into_io_error)
74    }
75
76    fn poll(
77        self: Pin<&mut Self>,
78        cx: &mut Context<'_>,
79    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
80        self.project().inner.poll(cx).map_err(into_io_error)
81    }
82}
83
84fn into_io_error<E>(err: E) -> io::Error
85where
86    E: Error + Send + Sync + 'static,
87{
88    io::Error::new(io::ErrorKind::Other, err)
89}
90
91impl StreamMuxerBox {
92    /// Turns a stream muxer into a `StreamMuxerBox`.
93    pub fn new<T>(muxer: T) -> StreamMuxerBox
94    where
95        T: StreamMuxer + Send + 'static,
96        T::Substream: Send + 'static,
97        T::Error: Send + Sync + 'static,
98    {
99        let wrap = Wrap { inner: muxer };
100
101        StreamMuxerBox {
102            inner: Box::pin(wrap),
103        }
104    }
105
106    fn project(
107        self: Pin<&mut Self>,
108    ) -> Pin<&mut (dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send)> {
109        self.get_mut().inner.as_mut()
110    }
111}
112
113impl StreamMuxer for StreamMuxerBox {
114    type Substream = SubstreamBox;
115    type Error = io::Error;
116
117    fn poll_inbound(
118        self: Pin<&mut Self>,
119        cx: &mut Context<'_>,
120    ) -> Poll<Result<Self::Substream, Self::Error>> {
121        self.project().poll_inbound(cx)
122    }
123
124    fn poll_outbound(
125        self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127    ) -> Poll<Result<Self::Substream, Self::Error>> {
128        self.project().poll_outbound(cx)
129    }
130
131    #[inline]
132    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        self.project().poll_close(cx)
134    }
135
136    fn poll(
137        self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
140        self.project().poll(cx)
141    }
142}
143
144impl SubstreamBox {
145    /// Construct a new [`SubstreamBox`] from something
146    /// that implements [`AsyncRead`] and [`AsyncWrite`].
147    pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
148        Self(Box::pin(stream))
149    }
150}
151
152impl fmt::Debug for SubstreamBox {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        write!(f, "SubstreamBox({})", self.0.type_name())
155    }
156}
157
158/// Workaround because Rust does not allow `Box<dyn AsyncRead + AsyncWrite>`.
159trait AsyncReadWrite: AsyncRead + AsyncWrite {
160    /// Helper function to capture the erased inner type.
161    ///
162    /// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful.
163    fn type_name(&self) -> &'static str;
164}
165
166impl<S> AsyncReadWrite for S
167where
168    S: AsyncRead + AsyncWrite,
169{
170    fn type_name(&self) -> &'static str {
171        std::any::type_name::<S>()
172    }
173}
174
175impl AsyncRead for SubstreamBox {
176    fn poll_read(
177        mut self: Pin<&mut Self>,
178        cx: &mut Context<'_>,
179        buf: &mut [u8],
180    ) -> Poll<std::io::Result<usize>> {
181        self.0.as_mut().poll_read(cx, buf)
182    }
183
184    fn poll_read_vectored(
185        mut self: Pin<&mut Self>,
186        cx: &mut Context<'_>,
187        bufs: &mut [IoSliceMut<'_>],
188    ) -> Poll<std::io::Result<usize>> {
189        self.0.as_mut().poll_read_vectored(cx, bufs)
190    }
191}
192
193impl AsyncWrite for SubstreamBox {
194    fn poll_write(
195        mut self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197        buf: &[u8],
198    ) -> Poll<std::io::Result<usize>> {
199        self.0.as_mut().poll_write(cx, buf)
200    }
201
202    fn poll_write_vectored(
203        mut self: Pin<&mut Self>,
204        cx: &mut Context<'_>,
205        bufs: &[IoSlice<'_>],
206    ) -> Poll<std::io::Result<usize>> {
207        self.0.as_mut().poll_write_vectored(cx, bufs)
208    }
209
210    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
211        self.0.as_mut().poll_flush(cx)
212    }
213
214    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
215        self.0.as_mut().poll_close(cx)
216    }
217}