libp2p_core/muxing.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Muxing is the process of splitting a connection into multiple substreams.
22//!
23//! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
24//! has ownership of a connection, lets you open and close substreams.
25//!
26//! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
27//! > is managed by the library's internals.
28//!
29//! Each substream of a connection is an isolated stream of data. All the substreams are muxed
30//! together so that the data read from or written to each substream doesn't influence the other
31//! substreams.
32//!
33//! In the context of libp2p, each substream can use a different protocol. Contrary to opening a
34//! connection, opening a substream is almost free in terms of resources. This means that you
35//! shouldn't hesitate to rapidly open and close substreams, and to design protocols that don't
36//! require maintaining long-lived channels of communication.
37//!
38//! > **Example**: The Kademlia protocol opens a new substream for each request it wants to
39//! > perform. Multiple requests can be performed simultaneously by opening multiple
40//! > substreams, without having to worry about associating responses with the
41//! > right request.
42//!
43//! # Implementing a muxing protocol
44//!
45//! In order to implement a muxing protocol, create an object that implements the `UpgradeInfo`,
46//! `InboundUpgrade` and `OutboundUpgrade` traits. See the `upgrade` module for more information.
47//! The `Output` associated type of the `InboundUpgrade` and `OutboundUpgrade` traits should be
48//! identical, and should be an object that implements the `StreamMuxer` trait.
49//!
50//! The upgrade process will take ownership of the connection, which makes it possible for the
51//! implementation of `StreamMuxer` to control everything that happens on the wire.
52
53use std::{future::Future, pin::Pin};
54
55use futures::{
56 task::{Context, Poll},
57 AsyncRead, AsyncWrite,
58};
59use multiaddr::Multiaddr;
60
61pub use self::boxed::{StreamMuxerBox, SubstreamBox};
62
63mod boxed;
64
65/// Provides multiplexing for a connection by allowing users to open substreams.
66///
67/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and
68/// [`AsyncWrite`]. The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features
69/// `poll`-style functions that allow the implementation to make progress on various tasks.
70pub trait StreamMuxer {
71 /// Type of the object that represents the raw substream where data can be read and written.
72 type Substream: AsyncRead + AsyncWrite;
73
74 /// Error type of the muxer
75 type Error: std::error::Error;
76
77 /// Poll for new inbound substreams.
78 ///
79 /// This function should be called whenever callers are ready to accept more inbound streams. In
80 /// other words, callers may exercise back-pressure on incoming streams by not calling this
81 /// function if a certain limit is hit.
82 fn poll_inbound(
83 self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 ) -> Poll<Result<Self::Substream, Self::Error>>;
86
87 /// Poll for a new, outbound substream.
88 fn poll_outbound(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 ) -> Poll<Result<Self::Substream, Self::Error>>;
92
93 /// Poll to close this [`StreamMuxer`].
94 ///
95 /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be
96 /// safely dropped.
97 ///
98 /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
99 /// > that the remote is properly informed of the shutdown. However, apart from
100 /// > properly informing the remote, there is no difference between this and
101 /// > immediately dropping the muxer.
102 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
103
104 /// Poll to allow the underlying connection to make progress.
105 ///
106 /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
107 /// unconditionally. Because it will be called regardless, this function can be used by
108 /// implementations to return events about the underlying connection that the caller MUST deal
109 /// with.
110 fn poll(
111 self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 ) -> Poll<Result<StreamMuxerEvent, Self::Error>>;
114}
115
116/// An event produced by a [`StreamMuxer`].
117#[derive(Debug)]
118pub enum StreamMuxerEvent {
119 /// The address of the remote has changed.
120 AddressChange(Multiaddr),
121}
122
123/// Extension trait for [`StreamMuxer`].
124pub trait StreamMuxerExt: StreamMuxer + Sized {
125 /// Convenience function for calling [`StreamMuxer::poll_inbound`]
126 /// for [`StreamMuxer`]s that are `Unpin`.
127 fn poll_inbound_unpin(
128 &mut self,
129 cx: &mut Context<'_>,
130 ) -> Poll<Result<Self::Substream, Self::Error>>
131 where
132 Self: Unpin,
133 {
134 Pin::new(self).poll_inbound(cx)
135 }
136
137 /// Convenience function for calling [`StreamMuxer::poll_outbound`]
138 /// for [`StreamMuxer`]s that are `Unpin`.
139 fn poll_outbound_unpin(
140 &mut self,
141 cx: &mut Context<'_>,
142 ) -> Poll<Result<Self::Substream, Self::Error>>
143 where
144 Self: Unpin,
145 {
146 Pin::new(self).poll_outbound(cx)
147 }
148
149 /// Convenience function for calling [`StreamMuxer::poll`]
150 /// for [`StreamMuxer`]s that are `Unpin`.
151 fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
152 where
153 Self: Unpin,
154 {
155 Pin::new(self).poll(cx)
156 }
157
158 /// Convenience function for calling [`StreamMuxer::poll_close`]
159 /// for [`StreamMuxer`]s that are `Unpin`.
160 fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
161 where
162 Self: Unpin,
163 {
164 Pin::new(self).poll_close(cx)
165 }
166
167 /// Returns a future for closing this [`StreamMuxer`].
168 fn close(self) -> Close<Self> {
169 Close(self)
170 }
171}
172
173impl<S> StreamMuxerExt for S where S: StreamMuxer {}
174
175pub struct Close<S>(S);
176
177impl<S> Future for Close<S>
178where
179 S: StreamMuxer + Unpin,
180{
181 type Output = Result<(), S::Error>;
182
183 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184 self.0.poll_close_unpin(cx)
185 }
186}