libp2p/builder/phase/
websocket.rs

1use std::marker::PhantomData;
2
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9    all(not(target_arch = "wasm32"), feature = "websocket"),
10    feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14    all(not(target_arch = "wasm32"), feature = "websocket"),
15    feature = "relay"
16))]
17use libp2p_identity::PeerId;
18
19use super::*;
20use crate::SwarmBuilder;
21
22pub struct WebsocketPhase<T> {
23    pub(crate) transport: T,
24}
25
26macro_rules! impl_websocket_builder {
27    ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
28        /// Adds a websocket client transport.
29        ///
30        /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
31        /// i.e. they take the function themselves (without the invocation via `()`), not the
32        /// result of the function invocation. See example below.
33        ///
34        /// ``` rust
35        /// # use libp2p::SwarmBuilder;
36        /// # use std::error::Error;
37        /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
38        /// let swarm = SwarmBuilder::with_new_identity()
39        ///     .with_tokio()
40        ///     .with_websocket(
41        ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
42        ///         libp2p_yamux::Config::default,
43        ///     )
44        ///     .await?
45        /// # ;
46        /// # Ok(())
47        /// # }
48        /// ```
49        #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
50        impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
51            pub async fn with_websocket<
52                SecUpgrade,
53                SecStream,
54                SecError,
55                MuxUpgrade,
56                MuxStream,
57                MuxError,
58            >(
59                self,
60                security_upgrade: SecUpgrade,
61                multiplexer_upgrade: MuxUpgrade,
62            ) -> Result<
63                SwarmBuilder<
64                    $providerPascalCase,
65                    RelayPhase<impl AuthenticatedMultiplexedTransport>,
66                >,
67                WebsocketError<SecUpgrade::Error>,
68            >
69
70            where
71                T: AuthenticatedMultiplexedTransport,
72
73                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
74                SecError: std::error::Error + Send + Sync + 'static,
75                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
76                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
77                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
78                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
79                <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80                <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
81
82                MuxStream: StreamMuxer + Send + 'static,
83                MuxStream::Substream: Send + 'static,
84                MuxStream::Error: Send + Sync + 'static,
85                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
86                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
87                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
88                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
89                MuxError: std::error::Error + Send + Sync + 'static,
90                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
91                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
92
93            {
94                let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
95                    .map_err(WebsocketErrorInner::SecurityUpgrade)?;
96                let websocket_transport = libp2p_websocket::Config::new(
97                    $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
98                )
99                    .upgrade(libp2p_core::upgrade::Version::V1Lazy)
100                    .authenticate(security_upgrade)
101                    .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
102                    .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
103
104                Ok(SwarmBuilder {
105                    keypair: self.keypair,
106                    phantom: PhantomData,
107                    phase: RelayPhase {
108                        transport: websocket_transport
109                            .or_transport(self.phase.transport)
110                            .map(|either, _| either.into_inner()),
111                    },
112                })
113            }
114        }
115    };
116}
117
118impl_websocket_builder!(
119    "async-std",
120    super::provider::AsyncStd,
121    libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
122        libp2p_tcp::Config::default(),
123    )),
124    rw_stream_sink::RwStreamSink<
125        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
126    >
127);
128impl_websocket_builder!(
129    "tokio",
130    super::provider::Tokio,
131    // Note this is an unnecessary await for Tokio Websocket (i.e. tokio dns) in order to be
132    // consistent with above AsyncStd construction.
133    futures::future::ready(libp2p_dns::tokio::Transport::system(
134        libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
135    )),
136    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
137);
138
139impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
140    pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
141        SwarmBuilder {
142            keypair: self.keypair,
143            phantom: PhantomData,
144            phase: RelayPhase {
145                transport: self.phase.transport,
146            },
147        }
148    }
149}
150
151// Shortcuts
152#[cfg(feature = "relay")]
153impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
154    /// See [`SwarmBuilder::with_relay_client`].
155    pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
156        self,
157        security_upgrade: SecUpgrade,
158        multiplexer_upgrade: MuxUpgrade,
159    ) -> Result<
160        SwarmBuilder<
161            Provider,
162            BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
163        >,
164        SecUpgrade::Error,
165        > where
166
167        SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
168        SecError: std::error::Error + Send + Sync + 'static,
169        SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
170        SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
171    <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
172    <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
173    <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
174    <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
175
176        MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
177        MuxStream::Substream: Send + 'static,
178        MuxStream::Error: Send + Sync + 'static,
179        MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
180        MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
181    <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
182    <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
183        MuxError: std::error::Error + Send + Sync + 'static,
184    <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
185    <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
186    {
187        self.without_websocket()
188            .with_relay_client(security_upgrade, multiplexer_upgrade)
189    }
190}
191#[cfg(feature = "metrics")]
192impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
193    pub fn with_bandwidth_metrics(
194        self,
195        registry: &mut libp2p_metrics::Registry,
196    ) -> SwarmBuilder<
197        Provider,
198        BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
199    > {
200        self.without_websocket()
201            .without_relay()
202            .without_bandwidth_logging()
203            .with_bandwidth_metrics(registry)
204    }
205}
206impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
207    pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
208        self,
209        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
210    ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
211        self.without_websocket()
212            .without_relay()
213            .without_bandwidth_logging()
214            .with_behaviour(constructor)
215    }
216}
217
218#[derive(Debug, thiserror::Error)]
219#[error(transparent)]
220#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
221pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
222
223#[derive(Debug, thiserror::Error)]
224#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
225enum WebsocketErrorInner<Sec> {
226    #[error("SecurityUpgrade")]
227    SecurityUpgrade(Sec),
228    #[cfg(feature = "dns")]
229    #[error("Dns")]
230    Dns(#[from] std::io::Error),
231}