libp2p_webrtc_utils/stream/
drop_listener.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    future::Future,
23    io,
24    pin::Pin,
25    task::{Context, Poll},
26};
27
28use futures::{
29    channel::{oneshot, oneshot::Canceled},
30    AsyncRead, AsyncWrite, FutureExt, SinkExt,
31};
32
33use crate::{
34    proto::{Flag, Message},
35    stream::framed_dc::FramedDc,
36};
37
38#[must_use]
39pub struct DropListener<T> {
40    state: State<T>,
41}
42
43impl<T> DropListener<T> {
44    pub fn new(stream: FramedDc<T>, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
45        Self {
46            state: State::Idle { stream, receiver },
47        }
48    }
49}
50
51enum State<T> {
52    /// The [`DropListener`] is idle and waiting to be activated.
53    Idle {
54        stream: FramedDc<T>,
55        receiver: oneshot::Receiver<GracefullyClosed>,
56    },
57    /// The stream got dropped and we are sending a reset flag.
58    SendingReset {
59        stream: FramedDc<T>,
60    },
61    Flushing {
62        stream: FramedDc<T>,
63    },
64    /// Bad state transition.
65    Poisoned,
66}
67
68impl<T> Future for DropListener<T>
69where
70    T: AsyncRead + AsyncWrite + Unpin,
71{
72    type Output = io::Result<()>;
73
74    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75        let state = &mut self.get_mut().state;
76
77        loop {
78            match std::mem::replace(state, State::Poisoned) {
79                State::Idle {
80                    stream,
81                    mut receiver,
82                } => match receiver.poll_unpin(cx) {
83                    Poll::Ready(Ok(GracefullyClosed {})) => {
84                        return Poll::Ready(Ok(()));
85                    }
86                    Poll::Ready(Err(Canceled)) => {
87                        tracing::info!("Stream dropped without graceful close, sending Reset");
88                        *state = State::SendingReset { stream };
89                        continue;
90                    }
91                    Poll::Pending => {
92                        *state = State::Idle { stream, receiver };
93                        return Poll::Pending;
94                    }
95                },
96                State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? {
97                    Poll::Ready(()) => {
98                        stream.start_send_unpin(Message {
99                            flag: Some(Flag::RESET),
100                            message: None,
101                        })?;
102                        *state = State::Flushing { stream };
103                        continue;
104                    }
105                    Poll::Pending => {
106                        *state = State::SendingReset { stream };
107                        return Poll::Pending;
108                    }
109                },
110                State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? {
111                    Poll::Ready(()) => return Poll::Ready(Ok(())),
112                    Poll::Pending => {
113                        *state = State::Flushing { stream };
114                        return Poll::Pending;
115                    }
116                },
117                State::Poisoned => {
118                    unreachable!()
119                }
120            }
121        }
122    }
123}
124
125/// Indicates that our stream got gracefully closed.
126pub struct GracefullyClosed {}