libp2p_quic/
connection.rs1mod connecting;
22mod stream;
23
24use std::{
25 pin::Pin,
26 task::{Context, Poll},
27};
28
29pub use connecting::Connecting;
30use futures::{future::BoxFuture, FutureExt};
31use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
32pub use stream::Stream;
33
34use crate::{ConnectionError, Error};
35
36pub struct Connection {
38 connection: quinn::Connection,
40 incoming: Option<
42 BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
43 >,
44 outgoing: Option<
46 BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
47 >,
48 closing: Option<BoxFuture<'static, quinn::ConnectionError>>,
50}
51
52impl Connection {
53 fn new(connection: quinn::Connection) -> Self {
58 Self {
59 connection,
60 incoming: None,
61 outgoing: None,
62 closing: None,
63 }
64 }
65}
66
67impl StreamMuxer for Connection {
68 type Substream = Stream;
69 type Error = Error;
70
71 fn poll_inbound(
72 self: Pin<&mut Self>,
73 cx: &mut Context<'_>,
74 ) -> Poll<Result<Self::Substream, Self::Error>> {
75 let this = self.get_mut();
76
77 let incoming = this.incoming.get_or_insert_with(|| {
78 let connection = this.connection.clone();
79 async move { connection.accept_bi().await }.boxed()
80 });
81
82 let (send, recv) = futures::ready!(incoming.poll_unpin(cx)).map_err(ConnectionError)?;
83 this.incoming.take();
84 let stream = Stream::new(send, recv);
85 Poll::Ready(Ok(stream))
86 }
87
88 fn poll_outbound(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 ) -> Poll<Result<Self::Substream, Self::Error>> {
92 let this = self.get_mut();
93
94 let outgoing = this.outgoing.get_or_insert_with(|| {
95 let connection = this.connection.clone();
96 async move { connection.open_bi().await }.boxed()
97 });
98
99 let (send, recv) = futures::ready!(outgoing.poll_unpin(cx)).map_err(ConnectionError)?;
100 this.outgoing.take();
101 let stream = Stream::new(send, recv);
102 Poll::Ready(Ok(stream))
103 }
104
105 fn poll(
106 self: Pin<&mut Self>,
107 _cx: &mut Context<'_>,
108 ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
109 Poll::Pending
112 }
113
114 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 let this = self.get_mut();
116
117 let closing = this.closing.get_or_insert_with(|| {
118 this.connection.close(From::from(0u32), &[]);
119 let connection = this.connection.clone();
120 async move { connection.closed().await }.boxed()
121 });
122
123 match futures::ready!(closing.poll_unpin(cx)) {
124 quinn::ConnectionError::LocallyClosed => {}
126 error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))),
127 };
128
129 Poll::Ready(Ok(()))
130 }
131}