1use std::marker::PhantomData;
2
3#[cfg(all(
4 not(target_arch = "wasm32"),
5 any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15 upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17
18use super::*;
19use crate::SwarmBuilder;
20
21pub struct TcpPhase {}
22
23macro_rules! impl_tcp_builder {
24 ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
25 #[cfg(all(
26 not(target_arch = "wasm32"),
27 feature = "tcp",
28 feature = $providerKebabCase,
29 ))]
30 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
31 pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
53 self,
54 tcp_config: libp2p_tcp::Config,
55 security_upgrade: SecUpgrade,
56 multiplexer_upgrade: MuxUpgrade,
57 ) -> Result<
58 SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
59 SecUpgrade::Error,
60 >
61 where
62 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
63 SecError: std::error::Error + Send + Sync + 'static,
64 SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
65 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
66 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
67 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
68 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
69 <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
70
71 MuxStream: StreamMuxer + Send + 'static,
72 MuxStream::Substream: Send + 'static,
73 MuxStream::Error: Send + Sync + 'static,
74 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
75 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
76 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
77 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
78 MuxError: std::error::Error + Send + Sync + 'static,
79 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
81 {
82 Ok(SwarmBuilder {
83 phase: QuicPhase {
84 transport: libp2p_tcp::$path::Transport::new(tcp_config)
85 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
86 .authenticate(
87 security_upgrade.into_security_upgrade(&self.keypair)?,
88 )
89 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
90 .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
91 },
92 keypair: self.keypair,
93 phantom: PhantomData,
94 })
95 }
96 }
97 };
98}
99
100impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
101
102impl<Provider> SwarmBuilder<Provider, TcpPhase> {
103 pub(crate) fn without_tcp(
104 self,
105 ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
106 SwarmBuilder {
107 keypair: self.keypair,
108 phantom: PhantomData,
109 phase: QuicPhase {
110 transport: libp2p_core::transport::dummy::DummyTransport::new(),
111 },
112 }
113 }
114}
115
116#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
117impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
118 pub fn with_quic(
119 self,
120 ) -> SwarmBuilder<
121 super::provider::Tokio,
122 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
123 > {
124 self.without_tcp().with_quic()
125 }
126}
127#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
128impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
129 pub fn with_quic_config(
130 self,
131 constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
132 ) -> SwarmBuilder<
133 super::provider::Tokio,
134 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
135 > {
136 self.without_tcp().with_quic_config(constructor)
137 }
138}
139impl<Provider> SwarmBuilder<Provider, TcpPhase> {
140 pub fn with_other_transport<
141 Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
142 OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
143 R: TryIntoTransport<OtherTransport>,
144 >(
145 self,
146 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
147 ) -> Result<
148 SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
149 R::Error,
150 >
151 where
152 <OtherTransport as Transport>::Error: Send + Sync + 'static,
153 <OtherTransport as Transport>::Dial: Send,
154 <OtherTransport as Transport>::ListenerUpgrade: Send,
155 <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
156 <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
157 {
158 self.without_tcp()
159 .without_quic()
160 .with_other_transport(constructor)
161 }
162}
163macro_rules! impl_tcp_phase_with_websocket {
164 ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
165 #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
166 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
167 pub async fn with_websocket <
169 SecUpgrade,
170 SecStream,
171 SecError,
172 MuxUpgrade,
173 MuxStream,
174 MuxError,
175 > (
176 self,
177 security_upgrade: SecUpgrade,
178 multiplexer_upgrade: MuxUpgrade,
179 ) -> Result<
180 SwarmBuilder<
181 $providerPascalCase,
182 RelayPhase<impl AuthenticatedMultiplexedTransport>,
183 >,
184 WebsocketError<SecUpgrade::Error>,
185 >
186 where
187 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
188 SecError: std::error::Error + Send + Sync + 'static,
189 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
190 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
191 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
192 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
193 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
194 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
195
196 MuxStream: StreamMuxer + Send + 'static,
197 MuxStream::Substream: Send + 'static,
198 MuxStream::Error: Send + Sync + 'static,
199 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
200 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
201 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
202 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
203 MuxError: std::error::Error + Send + Sync + 'static,
204 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
205 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
206 {
207 self.without_tcp()
208 .without_quic()
209 .without_any_other_transports()
210 .without_dns()
211 .with_websocket(security_upgrade, multiplexer_upgrade)
212 .await
213 }
214 }
215 }
216}
217impl_tcp_phase_with_websocket!(
218 "tokio",
219 super::provider::Tokio,
220 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
221);