1use std::marker::PhantomData;
2
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9 all(not(target_arch = "wasm32"), feature = "websocket"),
10 feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14 all(not(target_arch = "wasm32"), feature = "websocket"),
15 feature = "relay"
16))]
17use libp2p_identity::PeerId;
18
19use super::*;
20use crate::SwarmBuilder;
21
22pub struct WebsocketPhase<T> {
23 pub(crate) transport: T,
24}
25
26macro_rules! impl_websocket_builder {
27 ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
28 #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
50 impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
51 pub async fn with_websocket<
52 SecUpgrade,
53 SecStream,
54 SecError,
55 MuxUpgrade,
56 MuxStream,
57 MuxError,
58 >(
59 self,
60 security_upgrade: SecUpgrade,
61 multiplexer_upgrade: MuxUpgrade,
62 ) -> Result<
63 SwarmBuilder<
64 $providerPascalCase,
65 RelayPhase<impl AuthenticatedMultiplexedTransport>,
66 >,
67 WebsocketError<SecUpgrade::Error>,
68 >
69
70 where
71 T: AuthenticatedMultiplexedTransport,
72
73 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
74 SecError: std::error::Error + Send + Sync + 'static,
75 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
76 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
77 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
78 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
79 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
81
82 MuxStream: StreamMuxer + Send + 'static,
83 MuxStream::Substream: Send + 'static,
84 MuxStream::Error: Send + Sync + 'static,
85 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
86 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
87 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
88 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
89 MuxError: std::error::Error + Send + Sync + 'static,
90 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
91 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
92
93 {
94 let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
95 .map_err(WebsocketErrorInner::SecurityUpgrade)?;
96 let websocket_transport = libp2p_websocket::Config::new(
97 $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
98 )
99 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
100 .authenticate(security_upgrade)
101 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
102 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
103
104 Ok(SwarmBuilder {
105 keypair: self.keypair,
106 phantom: PhantomData,
107 phase: RelayPhase {
108 transport: websocket_transport
109 .or_transport(self.phase.transport)
110 .map(|either, _| either.into_inner()),
111 },
112 })
113 }
114 }
115 };
116}
117
118impl_websocket_builder!(
119 "tokio",
120 super::provider::Tokio,
121 futures::future::ready(libp2p_dns::tokio::Transport::system(
124 libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
125 )),
126 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
127);
128
129impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
130 pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
131 SwarmBuilder {
132 keypair: self.keypair,
133 phantom: PhantomData,
134 phase: RelayPhase {
135 transport: self.phase.transport,
136 },
137 }
138 }
139}
140
141#[cfg(feature = "relay")]
143impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
144 pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
146 self,
147 security_upgrade: SecUpgrade,
148 multiplexer_upgrade: MuxUpgrade,
149 ) -> Result<
150 SwarmBuilder<
151 Provider,
152 BandwidthMetricsPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
153 >,
154 SecUpgrade::Error,
155 > where
156
157 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
158 SecError: std::error::Error + Send + Sync + 'static,
159 SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
160 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
161 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
162 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
163 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
164 <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
165
166 MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
167 MuxStream::Substream: Send + 'static,
168 MuxStream::Error: Send + Sync + 'static,
169 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
170 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
171 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
172 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
173 MuxError: std::error::Error + Send + Sync + 'static,
174 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
175 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
176 {
177 self.without_websocket()
178 .with_relay_client(security_upgrade, multiplexer_upgrade)
179 }
180}
181#[cfg(feature = "metrics")]
182impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
183 pub fn with_bandwidth_metrics(
184 self,
185 registry: &mut libp2p_metrics::Registry,
186 ) -> SwarmBuilder<
187 Provider,
188 BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
189 > {
190 self.without_websocket()
191 .without_relay()
192 .with_bandwidth_metrics(registry)
193 }
194}
195impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
196 pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
197 self,
198 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
199 ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
200 self.without_websocket()
201 .without_relay()
202 .with_behaviour(constructor)
203 }
204}
205
206#[derive(Debug, thiserror::Error)]
207#[error(transparent)]
208#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
209pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
210
211#[derive(Debug, thiserror::Error)]
212#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
213enum WebsocketErrorInner<Sec> {
214 #[error("SecurityUpgrade")]
215 SecurityUpgrade(Sec),
216 #[cfg(feature = "dns")]
217 #[error("Dns")]
218 Dns(#[from] std::io::Error),
219}