libp2p/builder/phase/
tcp.rs

1use std::marker::PhantomData;
2
3#[cfg(all(
4    not(target_arch = "wasm32"),
5    any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15    upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17
18use super::*;
19use crate::SwarmBuilder;
20
21pub struct TcpPhase {}
22
23macro_rules! impl_tcp_builder {
24    ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
25        #[cfg(all(
26            not(target_arch = "wasm32"),
27            feature = "tcp",
28            feature = $providerKebabCase,
29        ))]
30        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
31            /// Adds a TCP based transport.
32            ///
33            /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
34            /// i.e. they take the function themselves (without the invocation via `()`), not the
35            /// result of the function invocation. See example below.
36            ///
37            /// ``` rust
38            /// # use libp2p::SwarmBuilder;
39            /// # use std::error::Error;
40            /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
41            /// let swarm = SwarmBuilder::with_new_identity()
42            ///     .with_tokio()
43            ///     .with_tcp(
44            ///         Default::default(),
45            ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
46            ///         libp2p_yamux::Config::default,
47            ///     )?
48            /// # ;
49            /// # Ok(())
50            /// # }
51            /// ```
52            pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
53                self,
54                tcp_config: libp2p_tcp::Config,
55                security_upgrade: SecUpgrade,
56                multiplexer_upgrade: MuxUpgrade,
57            ) -> Result<
58                SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
59            SecUpgrade::Error,
60            >
61            where
62                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
63                SecError: std::error::Error + Send + Sync + 'static,
64                SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
65                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
66                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
67                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
68                <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
69                <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
70
71                MuxStream: StreamMuxer + Send + 'static,
72                MuxStream::Substream: Send + 'static,
73                MuxStream::Error: Send + Sync + 'static,
74                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
75                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
76                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
77                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
78                MuxError: std::error::Error + Send + Sync + 'static,
79                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
81            {
82                Ok(SwarmBuilder {
83                    phase: QuicPhase {
84                        transport: libp2p_tcp::$path::Transport::new(tcp_config)
85                            .upgrade(libp2p_core::upgrade::Version::V1Lazy)
86                            .authenticate(
87                                security_upgrade.into_security_upgrade(&self.keypair)?,
88                            )
89                            .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
90                            .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
91                    },
92                    keypair: self.keypair,
93                    phantom: PhantomData,
94                })
95            }
96        }
97    };
98}
99
100impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
101impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
102
103impl<Provider> SwarmBuilder<Provider, TcpPhase> {
104    pub(crate) fn without_tcp(
105        self,
106    ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
107        SwarmBuilder {
108            keypair: self.keypair,
109            phantom: PhantomData,
110            phase: QuicPhase {
111                transport: libp2p_core::transport::dummy::DummyTransport::new(),
112            },
113        }
114    }
115}
116
117#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
118impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
119    pub fn with_quic(
120        self,
121    ) -> SwarmBuilder<
122        super::provider::Tokio,
123        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
124    > {
125        self.without_tcp().with_quic()
126    }
127}
128#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
129impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
130    pub fn with_quic_config(
131        self,
132        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
133    ) -> SwarmBuilder<
134        super::provider::Tokio,
135        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
136    > {
137        self.without_tcp().with_quic_config(constructor)
138    }
139}
140impl<Provider> SwarmBuilder<Provider, TcpPhase> {
141    pub fn with_other_transport<
142        Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
143        OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
144        R: TryIntoTransport<OtherTransport>,
145    >(
146        self,
147        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
148    ) -> Result<
149        SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
150        R::Error,
151    >
152    where
153        <OtherTransport as Transport>::Error: Send + Sync + 'static,
154        <OtherTransport as Transport>::Dial: Send,
155        <OtherTransport as Transport>::ListenerUpgrade: Send,
156        <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
157        <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
158    {
159        self.without_tcp()
160            .without_quic()
161            .with_other_transport(constructor)
162    }
163}
164macro_rules! impl_tcp_phase_with_websocket {
165    ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
166        #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
167        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
168            /// See [`SwarmBuilder::with_websocket`].
169            pub async fn with_websocket <
170                SecUpgrade,
171                SecStream,
172                SecError,
173                MuxUpgrade,
174                MuxStream,
175                MuxError,
176            > (
177                self,
178                security_upgrade: SecUpgrade,
179                multiplexer_upgrade: MuxUpgrade,
180            ) -> Result<
181                    SwarmBuilder<
182                        $providerPascalCase,
183                        RelayPhase<impl AuthenticatedMultiplexedTransport>,
184                    >,
185                    WebsocketError<SecUpgrade::Error>,
186                >
187            where
188                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
189                SecError: std::error::Error + Send + Sync + 'static,
190                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
191                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
192            <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
193            <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
194            <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
195            <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
196
197                MuxStream: StreamMuxer + Send + 'static,
198                MuxStream::Substream: Send + 'static,
199                MuxStream::Error: Send + Sync + 'static,
200                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
201                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
202                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
203                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
204                    MuxError: std::error::Error + Send + Sync + 'static,
205                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
206                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
207            {
208                self.without_tcp()
209                    .without_quic()
210                    .without_any_other_transports()
211                    .without_dns()
212                    .with_websocket(security_upgrade, multiplexer_upgrade)
213                    .await
214            }
215        }
216    }
217}
218impl_tcp_phase_with_websocket!(
219    "async-std",
220    super::provider::AsyncStd,
221    rw_stream_sink::RwStreamSink<
222        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
223    >
224);
225impl_tcp_phase_with_websocket!(
226    "tokio",
227    super::provider::Tokio,
228    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
229);