libp2p_quic/
connection.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod connecting;
22mod stream;
23
24use std::{
25    pin::Pin,
26    task::{Context, Poll},
27};
28
29pub use connecting::Connecting;
30use futures::{future::BoxFuture, FutureExt};
31use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
32pub use stream::Stream;
33
34use crate::{ConnectionError, Error};
35
36/// State for a single opened QUIC connection.
37pub struct Connection {
38    /// Underlying connection.
39    connection: quinn::Connection,
40    /// Future for accepting a new incoming bidirectional stream.
41    incoming: Option<
42        BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
43    >,
44    /// Future for opening a new outgoing bidirectional stream.
45    outgoing: Option<
46        BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>,
47    >,
48    /// Future to wait for the connection to be closed.
49    closing: Option<BoxFuture<'static, quinn::ConnectionError>>,
50}
51
52impl Connection {
53    /// Build a [`Connection`] from raw components.
54    ///
55    /// This function assumes that the [`quinn::Connection`] is completely fresh and none of
56    /// its methods has ever been called. Failure to comply might lead to logic errors and panics.
57    fn new(connection: quinn::Connection) -> Self {
58        Self {
59            connection,
60            incoming: None,
61            outgoing: None,
62            closing: None,
63        }
64    }
65}
66
67impl StreamMuxer for Connection {
68    type Substream = Stream;
69    type Error = Error;
70
71    fn poll_inbound(
72        self: Pin<&mut Self>,
73        cx: &mut Context<'_>,
74    ) -> Poll<Result<Self::Substream, Self::Error>> {
75        let this = self.get_mut();
76
77        let incoming = this.incoming.get_or_insert_with(|| {
78            let connection = this.connection.clone();
79            async move { connection.accept_bi().await }.boxed()
80        });
81
82        let (send, recv) = futures::ready!(incoming.poll_unpin(cx)).map_err(ConnectionError)?;
83        this.incoming.take();
84        let stream = Stream::new(send, recv);
85        Poll::Ready(Ok(stream))
86    }
87
88    fn poll_outbound(
89        self: Pin<&mut Self>,
90        cx: &mut Context<'_>,
91    ) -> Poll<Result<Self::Substream, Self::Error>> {
92        let this = self.get_mut();
93
94        let outgoing = this.outgoing.get_or_insert_with(|| {
95            let connection = this.connection.clone();
96            async move { connection.open_bi().await }.boxed()
97        });
98
99        let (send, recv) = futures::ready!(outgoing.poll_unpin(cx)).map_err(ConnectionError)?;
100        this.outgoing.take();
101        let stream = Stream::new(send, recv);
102        Poll::Ready(Ok(stream))
103    }
104
105    fn poll(
106        self: Pin<&mut Self>,
107        _cx: &mut Context<'_>,
108    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
109        // TODO: If connection migration is enabled (currently disabled) address
110        // change on the connection needs to be handled.
111        Poll::Pending
112    }
113
114    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        let this = self.get_mut();
116
117        let closing = this.closing.get_or_insert_with(|| {
118            this.connection.close(From::from(0u32), &[]);
119            let connection = this.connection.clone();
120            async move { connection.closed().await }.boxed()
121        });
122
123        match futures::ready!(closing.poll_unpin(cx)) {
124            // Expected error given that `connection.close` was called above.
125            quinn::ConnectionError::LocallyClosed => {}
126            error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))),
127        };
128
129        Poll::Ready(Ok(()))
130    }
131}