1use std::marker::PhantomData;
2
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9 all(not(target_arch = "wasm32"), feature = "websocket"),
10 feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14 all(not(target_arch = "wasm32"), feature = "websocket"),
15 feature = "relay"
16))]
17use libp2p_identity::PeerId;
18
19use super::*;
20use crate::SwarmBuilder;
21
22pub struct WebsocketPhase<T> {
23 pub(crate) transport: T,
24}
25
26macro_rules! impl_websocket_builder {
27 ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
28 #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
50 impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
51 pub async fn with_websocket<
52 SecUpgrade,
53 SecStream,
54 SecError,
55 MuxUpgrade,
56 MuxStream,
57 MuxError,
58 >(
59 self,
60 security_upgrade: SecUpgrade,
61 multiplexer_upgrade: MuxUpgrade,
62 ) -> Result<
63 SwarmBuilder<
64 $providerPascalCase,
65 RelayPhase<impl AuthenticatedMultiplexedTransport>,
66 >,
67 WebsocketError<SecUpgrade::Error>,
68 >
69
70 where
71 T: AuthenticatedMultiplexedTransport,
72
73 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
74 SecError: std::error::Error + Send + Sync + 'static,
75 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
76 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
77 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
78 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
79 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
81
82 MuxStream: StreamMuxer + Send + 'static,
83 MuxStream::Substream: Send + 'static,
84 MuxStream::Error: Send + Sync + 'static,
85 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
86 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
87 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
88 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
89 MuxError: std::error::Error + Send + Sync + 'static,
90 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
91 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
92
93 {
94 let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
95 .map_err(WebsocketErrorInner::SecurityUpgrade)?;
96 let websocket_transport = libp2p_websocket::Config::new(
97 $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
98 )
99 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
100 .authenticate(security_upgrade)
101 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
102 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
103
104 Ok(SwarmBuilder {
105 keypair: self.keypair,
106 phantom: PhantomData,
107 phase: RelayPhase {
108 transport: websocket_transport
109 .or_transport(self.phase.transport)
110 .map(|either, _| either.into_inner()),
111 },
112 })
113 }
114 }
115 };
116}
117
118impl_websocket_builder!(
119 "async-std",
120 super::provider::AsyncStd,
121 libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
122 libp2p_tcp::Config::default(),
123 )),
124 rw_stream_sink::RwStreamSink<
125 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
126 >
127);
128impl_websocket_builder!(
129 "tokio",
130 super::provider::Tokio,
131 futures::future::ready(libp2p_dns::tokio::Transport::system(
134 libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
135 )),
136 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
137);
138
139impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
140 pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
141 SwarmBuilder {
142 keypair: self.keypair,
143 phantom: PhantomData,
144 phase: RelayPhase {
145 transport: self.phase.transport,
146 },
147 }
148 }
149}
150
151#[cfg(feature = "relay")]
153impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
154 pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
156 self,
157 security_upgrade: SecUpgrade,
158 multiplexer_upgrade: MuxUpgrade,
159 ) -> Result<
160 SwarmBuilder<
161 Provider,
162 BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
163 >,
164 SecUpgrade::Error,
165 > where
166
167 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
168 SecError: std::error::Error + Send + Sync + 'static,
169 SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
170 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
171 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
172 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
173 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
174 <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
175
176 MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
177 MuxStream::Substream: Send + 'static,
178 MuxStream::Error: Send + Sync + 'static,
179 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
180 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
181 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
182 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
183 MuxError: std::error::Error + Send + Sync + 'static,
184 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
185 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
186 {
187 self.without_websocket()
188 .with_relay_client(security_upgrade, multiplexer_upgrade)
189 }
190}
191#[cfg(feature = "metrics")]
192impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
193 pub fn with_bandwidth_metrics(
194 self,
195 registry: &mut libp2p_metrics::Registry,
196 ) -> SwarmBuilder<
197 Provider,
198 BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
199 > {
200 self.without_websocket()
201 .without_relay()
202 .without_bandwidth_logging()
203 .with_bandwidth_metrics(registry)
204 }
205}
206impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
207 pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
208 self,
209 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
210 ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
211 self.without_websocket()
212 .without_relay()
213 .without_bandwidth_logging()
214 .with_behaviour(constructor)
215 }
216}
217
218#[derive(Debug, thiserror::Error)]
219#[error(transparent)]
220#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
221pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
222
223#[derive(Debug, thiserror::Error)]
224#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
225enum WebsocketErrorInner<Sec> {
226 #[error("SecurityUpgrade")]
227 SecurityUpgrade(Sec),
228 #[cfg(feature = "dns")]
229 #[error("Dns")]
230 Dns(#[from] std::io::Error),
231}