libp2p_webrtc_websys/
connection.rs1use std::{
4 pin::Pin,
5 task::{ready, Context, Poll, Waker},
6};
7
8use futures::{channel::mpsc, stream::FuturesUnordered, StreamExt};
9use js_sys::{Object, Reflect};
10use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
11use libp2p_webrtc_utils::Fingerprint;
12use send_wrapper::SendWrapper;
13use wasm_bindgen::prelude::*;
14use wasm_bindgen_futures::JsFuture;
15use web_sys::{
16 RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit, RtcDataChannelType,
17 RtcSessionDescriptionInit,
18};
19
20use super::{Error, Stream};
21use crate::stream::DropListener;
22
23pub struct Connection {
28 inner: SendWrapper<RtcPeerConnection>,
30
31 closed: bool,
33 inbound_data_channels: SendWrapper<mpsc::Receiver<RtcDataChannel>>,
38 drop_listeners: FuturesUnordered<DropListener>,
40 no_drop_listeners_waker: Option<Waker>,
41
42 _ondatachannel_closure: SendWrapper<Closure<dyn FnMut(RtcDataChannelEvent)>>,
43}
44
45impl Connection {
46 pub(crate) fn new(peer_connection: RtcPeerConnection) -> Self {
48 let (mut tx_ondatachannel, rx_ondatachannel) = mpsc::channel(4); let ondatachannel_closure = Closure::new(move |ev: RtcDataChannelEvent| {
53 tracing::trace!("New data channel");
54
55 if let Err(e) = tx_ondatachannel.try_send(ev.channel()) {
56 if e.is_full() {
57 tracing::warn!("Remote is opening too many data channels, we can't keep up!");
58 return;
59 }
60
61 if e.is_disconnected() {
62 tracing::warn!("Receiver is gone, are we shutting down?");
63 }
64 }
65 });
66 peer_connection
67 .inner
68 .set_ondatachannel(Some(ondatachannel_closure.as_ref().unchecked_ref()));
69
70 Self {
71 inner: SendWrapper::new(peer_connection),
72 closed: false,
73 drop_listeners: FuturesUnordered::default(),
74 no_drop_listeners_waker: None,
75 inbound_data_channels: SendWrapper::new(rx_ondatachannel),
76 _ondatachannel_closure: SendWrapper::new(ondatachannel_closure),
77 }
78 }
79
80 fn new_stream_from_data_channel(&mut self, data_channel: RtcDataChannel) -> Stream {
81 let (stream, drop_listener) = Stream::new(data_channel);
82
83 self.drop_listeners.push(drop_listener);
84 if let Some(waker) = self.no_drop_listeners_waker.take() {
85 waker.wake()
86 }
87 stream
88 }
89
90 fn close_connection(&mut self) {
95 if !self.closed {
96 tracing::trace!("connection::close_connection");
97 self.inner.inner.close();
98 self.closed = true;
99 }
100 }
101}
102
103impl Drop for Connection {
104 fn drop(&mut self) {
105 self.close_connection();
106 }
107}
108
109impl StreamMuxer for Connection {
112 type Substream = Stream;
113 type Error = Error;
114
115 fn poll_inbound(
116 mut self: Pin<&mut Self>,
117 cx: &mut Context<'_>,
118 ) -> Poll<Result<Self::Substream, Self::Error>> {
119 match ready!(self.inbound_data_channels.poll_next_unpin(cx)) {
120 Some(data_channel) => {
121 let stream = self.new_stream_from_data_channel(data_channel);
122
123 Poll::Ready(Ok(stream))
124 }
125 None => {
126 tracing::debug!("`Sender` for inbound data channels has been dropped");
129 Poll::Ready(Err(Error::Connection("connection closed".to_owned())))
130 }
131 }
132 }
133
134 fn poll_outbound(
135 mut self: Pin<&mut Self>,
136 _: &mut Context<'_>,
137 ) -> Poll<Result<Self::Substream, Self::Error>> {
138 tracing::trace!("Creating outbound data channel");
139
140 let data_channel = self.inner.new_regular_data_channel();
141 let stream = self.new_stream_from_data_channel(data_channel);
142
143 Poll::Ready(Ok(stream))
144 }
145
146 fn poll_close(
148 mut self: Pin<&mut Self>,
149 _cx: &mut Context<'_>,
150 ) -> Poll<Result<(), Self::Error>> {
151 tracing::trace!("connection::poll_close");
152
153 self.close_connection();
154 Poll::Ready(Ok(()))
155 }
156
157 fn poll(
158 mut self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
161 loop {
162 match ready!(self.drop_listeners.poll_next_unpin(cx)) {
163 Some(Ok(())) => {}
164 Some(Err(e)) => {
165 tracing::debug!("a DropListener failed: {e}")
166 }
167 None => {
168 self.no_drop_listeners_waker = Some(cx.waker().clone());
169 return Poll::Pending;
170 }
171 }
172 }
173 }
174}
175
176pub(crate) struct RtcPeerConnection {
177 inner: web_sys::RtcPeerConnection,
178}
179
180impl RtcPeerConnection {
181 pub(crate) async fn new(algorithm: String) -> Result<Self, Error> {
182 let algo: Object = Object::new();
183 Reflect::set(&algo, &"name".into(), &"ECDSA".into()).unwrap();
184 Reflect::set(&algo, &"namedCurve".into(), &"P-256".into()).unwrap();
185 Reflect::set(&algo, &"hash".into(), &algorithm.into()).unwrap();
186
187 let certificate_promise =
188 web_sys::RtcPeerConnection::generate_certificate_with_object(&algo)
189 .expect("certificate to be valid");
190
191 let certificate = JsFuture::from(certificate_promise).await?;
192
193 let config = RtcConfiguration::default();
194 let certificate_arr = js_sys::Array::new();
196 certificate_arr.push(&certificate);
197 config.set_certificates(&certificate_arr);
198
199 let inner = web_sys::RtcPeerConnection::new_with_configuration(&config)?;
200
201 Ok(Self { inner })
202 }
203
204 pub(crate) fn new_handshake_stream(&self) -> (Stream, DropListener) {
208 Stream::new(self.new_data_channel(true))
209 }
210
211 pub(crate) fn new_regular_data_channel(&self) -> RtcDataChannel {
213 self.new_data_channel(false)
214 }
215
216 fn new_data_channel(&self, negotiated: bool) -> RtcDataChannel {
217 const LABEL: &str = "";
218
219 let dc = match negotiated {
220 true => {
221 let options = RtcDataChannelInit::new();
222 options.set_negotiated(true);
223 options.set_id(0); self.inner
226 .create_data_channel_with_data_channel_dict(LABEL, &options)
227 }
228 false => self.inner.create_data_channel(LABEL),
229 };
230 dc.set_binary_type(RtcDataChannelType::Arraybuffer); dc
233 }
234
235 pub(crate) async fn create_offer(&self) -> Result<String, Error> {
236 let offer = JsFuture::from(self.inner.create_offer()).await?;
237
238 let offer = Reflect::get(&offer, &JsValue::from_str("sdp"))
239 .expect("sdp should be valid")
240 .as_string()
241 .expect("sdp string should be valid string");
242
243 Ok(offer)
244 }
245
246 pub(crate) async fn set_local_description(
247 &self,
248 sdp: RtcSessionDescriptionInit,
249 ) -> Result<(), Error> {
250 let promise = self.inner.set_local_description(&sdp);
251 JsFuture::from(promise).await?;
252
253 Ok(())
254 }
255
256 pub(crate) fn local_fingerprint(&self) -> Result<Fingerprint, Error> {
257 let sdp = &self
258 .inner
259 .local_description()
260 .ok_or_else(|| Error::Js("No local description".to_string()))?
261 .sdp();
262
263 let fingerprint =
264 parse_fingerprint(sdp).ok_or_else(|| Error::Js("No fingerprint in SDP".to_string()))?;
265
266 Ok(fingerprint)
267 }
268
269 pub(crate) async fn set_remote_description(
270 &self,
271 sdp: RtcSessionDescriptionInit,
272 ) -> Result<(), Error> {
273 let promise = self.inner.set_remote_description(&sdp);
274 JsFuture::from(promise).await?;
275
276 Ok(())
277 }
278}
279
280fn parse_fingerprint(sdp: &str) -> Option<Fingerprint> {
282 let lines = sdp.split("\r\n");
284
285 for line in lines {
289 if line.starts_with("a=fingerprint:") {
290 let fingerprint = line.split(' ').nth(1).unwrap();
291 let bytes = hex::decode(fingerprint.replace(':', "")).unwrap();
292 let arr: [u8; 32] = bytes.as_slice().try_into().unwrap();
293 return Some(Fingerprint::raw(arr));
294 }
295 }
296 None
297}
298
299#[cfg(test)]
300mod sdp_tests {
301 use super::*;
302
303 #[test]
304 fn test_fingerprint() {
305 let sdp = "v=0\r\no=- 0 0 IN IP6 ::1\r\ns=-\r\nc=IN IP6 ::1\r\nt=0 0\r\na=ice-lite\r\nm=application 61885 UDP/DTLS/SCTP webrtc-datachannel\r\na=mid:0\r\na=setup:passive\r\na=ice-ufrag:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=ice-pwd:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=fingerprint:sha-256 A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89\r\na=sctp-port:5000\r\na=max-message-size:16384\r\na=candidate:1467250027 1 UDP 1467250027 ::1 61885 typ host\r\n";
306
307 let fingerprint = parse_fingerprint(sdp).unwrap();
308
309 assert_eq!(fingerprint.algorithm(), "sha-256");
310 assert_eq!(fingerprint.to_sdp_format(), "A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89");
311 }
312}