libp2p_mplex/
lib.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the Stream Multiplexer [Mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) protocol.
22
23#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25mod codec;
26mod config;
27mod io;
28
29use std::{
30    cmp, iter,
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34};
35
36use bytes::Bytes;
37use codec::LocalStreamId;
38pub use config::{Config, MaxBufferBehaviour};
39use futures::{prelude::*, ready};
40use libp2p_core::{
41    muxing::{StreamMuxer, StreamMuxerEvent},
42    upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo},
43};
44use parking_lot::Mutex;
45
46#[deprecated = "Use `Config` instead"]
47pub type MplexConfig = Config;
48
49impl UpgradeInfo for Config {
50    type Info = &'static str;
51    type InfoIter = iter::Once<Self::Info>;
52
53    fn protocol_info(&self) -> Self::InfoIter {
54        iter::once(self.protocol_name)
55    }
56}
57
58impl<C> InboundConnectionUpgrade<C> for Config
59where
60    C: AsyncRead + AsyncWrite + Unpin,
61{
62    type Output = Multiplex<C>;
63    type Error = io::Error;
64    type Future = future::Ready<Result<Self::Output, io::Error>>;
65
66    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
67        future::ready(Ok(Multiplex {
68            #[allow(unknown_lints, clippy::arc_with_non_send_sync)] // `T` is not enforced to be `Send` but we don't want to constrain it either.
69            io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
70        }))
71    }
72}
73
74impl<C> OutboundConnectionUpgrade<C> for Config
75where
76    C: AsyncRead + AsyncWrite + Unpin,
77{
78    type Output = Multiplex<C>;
79    type Error = io::Error;
80    type Future = future::Ready<Result<Self::Output, io::Error>>;
81
82    fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
83        future::ready(Ok(Multiplex {
84            #[allow(unknown_lints, clippy::arc_with_non_send_sync)] // `T` is not enforced to be `Send` but we don't want to constrain it either.
85            io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
86        }))
87    }
88}
89
90/// Multiplexer. Implements the `StreamMuxer` trait.
91pub struct Multiplex<C> {
92    io: Arc<Mutex<io::Multiplexed<C>>>,
93}
94
95impl<C> StreamMuxer for Multiplex<C>
96where
97    C: AsyncRead + AsyncWrite + Unpin,
98{
99    type Substream = Substream<C>;
100    type Error = io::Error;
101
102    fn poll_inbound(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105    ) -> Poll<Result<Self::Substream, Self::Error>> {
106        self.io
107            .lock()
108            .poll_next_stream(cx)
109            .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
110    }
111
112    fn poll_outbound(
113        self: Pin<&mut Self>,
114        cx: &mut Context<'_>,
115    ) -> Poll<Result<Self::Substream, Self::Error>> {
116        self.io
117            .lock()
118            .poll_open_stream(cx)
119            .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
120    }
121
122    fn poll(
123        self: Pin<&mut Self>,
124        _: &mut Context<'_>,
125    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
126        Poll::Pending
127    }
128
129    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
130        self.io.lock().poll_close(cx)
131    }
132}
133
134impl<C> AsyncRead for Substream<C>
135where
136    C: AsyncRead + AsyncWrite + Unpin,
137{
138    fn poll_read(
139        self: Pin<&mut Self>,
140        cx: &mut Context<'_>,
141        buf: &mut [u8],
142    ) -> Poll<io::Result<usize>> {
143        let this = self.get_mut();
144
145        loop {
146            // Try to read from the current (i.e. last received) frame.
147            if !this.current_data.is_empty() {
148                let len = cmp::min(this.current_data.len(), buf.len());
149                buf[..len].copy_from_slice(&this.current_data.split_to(len));
150                return Poll::Ready(Ok(len));
151            }
152
153            // Read the next data frame from the multiplexed stream.
154            match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
155                Some(data) => {
156                    this.current_data = data;
157                }
158                None => return Poll::Ready(Ok(0)),
159            }
160        }
161    }
162}
163
164impl<C> AsyncWrite for Substream<C>
165where
166    C: AsyncRead + AsyncWrite + Unpin,
167{
168    fn poll_write(
169        self: Pin<&mut Self>,
170        cx: &mut Context<'_>,
171        buf: &[u8],
172    ) -> Poll<io::Result<usize>> {
173        let this = self.get_mut();
174
175        this.io.lock().poll_write_stream(cx, this.id, buf)
176    }
177
178    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
179        let this = self.get_mut();
180
181        this.io.lock().poll_flush_stream(cx, this.id)
182    }
183
184    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
185        let this = self.get_mut();
186        let mut io = this.io.lock();
187
188        ready!(io.poll_close_stream(cx, this.id))?;
189        ready!(io.poll_flush_stream(cx, this.id))?;
190
191        Poll::Ready(Ok(()))
192    }
193}
194
195/// Active substream to the remote.
196pub struct Substream<C>
197where
198    C: AsyncRead + AsyncWrite + Unpin,
199{
200    /// The unique, local identifier of the substream.
201    id: LocalStreamId,
202    /// The current data frame the substream is reading from.
203    current_data: Bytes,
204    /// Shared reference to the actual muxer.
205    io: Arc<Mutex<io::Multiplexed<C>>>,
206}
207
208impl<C> Substream<C>
209where
210    C: AsyncRead + AsyncWrite + Unpin,
211{
212    fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
213        Self {
214            id,
215            current_data: Bytes::new(),
216            io,
217        }
218    }
219}
220
221impl<C> Drop for Substream<C>
222where
223    C: AsyncRead + AsyncWrite + Unpin,
224{
225    fn drop(&mut self) {
226        self.io.lock().drop_stream(self.id);
227    }
228}