libp2p/builder/phase/
websocket.rs

1use std::marker::PhantomData;
2
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9    all(not(target_arch = "wasm32"), feature = "websocket"),
10    feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14    all(not(target_arch = "wasm32"), feature = "websocket"),
15    feature = "relay"
16))]
17use libp2p_identity::PeerId;
18
19use super::*;
20use crate::SwarmBuilder;
21
22pub struct WebsocketPhase<T> {
23    pub(crate) transport: T,
24}
25
26macro_rules! impl_websocket_builder {
27    ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
28        /// Adds a websocket client transport.
29        ///
30        /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
31        /// i.e. they take the function themselves (without the invocation via `()`), not the
32        /// result of the function invocation. See example below.
33        ///
34        /// ``` rust
35        /// # use libp2p::SwarmBuilder;
36        /// # use std::error::Error;
37        /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
38        /// let swarm = SwarmBuilder::with_new_identity()
39        ///     .with_tokio()
40        ///     .with_websocket(
41        ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
42        ///         libp2p_yamux::Config::default,
43        ///     )
44        ///     .await?
45        /// # ;
46        /// # Ok(())
47        /// # }
48        /// ```
49        #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
50        impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
51            pub async fn with_websocket<
52                SecUpgrade,
53                SecStream,
54                SecError,
55                MuxUpgrade,
56                MuxStream,
57                MuxError,
58            >(
59                self,
60                security_upgrade: SecUpgrade,
61                multiplexer_upgrade: MuxUpgrade,
62            ) -> Result<
63                SwarmBuilder<
64                    $providerPascalCase,
65                    RelayPhase<impl AuthenticatedMultiplexedTransport>,
66                >,
67                WebsocketError<SecUpgrade::Error>,
68            >
69
70            where
71                T: AuthenticatedMultiplexedTransport,
72
73                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
74                SecError: std::error::Error + Send + Sync + 'static,
75                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
76                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
77                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
78                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
79                <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80                <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
81
82                MuxStream: StreamMuxer + Send + 'static,
83                MuxStream::Substream: Send + 'static,
84                MuxStream::Error: Send + Sync + 'static,
85                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
86                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
87                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
88                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
89                MuxError: std::error::Error + Send + Sync + 'static,
90                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
91                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
92
93            {
94                let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
95                    .map_err(WebsocketErrorInner::SecurityUpgrade)?;
96                let websocket_transport = libp2p_websocket::Config::new(
97                    $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
98                )
99                    .upgrade(libp2p_core::upgrade::Version::V1Lazy)
100                    .authenticate(security_upgrade)
101                    .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
102                    .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
103
104                Ok(SwarmBuilder {
105                    keypair: self.keypair,
106                    phantom: PhantomData,
107                    phase: RelayPhase {
108                        transport: websocket_transport
109                            .or_transport(self.phase.transport)
110                            .map(|either, _| either.into_inner()),
111                    },
112                })
113            }
114        }
115    };
116}
117
118impl_websocket_builder!(
119    "tokio",
120    super::provider::Tokio,
121    // Note this is an unnecessary await for Tokio Websocket (i.e. tokio dns) in order to be
122    // consistent with above AsyncStd construction.
123    futures::future::ready(libp2p_dns::tokio::Transport::system(
124        libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
125    )),
126    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
127);
128
129impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
130    pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
131        SwarmBuilder {
132            keypair: self.keypair,
133            phantom: PhantomData,
134            phase: RelayPhase {
135                transport: self.phase.transport,
136            },
137        }
138    }
139}
140
141// Shortcuts
142#[cfg(feature = "relay")]
143impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
144    /// See [`SwarmBuilder::with_relay_client`].
145    pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
146        self,
147        security_upgrade: SecUpgrade,
148        multiplexer_upgrade: MuxUpgrade,
149    ) -> Result<
150        SwarmBuilder<
151            Provider,
152            BandwidthMetricsPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
153        >,
154        SecUpgrade::Error,
155        > where
156
157        SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
158        SecError: std::error::Error + Send + Sync + 'static,
159        SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
160        SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
161    <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
162    <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
163    <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
164    <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
165
166        MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
167        MuxStream::Substream: Send + 'static,
168        MuxStream::Error: Send + Sync + 'static,
169        MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
170        MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
171    <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
172    <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
173        MuxError: std::error::Error + Send + Sync + 'static,
174    <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
175    <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
176    {
177        self.without_websocket()
178            .with_relay_client(security_upgrade, multiplexer_upgrade)
179    }
180}
181#[cfg(feature = "metrics")]
182impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
183    pub fn with_bandwidth_metrics(
184        self,
185        registry: &mut libp2p_metrics::Registry,
186    ) -> SwarmBuilder<
187        Provider,
188        BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
189    > {
190        self.without_websocket()
191            .without_relay()
192            .with_bandwidth_metrics(registry)
193    }
194}
195impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
196    pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
197        self,
198        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
199    ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
200        self.without_websocket()
201            .without_relay()
202            .with_behaviour(constructor)
203    }
204}
205
206#[derive(Debug, thiserror::Error)]
207#[error(transparent)]
208#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
209pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
210
211#[derive(Debug, thiserror::Error)]
212#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
213enum WebsocketErrorInner<Sec> {
214    #[error("SecurityUpgrade")]
215    SecurityUpgrade(Sec),
216    #[cfg(feature = "dns")]
217    #[error("Dns")]
218    Dns(#[from] std::io::Error),
219}