libp2p_webrtc_utils/stream/drop_listener.rs
1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22 future::Future,
23 io,
24 pin::Pin,
25 task::{Context, Poll},
26};
27
28use futures::{
29 channel::{oneshot, oneshot::Canceled},
30 AsyncRead, AsyncWrite, FutureExt, SinkExt,
31};
32
33use crate::{
34 proto::{Flag, Message},
35 stream::framed_dc::FramedDc,
36};
37
38#[must_use]
39pub struct DropListener<T> {
40 state: State<T>,
41}
42
43impl<T> DropListener<T> {
44 pub fn new(stream: FramedDc<T>, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
45 Self {
46 state: State::Idle { stream, receiver },
47 }
48 }
49}
50
51enum State<T> {
52 /// The [`DropListener`] is idle and waiting to be activated.
53 Idle {
54 stream: FramedDc<T>,
55 receiver: oneshot::Receiver<GracefullyClosed>,
56 },
57 /// The stream got dropped and we are sending a reset flag.
58 SendingReset {
59 stream: FramedDc<T>,
60 },
61 Flushing {
62 stream: FramedDc<T>,
63 },
64 /// Bad state transition.
65 Poisoned,
66}
67
68impl<T> Future for DropListener<T>
69where
70 T: AsyncRead + AsyncWrite + Unpin,
71{
72 type Output = io::Result<()>;
73
74 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 let state = &mut self.get_mut().state;
76
77 loop {
78 match std::mem::replace(state, State::Poisoned) {
79 State::Idle {
80 stream,
81 mut receiver,
82 } => match receiver.poll_unpin(cx) {
83 Poll::Ready(Ok(GracefullyClosed {})) => {
84 return Poll::Ready(Ok(()));
85 }
86 Poll::Ready(Err(Canceled)) => {
87 tracing::info!("Stream dropped without graceful close, sending Reset");
88 *state = State::SendingReset { stream };
89 continue;
90 }
91 Poll::Pending => {
92 *state = State::Idle { stream, receiver };
93 return Poll::Pending;
94 }
95 },
96 State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? {
97 Poll::Ready(()) => {
98 stream.start_send_unpin(Message {
99 flag: Some(Flag::RESET),
100 message: None,
101 })?;
102 *state = State::Flushing { stream };
103 continue;
104 }
105 Poll::Pending => {
106 *state = State::SendingReset { stream };
107 return Poll::Pending;
108 }
109 },
110 State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? {
111 Poll::Ready(()) => return Poll::Ready(Ok(())),
112 Poll::Pending => {
113 *state = State::Flushing { stream };
114 return Poll::Pending;
115 }
116 },
117 State::Poisoned => {
118 unreachable!()
119 }
120 }
121 }
122 }
123}
124
125/// Indicates that our stream got gracefully closed.
126pub struct GracefullyClosed {}