libp2p_mplex/
io.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21pub(crate) use std::io::{Error, Result};
22use std::{
23    cmp,
24    collections::VecDeque,
25    fmt, io, mem,
26    sync::Arc,
27    task::{Context, Poll, Waker},
28};
29
30use asynchronous_codec::Framed;
31use bytes::Bytes;
32use futures::{
33    prelude::*,
34    ready,
35    stream::Fuse,
36    task::{waker_ref, ArcWake, AtomicWaker, WakerRef},
37};
38use nohash_hasher::{IntMap, IntSet};
39use parking_lot::Mutex;
40use smallvec::SmallVec;
41
42use crate::{
43    codec::{Codec, Frame, LocalStreamId, RemoteStreamId},
44    Config, MaxBufferBehaviour,
45};
46/// A connection identifier.
47///
48/// Randomly generated and mainly intended to improve log output
49/// by scoping substream IDs to a connection.
50#[derive(Clone, Copy)]
51struct ConnectionId(u64);
52
53impl fmt::Debug for ConnectionId {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        write!(f, "{:16x}", self.0)
56    }
57}
58
59impl fmt::Display for ConnectionId {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        write!(f, "{:16x}", self.0)
62    }
63}
64/// A multiplexed I/O stream.
65pub(crate) struct Multiplexed<C> {
66    /// A unique ID for the multiplexed stream (i.e. connection).
67    id: ConnectionId,
68    /// The current operating status of the multiplex stream.
69    status: Status,
70    /// The underlying multiplexed I/O stream.
71    io: Fuse<Framed<C, Codec>>,
72    /// The configuration.
73    config: Config,
74    /// The buffer of new inbound substreams that have not yet
75    /// been drained by `poll_next_stream`. This buffer is
76    /// effectively bounded by `max_substreams - substreams.len()`.
77    open_buffer: VecDeque<LocalStreamId>,
78    /// Whether a flush is pending due to one or more new outbound
79    /// `Open` frames, before reading frames can proceed.
80    pending_flush_open: IntSet<LocalStreamId>,
81    /// The stream that currently blocks reading for all streams
82    /// due to a full buffer, if any. Only applicable for use
83    /// with [`MaxBufferBehaviour::Block`].
84    blocking_stream: Option<LocalStreamId>,
85    /// Pending frames to send at the next opportunity.
86    ///
87    /// An opportunity for sending pending frames is every flush
88    /// or read operation. In the former case, sending of all
89    /// pending frames must complete before the flush can complete.
90    /// In the latter case, the read operation can proceed even
91    /// if some or all of the pending frames cannot be sent.
92    pending_frames: VecDeque<Frame<LocalStreamId>>,
93    /// The managed substreams.
94    substreams: IntMap<LocalStreamId, SubstreamState>,
95    /// The ID for the next outbound substream.
96    next_outbound_stream_id: LocalStreamId,
97    /// Registry of wakers for pending tasks interested in reading.
98    notifier_read: Arc<NotifierRead>,
99    /// Registry of wakers for pending tasks interested in writing.
100    notifier_write: Arc<NotifierWrite>,
101    /// Registry of wakers for pending tasks interested in opening
102    /// an outbound substream, when the configured limit is reached.
103    ///
104    /// As soon as the number of substreams drops below this limit,
105    /// these tasks are woken.
106    notifier_open: NotifierOpen,
107}
108
109/// The operation status of a `Multiplexed` I/O stream.
110#[derive(Debug)]
111enum Status {
112    /// The stream is considered open and healthy.
113    Open,
114    /// The stream has been actively closed.
115    Closed,
116    /// The stream has encountered a fatal error.
117    Err(io::Error),
118}
119
120impl<C> Multiplexed<C>
121where
122    C: AsyncRead + AsyncWrite + Unpin,
123{
124    /// Creates a new multiplexed I/O stream.
125    pub(crate) fn new(io: C, config: Config) -> Self {
126        let id = ConnectionId(rand::random());
127        tracing::debug!(connection=%id, "New multiplexed connection");
128        Multiplexed {
129            id,
130            config,
131            status: Status::Open,
132            io: Framed::new(io, Codec::new()).fuse(),
133            open_buffer: Default::default(),
134            substreams: Default::default(),
135            pending_flush_open: Default::default(),
136            pending_frames: Default::default(),
137            blocking_stream: None,
138            next_outbound_stream_id: LocalStreamId::dialer(0),
139            notifier_read: Arc::new(NotifierRead {
140                read_stream: Mutex::new(Default::default()),
141                next_stream: AtomicWaker::new(),
142            }),
143            notifier_write: Arc::new(NotifierWrite {
144                pending: Mutex::new(Default::default()),
145            }),
146            notifier_open: NotifierOpen {
147                pending: Default::default(),
148            },
149        }
150    }
151
152    /// Flushes the underlying I/O stream.
153    pub(crate) fn poll_flush(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
154        match &self.status {
155            Status::Closed => return Poll::Ready(Ok(())),
156            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
157            Status::Open => {}
158        }
159
160        // Send any pending frames.
161        ready!(self.send_pending_frames(cx))?;
162
163        // Flush the underlying I/O stream.
164        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
165        match ready!(self.io.poll_flush_unpin(&mut Context::from_waker(&waker))) {
166            Err(e) => Poll::Ready(self.on_error(e)),
167            Ok(()) => {
168                self.pending_flush_open = Default::default();
169                Poll::Ready(Ok(()))
170            }
171        }
172    }
173
174    /// Closes the underlying I/O stream.
175    ///
176    /// > **Note**: No `Close` or `Reset` frames are sent on open substreams
177    /// > before closing the underlying connection. However, the connection
178    /// > close implies a flush of any frames already sent.
179    pub(crate) fn poll_close(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
180        match &self.status {
181            Status::Closed => return Poll::Ready(Ok(())),
182            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
183            Status::Open => {}
184        }
185
186        // Note: We do not make the effort to send pending `Reset` frames
187        // here, we only close (and thus flush) the underlying I/O stream.
188
189        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
190        match self.io.poll_close_unpin(&mut Context::from_waker(&waker)) {
191            Poll::Pending => Poll::Pending,
192            Poll::Ready(Err(e)) => Poll::Ready(self.on_error(e)),
193            Poll::Ready(Ok(())) => {
194                self.pending_frames = VecDeque::new();
195                // We do not support read-after-close on the underlying
196                // I/O stream, hence clearing the buffer and substreams.
197                self.open_buffer = Default::default();
198                self.substreams = Default::default();
199                self.status = Status::Closed;
200                Poll::Ready(Ok(()))
201            }
202        }
203    }
204
205    /// Waits for a new inbound substream, returning the corresponding `LocalStreamId`.
206    ///
207    /// If the number of already used substreams (i.e. substreams that have not
208    /// yet been dropped via `drop_substream`) reaches the configured
209    /// `max_substreams`, any further inbound substreams are immediately reset
210    /// until existing substreams are dropped.
211    ///
212    /// Data frames read for existing substreams in the context of this
213    /// method call are buffered and tasks interested in reading from
214    /// these substreams are woken. If a substream buffer is full and
215    /// [`MaxBufferBehaviour::Block`] is used, this method is blocked
216    /// (i.e. `Pending`) on some task reading from the substream whose
217    /// buffer is full.
218    pub(crate) fn poll_next_stream(&mut self, cx: &Context<'_>) -> Poll<io::Result<LocalStreamId>> {
219        self.guard_open()?;
220
221        // Try to read from the buffer first.
222        if let Some(stream_id) = self.open_buffer.pop_back() {
223            return Poll::Ready(Ok(stream_id));
224        }
225
226        debug_assert!(self.open_buffer.is_empty());
227        let mut num_buffered = 0;
228
229        loop {
230            // Whenever we may have completely filled a substream
231            // buffer while waiting for the next inbound stream,
232            // yield to give the current task a chance to read
233            // from the respective substreams.
234            if num_buffered == self.config.max_buffer_len {
235                cx.waker().wake_by_ref();
236                return Poll::Pending;
237            }
238
239            // Wait for the next inbound `Open` frame.
240            match ready!(self.poll_read_frame(cx, None))? {
241                Frame::Open { stream_id } => {
242                    if let Some(id) = self.on_open(stream_id)? {
243                        return Poll::Ready(Ok(id));
244                    }
245                }
246                Frame::Data { stream_id, data } => {
247                    self.buffer(stream_id.into_local(), data)?;
248                    num_buffered += 1;
249                }
250                Frame::Close { stream_id } => {
251                    self.on_close(stream_id.into_local());
252                }
253                Frame::Reset { stream_id } => self.on_reset(stream_id.into_local()),
254            }
255        }
256    }
257
258    /// Creates a new (outbound) substream, returning the allocated stream ID.
259    pub(crate) fn poll_open_stream(&mut self, cx: &Context<'_>) -> Poll<io::Result<LocalStreamId>> {
260        self.guard_open()?;
261
262        // Check the stream limits.
263        if self.substreams.len() >= self.config.max_substreams {
264            tracing::debug!(
265                connection=%self.id,
266                total_substreams=%self.substreams.len(),
267                max_substreams=%self.config.max_substreams,
268                "Maximum number of substreams reached"
269            );
270            self.notifier_open.register(cx.waker());
271            return Poll::Pending;
272        }
273
274        // Send the `Open` frame.
275        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
276        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
277            Ok(()) => {
278                let stream_id = self.next_outbound_stream_id();
279                let frame = Frame::Open { stream_id };
280                match self.io.start_send_unpin(frame) {
281                    Ok(()) => {
282                        self.substreams.insert(
283                            stream_id,
284                            SubstreamState::Open {
285                                buf: Default::default(),
286                            },
287                        );
288                        tracing::debug!(
289                            connection=%self.id,
290                            substream=%stream_id,
291                            total_substreams=%self.substreams.len(),
292                            "New outbound substream"
293                        );
294                        // The flush is delayed and the `Open` frame may be sent
295                        // together with other frames in the same transport packet.
296                        self.pending_flush_open.insert(stream_id);
297                        Poll::Ready(Ok(stream_id))
298                    }
299                    Err(e) => Poll::Ready(self.on_error(e)),
300                }
301            }
302            Err(e) => Poll::Ready(self.on_error(e)),
303        }
304    }
305
306    /// Immediately drops a substream.
307    ///
308    /// All locally allocated resources for the dropped substream
309    /// are freed and the substream becomes unavailable for both
310    /// reading and writing immediately. The remote is informed
311    /// based on the current state of the substream:
312    ///
313    /// * If the substream was open, a `Reset` frame is sent at the next opportunity.
314    /// * If the substream was half-closed, i.e. a `Close` frame has already been sent, nothing
315    ///   further happens.
316    /// * If the substream was half-closed by the remote, i.e. a `Close` frame has already been
317    ///   received, a `Close` frame is sent at the next opportunity.
318    ///
319    /// If the multiplexed stream is closed or encountered
320    /// an error earlier, or there is no known substream with
321    /// the given ID, this is a no-op.
322    ///
323    /// > **Note**: All substreams obtained via `poll_next_stream`
324    /// > or `poll_open_stream` must eventually be "dropped" by
325    /// > calling this method when they are no longer used.
326    pub(crate) fn drop_stream(&mut self, id: LocalStreamId) {
327        // Check if the underlying stream is ok.
328        match self.status {
329            Status::Closed | Status::Err(_) => return,
330            Status::Open => {}
331        }
332
333        // If there is still a task waker interested in reading from that
334        // stream, wake it to avoid leaving it dangling and notice that
335        // the stream is gone. In contrast, wakers for write operations
336        // are all woken on every new write opportunity.
337        self.notifier_read.wake_read_stream(id);
338
339        // Remove the substream, scheduling pending frames as necessary.
340        match self.substreams.remove(&id) {
341            None => {}
342            Some(state) => {
343                // If we fell below the substream limit, notify tasks that had
344                // interest in opening an outbound substream earlier.
345                let below_limit = self.substreams.len() == self.config.max_substreams - 1;
346                if below_limit {
347                    self.notifier_open.wake_all();
348                }
349                // Schedule any pending final frames to send, if necessary.
350                match state {
351                    SubstreamState::Closed { .. } => {}
352                    SubstreamState::SendClosed { .. } => {}
353                    SubstreamState::Reset { .. } => {}
354                    SubstreamState::RecvClosed { .. } => {
355                        if self.check_max_pending_frames().is_err() {
356                            return;
357                        }
358                        tracing::trace!(
359                            connection=%self.id,
360                            substream=%id,
361                            "Pending close for substream"
362                        );
363                        self.pending_frames
364                            .push_front(Frame::Close { stream_id: id });
365                    }
366                    SubstreamState::Open { .. } => {
367                        if self.check_max_pending_frames().is_err() {
368                            return;
369                        }
370                        tracing::trace!(
371                            connection=%self.id,
372                            substream=%id,
373                            "Pending reset for substream"
374                        );
375                        self.pending_frames
376                            .push_front(Frame::Reset { stream_id: id });
377                    }
378                }
379            }
380        }
381    }
382
383    /// Writes data to a substream.
384    pub(crate) fn poll_write_stream(
385        &mut self,
386        cx: &Context<'_>,
387        id: LocalStreamId,
388        buf: &[u8],
389    ) -> Poll<io::Result<usize>> {
390        self.guard_open()?;
391
392        // Check if the stream is open for writing.
393        match self.substreams.get(&id) {
394            None | Some(SubstreamState::Reset { .. }) => {
395                return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
396            }
397            Some(SubstreamState::SendClosed { .. }) | Some(SubstreamState::Closed { .. }) => {
398                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
399            }
400            Some(SubstreamState::Open { .. }) | Some(SubstreamState::RecvClosed { .. }) => {
401                // Substream is writeable. Continue.
402            }
403        }
404
405        // Determine the size of the frame to send.
406        let frame_len = cmp::min(buf.len(), self.config.split_send_size);
407
408        // Send the data frame.
409        ready!(self.poll_send_frame(cx, || {
410            let data = Bytes::copy_from_slice(&buf[..frame_len]);
411            Frame::Data {
412                stream_id: id,
413                data,
414            }
415        }))?;
416
417        Poll::Ready(Ok(frame_len))
418    }
419
420    /// Reads data from a substream.
421    ///
422    /// Data frames read for substreams other than `id` in the context
423    /// of this method call are buffered and tasks interested in reading
424    /// from these substreams are woken. If a substream buffer is full
425    /// and [`MaxBufferBehaviour::Block`] is used, reading the next data
426    /// frame for `id` is blocked on some task reading from the blocking
427    /// stream's full buffer first.
428    ///
429    /// New inbound substreams (i.e. `Open` frames) read in the context of
430    /// this method call are buffered up to the configured `max_substreams`
431    /// and under consideration of the number of already used substreams,
432    /// thereby waking the task that last called `poll_next_stream`, if any.
433    /// Inbound substreams received in excess of that limit are immediately reset.
434    pub(crate) fn poll_read_stream(
435        &mut self,
436        cx: &Context<'_>,
437        id: LocalStreamId,
438    ) -> Poll<io::Result<Option<Bytes>>> {
439        self.guard_open()?;
440
441        // Try to read from the buffer first.
442        if let Some(state) = self.substreams.get_mut(&id) {
443            let buf = state.recv_buf();
444            if !buf.is_empty() {
445                if self.blocking_stream == Some(id) {
446                    // Unblock reading new frames.
447                    self.blocking_stream = None;
448                    ArcWake::wake_by_ref(&self.notifier_read);
449                }
450                let data = buf.remove(0);
451                return Poll::Ready(Ok(Some(data)));
452            }
453            // If the stream buffer "spilled" onto the heap, free that memory.
454            buf.shrink_to_fit();
455        }
456
457        let mut num_buffered = 0;
458
459        loop {
460            // Whenever we may have completely filled a substream
461            // buffer of another substream while waiting for the
462            // next frame for `id`, yield to give the current task
463            // a chance to read from the other substream(s).
464            if num_buffered == self.config.max_buffer_len {
465                cx.waker().wake_by_ref();
466                return Poll::Pending;
467            }
468
469            // Check if the targeted substream (if any) reached EOF.
470            if !self.can_read(&id) {
471                // Note: Contrary to what is recommended by the spec, we must
472                // return "EOF" also when the stream has been reset by the
473                // remote, as the `StreamMuxer::read_substream` contract only
474                // permits errors on "terminal" conditions, e.g. if the connection
475                // has been closed or on protocol misbehaviour.
476                return Poll::Ready(Ok(None));
477            }
478
479            // Read the next frame.
480            match ready!(self.poll_read_frame(cx, Some(id)))? {
481                Frame::Data { data, stream_id } if stream_id.into_local() == id => {
482                    return Poll::Ready(Ok(Some(data)))
483                }
484                Frame::Data { stream_id, data } => {
485                    // The data frame is for a different stream than the one
486                    // currently being polled, so it needs to be buffered and
487                    // the interested tasks notified.
488                    self.buffer(stream_id.into_local(), data)?;
489                    num_buffered += 1;
490                }
491                frame @ Frame::Open { .. } => {
492                    if let Some(id) = self.on_open(frame.remote_id())? {
493                        self.open_buffer.push_front(id);
494                        tracing::trace!(
495                            connection=%self.id,
496                            inbound_stream=%id,
497                            inbound_buffer_len=%self.open_buffer.len(),
498                            "Buffered new inbound stream"
499                        );
500                        self.notifier_read.wake_next_stream();
501                    }
502                }
503                Frame::Close { stream_id } => {
504                    let stream_id = stream_id.into_local();
505                    self.on_close(stream_id);
506                    if id == stream_id {
507                        return Poll::Ready(Ok(None));
508                    }
509                }
510                Frame::Reset { stream_id } => {
511                    let stream_id = stream_id.into_local();
512                    self.on_reset(stream_id);
513                    if id == stream_id {
514                        return Poll::Ready(Ok(None));
515                    }
516                }
517            }
518        }
519    }
520
521    /// Flushes a substream.
522    ///
523    /// > **Note**: This is equivalent to `poll_flush()`, i.e. to flushing
524    /// > all substreams, except that this operation returns an error if
525    /// > the underlying I/O stream is already closed.
526    pub(crate) fn poll_flush_stream(
527        &mut self,
528        cx: &Context<'_>,
529        id: LocalStreamId,
530    ) -> Poll<io::Result<()>> {
531        self.guard_open()?;
532
533        ready!(self.poll_flush(cx))?;
534        tracing::trace!(
535            connection=%self.id,
536            substream=%id,
537            "Flushed substream"
538        );
539
540        Poll::Ready(Ok(()))
541    }
542
543    /// Closes a stream for writing.
544    ///
545    /// > **Note**: As opposed to `poll_close()`, a flush it not implied.
546    pub(crate) fn poll_close_stream(
547        &mut self,
548        cx: &Context<'_>,
549        id: LocalStreamId,
550    ) -> Poll<io::Result<()>> {
551        self.guard_open()?;
552
553        match self.substreams.remove(&id) {
554            None => Poll::Ready(Ok(())),
555            Some(SubstreamState::SendClosed { buf }) => {
556                self.substreams
557                    .insert(id, SubstreamState::SendClosed { buf });
558                Poll::Ready(Ok(()))
559            }
560            Some(SubstreamState::Closed { buf }) => {
561                self.substreams.insert(id, SubstreamState::Closed { buf });
562                Poll::Ready(Ok(()))
563            }
564            Some(SubstreamState::Reset { buf }) => {
565                self.substreams.insert(id, SubstreamState::Reset { buf });
566                Poll::Ready(Ok(()))
567            }
568            Some(SubstreamState::Open { buf }) => {
569                if self
570                    .poll_send_frame(cx, || Frame::Close { stream_id: id })?
571                    .is_pending()
572                {
573                    self.substreams.insert(id, SubstreamState::Open { buf });
574                    Poll::Pending
575                } else {
576                    tracing::debug!(
577                        connection=%self.id,
578                        substream=%id,
579                        "Closed substream (half-close)"
580                    );
581                    self.substreams
582                        .insert(id, SubstreamState::SendClosed { buf });
583                    Poll::Ready(Ok(()))
584                }
585            }
586            Some(SubstreamState::RecvClosed { buf }) => {
587                if self
588                    .poll_send_frame(cx, || Frame::Close { stream_id: id })?
589                    .is_pending()
590                {
591                    self.substreams
592                        .insert(id, SubstreamState::RecvClosed { buf });
593                    Poll::Pending
594                } else {
595                    tracing::debug!(
596                        connection=%self.id,
597                        substream=%id,
598                        "Closed substream"
599                    );
600                    self.substreams.insert(id, SubstreamState::Closed { buf });
601                    Poll::Ready(Ok(()))
602                }
603            }
604        }
605    }
606
607    /// Sends a (lazily constructed) mplex frame on the underlying I/O stream.
608    ///
609    /// The frame is only constructed if the underlying sink is ready to
610    /// send another frame.
611    fn poll_send_frame<F>(&mut self, cx: &Context<'_>, frame: F) -> Poll<io::Result<()>>
612    where
613        F: FnOnce() -> Frame<LocalStreamId>,
614    {
615        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
616        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
617            Ok(()) => {
618                let frame = frame();
619                tracing::trace!(connection=%self.id, ?frame, "Sending frame");
620                match self.io.start_send_unpin(frame) {
621                    Ok(()) => Poll::Ready(Ok(())),
622                    Err(e) => Poll::Ready(self.on_error(e)),
623                }
624            }
625            Err(e) => Poll::Ready(self.on_error(e)),
626        }
627    }
628
629    /// Reads the next frame from the underlying I/O stream.
630    ///
631    /// The given `stream_id` identifies the substream in which
632    /// the current task is interested and wants to be woken up for,
633    /// in case new frames can be read. `None` means interest in
634    /// frames for any substream.
635    fn poll_read_frame(
636        &mut self,
637        cx: &Context<'_>,
638        stream_id: Option<LocalStreamId>,
639    ) -> Poll<io::Result<Frame<RemoteStreamId>>> {
640        // Try to send pending frames, if there are any, without blocking,
641        if let Poll::Ready(Err(e)) = self.send_pending_frames(cx) {
642            return Poll::Ready(Err(e));
643        }
644
645        // Perform any pending flush before reading.
646        if let Some(id) = &stream_id {
647            if self.pending_flush_open.contains(id) {
648                tracing::trace!(
649                    connection=%self.id,
650                    substream=%id,
651                    "Executing pending flush for substream"
652                );
653                ready!(self.poll_flush(cx))?;
654                self.pending_flush_open = Default::default();
655            }
656        }
657
658        // Check if there is a blocked stream.
659        if let Some(blocked_id) = &self.blocking_stream {
660            // We have a blocked stream and cannot continue reading
661            // new frames until frames are taken from the blocked stream's
662            // buffer.
663
664            // Try to wake a pending reader of the blocked stream.
665            if !self.notifier_read.wake_read_stream(*blocked_id) {
666                // No task dedicated to the blocked stream woken, so schedule
667                // this task again to have a chance at progress.
668                tracing::trace!(
669                    connection=%self.id,
670                    "No task to read from blocked stream. Waking current task."
671                );
672                cx.waker().wake_by_ref();
673            } else if let Some(id) = stream_id {
674                // We woke some other task, but are still interested in
675                // reading `Data` frames from the current stream when unblocked.
676                debug_assert!(
677                    blocked_id != &id,
678                    "Unexpected attempt at reading a new \
679                    frame from a substream with a full buffer."
680                );
681                let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id);
682            } else {
683                // We woke some other task but are still interested in
684                // reading new `Open` frames when unblocked.
685                let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker());
686            }
687
688            return Poll::Pending;
689        }
690
691        // Try to read another frame from the underlying I/O stream.
692        let waker = match stream_id {
693            Some(id) => NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id),
694            None => NotifierRead::register_next_stream(&self.notifier_read, cx.waker()),
695        };
696        match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) {
697            Some(Ok(frame)) => {
698                tracing::trace!(connection=%self.id, ?frame, "Received frame");
699                Poll::Ready(Ok(frame))
700            }
701            Some(Err(e)) => Poll::Ready(self.on_error(e)),
702            None => Poll::Ready(self.on_error(io::ErrorKind::UnexpectedEof.into())),
703        }
704    }
705
706    /// Processes an inbound `Open` frame.
707    fn on_open(&mut self, id: RemoteStreamId) -> io::Result<Option<LocalStreamId>> {
708        let id = id.into_local();
709
710        if self.substreams.contains_key(&id) {
711            tracing::debug!(
712                connection=%self.id,
713                substream=%id,
714                "Received unexpected `Open` frame for open substream",
715            );
716            return self.on_error(io::Error::new(
717                io::ErrorKind::Other,
718                "Protocol error: Received `Open` frame for open substream.",
719            ));
720        }
721
722        if self.substreams.len() >= self.config.max_substreams {
723            tracing::debug!(
724                connection=%self.id,
725                max_substreams=%self.config.max_substreams,
726                "Maximum number of substreams exceeded"
727            );
728            self.check_max_pending_frames()?;
729            tracing::debug!(
730                connection=%self.id,
731                substream=%id,
732                "Pending reset for new substream"
733            );
734            self.pending_frames
735                .push_front(Frame::Reset { stream_id: id });
736            return Ok(None);
737        }
738
739        self.substreams.insert(
740            id,
741            SubstreamState::Open {
742                buf: Default::default(),
743            },
744        );
745
746        tracing::debug!(
747            connection=%self.id,
748            substream=%id,
749            total_substreams=%self.substreams.len(),
750            "New inbound substream"
751        );
752
753        Ok(Some(id))
754    }
755
756    /// Processes an inbound `Reset` frame.
757    fn on_reset(&mut self, id: LocalStreamId) {
758        if let Some(state) = self.substreams.remove(&id) {
759            match state {
760                SubstreamState::Closed { .. } => {
761                    tracing::trace!(
762                        connection=%self.id,
763                        substream=%id,
764                        "Ignoring reset for mutually closed substream"
765                    );
766                }
767                SubstreamState::Reset { .. } => {
768                    tracing::trace!(
769                        connection=%self.id,
770                        substream=%id,
771                        "Ignoring redundant reset for already reset substream"
772                    );
773                }
774                SubstreamState::RecvClosed { buf }
775                | SubstreamState::SendClosed { buf }
776                | SubstreamState::Open { buf } => {
777                    tracing::debug!(
778                        connection=%self.id,
779                        substream=%id,
780                        "Substream reset by remote"
781                    );
782                    self.substreams.insert(id, SubstreamState::Reset { buf });
783                    // Notify tasks interested in reading from that stream,
784                    // so they may read the EOF.
785                    NotifierRead::wake_read_stream(&self.notifier_read, id);
786                }
787            }
788        } else {
789            tracing::trace!(
790                connection=%self.id,
791                substream=%id,
792                "Ignoring `Reset` for unknown substream, possibly dropped earlier"
793            );
794        }
795    }
796
797    /// Processes an inbound `Close` frame.
798    fn on_close(&mut self, id: LocalStreamId) {
799        if let Some(state) = self.substreams.remove(&id) {
800            match state {
801                SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
802                    tracing::debug!(
803                        connection=%self.id,
804                        substream=%id,
805                        "Ignoring `Close` frame for closed substream"
806                    );
807                    self.substreams.insert(id, state);
808                }
809                SubstreamState::Reset { buf } => {
810                    tracing::debug!(
811                        connection=%self.id,
812                        substream=%id,
813                        "Ignoring `Close` frame for already reset substream"
814                    );
815                    self.substreams.insert(id, SubstreamState::Reset { buf });
816                }
817                SubstreamState::SendClosed { buf } => {
818                    tracing::debug!(
819                        connection=%self.id,
820                        substream=%id,
821                        "Substream closed by remote (SendClosed -> Closed)"
822                    );
823                    self.substreams.insert(id, SubstreamState::Closed { buf });
824                    // Notify tasks interested in reading, so they may read the EOF.
825                    self.notifier_read.wake_read_stream(id);
826                }
827                SubstreamState::Open { buf } => {
828                    tracing::debug!(
829                        connection=%self.id,
830                        substream=%id,
831                        "Substream closed by remote (Open -> RecvClosed)"
832                    );
833                    self.substreams
834                        .insert(id, SubstreamState::RecvClosed { buf });
835                    // Notify tasks interested in reading, so they may read the EOF.
836                    self.notifier_read.wake_read_stream(id);
837                }
838            }
839        } else {
840            tracing::trace!(
841                connection=%self.id,
842                substream=%id,
843                "Ignoring `Close` for unknown substream, possibly dropped earlier."
844            );
845        }
846    }
847
848    /// Generates the next outbound stream ID.
849    fn next_outbound_stream_id(&mut self) -> LocalStreamId {
850        let id = self.next_outbound_stream_id;
851        self.next_outbound_stream_id = self.next_outbound_stream_id.next();
852        id
853    }
854
855    /// Checks whether a substream is open for reading.
856    fn can_read(&self, id: &LocalStreamId) -> bool {
857        matches!(
858            self.substreams.get(id),
859            Some(SubstreamState::Open { .. }) | Some(SubstreamState::SendClosed { .. })
860        )
861    }
862
863    /// Sends pending frames, without flushing.
864    fn send_pending_frames(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
865        while let Some(frame) = self.pending_frames.pop_back() {
866            if self.poll_send_frame(cx, || frame.clone())?.is_pending() {
867                self.pending_frames.push_back(frame);
868                return Poll::Pending;
869            }
870        }
871
872        Poll::Ready(Ok(()))
873    }
874
875    /// Records a fatal error for the multiplexed I/O stream.
876    fn on_error<T>(&mut self, e: io::Error) -> io::Result<T> {
877        tracing::debug!(
878            connection=%self.id,
879            "Multiplexed connection failed: {:?}",
880            e
881        );
882        self.status = Status::Err(io::Error::new(e.kind(), e.to_string()));
883        self.pending_frames = Default::default();
884        self.substreams = Default::default();
885        self.open_buffer = Default::default();
886        Err(e)
887    }
888
889    /// Checks that the multiplexed stream has status `Ok`,
890    /// i.e. is not closed and did not encounter a fatal error.
891    fn guard_open(&self) -> io::Result<()> {
892        match &self.status {
893            Status::Closed => Err(io::Error::new(io::ErrorKind::Other, "Connection is closed")),
894            Status::Err(e) => Err(io::Error::new(e.kind(), e.to_string())),
895            Status::Open => Ok(()),
896        }
897    }
898
899    /// Checks that the permissible limit for pending outgoing frames
900    /// has not been reached.
901    fn check_max_pending_frames(&mut self) -> io::Result<()> {
902        if self.pending_frames.len() >= self.config.max_substreams + EXTRA_PENDING_FRAMES {
903            return self.on_error(io::Error::new(
904                io::ErrorKind::Other,
905                "Too many pending frames.",
906            ));
907        }
908        Ok(())
909    }
910
911    /// Buffers a data frame for a particular substream, if possible.
912    ///
913    /// If the new data frame exceeds the `max_buffer_len` for the buffer
914    /// of the substream, the behaviour depends on the configured
915    /// [`MaxBufferBehaviour`]. Note that the excess frame is still
916    /// buffered in that case (but no further frames will be).
917    ///
918    /// Fails the entire multiplexed stream if too many pending `Reset`
919    /// frames accumulate when using [`MaxBufferBehaviour::ResetStream`].
920    fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> {
921        let Some(state) = self.substreams.get_mut(&id) else {
922            tracing::trace!(
923                connection=%self.id,
924                substream=%id,
925                data=?data,
926                "Dropping data for unknown substream"
927            );
928            return Ok(());
929        };
930
931        let Some(buf) = state.recv_buf_open() else {
932            tracing::trace!(
933                connection=%self.id,
934                substream=%id,
935                data=?data,
936                "Dropping data for closed or reset substream",
937            );
938            return Ok(());
939        };
940
941        debug_assert!(buf.len() <= self.config.max_buffer_len);
942        tracing::trace!(
943            connection=%self.id,
944            substream=%id,
945            data=?data,
946            data_buffer=%buf.len() + 1,
947            "Buffering data for substream"
948        );
949        buf.push(data);
950        self.notifier_read.wake_read_stream(id);
951        if buf.len() > self.config.max_buffer_len {
952            tracing::debug!(
953                connection=%self.id,
954                substream=%id,
955                "Frame buffer of substream is full"
956            );
957            match self.config.max_buffer_behaviour {
958                MaxBufferBehaviour::ResetStream => {
959                    let buf = buf.clone();
960                    self.check_max_pending_frames()?;
961                    self.substreams.insert(id, SubstreamState::Reset { buf });
962                    tracing::debug!(
963                        connection=%self.id,
964                        substream=%id,
965                        "Pending reset for stream"
966                    );
967                    self.pending_frames
968                        .push_front(Frame::Reset { stream_id: id });
969                }
970                MaxBufferBehaviour::Block => {
971                    self.blocking_stream = Some(id);
972                }
973            }
974        }
975
976        Ok(())
977    }
978}
979
980type RecvBuf = SmallVec<[Bytes; 10]>;
981
982/// The operating states of a substream.
983#[derive(Clone, Debug)]
984enum SubstreamState {
985    /// An `Open` frame has been received or sent.
986    Open { buf: RecvBuf },
987    /// A `Close` frame has been sent, but the stream is still open
988    /// for reading (half-close).
989    SendClosed { buf: RecvBuf },
990    /// A `Close` frame has been received but the stream is still
991    /// open for writing (remote half-close).
992    RecvClosed { buf: RecvBuf },
993    /// A `Close` frame has been sent and received but the stream
994    /// has not yet been dropped and may still have buffered
995    /// frames to read.
996    Closed { buf: RecvBuf },
997    /// The stream has been reset by the local or remote peer but has
998    /// not yet been dropped and may still have buffered frames to read.
999    Reset { buf: RecvBuf },
1000}
1001
1002impl SubstreamState {
1003    /// Mutably borrows the substream's receive buffer.
1004    fn recv_buf(&mut self) -> &mut RecvBuf {
1005        match self {
1006            SubstreamState::Open { buf } => buf,
1007            SubstreamState::SendClosed { buf } => buf,
1008            SubstreamState::RecvClosed { buf } => buf,
1009            SubstreamState::Closed { buf } => buf,
1010            SubstreamState::Reset { buf } => buf,
1011        }
1012    }
1013
1014    /// Mutably borrows the substream's receive buffer if the substream
1015    /// is still open for reading, `None` otherwise.
1016    fn recv_buf_open(&mut self) -> Option<&mut RecvBuf> {
1017        match self {
1018            SubstreamState::Open { buf } => Some(buf),
1019            SubstreamState::SendClosed { buf } => Some(buf),
1020            SubstreamState::RecvClosed { .. } => None,
1021            SubstreamState::Closed { .. } => None,
1022            SubstreamState::Reset { .. } => None,
1023        }
1024    }
1025}
1026
1027struct NotifierRead {
1028    /// The waker of the currently pending task that last
1029    /// called `poll_next_stream`, if any.
1030    next_stream: AtomicWaker,
1031    /// The wakers of currently pending tasks that last
1032    /// called `poll_read_stream` for a particular substream.
1033    read_stream: Mutex<IntMap<LocalStreamId, Waker>>,
1034}
1035
1036impl NotifierRead {
1037    /// Registers a task to be woken up when new `Data` frames for a particular
1038    /// stream can be read.
1039    ///
1040    /// The returned waker should be passed to an I/O read operation
1041    /// that schedules a wakeup, if the operation is pending.
1042    #[must_use]
1043    fn register_read_stream<'a>(
1044        self: &'a Arc<Self>,
1045        waker: &Waker,
1046        id: LocalStreamId,
1047    ) -> WakerRef<'a> {
1048        let mut pending = self.read_stream.lock();
1049        pending.insert(id, waker.clone());
1050        waker_ref(self)
1051    }
1052
1053    /// Registers a task to be woken up when new `Open` frames can be read.
1054    ///
1055    /// The returned waker should be passed to an I/O read operation
1056    /// that schedules a wakeup, if the operation is pending.
1057    #[must_use]
1058    fn register_next_stream<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
1059        self.next_stream.register(waker);
1060        waker_ref(self)
1061    }
1062
1063    /// Wakes the task pending on `poll_read_stream` for the
1064    /// specified stream, if any.
1065    fn wake_read_stream(&self, id: LocalStreamId) -> bool {
1066        let mut pending = self.read_stream.lock();
1067
1068        if let Some(waker) = pending.remove(&id) {
1069            waker.wake();
1070            return true;
1071        }
1072
1073        false
1074    }
1075
1076    /// Wakes the task pending on `poll_next_stream`, if any.
1077    fn wake_next_stream(&self) {
1078        self.next_stream.wake();
1079    }
1080}
1081
1082impl ArcWake for NotifierRead {
1083    fn wake_by_ref(this: &Arc<Self>) {
1084        let wakers = mem::take(&mut *this.read_stream.lock());
1085        for (_, waker) in wakers {
1086            waker.wake();
1087        }
1088        this.wake_next_stream();
1089    }
1090}
1091
1092struct NotifierWrite {
1093    /// List of wakers to wake when write operations on the
1094    /// underlying I/O stream can proceed.
1095    pending: Mutex<Vec<Waker>>,
1096}
1097
1098impl NotifierWrite {
1099    /// Registers interest of a task in writing to some substream.
1100    ///
1101    /// The returned waker should be passed to an I/O write operation
1102    /// that schedules a wakeup if the operation is pending.
1103    #[must_use]
1104    fn register<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
1105        let mut pending = self.pending.lock();
1106        if pending.iter().all(|w| !w.will_wake(waker)) {
1107            pending.push(waker.clone());
1108        }
1109        waker_ref(self)
1110    }
1111}
1112
1113impl ArcWake for NotifierWrite {
1114    fn wake_by_ref(this: &Arc<Self>) {
1115        let wakers = mem::take(&mut *this.pending.lock());
1116        for waker in wakers {
1117            waker.wake();
1118        }
1119    }
1120}
1121
1122struct NotifierOpen {
1123    /// Wakers of pending tasks interested in creating new
1124    /// outbound substreams.
1125    pending: Vec<Waker>,
1126}
1127
1128impl NotifierOpen {
1129    /// Registers interest of a task in opening a new outbound substream.
1130    fn register(&mut self, waker: &Waker) {
1131        if self.pending.iter().all(|w| !w.will_wake(waker)) {
1132            self.pending.push(waker.clone());
1133        }
1134    }
1135
1136    fn wake_all(&mut self) {
1137        let wakers = mem::take(&mut self.pending);
1138        for waker in wakers {
1139            waker.wake();
1140        }
1141    }
1142}
1143
1144/// The maximum number of pending reset or close frames to send
1145/// we are willing to buffer beyond the configured substream limit.
1146/// This extra leeway bounds resource usage while allowing some
1147/// back-pressure when sending out these frames.
1148///
1149/// If too many pending frames accumulate, the multiplexed stream is
1150/// considered unhealthy and terminates with an error.
1151const EXTRA_PENDING_FRAMES: usize = 1000;
1152
1153#[cfg(test)]
1154mod tests {
1155    use std::{collections::HashSet, num::NonZeroU8, ops::DerefMut, pin::Pin};
1156
1157    use async_std::task;
1158    use asynchronous_codec::{Decoder, Encoder};
1159    use bytes::BytesMut;
1160    use quickcheck::*;
1161
1162    use super::*;
1163
1164    impl Arbitrary for MaxBufferBehaviour {
1165        fn arbitrary(g: &mut Gen) -> MaxBufferBehaviour {
1166            *g.choose(&[MaxBufferBehaviour::Block, MaxBufferBehaviour::ResetStream])
1167                .unwrap()
1168        }
1169    }
1170
1171    impl Arbitrary for Config {
1172        fn arbitrary(g: &mut Gen) -> Config {
1173            Config {
1174                max_substreams: g.gen_range(1..100),
1175                max_buffer_len: g.gen_range(1..1000),
1176                max_buffer_behaviour: MaxBufferBehaviour::arbitrary(g),
1177                split_send_size: g.gen_range(1..10000),
1178                protocol_name: crate::config::DEFAULT_MPLEX_PROTOCOL_NAME,
1179            }
1180        }
1181    }
1182
1183    /// Memory-backed "connection".
1184    struct Connection {
1185        /// The buffer that the `Multiplexed` stream reads from.
1186        r_buf: BytesMut,
1187        /// The buffer that the `Multiplexed` stream writes to.
1188        w_buf: BytesMut,
1189        /// Whether the connection should return EOF on the next read.
1190        eof: bool,
1191    }
1192
1193    impl AsyncRead for Connection {
1194        fn poll_read(
1195            mut self: Pin<&mut Self>,
1196            _: &mut Context<'_>,
1197            buf: &mut [u8],
1198        ) -> Poll<io::Result<usize>> {
1199            if self.eof {
1200                return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
1201            }
1202            let n = std::cmp::min(buf.len(), self.r_buf.len());
1203            let data = self.r_buf.split_to(n);
1204            buf[..n].copy_from_slice(&data[..]);
1205            if n == 0 {
1206                Poll::Pending
1207            } else {
1208                Poll::Ready(Ok(n))
1209            }
1210        }
1211    }
1212
1213    impl AsyncWrite for Connection {
1214        fn poll_write(
1215            mut self: Pin<&mut Self>,
1216            _: &mut Context<'_>,
1217            buf: &[u8],
1218        ) -> Poll<io::Result<usize>> {
1219            self.w_buf.extend_from_slice(buf);
1220            Poll::Ready(Ok(buf.len()))
1221        }
1222
1223        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1224            Poll::Ready(Ok(()))
1225        }
1226
1227        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1228            Poll::Ready(Ok(()))
1229        }
1230    }
1231
1232    #[test]
1233    fn max_buffer_behaviour() {
1234        use tracing_subscriber::EnvFilter;
1235        let _ = tracing_subscriber::fmt()
1236            .with_env_filter(EnvFilter::from_default_env())
1237            .try_init();
1238
1239        fn prop(cfg: Config, overflow: NonZeroU8) {
1240            let mut r_buf = BytesMut::new();
1241            let mut codec = Codec::new();
1242
1243            // Open the maximum number of inbound streams.
1244            for i in 0..cfg.max_substreams {
1245                let stream_id = LocalStreamId::dialer(i as u64);
1246                codec.encode(Frame::Open { stream_id }, &mut r_buf).unwrap();
1247            }
1248
1249            // Send more data on stream 0 than the buffer permits.
1250            let stream_id = LocalStreamId::dialer(0);
1251            let data = Bytes::from("Hello world");
1252            for _ in 0..cfg.max_buffer_len + overflow.get() as usize {
1253                codec
1254                    .encode(
1255                        Frame::Data {
1256                            stream_id,
1257                            data: data.clone(),
1258                        },
1259                        &mut r_buf,
1260                    )
1261                    .unwrap();
1262            }
1263
1264            // Setup the multiplexed connection.
1265            let conn = Connection {
1266                r_buf,
1267                w_buf: BytesMut::new(),
1268                eof: false,
1269            };
1270            let mut m = Multiplexed::new(conn, cfg.clone());
1271
1272            task::block_on(future::poll_fn(move |cx| {
1273                // Receive all inbound streams.
1274                for i in 0..cfg.max_substreams {
1275                    match m.poll_next_stream(cx) {
1276                        Poll::Pending => panic!("Expected new inbound stream."),
1277                        Poll::Ready(Err(e)) => panic!("{e:?}"),
1278                        Poll::Ready(Ok(id)) => {
1279                            assert_eq!(id, LocalStreamId::listener(i as u64));
1280                        }
1281                    };
1282                }
1283
1284                // Polling again for an inbound stream should yield `Pending`
1285                // after reading and buffering data frames up to the limit.
1286                let id = LocalStreamId::listener(0);
1287                match m.poll_next_stream(cx) {
1288                    Poll::Ready(r) => panic!("Unexpected result for next stream: {r:?}"),
1289                    Poll::Pending => {
1290                        // We expect the implementation to yield when the buffer
1291                        // is full but before it is exceeded and the max buffer
1292                        // behaviour takes effect, giving the current task a
1293                        // chance to read from the buffer. Here we just read
1294                        // again to provoke the max buffer behaviour.
1295                        assert_eq!(
1296                            m.substreams.get_mut(&id).unwrap().recv_buf().len(),
1297                            cfg.max_buffer_len
1298                        );
1299                        match m.poll_next_stream(cx) {
1300                            Poll::Ready(r) => panic!("Unexpected result for next stream: {r:?}"),
1301                            Poll::Pending => {
1302                                // Expect the buffer for stream 0 to be exceeded, triggering
1303                                // the max. buffer behaviour.
1304                                assert_eq!(
1305                                    m.substreams.get_mut(&id).unwrap().recv_buf().len(),
1306                                    cfg.max_buffer_len + 1
1307                                );
1308                            }
1309                        }
1310                    }
1311                }
1312
1313                // Expect either a `Reset` to be sent or all reads to be
1314                // blocked `Pending`, depending on the `MaxBufferBehaviour`.
1315                match cfg.max_buffer_behaviour {
1316                    MaxBufferBehaviour::ResetStream => {
1317                        let _ = m.poll_flush_stream(cx, id);
1318                        let w_buf = &mut m.io.get_mut().deref_mut().w_buf;
1319                        let frame = codec.decode(w_buf).unwrap();
1320                        let stream_id = stream_id.into_remote();
1321                        assert_eq!(frame, Some(Frame::Reset { stream_id }));
1322                    }
1323                    MaxBufferBehaviour::Block => {
1324                        assert!(m.poll_next_stream(cx).is_pending());
1325                        for i in 1..cfg.max_substreams {
1326                            let id = LocalStreamId::listener(i as u64);
1327                            assert!(m.poll_read_stream(cx, id).is_pending());
1328                        }
1329                    }
1330                }
1331
1332                // Drain the buffer by reading from the stream.
1333                for _ in 0..cfg.max_buffer_len + 1 {
1334                    match m.poll_read_stream(cx, id) {
1335                        Poll::Ready(Ok(Some(bytes))) => {
1336                            assert_eq!(bytes, data);
1337                        }
1338                        x => panic!("Unexpected: {x:?}"),
1339                    }
1340                }
1341
1342                // Read from the stream after the buffer has been drained,
1343                // expecting either EOF or further data, depending on
1344                // the `MaxBufferBehaviour`.
1345                match cfg.max_buffer_behaviour {
1346                    MaxBufferBehaviour::ResetStream => {
1347                        // Expect to read EOF
1348                        match m.poll_read_stream(cx, id) {
1349                            Poll::Ready(Ok(None)) => {}
1350                            poll => panic!("Unexpected: {poll:?}"),
1351                        }
1352                    }
1353                    MaxBufferBehaviour::Block => {
1354                        // Expect to be able to continue reading.
1355                        match m.poll_read_stream(cx, id) {
1356                            Poll::Ready(Ok(Some(bytes))) => assert_eq!(bytes, data),
1357                            Poll::Pending => assert_eq!(overflow.get(), 1),
1358                            poll => panic!("Unexpected: {poll:?}"),
1359                        }
1360                    }
1361                }
1362
1363                Poll::Ready(())
1364            }));
1365        }
1366
1367        quickcheck(prop as fn(_, _))
1368    }
1369
1370    #[test]
1371    fn close_on_error() {
1372        use tracing_subscriber::EnvFilter;
1373        let _ = tracing_subscriber::fmt()
1374            .with_env_filter(EnvFilter::from_default_env())
1375            .try_init();
1376
1377        fn prop(cfg: Config, num_streams: NonZeroU8) {
1378            let num_streams = cmp::min(cfg.max_substreams, num_streams.get() as usize);
1379
1380            // Setup the multiplexed connection.
1381            let conn = Connection {
1382                r_buf: BytesMut::new(),
1383                w_buf: BytesMut::new(),
1384                eof: false,
1385            };
1386            let mut m = Multiplexed::new(conn, cfg);
1387
1388            // Run the test.
1389            let mut opened = HashSet::new();
1390            task::block_on(future::poll_fn(move |cx| {
1391                // Open a number of streams.
1392                for _ in 0..num_streams {
1393                    let id = ready!(m.poll_open_stream(cx)).unwrap();
1394                    assert!(opened.insert(id));
1395                    assert!(m.poll_read_stream(cx, id).is_pending());
1396                }
1397
1398                // Abruptly "close" the connection.
1399                m.io.get_mut().deref_mut().eof = true;
1400
1401                // Reading from any stream should yield an error and all streams
1402                // should be closed due to the failed connection.
1403                assert!(opened.iter().all(|id| match m.poll_read_stream(cx, *id) {
1404                    Poll::Ready(Err(e)) => e.kind() == io::ErrorKind::UnexpectedEof,
1405                    _ => false,
1406                }));
1407
1408                assert!(m.substreams.is_empty());
1409
1410                Poll::Ready(())
1411            }))
1412        }
1413
1414        quickcheck(prop as fn(_, _))
1415    }
1416}