1use std::{
22 net::SocketAddr,
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc,
26 },
27 time::Duration,
28};
29
30use futures::{channel::oneshot, future::Either};
31use futures_timer::Delay;
32use libp2p_identity as identity;
33use libp2p_identity::PeerId;
34use libp2p_webrtc_utils::{noise, Fingerprint};
35use webrtc::{
36 api::{setting_engine::SettingEngine, APIBuilder},
37 data::data_channel::DataChannel,
38 data_channel::data_channel_init::RTCDataChannelInit,
39 dtls_transport::dtls_role::DTLSRole,
40 ice::{network_type::NetworkType, udp_mux::UDPMux, udp_network::UDPNetwork},
41 peer_connection::{configuration::RTCConfiguration, RTCPeerConnection},
42};
43
44use crate::tokio::{error::Error, sdp, sdp::random_ufrag, stream::Stream, Connection};
45
46pub(crate) async fn outbound(
48 addr: SocketAddr,
49 config: RTCConfiguration,
50 udp_mux: Arc<dyn UDPMux + Send + Sync>,
51 client_fingerprint: Fingerprint,
52 server_fingerprint: Fingerprint,
53 id_keys: identity::Keypair,
54) -> Result<(PeerId, Connection), Error> {
55 tracing::debug!(address=%addr, "new outbound connection to address");
56
57 let (peer_connection, ufrag) = new_outbound_connection(addr, config, udp_mux).await?;
58
59 let offer = peer_connection.create_offer(None).await?;
60 tracing::debug!(offer=%offer.sdp, "created SDP offer for outbound connection");
61 peer_connection.set_local_description(offer).await?;
62
63 let answer = sdp::answer(addr, server_fingerprint, &ufrag);
64 tracing::debug!(?answer, "calculated SDP answer for outbound connection");
65 peer_connection.set_remote_description(answer).await?; let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
68 let peer_id = noise::outbound(
69 id_keys,
70 data_channel,
71 server_fingerprint,
72 client_fingerprint,
73 )
74 .await?;
75
76 Ok((peer_id, Connection::new(peer_connection).await))
77}
78
79pub(crate) async fn inbound(
81 addr: SocketAddr,
82 config: RTCConfiguration,
83 udp_mux: Arc<dyn UDPMux + Send + Sync>,
84 server_fingerprint: Fingerprint,
85 remote_ufrag: String,
86 id_keys: identity::Keypair,
87) -> Result<(PeerId, Connection), Error> {
88 tracing::debug!(address=%addr, ufrag=%remote_ufrag, "new inbound connection from address");
89
90 let peer_connection = new_inbound_connection(addr, config, udp_mux, &remote_ufrag).await?;
91
92 let offer = sdp::offer(addr, &remote_ufrag);
93 tracing::debug!(?offer, "calculated SDP offer for inbound connection");
94 peer_connection.set_remote_description(offer).await?;
95
96 let answer = peer_connection.create_answer(None).await?;
97 tracing::debug!(?answer, "created SDP answer for inbound connection");
98 peer_connection.set_local_description(answer).await?; let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
101 let client_fingerprint = get_remote_fingerprint(&peer_connection).await;
102 let peer_id = noise::inbound(
103 id_keys,
104 data_channel,
105 client_fingerprint,
106 server_fingerprint,
107 )
108 .await?;
109
110 Ok((peer_id, Connection::new(peer_connection).await))
111}
112
113async fn new_outbound_connection(
114 addr: SocketAddr,
115 config: RTCConfiguration,
116 udp_mux: Arc<dyn UDPMux + Send + Sync>,
117) -> Result<(RTCPeerConnection, String), Error> {
118 let ufrag = random_ufrag();
119 let se = setting_engine(udp_mux, &ufrag, addr);
120
121 let connection = APIBuilder::new()
122 .with_setting_engine(se)
123 .build()
124 .new_peer_connection(config)
125 .await?;
126
127 Ok((connection, ufrag))
128}
129
130async fn new_inbound_connection(
131 addr: SocketAddr,
132 config: RTCConfiguration,
133 udp_mux: Arc<dyn UDPMux + Send + Sync>,
134 ufrag: &str,
135) -> Result<RTCPeerConnection, Error> {
136 let mut se = setting_engine(udp_mux, ufrag, addr);
137 {
138 se.set_lite(true);
139 se.disable_certificate_fingerprint_verification(true);
140 se.set_answering_dtls_role(DTLSRole::Server)?;
145 }
146
147 let connection = APIBuilder::new()
148 .with_setting_engine(se)
149 .build()
150 .new_peer_connection(config)
151 .await?;
152
153 Ok(connection)
154}
155
156fn setting_engine(
157 udp_mux: Arc<dyn UDPMux + Send + Sync>,
158 ufrag: &str,
159 addr: SocketAddr,
160) -> SettingEngine {
161 let mut se = SettingEngine::default();
162
163 se.set_ice_credentials(ufrag.to_owned(), ufrag.to_owned());
166
167 se.set_udp_network(UDPNetwork::Muxed(udp_mux.clone()));
168
169 se.detach_data_channels();
171
172 let network_type = match addr {
177 SocketAddr::V4(_) => NetworkType::Udp4,
178 SocketAddr::V6(_) => NetworkType::Udp6,
179 };
180 se.set_network_types(vec![network_type]);
181
182 se.set_ip_filter(Box::new({
186 let once = AtomicBool::new(true);
187 move |_ip| {
188 if once.load(Ordering::Relaxed) {
189 once.store(false, Ordering::Relaxed);
190 return true;
191 }
192 false
193 }
194 }));
195
196 se
197}
198
199async fn get_remote_fingerprint(conn: &RTCPeerConnection) -> Fingerprint {
201 let cert_bytes = conn.sctp().transport().get_remote_certificate().await;
202
203 Fingerprint::from_certificate(&cert_bytes)
204}
205
206async fn create_substream_for_noise_handshake(conn: &RTCPeerConnection) -> Result<Stream, Error> {
207 let data_channel = conn
209 .create_data_channel(
210 "",
211 Some(RTCDataChannelInit {
212 negotiated: Some(0), ..RTCDataChannelInit::default()
214 }),
215 )
216 .await?;
217
218 let (tx, rx) = oneshot::channel::<Arc<DataChannel>>();
219
220 crate::tokio::connection::register_data_channel_open_handler(data_channel, tx).await;
222
223 let channel = match futures::future::select(rx, Delay::new(Duration::from_secs(10))).await {
224 Either::Left((Ok(channel), _)) => channel,
225 Either::Left((Err(_), _)) => {
226 return Err(Error::Internal("failed to open data channel".to_owned()))
227 }
228 Either::Right(((), _)) => {
229 return Err(Error::Internal(
230 "data channel opening took longer than 10 seconds (see logs)".into(),
231 ))
232 }
233 };
234
235 let (substream, drop_listener) = Stream::new(channel);
236 drop(drop_listener); Ok(substream)
239}