libp2p_gossipsub/
rpc.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    future::Future,
23    pin::Pin,
24    sync::{
25        atomic::{AtomicUsize, Ordering},
26        Arc,
27    },
28    task::{Context, Poll},
29};
30
31use futures::{stream::Peekable, Stream, StreamExt};
32
33use crate::types::RpcOut;
34
35/// `RpcOut` sender that is priority aware.
36#[derive(Debug)]
37pub(crate) struct Sender {
38    /// Capacity of the priority channel for `Publish` messages.
39    priority_cap: usize,
40    len: Arc<AtomicUsize>,
41    pub(crate) priority_sender: async_channel::Sender<RpcOut>,
42    pub(crate) non_priority_sender: async_channel::Sender<RpcOut>,
43    priority_receiver: async_channel::Receiver<RpcOut>,
44    non_priority_receiver: async_channel::Receiver<RpcOut>,
45}
46
47impl Sender {
48    /// Create a RpcSender.
49    pub(crate) fn new(cap: usize) -> Sender {
50        // We intentionally do not bound the channel, as we still need to send control messages
51        // such as `GRAFT`, `PRUNE`, `SUBSCRIBE`, and `UNSUBSCRIBE`.
52        // That's also why we define `cap` and divide it by two,
53        // to ensure there is capacity for both priority and non_priority messages.
54        let (priority_sender, priority_receiver) = async_channel::unbounded();
55        let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
56        let len = Arc::new(AtomicUsize::new(0));
57        Sender {
58            priority_cap: cap / 2,
59            len,
60            priority_sender,
61            non_priority_sender,
62            priority_receiver,
63            non_priority_receiver,
64        }
65    }
66
67    /// Create a new Receiver to the sender.
68    pub(crate) fn new_receiver(&self) -> Receiver {
69        Receiver {
70            priority_queue_len: self.len.clone(),
71            priority: Box::pin(self.priority_receiver.clone().peekable()),
72            non_priority: Box::pin(self.non_priority_receiver.clone().peekable()),
73        }
74    }
75
76    #[allow(clippy::result_large_err)]
77    pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), RpcOut> {
78        if let RpcOut::Publish { .. } = rpc {
79            // Update number of publish message in queue.
80            let len = self.len.load(Ordering::Relaxed);
81            if len >= self.priority_cap {
82                return Err(rpc);
83            }
84            self.len.store(len + 1, Ordering::Relaxed);
85        }
86        let sender = match rpc {
87            RpcOut::Publish { .. }
88            | RpcOut::Graft(_)
89            | RpcOut::Prune(_)
90            | RpcOut::Subscribe(_)
91            | RpcOut::Unsubscribe(_) => &self.priority_sender,
92            RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
93                &self.non_priority_sender
94            }
95        };
96        sender.try_send(rpc).map_err(|err| err.into_inner())
97    }
98
99    /// Returns the current size of the priority queue.
100    pub(crate) fn priority_queue_len(&self) -> usize {
101        self.len.load(Ordering::Relaxed)
102    }
103
104    /// Returns the current size of the non-priority queue.
105    pub(crate) fn non_priority_queue_len(&self) -> usize {
106        self.non_priority_sender.len()
107    }
108}
109
110/// `RpcOut` sender that is priority aware.
111#[derive(Debug)]
112pub struct Receiver {
113    /// The maximum length of the priority queue.
114    pub(crate) priority_queue_len: Arc<AtomicUsize>,
115    /// The priority queue receiver.
116    pub(crate) priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
117    /// The non priority queue receiver.
118    pub(crate) non_priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
119}
120
121impl Receiver {
122    // Peek the next message in the queues and return it if its timeout has elapsed.
123    // Returns `None` if there aren't any more messages on the stream or none is stale.
124    pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
125        // Peek priority queue.
126        let priority = match self.priority.as_mut().poll_peek_mut(cx) {
127            Poll::Ready(Some(RpcOut::Publish {
128                message: _,
129                ref mut timeout,
130            })) => {
131                if Pin::new(timeout).poll(cx).is_ready() {
132                    // Return the message.
133                    let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
134                        .expect("There should be a message");
135                    return Poll::Ready(Some(dropped));
136                }
137                Poll::Ready(None)
138            }
139            poll => poll,
140        };
141
142        let non_priority = match self.non_priority.as_mut().poll_peek_mut(cx) {
143            Poll::Ready(Some(RpcOut::Forward {
144                message: _,
145                ref mut timeout,
146            })) => {
147                if Pin::new(timeout).poll(cx).is_ready() {
148                    // Return the message.
149                    let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
150                        .expect("There should be a message");
151                    return Poll::Ready(Some(dropped));
152                }
153                Poll::Ready(None)
154            }
155            poll => poll,
156        };
157
158        match (priority, non_priority) {
159            (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
160            _ => Poll::Pending,
161        }
162    }
163
164    /// Poll queues and return true if both are empty.
165    pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
166        matches!(
167            (
168                self.priority.as_mut().poll_peek(cx),
169                self.non_priority.as_mut().poll_peek(cx),
170            ),
171            (Poll::Ready(None), Poll::Ready(None))
172        )
173    }
174}
175
176impl Stream for Receiver {
177    type Item = RpcOut;
178
179    fn poll_next(
180        mut self: std::pin::Pin<&mut Self>,
181        cx: &mut std::task::Context<'_>,
182    ) -> std::task::Poll<Option<Self::Item>> {
183        // The priority queue is first polled.
184        if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
185            if let Some(RpcOut::Publish { .. }) = rpc {
186                self.priority_queue_len.fetch_sub(1, Ordering::Relaxed);
187            }
188            return Poll::Ready(rpc);
189        }
190        // Then we poll the non priority.
191        Pin::new(&mut self.non_priority).poll_next(cx)
192    }
193}