libp2p_gossipsub/
rpc.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    future::Future,
23    pin::Pin,
24    sync::{
25        atomic::{AtomicUsize, Ordering},
26        Arc,
27    },
28    task::{Context, Poll},
29};
30
31use futures::{stream::Peekable, Stream, StreamExt};
32
33use crate::types::RpcOut;
34
35/// `RpcOut` sender that is priority aware.
36#[derive(Debug)]
37pub(crate) struct Sender {
38    /// Capacity of the priority channel for `Publish` messages.
39    priority_cap: usize,
40    len: Arc<AtomicUsize>,
41    pub(crate) priority_sender: async_channel::Sender<RpcOut>,
42    pub(crate) non_priority_sender: async_channel::Sender<RpcOut>,
43    priority_receiver: async_channel::Receiver<RpcOut>,
44    non_priority_receiver: async_channel::Receiver<RpcOut>,
45}
46
47impl Sender {
48    /// Create a RpcSender.
49    pub(crate) fn new(cap: usize) -> Sender {
50        // We intentionally do not bound the channel, as we still need to send control messages
51        // such as `GRAFT`, `PRUNE`, `SUBSCRIBE`, and `UNSUBSCRIBE`.
52        // That's also why we define `cap` and divide it by two,
53        // to ensure there is capacity for both priority and non_priority messages.
54        let (priority_sender, priority_receiver) = async_channel::unbounded();
55        let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
56        let len = Arc::new(AtomicUsize::new(0));
57        Sender {
58            priority_cap: cap / 2,
59            len,
60            priority_sender,
61            non_priority_sender,
62            priority_receiver,
63            non_priority_receiver,
64        }
65    }
66
67    /// Create a new Receiver to the sender.
68    pub(crate) fn new_receiver(&self) -> Receiver {
69        Receiver {
70            priority_queue_len: self.len.clone(),
71            priority: Box::pin(self.priority_receiver.clone().peekable()),
72            non_priority: Box::pin(self.non_priority_receiver.clone().peekable()),
73        }
74    }
75
76    #[allow(clippy::result_large_err)]
77    pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), RpcOut> {
78        if let RpcOut::Publish { .. } = rpc {
79            // Update number of publish message in queue.
80            let len = self.len.load(Ordering::Relaxed);
81            if len >= self.priority_cap {
82                return Err(rpc);
83            }
84            self.len.store(len + 1, Ordering::Relaxed);
85        }
86        let sender = match rpc {
87            RpcOut::Publish { .. }
88            | RpcOut::Graft(_)
89            | RpcOut::Prune(_)
90            | RpcOut::Subscribe(_)
91            | RpcOut::Unsubscribe(_) => &self.priority_sender,
92            RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
93                &self.non_priority_sender
94            }
95        };
96        sender.try_send(rpc).map_err(|err| err.into_inner())
97    }
98
99    /// Returns the current size of the priority queue.
100    #[cfg(feature = "metrics")]
101    pub(crate) fn priority_queue_len(&self) -> usize {
102        self.len.load(Ordering::Relaxed)
103    }
104
105    /// Returns the current size of the non-priority queue.
106    #[cfg(feature = "metrics")]
107    pub(crate) fn non_priority_queue_len(&self) -> usize {
108        self.non_priority_sender.len()
109    }
110}
111
112/// `RpcOut` sender that is priority aware.
113#[derive(Debug)]
114pub struct Receiver {
115    /// The maximum length of the priority queue.
116    pub(crate) priority_queue_len: Arc<AtomicUsize>,
117    /// The priority queue receiver.
118    pub(crate) priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
119    /// The non priority queue receiver.
120    pub(crate) non_priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
121}
122
123impl Receiver {
124    // Peek the next message in the queues and return it if its timeout has elapsed.
125    // Returns `None` if there aren't any more messages on the stream or none is stale.
126    pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
127        // Peek priority queue.
128        let priority = match self.priority.as_mut().poll_peek_mut(cx) {
129            Poll::Ready(Some(RpcOut::Publish {
130                message: _,
131                ref mut timeout,
132            })) => {
133                if Pin::new(timeout).poll(cx).is_ready() {
134                    // Return the message.
135                    let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
136                        .expect("There should be a message");
137                    return Poll::Ready(Some(dropped));
138                }
139                Poll::Ready(None)
140            }
141            poll => poll,
142        };
143
144        let non_priority = match self.non_priority.as_mut().poll_peek_mut(cx) {
145            Poll::Ready(Some(RpcOut::Forward {
146                message: _,
147                ref mut timeout,
148            })) => {
149                if Pin::new(timeout).poll(cx).is_ready() {
150                    // Return the message.
151                    let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
152                        .expect("There should be a message");
153                    return Poll::Ready(Some(dropped));
154                }
155                Poll::Ready(None)
156            }
157            poll => poll,
158        };
159
160        match (priority, non_priority) {
161            (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
162            _ => Poll::Pending,
163        }
164    }
165
166    /// Poll queues and return true if both are empty.
167    pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
168        matches!(
169            (
170                self.priority.as_mut().poll_peek(cx),
171                self.non_priority.as_mut().poll_peek(cx),
172            ),
173            (Poll::Ready(None), Poll::Ready(None))
174        )
175    }
176}
177
178impl Stream for Receiver {
179    type Item = RpcOut;
180
181    fn poll_next(
182        mut self: std::pin::Pin<&mut Self>,
183        cx: &mut std::task::Context<'_>,
184    ) -> std::task::Poll<Option<Self::Item>> {
185        // The priority queue is first polled.
186        if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
187            if let Some(RpcOut::Publish { .. }) = rpc {
188                self.priority_queue_len.fetch_sub(1, Ordering::Relaxed);
189            }
190            return Poll::Ready(rpc);
191        }
192        // Then we poll the non priority.
193        Pin::new(&mut self.non_priority).poll_next(cx)
194    }
195}