libp2p_mplex/
io.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21pub(crate) use std::io::{Error, Result};
22use std::{
23    cmp,
24    collections::VecDeque,
25    fmt, io, mem,
26    sync::Arc,
27    task::{Context, Poll, Waker},
28};
29
30use asynchronous_codec::Framed;
31use bytes::Bytes;
32use futures::{
33    prelude::*,
34    ready,
35    stream::Fuse,
36    task::{waker_ref, ArcWake, AtomicWaker, WakerRef},
37};
38use nohash_hasher::{IntMap, IntSet};
39use parking_lot::Mutex;
40use smallvec::SmallVec;
41
42use crate::{
43    codec::{Codec, Frame, LocalStreamId, RemoteStreamId},
44    Config, MaxBufferBehaviour,
45};
46/// A connection identifier.
47///
48/// Randomly generated and mainly intended to improve log output
49/// by scoping substream IDs to a connection.
50#[derive(Clone, Copy)]
51struct ConnectionId(u64);
52
53impl fmt::Debug for ConnectionId {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        write!(f, "{:16x}", self.0)
56    }
57}
58
59impl fmt::Display for ConnectionId {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        write!(f, "{:16x}", self.0)
62    }
63}
64/// A multiplexed I/O stream.
65pub(crate) struct Multiplexed<C> {
66    /// A unique ID for the multiplexed stream (i.e. connection).
67    id: ConnectionId,
68    /// The current operating status of the multiplex stream.
69    status: Status,
70    /// The underlying multiplexed I/O stream.
71    io: Fuse<Framed<C, Codec>>,
72    /// The configuration.
73    config: Config,
74    /// The buffer of new inbound substreams that have not yet
75    /// been drained by `poll_next_stream`. This buffer is
76    /// effectively bounded by `max_substreams - substreams.len()`.
77    open_buffer: VecDeque<LocalStreamId>,
78    /// Whether a flush is pending due to one or more new outbound
79    /// `Open` frames, before reading frames can proceed.
80    pending_flush_open: IntSet<LocalStreamId>,
81    /// The stream that currently blocks reading for all streams
82    /// due to a full buffer, if any. Only applicable for use
83    /// with [`MaxBufferBehaviour::Block`].
84    blocking_stream: Option<LocalStreamId>,
85    /// Pending frames to send at the next opportunity.
86    ///
87    /// An opportunity for sending pending frames is every flush
88    /// or read operation. In the former case, sending of all
89    /// pending frames must complete before the flush can complete.
90    /// In the latter case, the read operation can proceed even
91    /// if some or all of the pending frames cannot be sent.
92    pending_frames: VecDeque<Frame<LocalStreamId>>,
93    /// The managed substreams.
94    substreams: IntMap<LocalStreamId, SubstreamState>,
95    /// The ID for the next outbound substream.
96    next_outbound_stream_id: LocalStreamId,
97    /// Registry of wakers for pending tasks interested in reading.
98    notifier_read: Arc<NotifierRead>,
99    /// Registry of wakers for pending tasks interested in writing.
100    notifier_write: Arc<NotifierWrite>,
101    /// Registry of wakers for pending tasks interested in opening
102    /// an outbound substream, when the configured limit is reached.
103    ///
104    /// As soon as the number of substreams drops below this limit,
105    /// these tasks are woken.
106    notifier_open: NotifierOpen,
107}
108
109/// The operation status of a `Multiplexed` I/O stream.
110#[derive(Debug)]
111enum Status {
112    /// The stream is considered open and healthy.
113    Open,
114    /// The stream has been actively closed.
115    Closed,
116    /// The stream has encountered a fatal error.
117    Err(io::Error),
118}
119
120impl<C> Multiplexed<C>
121where
122    C: AsyncRead + AsyncWrite + Unpin,
123{
124    /// Creates a new multiplexed I/O stream.
125    pub(crate) fn new(io: C, config: Config) -> Self {
126        let id = ConnectionId(rand::random());
127        tracing::debug!(connection=%id, "New multiplexed connection");
128        Multiplexed {
129            id,
130            config,
131            status: Status::Open,
132            io: Framed::new(io, Codec::new()).fuse(),
133            open_buffer: Default::default(),
134            substreams: Default::default(),
135            pending_flush_open: Default::default(),
136            pending_frames: Default::default(),
137            blocking_stream: None,
138            next_outbound_stream_id: LocalStreamId::dialer(0),
139            notifier_read: Arc::new(NotifierRead {
140                read_stream: Mutex::new(Default::default()),
141                next_stream: AtomicWaker::new(),
142            }),
143            notifier_write: Arc::new(NotifierWrite {
144                pending: Mutex::new(Default::default()),
145            }),
146            notifier_open: NotifierOpen {
147                pending: Default::default(),
148            },
149        }
150    }
151
152    /// Flushes the underlying I/O stream.
153    pub(crate) fn poll_flush(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
154        match &self.status {
155            Status::Closed => return Poll::Ready(Ok(())),
156            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
157            Status::Open => {}
158        }
159
160        // Send any pending frames.
161        ready!(self.send_pending_frames(cx))?;
162
163        // Flush the underlying I/O stream.
164        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
165        match ready!(self.io.poll_flush_unpin(&mut Context::from_waker(&waker))) {
166            Err(e) => Poll::Ready(self.on_error(e)),
167            Ok(()) => {
168                self.pending_flush_open = Default::default();
169                Poll::Ready(Ok(()))
170            }
171        }
172    }
173
174    /// Closes the underlying I/O stream.
175    ///
176    /// > **Note**: No `Close` or `Reset` frames are sent on open substreams
177    /// > before closing the underlying connection. However, the connection
178    /// > close implies a flush of any frames already sent.
179    pub(crate) fn poll_close(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
180        match &self.status {
181            Status::Closed => return Poll::Ready(Ok(())),
182            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
183            Status::Open => {}
184        }
185
186        // Note: We do not make the effort to send pending `Reset` frames
187        // here, we only close (and thus flush) the underlying I/O stream.
188
189        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
190        match self.io.poll_close_unpin(&mut Context::from_waker(&waker)) {
191            Poll::Pending => Poll::Pending,
192            Poll::Ready(Err(e)) => Poll::Ready(self.on_error(e)),
193            Poll::Ready(Ok(())) => {
194                self.pending_frames = VecDeque::new();
195                // We do not support read-after-close on the underlying
196                // I/O stream, hence clearing the buffer and substreams.
197                self.open_buffer = Default::default();
198                self.substreams = Default::default();
199                self.status = Status::Closed;
200                Poll::Ready(Ok(()))
201            }
202        }
203    }
204
205    /// Waits for a new inbound substream, returning the corresponding `LocalStreamId`.
206    ///
207    /// If the number of already used substreams (i.e. substreams that have not
208    /// yet been dropped via `drop_substream`) reaches the configured
209    /// `max_substreams`, any further inbound substreams are immediately reset
210    /// until existing substreams are dropped.
211    ///
212    /// Data frames read for existing substreams in the context of this
213    /// method call are buffered and tasks interested in reading from
214    /// these substreams are woken. If a substream buffer is full and
215    /// [`MaxBufferBehaviour::Block`] is used, this method is blocked
216    /// (i.e. `Pending`) on some task reading from the substream whose
217    /// buffer is full.
218    pub(crate) fn poll_next_stream(&mut self, cx: &Context<'_>) -> Poll<io::Result<LocalStreamId>> {
219        self.guard_open()?;
220
221        // Try to read from the buffer first.
222        if let Some(stream_id) = self.open_buffer.pop_back() {
223            return Poll::Ready(Ok(stream_id));
224        }
225
226        debug_assert!(self.open_buffer.is_empty());
227        let mut num_buffered = 0;
228
229        loop {
230            // Whenever we may have completely filled a substream
231            // buffer while waiting for the next inbound stream,
232            // yield to give the current task a chance to read
233            // from the respective substreams.
234            if num_buffered == self.config.max_buffer_len {
235                cx.waker().wake_by_ref();
236                return Poll::Pending;
237            }
238
239            // Wait for the next inbound `Open` frame.
240            match ready!(self.poll_read_frame(cx, None))? {
241                Frame::Open { stream_id } => {
242                    if let Some(id) = self.on_open(stream_id)? {
243                        return Poll::Ready(Ok(id));
244                    }
245                }
246                Frame::Data { stream_id, data } => {
247                    self.buffer(stream_id.into_local(), data)?;
248                    num_buffered += 1;
249                }
250                Frame::Close { stream_id } => {
251                    self.on_close(stream_id.into_local());
252                }
253                Frame::Reset { stream_id } => self.on_reset(stream_id.into_local()),
254            }
255        }
256    }
257
258    /// Creates a new (outbound) substream, returning the allocated stream ID.
259    pub(crate) fn poll_open_stream(&mut self, cx: &Context<'_>) -> Poll<io::Result<LocalStreamId>> {
260        self.guard_open()?;
261
262        // Check the stream limits.
263        if self.substreams.len() >= self.config.max_substreams {
264            tracing::debug!(
265                connection=%self.id,
266                total_substreams=%self.substreams.len(),
267                max_substreams=%self.config.max_substreams,
268                "Maximum number of substreams reached"
269            );
270            self.notifier_open.register(cx.waker());
271            return Poll::Pending;
272        }
273
274        // Send the `Open` frame.
275        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
276        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
277            Ok(()) => {
278                let stream_id = self.next_outbound_stream_id();
279                let frame = Frame::Open { stream_id };
280                match self.io.start_send_unpin(frame) {
281                    Ok(()) => {
282                        self.substreams.insert(
283                            stream_id,
284                            SubstreamState::Open {
285                                buf: Default::default(),
286                            },
287                        );
288                        tracing::debug!(
289                            connection=%self.id,
290                            substream=%stream_id,
291                            total_substreams=%self.substreams.len(),
292                            "New outbound substream"
293                        );
294                        // The flush is delayed and the `Open` frame may be sent
295                        // together with other frames in the same transport packet.
296                        self.pending_flush_open.insert(stream_id);
297                        Poll::Ready(Ok(stream_id))
298                    }
299                    Err(e) => Poll::Ready(self.on_error(e)),
300                }
301            }
302            Err(e) => Poll::Ready(self.on_error(e)),
303        }
304    }
305
306    /// Immediately drops a substream.
307    ///
308    /// All locally allocated resources for the dropped substream
309    /// are freed and the substream becomes unavailable for both
310    /// reading and writing immediately. The remote is informed
311    /// based on the current state of the substream:
312    ///
313    /// * If the substream was open, a `Reset` frame is sent at the next opportunity.
314    /// * If the substream was half-closed, i.e. a `Close` frame has already been sent, nothing
315    ///   further happens.
316    /// * If the substream was half-closed by the remote, i.e. a `Close` frame has already been
317    ///   received, a `Close` frame is sent at the next opportunity.
318    ///
319    /// If the multiplexed stream is closed or encountered
320    /// an error earlier, or there is no known substream with
321    /// the given ID, this is a no-op.
322    ///
323    /// > **Note**: All substreams obtained via `poll_next_stream`
324    /// > or `poll_open_stream` must eventually be "dropped" by
325    /// > calling this method when they are no longer used.
326    pub(crate) fn drop_stream(&mut self, id: LocalStreamId) {
327        // Check if the underlying stream is ok.
328        match self.status {
329            Status::Closed | Status::Err(_) => return,
330            Status::Open => {}
331        }
332
333        // If there is still a task waker interested in reading from that
334        // stream, wake it to avoid leaving it dangling and notice that
335        // the stream is gone. In contrast, wakers for write operations
336        // are all woken on every new write opportunity.
337        self.notifier_read.wake_read_stream(id);
338
339        // Remove the substream, scheduling pending frames as necessary.
340        match self.substreams.remove(&id) {
341            None => {}
342            Some(state) => {
343                // If we fell below the substream limit, notify tasks that had
344                // interest in opening an outbound substream earlier.
345                let below_limit = self.substreams.len() == self.config.max_substreams - 1;
346                if below_limit {
347                    self.notifier_open.wake_all();
348                }
349                // Schedule any pending final frames to send, if necessary.
350                match state {
351                    SubstreamState::Closed { .. } => {}
352                    SubstreamState::SendClosed { .. } => {}
353                    SubstreamState::Reset { .. } => {}
354                    SubstreamState::RecvClosed { .. } => {
355                        if self.check_max_pending_frames().is_err() {
356                            return;
357                        }
358                        tracing::trace!(
359                            connection=%self.id,
360                            substream=%id,
361                            "Pending close for substream"
362                        );
363                        self.pending_frames
364                            .push_front(Frame::Close { stream_id: id });
365                    }
366                    SubstreamState::Open { .. } => {
367                        if self.check_max_pending_frames().is_err() {
368                            return;
369                        }
370                        tracing::trace!(
371                            connection=%self.id,
372                            substream=%id,
373                            "Pending reset for substream"
374                        );
375                        self.pending_frames
376                            .push_front(Frame::Reset { stream_id: id });
377                    }
378                }
379            }
380        }
381    }
382
383    /// Writes data to a substream.
384    pub(crate) fn poll_write_stream(
385        &mut self,
386        cx: &Context<'_>,
387        id: LocalStreamId,
388        buf: &[u8],
389    ) -> Poll<io::Result<usize>> {
390        self.guard_open()?;
391
392        // Check if the stream is open for writing.
393        match self.substreams.get(&id) {
394            None | Some(SubstreamState::Reset { .. }) => {
395                return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
396            }
397            Some(SubstreamState::SendClosed { .. }) | Some(SubstreamState::Closed { .. }) => {
398                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
399            }
400            Some(SubstreamState::Open { .. }) | Some(SubstreamState::RecvClosed { .. }) => {
401                // Substream is writeable. Continue.
402            }
403        }
404
405        // Determine the size of the frame to send.
406        let frame_len = cmp::min(buf.len(), self.config.split_send_size);
407
408        // Send the data frame.
409        ready!(self.poll_send_frame(cx, || {
410            let data = Bytes::copy_from_slice(&buf[..frame_len]);
411            Frame::Data {
412                stream_id: id,
413                data,
414            }
415        }))?;
416
417        Poll::Ready(Ok(frame_len))
418    }
419
420    /// Reads data from a substream.
421    ///
422    /// Data frames read for substreams other than `id` in the context
423    /// of this method call are buffered and tasks interested in reading
424    /// from these substreams are woken. If a substream buffer is full
425    /// and [`MaxBufferBehaviour::Block`] is used, reading the next data
426    /// frame for `id` is blocked on some task reading from the blocking
427    /// stream's full buffer first.
428    ///
429    /// New inbound substreams (i.e. `Open` frames) read in the context of
430    /// this method call are buffered up to the configured `max_substreams`
431    /// and under consideration of the number of already used substreams,
432    /// thereby waking the task that last called `poll_next_stream`, if any.
433    /// Inbound substreams received in excess of that limit are immediately reset.
434    pub(crate) fn poll_read_stream(
435        &mut self,
436        cx: &Context<'_>,
437        id: LocalStreamId,
438    ) -> Poll<io::Result<Option<Bytes>>> {
439        self.guard_open()?;
440
441        // Try to read from the buffer first.
442        if let Some(state) = self.substreams.get_mut(&id) {
443            let buf = state.recv_buf();
444            if !buf.is_empty() {
445                if self.blocking_stream == Some(id) {
446                    // Unblock reading new frames.
447                    self.blocking_stream = None;
448                    ArcWake::wake_by_ref(&self.notifier_read);
449                }
450                let data = buf.remove(0);
451                return Poll::Ready(Ok(Some(data)));
452            }
453            // If the stream buffer "spilled" onto the heap, free that memory.
454            buf.shrink_to_fit();
455        }
456
457        let mut num_buffered = 0;
458
459        loop {
460            // Whenever we may have completely filled a substream
461            // buffer of another substream while waiting for the
462            // next frame for `id`, yield to give the current task
463            // a chance to read from the other substream(s).
464            if num_buffered == self.config.max_buffer_len {
465                cx.waker().wake_by_ref();
466                return Poll::Pending;
467            }
468
469            // Check if the targeted substream (if any) reached EOF.
470            if !self.can_read(&id) {
471                // Note: Contrary to what is recommended by the spec, we must
472                // return "EOF" also when the stream has been reset by the
473                // remote, as the `StreamMuxer::read_substream` contract only
474                // permits errors on "terminal" conditions, e.g. if the connection
475                // has been closed or on protocol misbehaviour.
476                return Poll::Ready(Ok(None));
477            }
478
479            // Read the next frame.
480            match ready!(self.poll_read_frame(cx, Some(id)))? {
481                Frame::Data { data, stream_id } if stream_id.into_local() == id => {
482                    return Poll::Ready(Ok(Some(data)))
483                }
484                Frame::Data { stream_id, data } => {
485                    // The data frame is for a different stream than the one
486                    // currently being polled, so it needs to be buffered and
487                    // the interested tasks notified.
488                    self.buffer(stream_id.into_local(), data)?;
489                    num_buffered += 1;
490                }
491                frame @ Frame::Open { .. } => {
492                    if let Some(id) = self.on_open(frame.remote_id())? {
493                        self.open_buffer.push_front(id);
494                        tracing::trace!(
495                            connection=%self.id,
496                            inbound_stream=%id,
497                            inbound_buffer_len=%self.open_buffer.len(),
498                            "Buffered new inbound stream"
499                        );
500                        self.notifier_read.wake_next_stream();
501                    }
502                }
503                Frame::Close { stream_id } => {
504                    let stream_id = stream_id.into_local();
505                    self.on_close(stream_id);
506                    if id == stream_id {
507                        return Poll::Ready(Ok(None));
508                    }
509                }
510                Frame::Reset { stream_id } => {
511                    let stream_id = stream_id.into_local();
512                    self.on_reset(stream_id);
513                    if id == stream_id {
514                        return Poll::Ready(Ok(None));
515                    }
516                }
517            }
518        }
519    }
520
521    /// Flushes a substream.
522    ///
523    /// > **Note**: This is equivalent to `poll_flush()`, i.e. to flushing
524    /// > all substreams, except that this operation returns an error if
525    /// > the underlying I/O stream is already closed.
526    pub(crate) fn poll_flush_stream(
527        &mut self,
528        cx: &Context<'_>,
529        id: LocalStreamId,
530    ) -> Poll<io::Result<()>> {
531        self.guard_open()?;
532
533        ready!(self.poll_flush(cx))?;
534        tracing::trace!(
535            connection=%self.id,
536            substream=%id,
537            "Flushed substream"
538        );
539
540        Poll::Ready(Ok(()))
541    }
542
543    /// Closes a stream for writing.
544    ///
545    /// > **Note**: As opposed to `poll_close()`, a flush it not implied.
546    pub(crate) fn poll_close_stream(
547        &mut self,
548        cx: &Context<'_>,
549        id: LocalStreamId,
550    ) -> Poll<io::Result<()>> {
551        self.guard_open()?;
552
553        match self.substreams.remove(&id) {
554            None => Poll::Ready(Ok(())),
555            Some(SubstreamState::SendClosed { buf }) => {
556                self.substreams
557                    .insert(id, SubstreamState::SendClosed { buf });
558                Poll::Ready(Ok(()))
559            }
560            Some(SubstreamState::Closed { buf }) => {
561                self.substreams.insert(id, SubstreamState::Closed { buf });
562                Poll::Ready(Ok(()))
563            }
564            Some(SubstreamState::Reset { buf }) => {
565                self.substreams.insert(id, SubstreamState::Reset { buf });
566                Poll::Ready(Ok(()))
567            }
568            Some(SubstreamState::Open { buf }) => {
569                if self
570                    .poll_send_frame(cx, || Frame::Close { stream_id: id })?
571                    .is_pending()
572                {
573                    self.substreams.insert(id, SubstreamState::Open { buf });
574                    Poll::Pending
575                } else {
576                    tracing::debug!(
577                        connection=%self.id,
578                        substream=%id,
579                        "Closed substream (half-close)"
580                    );
581                    self.substreams
582                        .insert(id, SubstreamState::SendClosed { buf });
583                    Poll::Ready(Ok(()))
584                }
585            }
586            Some(SubstreamState::RecvClosed { buf }) => {
587                if self
588                    .poll_send_frame(cx, || Frame::Close { stream_id: id })?
589                    .is_pending()
590                {
591                    self.substreams
592                        .insert(id, SubstreamState::RecvClosed { buf });
593                    Poll::Pending
594                } else {
595                    tracing::debug!(
596                        connection=%self.id,
597                        substream=%id,
598                        "Closed substream"
599                    );
600                    self.substreams.insert(id, SubstreamState::Closed { buf });
601                    Poll::Ready(Ok(()))
602                }
603            }
604        }
605    }
606
607    /// Sends a (lazily constructed) mplex frame on the underlying I/O stream.
608    ///
609    /// The frame is only constructed if the underlying sink is ready to
610    /// send another frame.
611    fn poll_send_frame<F>(&mut self, cx: &Context<'_>, frame: F) -> Poll<io::Result<()>>
612    where
613        F: FnOnce() -> Frame<LocalStreamId>,
614    {
615        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
616        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
617            Ok(()) => {
618                let frame = frame();
619                tracing::trace!(connection=%self.id, ?frame, "Sending frame");
620                match self.io.start_send_unpin(frame) {
621                    Ok(()) => Poll::Ready(Ok(())),
622                    Err(e) => Poll::Ready(self.on_error(e)),
623                }
624            }
625            Err(e) => Poll::Ready(self.on_error(e)),
626        }
627    }
628
629    /// Reads the next frame from the underlying I/O stream.
630    ///
631    /// The given `stream_id` identifies the substream in which
632    /// the current task is interested and wants to be woken up for,
633    /// in case new frames can be read. `None` means interest in
634    /// frames for any substream.
635    fn poll_read_frame(
636        &mut self,
637        cx: &Context<'_>,
638        stream_id: Option<LocalStreamId>,
639    ) -> Poll<io::Result<Frame<RemoteStreamId>>> {
640        // Try to send pending frames, if there are any, without blocking,
641        if let Poll::Ready(Err(e)) = self.send_pending_frames(cx) {
642            return Poll::Ready(Err(e));
643        }
644
645        // Perform any pending flush before reading.
646        if let Some(id) = &stream_id {
647            if self.pending_flush_open.contains(id) {
648                tracing::trace!(
649                    connection=%self.id,
650                    substream=%id,
651                    "Executing pending flush for substream"
652                );
653                ready!(self.poll_flush(cx))?;
654                self.pending_flush_open = Default::default();
655            }
656        }
657
658        // Check if there is a blocked stream.
659        if let Some(blocked_id) = &self.blocking_stream {
660            // We have a blocked stream and cannot continue reading
661            // new frames until frames are taken from the blocked stream's
662            // buffer.
663
664            // Try to wake a pending reader of the blocked stream.
665            if !self.notifier_read.wake_read_stream(*blocked_id) {
666                // No task dedicated to the blocked stream woken, so schedule
667                // this task again to have a chance at progress.
668                tracing::trace!(
669                    connection=%self.id,
670                    "No task to read from blocked stream. Waking current task."
671                );
672                cx.waker().wake_by_ref();
673            } else if let Some(id) = stream_id {
674                // We woke some other task, but are still interested in
675                // reading `Data` frames from the current stream when unblocked.
676                debug_assert!(
677                    blocked_id != &id,
678                    "Unexpected attempt at reading a new \
679                    frame from a substream with a full buffer."
680                );
681                let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id);
682            } else {
683                // We woke some other task but are still interested in
684                // reading new `Open` frames when unblocked.
685                let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker());
686            }
687
688            return Poll::Pending;
689        }
690
691        // Try to read another frame from the underlying I/O stream.
692        let waker = match stream_id {
693            Some(id) => NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id),
694            None => NotifierRead::register_next_stream(&self.notifier_read, cx.waker()),
695        };
696        match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) {
697            Some(Ok(frame)) => {
698                tracing::trace!(connection=%self.id, ?frame, "Received frame");
699                Poll::Ready(Ok(frame))
700            }
701            Some(Err(e)) => Poll::Ready(self.on_error(e)),
702            None => Poll::Ready(self.on_error(io::ErrorKind::UnexpectedEof.into())),
703        }
704    }
705
706    /// Processes an inbound `Open` frame.
707    fn on_open(&mut self, id: RemoteStreamId) -> io::Result<Option<LocalStreamId>> {
708        let id = id.into_local();
709
710        if self.substreams.contains_key(&id) {
711            tracing::debug!(
712                connection=%self.id,
713                substream=%id,
714                "Received unexpected `Open` frame for open substream",
715            );
716            return self.on_error(io::Error::other(
717                "Protocol error: Received `Open` frame for open substream.",
718            ));
719        }
720
721        if self.substreams.len() >= self.config.max_substreams {
722            tracing::debug!(
723                connection=%self.id,
724                max_substreams=%self.config.max_substreams,
725                "Maximum number of substreams exceeded"
726            );
727            self.check_max_pending_frames()?;
728            tracing::debug!(
729                connection=%self.id,
730                substream=%id,
731                "Pending reset for new substream"
732            );
733            self.pending_frames
734                .push_front(Frame::Reset { stream_id: id });
735            return Ok(None);
736        }
737
738        self.substreams.insert(
739            id,
740            SubstreamState::Open {
741                buf: Default::default(),
742            },
743        );
744
745        tracing::debug!(
746            connection=%self.id,
747            substream=%id,
748            total_substreams=%self.substreams.len(),
749            "New inbound substream"
750        );
751
752        Ok(Some(id))
753    }
754
755    /// Processes an inbound `Reset` frame.
756    fn on_reset(&mut self, id: LocalStreamId) {
757        if let Some(state) = self.substreams.remove(&id) {
758            match state {
759                SubstreamState::Closed { .. } => {
760                    tracing::trace!(
761                        connection=%self.id,
762                        substream=%id,
763                        "Ignoring reset for mutually closed substream"
764                    );
765                }
766                SubstreamState::Reset { .. } => {
767                    tracing::trace!(
768                        connection=%self.id,
769                        substream=%id,
770                        "Ignoring redundant reset for already reset substream"
771                    );
772                }
773                SubstreamState::RecvClosed { buf }
774                | SubstreamState::SendClosed { buf }
775                | SubstreamState::Open { buf } => {
776                    tracing::debug!(
777                        connection=%self.id,
778                        substream=%id,
779                        "Substream reset by remote"
780                    );
781                    self.substreams.insert(id, SubstreamState::Reset { buf });
782                    // Notify tasks interested in reading from that stream,
783                    // so they may read the EOF.
784                    NotifierRead::wake_read_stream(&self.notifier_read, id);
785                }
786            }
787        } else {
788            tracing::trace!(
789                connection=%self.id,
790                substream=%id,
791                "Ignoring `Reset` for unknown substream, possibly dropped earlier"
792            );
793        }
794    }
795
796    /// Processes an inbound `Close` frame.
797    fn on_close(&mut self, id: LocalStreamId) {
798        if let Some(state) = self.substreams.remove(&id) {
799            match state {
800                SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
801                    tracing::debug!(
802                        connection=%self.id,
803                        substream=%id,
804                        "Ignoring `Close` frame for closed substream"
805                    );
806                    self.substreams.insert(id, state);
807                }
808                SubstreamState::Reset { buf } => {
809                    tracing::debug!(
810                        connection=%self.id,
811                        substream=%id,
812                        "Ignoring `Close` frame for already reset substream"
813                    );
814                    self.substreams.insert(id, SubstreamState::Reset { buf });
815                }
816                SubstreamState::SendClosed { buf } => {
817                    tracing::debug!(
818                        connection=%self.id,
819                        substream=%id,
820                        "Substream closed by remote (SendClosed -> Closed)"
821                    );
822                    self.substreams.insert(id, SubstreamState::Closed { buf });
823                    // Notify tasks interested in reading, so they may read the EOF.
824                    self.notifier_read.wake_read_stream(id);
825                }
826                SubstreamState::Open { buf } => {
827                    tracing::debug!(
828                        connection=%self.id,
829                        substream=%id,
830                        "Substream closed by remote (Open -> RecvClosed)"
831                    );
832                    self.substreams
833                        .insert(id, SubstreamState::RecvClosed { buf });
834                    // Notify tasks interested in reading, so they may read the EOF.
835                    self.notifier_read.wake_read_stream(id);
836                }
837            }
838        } else {
839            tracing::trace!(
840                connection=%self.id,
841                substream=%id,
842                "Ignoring `Close` for unknown substream, possibly dropped earlier."
843            );
844        }
845    }
846
847    /// Generates the next outbound stream ID.
848    fn next_outbound_stream_id(&mut self) -> LocalStreamId {
849        let id = self.next_outbound_stream_id;
850        self.next_outbound_stream_id = self.next_outbound_stream_id.next();
851        id
852    }
853
854    /// Checks whether a substream is open for reading.
855    fn can_read(&self, id: &LocalStreamId) -> bool {
856        matches!(
857            self.substreams.get(id),
858            Some(SubstreamState::Open { .. }) | Some(SubstreamState::SendClosed { .. })
859        )
860    }
861
862    /// Sends pending frames, without flushing.
863    fn send_pending_frames(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
864        while let Some(frame) = self.pending_frames.pop_back() {
865            if self.poll_send_frame(cx, || frame.clone())?.is_pending() {
866                self.pending_frames.push_back(frame);
867                return Poll::Pending;
868            }
869        }
870
871        Poll::Ready(Ok(()))
872    }
873
874    /// Records a fatal error for the multiplexed I/O stream.
875    fn on_error<T>(&mut self, e: io::Error) -> io::Result<T> {
876        tracing::debug!(
877            connection=%self.id,
878            "Multiplexed connection failed: {:?}",
879            e
880        );
881        self.status = Status::Err(io::Error::new(e.kind(), e.to_string()));
882        self.pending_frames = Default::default();
883        self.substreams = Default::default();
884        self.open_buffer = Default::default();
885        Err(e)
886    }
887
888    /// Checks that the multiplexed stream has status `Ok`,
889    /// i.e. is not closed and did not encounter a fatal error.
890    fn guard_open(&self) -> io::Result<()> {
891        match &self.status {
892            Status::Closed => Err(io::Error::other("Connection is closed")),
893            Status::Err(e) => Err(io::Error::new(e.kind(), e.to_string())),
894            Status::Open => Ok(()),
895        }
896    }
897
898    /// Checks that the permissible limit for pending outgoing frames
899    /// has not been reached.
900    fn check_max_pending_frames(&mut self) -> io::Result<()> {
901        if self.pending_frames.len() >= self.config.max_substreams + EXTRA_PENDING_FRAMES {
902            return self.on_error(io::Error::other("Too many pending frames."));
903        }
904        Ok(())
905    }
906
907    /// Buffers a data frame for a particular substream, if possible.
908    ///
909    /// If the new data frame exceeds the `max_buffer_len` for the buffer
910    /// of the substream, the behaviour depends on the configured
911    /// [`MaxBufferBehaviour`]. Note that the excess frame is still
912    /// buffered in that case (but no further frames will be).
913    ///
914    /// Fails the entire multiplexed stream if too many pending `Reset`
915    /// frames accumulate when using [`MaxBufferBehaviour::ResetStream`].
916    fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> {
917        let Some(state) = self.substreams.get_mut(&id) else {
918            tracing::trace!(
919                connection=%self.id,
920                substream=%id,
921                data=?data,
922                "Dropping data for unknown substream"
923            );
924            return Ok(());
925        };
926
927        let Some(buf) = state.recv_buf_open() else {
928            tracing::trace!(
929                connection=%self.id,
930                substream=%id,
931                data=?data,
932                "Dropping data for closed or reset substream",
933            );
934            return Ok(());
935        };
936
937        debug_assert!(buf.len() <= self.config.max_buffer_len);
938        tracing::trace!(
939            connection=%self.id,
940            substream=%id,
941            data=?data,
942            data_buffer=%buf.len() + 1,
943            "Buffering data for substream"
944        );
945        buf.push(data);
946        self.notifier_read.wake_read_stream(id);
947        if buf.len() > self.config.max_buffer_len {
948            tracing::debug!(
949                connection=%self.id,
950                substream=%id,
951                "Frame buffer of substream is full"
952            );
953            match self.config.max_buffer_behaviour {
954                MaxBufferBehaviour::ResetStream => {
955                    let buf = buf.clone();
956                    self.check_max_pending_frames()?;
957                    self.substreams.insert(id, SubstreamState::Reset { buf });
958                    tracing::debug!(
959                        connection=%self.id,
960                        substream=%id,
961                        "Pending reset for stream"
962                    );
963                    self.pending_frames
964                        .push_front(Frame::Reset { stream_id: id });
965                }
966                MaxBufferBehaviour::Block => {
967                    self.blocking_stream = Some(id);
968                }
969            }
970        }
971
972        Ok(())
973    }
974}
975
976type RecvBuf = SmallVec<[Bytes; 10]>;
977
978/// The operating states of a substream.
979#[derive(Clone, Debug)]
980enum SubstreamState {
981    /// An `Open` frame has been received or sent.
982    Open { buf: RecvBuf },
983    /// A `Close` frame has been sent, but the stream is still open
984    /// for reading (half-close).
985    SendClosed { buf: RecvBuf },
986    /// A `Close` frame has been received but the stream is still
987    /// open for writing (remote half-close).
988    RecvClosed { buf: RecvBuf },
989    /// A `Close` frame has been sent and received but the stream
990    /// has not yet been dropped and may still have buffered
991    /// frames to read.
992    Closed { buf: RecvBuf },
993    /// The stream has been reset by the local or remote peer but has
994    /// not yet been dropped and may still have buffered frames to read.
995    Reset { buf: RecvBuf },
996}
997
998impl SubstreamState {
999    /// Mutably borrows the substream's receive buffer.
1000    fn recv_buf(&mut self) -> &mut RecvBuf {
1001        match self {
1002            SubstreamState::Open { buf } => buf,
1003            SubstreamState::SendClosed { buf } => buf,
1004            SubstreamState::RecvClosed { buf } => buf,
1005            SubstreamState::Closed { buf } => buf,
1006            SubstreamState::Reset { buf } => buf,
1007        }
1008    }
1009
1010    /// Mutably borrows the substream's receive buffer if the substream
1011    /// is still open for reading, `None` otherwise.
1012    fn recv_buf_open(&mut self) -> Option<&mut RecvBuf> {
1013        match self {
1014            SubstreamState::Open { buf } => Some(buf),
1015            SubstreamState::SendClosed { buf } => Some(buf),
1016            SubstreamState::RecvClosed { .. } => None,
1017            SubstreamState::Closed { .. } => None,
1018            SubstreamState::Reset { .. } => None,
1019        }
1020    }
1021}
1022
1023struct NotifierRead {
1024    /// The waker of the currently pending task that last
1025    /// called `poll_next_stream`, if any.
1026    next_stream: AtomicWaker,
1027    /// The wakers of currently pending tasks that last
1028    /// called `poll_read_stream` for a particular substream.
1029    read_stream: Mutex<IntMap<LocalStreamId, Waker>>,
1030}
1031
1032impl NotifierRead {
1033    /// Registers a task to be woken up when new `Data` frames for a particular
1034    /// stream can be read.
1035    ///
1036    /// The returned waker should be passed to an I/O read operation
1037    /// that schedules a wakeup, if the operation is pending.
1038    #[must_use]
1039    fn register_read_stream<'a>(
1040        self: &'a Arc<Self>,
1041        waker: &Waker,
1042        id: LocalStreamId,
1043    ) -> WakerRef<'a> {
1044        let mut pending = self.read_stream.lock();
1045        pending.insert(id, waker.clone());
1046        waker_ref(self)
1047    }
1048
1049    /// Registers a task to be woken up when new `Open` frames can be read.
1050    ///
1051    /// The returned waker should be passed to an I/O read operation
1052    /// that schedules a wakeup, if the operation is pending.
1053    #[must_use]
1054    fn register_next_stream<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
1055        self.next_stream.register(waker);
1056        waker_ref(self)
1057    }
1058
1059    /// Wakes the task pending on `poll_read_stream` for the
1060    /// specified stream, if any.
1061    fn wake_read_stream(&self, id: LocalStreamId) -> bool {
1062        let mut pending = self.read_stream.lock();
1063
1064        if let Some(waker) = pending.remove(&id) {
1065            waker.wake();
1066            return true;
1067        }
1068
1069        false
1070    }
1071
1072    /// Wakes the task pending on `poll_next_stream`, if any.
1073    fn wake_next_stream(&self) {
1074        self.next_stream.wake();
1075    }
1076}
1077
1078impl ArcWake for NotifierRead {
1079    fn wake_by_ref(this: &Arc<Self>) {
1080        let wakers = mem::take(&mut *this.read_stream.lock());
1081        for (_, waker) in wakers {
1082            waker.wake();
1083        }
1084        this.wake_next_stream();
1085    }
1086}
1087
1088struct NotifierWrite {
1089    /// List of wakers to wake when write operations on the
1090    /// underlying I/O stream can proceed.
1091    pending: Mutex<Vec<Waker>>,
1092}
1093
1094impl NotifierWrite {
1095    /// Registers interest of a task in writing to some substream.
1096    ///
1097    /// The returned waker should be passed to an I/O write operation
1098    /// that schedules a wakeup if the operation is pending.
1099    #[must_use]
1100    fn register<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
1101        let mut pending = self.pending.lock();
1102        if pending.iter().all(|w| !w.will_wake(waker)) {
1103            pending.push(waker.clone());
1104        }
1105        waker_ref(self)
1106    }
1107}
1108
1109impl ArcWake for NotifierWrite {
1110    fn wake_by_ref(this: &Arc<Self>) {
1111        let wakers = mem::take(&mut *this.pending.lock());
1112        for waker in wakers {
1113            waker.wake();
1114        }
1115    }
1116}
1117
1118struct NotifierOpen {
1119    /// Wakers of pending tasks interested in creating new
1120    /// outbound substreams.
1121    pending: Vec<Waker>,
1122}
1123
1124impl NotifierOpen {
1125    /// Registers interest of a task in opening a new outbound substream.
1126    fn register(&mut self, waker: &Waker) {
1127        if self.pending.iter().all(|w| !w.will_wake(waker)) {
1128            self.pending.push(waker.clone());
1129        }
1130    }
1131
1132    fn wake_all(&mut self) {
1133        let wakers = mem::take(&mut self.pending);
1134        for waker in wakers {
1135            waker.wake();
1136        }
1137    }
1138}
1139
1140/// The maximum number of pending reset or close frames to send
1141/// we are willing to buffer beyond the configured substream limit.
1142/// This extra leeway bounds resource usage while allowing some
1143/// back-pressure when sending out these frames.
1144///
1145/// If too many pending frames accumulate, the multiplexed stream is
1146/// considered unhealthy and terminates with an error.
1147const EXTRA_PENDING_FRAMES: usize = 1000;
1148
1149#[cfg(test)]
1150mod tests {
1151    use std::{collections::HashSet, num::NonZeroU8, ops::DerefMut, pin::Pin};
1152
1153    use asynchronous_codec::{Decoder, Encoder};
1154    use bytes::BytesMut;
1155    use quickcheck::*;
1156    use tokio::runtime::Runtime;
1157
1158    use super::*;
1159
1160    impl Arbitrary for MaxBufferBehaviour {
1161        fn arbitrary(g: &mut Gen) -> MaxBufferBehaviour {
1162            *g.choose(&[MaxBufferBehaviour::Block, MaxBufferBehaviour::ResetStream])
1163                .unwrap()
1164        }
1165    }
1166
1167    impl Arbitrary for Config {
1168        fn arbitrary(g: &mut Gen) -> Config {
1169            Config {
1170                max_substreams: g.gen_range(1..100),
1171                max_buffer_len: g.gen_range(1..1000),
1172                max_buffer_behaviour: MaxBufferBehaviour::arbitrary(g),
1173                split_send_size: g.gen_range(1..10000),
1174                protocol_name: crate::config::DEFAULT_MPLEX_PROTOCOL_NAME,
1175            }
1176        }
1177    }
1178
1179    /// Memory-backed "connection".
1180    struct Connection {
1181        /// The buffer that the `Multiplexed` stream reads from.
1182        r_buf: BytesMut,
1183        /// The buffer that the `Multiplexed` stream writes to.
1184        w_buf: BytesMut,
1185        /// Whether the connection should return EOF on the next read.
1186        eof: bool,
1187    }
1188
1189    impl AsyncRead for Connection {
1190        fn poll_read(
1191            mut self: Pin<&mut Self>,
1192            _: &mut Context<'_>,
1193            buf: &mut [u8],
1194        ) -> Poll<io::Result<usize>> {
1195            if self.eof {
1196                return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
1197            }
1198            let n = std::cmp::min(buf.len(), self.r_buf.len());
1199            let data = self.r_buf.split_to(n);
1200            buf[..n].copy_from_slice(&data[..]);
1201            if n == 0 {
1202                Poll::Pending
1203            } else {
1204                Poll::Ready(Ok(n))
1205            }
1206        }
1207    }
1208
1209    impl AsyncWrite for Connection {
1210        fn poll_write(
1211            mut self: Pin<&mut Self>,
1212            _: &mut Context<'_>,
1213            buf: &[u8],
1214        ) -> Poll<io::Result<usize>> {
1215            self.w_buf.extend_from_slice(buf);
1216            Poll::Ready(Ok(buf.len()))
1217        }
1218
1219        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1220            Poll::Ready(Ok(()))
1221        }
1222
1223        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1224            Poll::Ready(Ok(()))
1225        }
1226    }
1227
1228    #[test]
1229    fn max_buffer_behaviour() {
1230        use tracing_subscriber::EnvFilter;
1231        let _ = tracing_subscriber::fmt()
1232            .with_env_filter(EnvFilter::from_default_env())
1233            .try_init();
1234
1235        fn prop(cfg: Config, overflow: NonZeroU8) {
1236            let mut r_buf = BytesMut::new();
1237            let mut codec = Codec::new();
1238
1239            // Open the maximum number of inbound streams.
1240            for i in 0..cfg.max_substreams {
1241                let stream_id = LocalStreamId::dialer(i as u64);
1242                codec.encode(Frame::Open { stream_id }, &mut r_buf).unwrap();
1243            }
1244
1245            // Send more data on stream 0 than the buffer permits.
1246            let stream_id = LocalStreamId::dialer(0);
1247            let data = Bytes::from("Hello world");
1248            for _ in 0..cfg.max_buffer_len + overflow.get() as usize {
1249                codec
1250                    .encode(
1251                        Frame::Data {
1252                            stream_id,
1253                            data: data.clone(),
1254                        },
1255                        &mut r_buf,
1256                    )
1257                    .unwrap();
1258            }
1259
1260            // Setup the multiplexed connection.
1261            let conn = Connection {
1262                r_buf,
1263                w_buf: BytesMut::new(),
1264                eof: false,
1265            };
1266            let mut m = Multiplexed::new(conn, cfg.clone());
1267
1268            let rt = Runtime::new().unwrap();
1269            rt.block_on(future::poll_fn(move |cx| {
1270                // Receive all inbound streams.
1271                for i in 0..cfg.max_substreams {
1272                    match m.poll_next_stream(cx) {
1273                        Poll::Pending => panic!("Expected new inbound stream."),
1274                        Poll::Ready(Err(e)) => panic!("{e:?}"),
1275                        Poll::Ready(Ok(id)) => {
1276                            assert_eq!(id, LocalStreamId::listener(i as u64));
1277                        }
1278                    };
1279                }
1280
1281                // Polling again for an inbound stream should yield `Pending`
1282                // after reading and buffering data frames up to the limit.
1283                let id = LocalStreamId::listener(0);
1284                match m.poll_next_stream(cx) {
1285                    Poll::Ready(r) => panic!("Unexpected result for next stream: {r:?}"),
1286                    Poll::Pending => {
1287                        // We expect the implementation to yield when the buffer
1288                        // is full but before it is exceeded and the max buffer
1289                        // behaviour takes effect, giving the current task a
1290                        // chance to read from the buffer. Here we just read
1291                        // again to provoke the max buffer behaviour.
1292                        assert_eq!(
1293                            m.substreams.get_mut(&id).unwrap().recv_buf().len(),
1294                            cfg.max_buffer_len
1295                        );
1296                        match m.poll_next_stream(cx) {
1297                            Poll::Ready(r) => panic!("Unexpected result for next stream: {r:?}"),
1298                            Poll::Pending => {
1299                                // Expect the buffer for stream 0 to be exceeded, triggering
1300                                // the max. buffer behaviour.
1301                                assert_eq!(
1302                                    m.substreams.get_mut(&id).unwrap().recv_buf().len(),
1303                                    cfg.max_buffer_len + 1
1304                                );
1305                            }
1306                        }
1307                    }
1308                }
1309
1310                // Expect either a `Reset` to be sent or all reads to be
1311                // blocked `Pending`, depending on the `MaxBufferBehaviour`.
1312                match cfg.max_buffer_behaviour {
1313                    MaxBufferBehaviour::ResetStream => {
1314                        let _ = m.poll_flush_stream(cx, id);
1315                        let w_buf = &mut m.io.get_mut().deref_mut().w_buf;
1316                        let frame = codec.decode(w_buf).unwrap();
1317                        let stream_id = stream_id.into_remote();
1318                        assert_eq!(frame, Some(Frame::Reset { stream_id }));
1319                    }
1320                    MaxBufferBehaviour::Block => {
1321                        assert!(m.poll_next_stream(cx).is_pending());
1322                        for i in 1..cfg.max_substreams {
1323                            let id = LocalStreamId::listener(i as u64);
1324                            assert!(m.poll_read_stream(cx, id).is_pending());
1325                        }
1326                    }
1327                }
1328
1329                // Drain the buffer by reading from the stream.
1330                for _ in 0..cfg.max_buffer_len + 1 {
1331                    match m.poll_read_stream(cx, id) {
1332                        Poll::Ready(Ok(Some(bytes))) => {
1333                            assert_eq!(bytes, data);
1334                        }
1335                        x => panic!("Unexpected: {x:?}"),
1336                    }
1337                }
1338
1339                // Read from the stream after the buffer has been drained,
1340                // expecting either EOF or further data, depending on
1341                // the `MaxBufferBehaviour`.
1342                match cfg.max_buffer_behaviour {
1343                    MaxBufferBehaviour::ResetStream => {
1344                        // Expect to read EOF
1345                        match m.poll_read_stream(cx, id) {
1346                            Poll::Ready(Ok(None)) => {}
1347                            poll => panic!("Unexpected: {poll:?}"),
1348                        }
1349                    }
1350                    MaxBufferBehaviour::Block => {
1351                        // Expect to be able to continue reading.
1352                        match m.poll_read_stream(cx, id) {
1353                            Poll::Ready(Ok(Some(bytes))) => assert_eq!(bytes, data),
1354                            Poll::Pending => assert_eq!(overflow.get(), 1),
1355                            poll => panic!("Unexpected: {poll:?}"),
1356                        }
1357                    }
1358                }
1359
1360                Poll::Ready(())
1361            }));
1362        }
1363
1364        quickcheck(prop as fn(_, _))
1365    }
1366
1367    #[test]
1368    fn close_on_error() {
1369        use tracing_subscriber::EnvFilter;
1370        let _ = tracing_subscriber::fmt()
1371            .with_env_filter(EnvFilter::from_default_env())
1372            .try_init();
1373
1374        fn prop(cfg: Config, num_streams: NonZeroU8) {
1375            let num_streams = cmp::min(cfg.max_substreams, num_streams.get() as usize);
1376
1377            // Setup the multiplexed connection.
1378            let conn = Connection {
1379                r_buf: BytesMut::new(),
1380                w_buf: BytesMut::new(),
1381                eof: false,
1382            };
1383            let mut m = Multiplexed::new(conn, cfg);
1384
1385            // Run the test.
1386            let mut opened = HashSet::new();
1387            let rt = Runtime::new().unwrap();
1388            rt.block_on(future::poll_fn(move |cx| {
1389                // Open a number of streams.
1390                for _ in 0..num_streams {
1391                    let id = ready!(m.poll_open_stream(cx)).unwrap();
1392                    assert!(opened.insert(id));
1393                    assert!(m.poll_read_stream(cx, id).is_pending());
1394                }
1395
1396                // Abruptly "close" the connection.
1397                m.io.get_mut().deref_mut().eof = true;
1398
1399                // Reading from any stream should yield an error and all streams
1400                // should be closed due to the failed connection.
1401                assert!(opened.iter().all(|id| match m.poll_read_stream(cx, *id) {
1402                    Poll::Ready(Err(e)) => e.kind() == io::ErrorKind::UnexpectedEof,
1403                    _ => false,
1404                }));
1405
1406                assert!(m.substreams.is_empty());
1407
1408                Poll::Ready(())
1409            }))
1410        }
1411
1412        quickcheck(prop as fn(_, _))
1413    }
1414}