libp2p/builder/phase/
tcp.rs

1use std::marker::PhantomData;
2
3#[cfg(all(
4    not(target_arch = "wasm32"),
5    any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15    upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17
18use super::*;
19use crate::SwarmBuilder;
20
21pub struct TcpPhase {}
22
23macro_rules! impl_tcp_builder {
24    ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
25        #[cfg(all(
26            not(target_arch = "wasm32"),
27            feature = "tcp",
28            feature = $providerKebabCase,
29        ))]
30        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
31            /// Adds a TCP based transport.
32            ///
33            /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
34            /// i.e. they take the function themselves (without the invocation via `()`), not the
35            /// result of the function invocation. See example below.
36            ///
37            /// ``` rust
38            /// # use libp2p::SwarmBuilder;
39            /// # use std::error::Error;
40            /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
41            /// let swarm = SwarmBuilder::with_new_identity()
42            ///     .with_tokio()
43            ///     .with_tcp(
44            ///         Default::default(),
45            ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
46            ///         libp2p_yamux::Config::default,
47            ///     )?
48            /// # ;
49            /// # Ok(())
50            /// # }
51            /// ```
52            pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
53                self,
54                tcp_config: libp2p_tcp::Config,
55                security_upgrade: SecUpgrade,
56                multiplexer_upgrade: MuxUpgrade,
57            ) -> Result<
58                SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
59            SecUpgrade::Error,
60            >
61            where
62                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
63                SecError: std::error::Error + Send + Sync + 'static,
64                SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
65                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
66                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
67                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
68                <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
69                <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
70
71                MuxStream: StreamMuxer + Send + 'static,
72                MuxStream::Substream: Send + 'static,
73                MuxStream::Error: Send + Sync + 'static,
74                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
75                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
76                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
77                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
78                MuxError: std::error::Error + Send + Sync + 'static,
79                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
80                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
81            {
82                Ok(SwarmBuilder {
83                    phase: QuicPhase {
84                        transport: libp2p_tcp::$path::Transport::new(tcp_config)
85                            .upgrade(libp2p_core::upgrade::Version::V1Lazy)
86                            .authenticate(
87                                security_upgrade.into_security_upgrade(&self.keypair)?,
88                            )
89                            .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
90                            .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
91                    },
92                    keypair: self.keypair,
93                    phantom: PhantomData,
94                })
95            }
96        }
97    };
98}
99
100impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
101
102impl<Provider> SwarmBuilder<Provider, TcpPhase> {
103    pub(crate) fn without_tcp(
104        self,
105    ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
106        SwarmBuilder {
107            keypair: self.keypair,
108            phantom: PhantomData,
109            phase: QuicPhase {
110                transport: libp2p_core::transport::dummy::DummyTransport::new(),
111            },
112        }
113    }
114}
115
116#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
117impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
118    pub fn with_quic(
119        self,
120    ) -> SwarmBuilder<
121        super::provider::Tokio,
122        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
123    > {
124        self.without_tcp().with_quic()
125    }
126}
127#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
128impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
129    pub fn with_quic_config(
130        self,
131        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
132    ) -> SwarmBuilder<
133        super::provider::Tokio,
134        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
135    > {
136        self.without_tcp().with_quic_config(constructor)
137    }
138}
139impl<Provider> SwarmBuilder<Provider, TcpPhase> {
140    pub fn with_other_transport<
141        Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
142        OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
143        R: TryIntoTransport<OtherTransport>,
144    >(
145        self,
146        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
147    ) -> Result<
148        SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
149        R::Error,
150    >
151    where
152        <OtherTransport as Transport>::Error: Send + Sync + 'static,
153        <OtherTransport as Transport>::Dial: Send,
154        <OtherTransport as Transport>::ListenerUpgrade: Send,
155        <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
156        <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
157    {
158        self.without_tcp()
159            .without_quic()
160            .with_other_transport(constructor)
161    }
162}
163macro_rules! impl_tcp_phase_with_websocket {
164    ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
165        #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
166        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
167            /// See [`SwarmBuilder::with_websocket`].
168            pub async fn with_websocket <
169                SecUpgrade,
170                SecStream,
171                SecError,
172                MuxUpgrade,
173                MuxStream,
174                MuxError,
175            > (
176                self,
177                security_upgrade: SecUpgrade,
178                multiplexer_upgrade: MuxUpgrade,
179            ) -> Result<
180                    SwarmBuilder<
181                        $providerPascalCase,
182                        RelayPhase<impl AuthenticatedMultiplexedTransport>,
183                    >,
184                    WebsocketError<SecUpgrade::Error>,
185                >
186            where
187                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
188                SecError: std::error::Error + Send + Sync + 'static,
189                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
190                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
191            <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
192            <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
193            <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
194            <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
195
196                MuxStream: StreamMuxer + Send + 'static,
197                MuxStream::Substream: Send + 'static,
198                MuxStream::Error: Send + Sync + 'static,
199                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
200                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
201                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
202                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
203                    MuxError: std::error::Error + Send + Sync + 'static,
204                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
205                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
206            {
207                self.without_tcp()
208                    .without_quic()
209                    .without_any_other_transports()
210                    .without_dns()
211                    .with_websocket(security_upgrade, multiplexer_upgrade)
212                    .await
213            }
214        }
215    }
216}
217impl_tcp_phase_with_websocket!(
218    "tokio",
219    super::provider::Tokio,
220    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
221);