1use std::{
24 error::Error,
25 fmt,
26 pin::Pin,
27 task::{Context, Poll},
28 time::Duration,
29};
30
31use futures::{prelude::*, ready};
32use libp2p_identity::PeerId;
33use multiaddr::Multiaddr;
34
35pub use crate::upgrade::Version;
36use crate::{
37 connection::ConnectedPoint,
38 muxing::{StreamMuxer, StreamMuxerBox},
39 transport::{
40 and_then::AndThen, boxed::boxed, timeout::TransportTimeout, DialOpts, ListenerId,
41 Transport, TransportError, TransportEvent,
42 },
43 upgrade::{
44 self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
45 OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
46 },
47 Negotiated,
48};
49
50#[derive(Clone)]
70pub struct Builder<T> {
71 inner: T,
72 version: upgrade::Version,
73}
74
75impl<T> Builder<T>
76where
77 T: Transport,
78 T::Error: 'static,
79{
80 pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
82 Builder { inner, version }
83 }
84
85 pub fn authenticate<C, D, U, E>(
98 self,
99 upgrade: U,
100 ) -> Authenticated<AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>>
101 where
102 T: Transport<Output = C>,
103 C: AsyncRead + AsyncWrite + Unpin,
104 D: AsyncRead + AsyncWrite + Unpin,
105 U: InboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
106 U: OutboundConnectionUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
107 E: Error + 'static,
108 {
109 let version = self.version;
110 Authenticated(Builder::new(
111 self.inner.and_then(move |conn, endpoint| Authenticate {
112 inner: upgrade::apply(conn, upgrade, endpoint, version),
113 }),
114 version,
115 ))
116 }
117}
118
119#[pin_project::pin_project]
124pub struct Authenticate<C, U>
125where
126 C: AsyncRead + AsyncWrite + Unpin,
127 U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
128{
129 #[pin]
130 inner: EitherUpgrade<C, U>,
131}
132
133impl<C, U> Future for Authenticate<C, U>
134where
135 C: AsyncRead + AsyncWrite + Unpin,
136 U: InboundConnectionUpgrade<Negotiated<C>>
137 + OutboundConnectionUpgrade<
138 Negotiated<C>,
139 Output = <U as InboundConnectionUpgrade<Negotiated<C>>>::Output,
140 Error = <U as InboundConnectionUpgrade<Negotiated<C>>>::Error,
141 >,
142{
143 type Output = <EitherUpgrade<C, U> as Future>::Output;
144
145 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 let this = self.project();
147 Future::poll(this.inner, cx)
148 }
149}
150
151#[pin_project::pin_project]
156pub struct Multiplex<C, U>
157where
158 C: AsyncRead + AsyncWrite + Unpin,
159 U: InboundConnectionUpgrade<Negotiated<C>> + OutboundConnectionUpgrade<Negotiated<C>>,
160{
161 peer_id: Option<PeerId>,
162 #[pin]
163 upgrade: EitherUpgrade<C, U>,
164}
165
166impl<C, U, M, E> Future for Multiplex<C, U>
167where
168 C: AsyncRead + AsyncWrite + Unpin,
169 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
170 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
171{
172 type Output = Result<(PeerId, M), UpgradeError<E>>;
173
174 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175 let this = self.project();
176 let m = match ready!(Future::poll(this.upgrade, cx)) {
177 Ok(m) => m,
178 Err(err) => return Poll::Ready(Err(err)),
179 };
180 let i = this
181 .peer_id
182 .take()
183 .expect("Multiplex future polled after completion.");
184 Poll::Ready(Ok((i, m)))
185 }
186}
187
188#[derive(Clone)]
190pub struct Authenticated<T>(Builder<T>);
191
192impl<T> Authenticated<T>
193where
194 T: Transport,
195 T::Error: 'static,
196{
197 pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
208 where
209 T: Transport<Output = (PeerId, C)>,
210 C: AsyncRead + AsyncWrite + Unpin,
211 D: AsyncRead + AsyncWrite + Unpin,
212 U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
213 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
214 E: Error + 'static,
215 {
216 Authenticated(Builder::new(
217 Upgrade::new(self.0.inner, upgrade),
218 self.0.version,
219 ))
220 }
221
222 pub fn multiplex<C, M, U, E>(
233 self,
234 upgrade: U,
235 ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
236 where
237 T: Transport<Output = (PeerId, C)>,
238 C: AsyncRead + AsyncWrite + Unpin,
239 M: StreamMuxer,
240 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
241 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
242 E: Error + 'static,
243 {
244 let version = self.0.version;
245 Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
246 let upgrade = upgrade::apply(c, upgrade, endpoint, version);
247 Multiplex {
248 peer_id: Some(i),
249 upgrade,
250 }
251 }))
252 }
253
254 pub fn multiplex_ext<C, M, U, E, F>(
266 self,
267 up: F,
268 ) -> Multiplexed<AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>>
269 where
270 T: Transport<Output = (PeerId, C)>,
271 C: AsyncRead + AsyncWrite + Unpin,
272 M: StreamMuxer,
273 U: InboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E>,
274 U: OutboundConnectionUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
275 E: Error + 'static,
276 F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone,
277 {
278 let version = self.0.version;
279 Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
280 let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
281 Multiplex {
282 peer_id: Some(peer_id),
283 upgrade,
284 }
285 }))
286 }
287}
288
289#[derive(Clone)]
292#[pin_project::pin_project]
293pub struct Multiplexed<T>(#[pin] T);
294
295impl<T> Multiplexed<T> {
296 pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
299 where
300 T: Transport<Output = (PeerId, M)> + Sized + Send + Unpin + 'static,
301 T::Dial: Send + 'static,
302 T::ListenerUpgrade: Send + 'static,
303 T::Error: Send + Sync,
304 M: StreamMuxer + Send + 'static,
305 M::Substream: Send + 'static,
306 M::Error: Send + Sync + 'static,
307 {
308 boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
309 }
310
311 pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
314 Multiplexed(TransportTimeout::new(self.0, timeout))
315 }
316
317 pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
320 Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
321 }
322
323 pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
326 Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
327 }
328}
329
330impl<T> Transport for Multiplexed<T>
331where
332 T: Transport,
333{
334 type Output = T::Output;
335 type Error = T::Error;
336 type ListenerUpgrade = T::ListenerUpgrade;
337 type Dial = T::Dial;
338
339 fn dial(
340 &mut self,
341 addr: Multiaddr,
342 opts: DialOpts,
343 ) -> Result<Self::Dial, TransportError<Self::Error>> {
344 self.0.dial(addr, opts)
345 }
346
347 fn remove_listener(&mut self, id: ListenerId) -> bool {
348 self.0.remove_listener(id)
349 }
350
351 fn listen_on(
352 &mut self,
353 id: ListenerId,
354 addr: Multiaddr,
355 ) -> Result<(), TransportError<Self::Error>> {
356 self.0.listen_on(id, addr)
357 }
358
359 fn poll(
360 self: Pin<&mut Self>,
361 cx: &mut Context<'_>,
362 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
363 self.project().0.poll(cx)
364 }
365}
366
367type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
369
370#[derive(Debug, Copy, Clone)]
374#[pin_project::pin_project]
375pub struct Upgrade<T, U> {
376 #[pin]
377 inner: T,
378 upgrade: U,
379}
380
381impl<T, U> Upgrade<T, U> {
382 pub fn new(inner: T, upgrade: U) -> Self {
383 Upgrade { inner, upgrade }
384 }
385}
386
387impl<T, C, D, U, E> Transport for Upgrade<T, U>
388where
389 T: Transport<Output = (PeerId, C)>,
390 T::Error: 'static,
391 C: AsyncRead + AsyncWrite + Unpin,
392 U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
393 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
394 E: Error + 'static,
395{
396 type Output = (PeerId, D);
397 type Error = TransportUpgradeError<T::Error, E>;
398 type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
399 type Dial = DialUpgradeFuture<T::Dial, U, C>;
400
401 fn dial(
402 &mut self,
403 addr: Multiaddr,
404 opts: DialOpts,
405 ) -> Result<Self::Dial, TransportError<Self::Error>> {
406 let future = self
407 .inner
408 .dial(addr, opts)
409 .map_err(|err| err.map(TransportUpgradeError::Transport))?;
410 Ok(DialUpgradeFuture {
411 future: Box::pin(future),
412 upgrade: future::Either::Left(Some(self.upgrade.clone())),
413 })
414 }
415
416 fn remove_listener(&mut self, id: ListenerId) -> bool {
417 self.inner.remove_listener(id)
418 }
419
420 fn listen_on(
421 &mut self,
422 id: ListenerId,
423 addr: Multiaddr,
424 ) -> Result<(), TransportError<Self::Error>> {
425 self.inner
426 .listen_on(id, addr)
427 .map_err(|err| err.map(TransportUpgradeError::Transport))
428 }
429
430 fn poll(
431 self: Pin<&mut Self>,
432 cx: &mut Context<'_>,
433 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
434 let this = self.project();
435 let upgrade = this.upgrade.clone();
436 this.inner.poll(cx).map(|event| {
437 event
438 .map_upgrade(move |future| ListenerUpgradeFuture {
439 future: Box::pin(future),
440 upgrade: future::Either::Left(Some(upgrade)),
441 })
442 .map_err(TransportUpgradeError::Transport)
443 })
444 }
445}
446
447#[derive(Debug)]
449pub enum TransportUpgradeError<T, U> {
450 Transport(T),
452 Upgrade(UpgradeError<U>),
454}
455
456impl<T, U> fmt::Display for TransportUpgradeError<T, U>
457where
458 T: fmt::Display,
459 U: fmt::Display,
460{
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 match self {
463 TransportUpgradeError::Transport(e) => write!(f, "Transport error: {e}"),
464 TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {e}"),
465 }
466 }
467}
468
469impl<T, U> Error for TransportUpgradeError<T, U>
470where
471 T: Error + 'static,
472 U: Error + 'static,
473{
474 fn source(&self) -> Option<&(dyn Error + 'static)> {
475 match self {
476 TransportUpgradeError::Transport(e) => Some(e),
477 TransportUpgradeError::Upgrade(e) => Some(e),
478 }
479 }
480}
481
482pub struct DialUpgradeFuture<F, U, C>
484where
485 U: OutboundConnectionUpgrade<Negotiated<C>>,
486 C: AsyncRead + AsyncWrite + Unpin,
487{
488 future: Pin<Box<F>>,
489 upgrade: future::Either<Option<U>, (PeerId, OutboundUpgradeApply<C, U>)>,
490}
491
492impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
493where
494 F: TryFuture<Ok = (PeerId, C)>,
495 C: AsyncRead + AsyncWrite + Unpin,
496 U: OutboundConnectionUpgrade<Negotiated<C>, Output = D>,
497 U::Error: Error,
498{
499 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
500
501 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 let this = &mut *self;
505
506 loop {
507 this.upgrade = match this.upgrade {
508 future::Either::Left(ref mut up) => {
509 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
510 .map_err(TransportUpgradeError::Transport))
511 {
512 Ok(v) => v,
513 Err(err) => return Poll::Ready(Err(err)),
514 };
515 let u = up
516 .take()
517 .expect("DialUpgradeFuture is constructed with Either::Left(Some).");
518 future::Either::Right((i, apply_outbound(c, u, upgrade::Version::V1)))
519 }
520 future::Either::Right((i, ref mut up)) => {
521 let d = match ready!(
522 Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)
523 ) {
524 Ok(d) => d,
525 Err(err) => return Poll::Ready(Err(err)),
526 };
527 return Poll::Ready(Ok((i, d)));
528 }
529 }
530 }
531 }
532}
533
534impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
535where
536 U: OutboundConnectionUpgrade<Negotiated<C>>,
537 C: AsyncRead + AsyncWrite + Unpin,
538{
539}
540
541pub struct ListenerUpgradeFuture<F, U, C>
543where
544 C: AsyncRead + AsyncWrite + Unpin,
545 U: InboundConnectionUpgrade<Negotiated<C>>,
546{
547 future: Pin<Box<F>>,
548 upgrade: future::Either<Option<U>, (PeerId, InboundUpgradeApply<C, U>)>,
549}
550
551impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
552where
553 F: TryFuture<Ok = (PeerId, C)>,
554 C: AsyncRead + AsyncWrite + Unpin,
555 U: InboundConnectionUpgrade<Negotiated<C>, Output = D>,
556 U::Error: Error,
557{
558 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
559
560 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561 let this = &mut *self;
564
565 loop {
566 this.upgrade = match this.upgrade {
567 future::Either::Left(ref mut up) => {
568 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx)
569 .map_err(TransportUpgradeError::Transport))
570 {
571 Ok(v) => v,
572 Err(err) => return Poll::Ready(Err(err)),
573 };
574 let u = up
575 .take()
576 .expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
577 future::Either::Right((i, apply_inbound(c, u)))
578 }
579 future::Either::Right((i, ref mut up)) => {
580 let d = match ready!(TryFuture::try_poll(Pin::new(up), cx)
581 .map_err(TransportUpgradeError::Upgrade))
582 {
583 Ok(v) => v,
584 Err(err) => return Poll::Ready(Err(err)),
585 };
586 return Poll::Ready(Ok((i, d)));
587 }
588 }
589 }
590 }
591}
592
593impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
594where
595 C: AsyncRead + AsyncWrite + Unpin,
596 U: InboundConnectionUpgrade<Negotiated<C>>,
597{
598}