libp2p/builder/phase/
tcp.rs

1use std::marker::PhantomData;
2
3#[cfg(all(
4    not(target_arch = "wasm32"),
5    any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15    upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17
18use super::*;
19use crate::SwarmBuilder;
20
21pub struct TcpPhase {}
22
23macro_rules! impl_tcp_builder {
24    ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
25        #[cfg(all(
26            not(target_arch = "wasm32"),
27            feature = "tcp",
28            feature = $providerKebabCase,
29        ))]
30        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
31            /// Adds a TCP based transport.
32            ///
33            /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
34            /// i.e. they take the function themselves (without the invocation via `()`), not the
35            /// result of the function invocation. See example below.
36            ///
37            /// ``` rust
38            /// # use libp2p::SwarmBuilder;
39            /// # use std::error::Error;
40            /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
41            /// let swarm = SwarmBuilder::with_new_identity()
42            ///     .with_tokio()
43            ///     .with_tcp(
44            ///         Default::default(),
45            ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
46            ///         libp2p_yamux::Config::default,
47            ///     )?
48            /// # ;
49            /// # Ok(())
50            /// # }
51            /// ```
52            pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
53                self,
54                tcp_config: libp2p_tcp::Config,
55                security_upgrade: SecUpgrade,
56                multiplexer_upgrade: MuxUpgrade,
57            ) -> Result<
58                SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
59            SecUpgrade::Error,
60            >
61            where
62                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
63                SecError: std::error::Error + Send + Sync + 'static,
64                SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
65                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
66                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
67                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
68                <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
69                <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
70
71                MuxStream: StreamMuxer + Send + 'static,
72                MuxStream::Substream: Send + 'static,
73                MuxStream::Error: Send + Sync + 'static,
74                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
75                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
76                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
77                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
78                MuxError: std::error::Error + Send + Sync + 'static,
79                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
81            {
82                Ok(SwarmBuilder {
83                    phase: QuicPhase {
84                        transport: libp2p_tcp::$path::Transport::new(tcp_config)
85                            .upgrade(libp2p_core::upgrade::Version::V1Lazy)
86                            .authenticate(
87                                security_upgrade.into_security_upgrade(&self.keypair)?,
88                            )
89                            .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
90                            .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
91                    },
92                    keypair: self.keypair,
93                    phantom: PhantomData,
94                })
95            }
96        }
97    };
98}
99
100impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
101impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
102
103impl<Provider> SwarmBuilder<Provider, TcpPhase> {
104    pub(crate) fn without_tcp(
105        self,
106    ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
107        SwarmBuilder {
108            keypair: self.keypair,
109            phantom: PhantomData,
110            phase: QuicPhase {
111                transport: libp2p_core::transport::dummy::DummyTransport::new(),
112            },
113        }
114    }
115}
116
117// Shortcuts
118#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
119impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
120    pub fn with_quic(
121        self,
122    ) -> SwarmBuilder<
123        super::provider::AsyncStd,
124        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
125    > {
126        self.without_tcp().with_quic()
127    }
128}
129#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
130impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
131    pub fn with_quic(
132        self,
133    ) -> SwarmBuilder<
134        super::provider::Tokio,
135        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
136    > {
137        self.without_tcp().with_quic()
138    }
139}
140#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
141impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
142    pub fn with_quic_config(
143        self,
144        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
145    ) -> SwarmBuilder<
146        super::provider::AsyncStd,
147        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
148    > {
149        self.without_tcp().with_quic_config(constructor)
150    }
151}
152#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
153impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
154    pub fn with_quic_config(
155        self,
156        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
157    ) -> SwarmBuilder<
158        super::provider::Tokio,
159        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
160    > {
161        self.without_tcp().with_quic_config(constructor)
162    }
163}
164impl<Provider> SwarmBuilder<Provider, TcpPhase> {
165    pub fn with_other_transport<
166        Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
167        OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
168        R: TryIntoTransport<OtherTransport>,
169    >(
170        self,
171        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
172    ) -> Result<
173        SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
174        R::Error,
175    >
176    where
177        <OtherTransport as Transport>::Error: Send + Sync + 'static,
178        <OtherTransport as Transport>::Dial: Send,
179        <OtherTransport as Transport>::ListenerUpgrade: Send,
180        <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
181        <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
182    {
183        self.without_tcp()
184            .without_quic()
185            .with_other_transport(constructor)
186    }
187}
188macro_rules! impl_tcp_phase_with_websocket {
189    ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
190        #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
191        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
192            /// See [`SwarmBuilder::with_websocket`].
193            pub async fn with_websocket <
194                SecUpgrade,
195                SecStream,
196                SecError,
197                MuxUpgrade,
198                MuxStream,
199                MuxError,
200            > (
201                self,
202                security_upgrade: SecUpgrade,
203                multiplexer_upgrade: MuxUpgrade,
204            ) -> Result<
205                    SwarmBuilder<
206                        $providerPascalCase,
207                        RelayPhase<impl AuthenticatedMultiplexedTransport>,
208                    >,
209                    WebsocketError<SecUpgrade::Error>,
210                >
211            where
212                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
213                SecError: std::error::Error + Send + Sync + 'static,
214                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
215                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
216            <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
217            <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
218            <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
219            <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
220
221                MuxStream: StreamMuxer + Send + 'static,
222                MuxStream::Substream: Send + 'static,
223                MuxStream::Error: Send + Sync + 'static,
224                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
225                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
226                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
227                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
228                    MuxError: std::error::Error + Send + Sync + 'static,
229                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
230                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
231            {
232                self.without_tcp()
233                    .without_quic()
234                    .without_any_other_transports()
235                    .without_dns()
236                    .with_websocket(security_upgrade, multiplexer_upgrade)
237                    .await
238            }
239        }
240    }
241}
242impl_tcp_phase_with_websocket!(
243    "async-std",
244    super::provider::AsyncStd,
245    rw_stream_sink::RwStreamSink<
246        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
247    >
248);
249impl_tcp_phase_with_websocket!(
250    "tokio",
251    super::provider::Tokio,
252    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
253);