1use std::marker::PhantomData;
2
3#[cfg(all(
4 not(target_arch = "wasm32"),
5 any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15 upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17
18use super::*;
19use crate::SwarmBuilder;
20
21pub struct TcpPhase {}
22
23macro_rules! impl_tcp_builder {
24 ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
25 #[cfg(all(
26 not(target_arch = "wasm32"),
27 feature = "tcp",
28 feature = $providerKebabCase,
29 ))]
30 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
31 pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
53 self,
54 tcp_config: libp2p_tcp::Config,
55 security_upgrade: SecUpgrade,
56 multiplexer_upgrade: MuxUpgrade,
57 ) -> Result<
58 SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
59 SecUpgrade::Error,
60 >
61 where
62 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
63 SecError: std::error::Error + Send + Sync + 'static,
64 SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
65 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
66 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
67 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
68 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
69 <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
70
71 MuxStream: StreamMuxer + Send + 'static,
72 MuxStream::Substream: Send + 'static,
73 MuxStream::Error: Send + Sync + 'static,
74 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
75 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
76 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
77 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
78 MuxError: std::error::Error + Send + Sync + 'static,
79 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
81 {
82 Ok(SwarmBuilder {
83 phase: QuicPhase {
84 transport: libp2p_tcp::$path::Transport::new(tcp_config)
85 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
86 .authenticate(
87 security_upgrade.into_security_upgrade(&self.keypair)?,
88 )
89 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
90 .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
91 },
92 keypair: self.keypair,
93 phantom: PhantomData,
94 })
95 }
96 }
97 };
98}
99
100impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
101impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
102
103impl<Provider> SwarmBuilder<Provider, TcpPhase> {
104 pub(crate) fn without_tcp(
105 self,
106 ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
107 SwarmBuilder {
108 keypair: self.keypair,
109 phantom: PhantomData,
110 phase: QuicPhase {
111 transport: libp2p_core::transport::dummy::DummyTransport::new(),
112 },
113 }
114 }
115}
116
117#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
118impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
119 pub fn with_quic(
120 self,
121 ) -> SwarmBuilder<
122 super::provider::Tokio,
123 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
124 > {
125 self.without_tcp().with_quic()
126 }
127}
128#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
129impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
130 pub fn with_quic_config(
131 self,
132 constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
133 ) -> SwarmBuilder<
134 super::provider::Tokio,
135 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
136 > {
137 self.without_tcp().with_quic_config(constructor)
138 }
139}
140impl<Provider> SwarmBuilder<Provider, TcpPhase> {
141 pub fn with_other_transport<
142 Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
143 OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
144 R: TryIntoTransport<OtherTransport>,
145 >(
146 self,
147 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
148 ) -> Result<
149 SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
150 R::Error,
151 >
152 where
153 <OtherTransport as Transport>::Error: Send + Sync + 'static,
154 <OtherTransport as Transport>::Dial: Send,
155 <OtherTransport as Transport>::ListenerUpgrade: Send,
156 <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
157 <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
158 {
159 self.without_tcp()
160 .without_quic()
161 .with_other_transport(constructor)
162 }
163}
164macro_rules! impl_tcp_phase_with_websocket {
165 ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
166 #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
167 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
168 pub async fn with_websocket <
170 SecUpgrade,
171 SecStream,
172 SecError,
173 MuxUpgrade,
174 MuxStream,
175 MuxError,
176 > (
177 self,
178 security_upgrade: SecUpgrade,
179 multiplexer_upgrade: MuxUpgrade,
180 ) -> Result<
181 SwarmBuilder<
182 $providerPascalCase,
183 RelayPhase<impl AuthenticatedMultiplexedTransport>,
184 >,
185 WebsocketError<SecUpgrade::Error>,
186 >
187 where
188 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
189 SecError: std::error::Error + Send + Sync + 'static,
190 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
191 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
192 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
193 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
194 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
195 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
196
197 MuxStream: StreamMuxer + Send + 'static,
198 MuxStream::Substream: Send + 'static,
199 MuxStream::Error: Send + Sync + 'static,
200 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
201 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
202 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
203 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
204 MuxError: std::error::Error + Send + Sync + 'static,
205 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
206 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
207 {
208 self.without_tcp()
209 .without_quic()
210 .without_any_other_transports()
211 .without_dns()
212 .with_websocket(security_upgrade, multiplexer_upgrade)
213 .await
214 }
215 }
216 }
217}
218impl_tcp_phase_with_websocket!(
219 "async-std",
220 super::provider::AsyncStd,
221 rw_stream_sink::RwStreamSink<
222 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
223 >
224);
225impl_tcp_phase_with_websocket!(
226 "tokio",
227 super::provider::Tokio,
228 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
229);