1use std::{
22 future::Future,
23 pin::Pin,
24 sync::{
25 atomic::{AtomicUsize, Ordering},
26 Arc,
27 },
28 task::{Context, Poll},
29};
30
31use futures::{stream::Peekable, Stream, StreamExt};
32
33use crate::types::RpcOut;
34
35#[derive(Debug)]
37pub(crate) struct Sender {
38 priority_cap: usize,
40 len: Arc<AtomicUsize>,
41 pub(crate) priority_sender: async_channel::Sender<RpcOut>,
42 pub(crate) non_priority_sender: async_channel::Sender<RpcOut>,
43 priority_receiver: async_channel::Receiver<RpcOut>,
44 non_priority_receiver: async_channel::Receiver<RpcOut>,
45}
46
47impl Sender {
48 pub(crate) fn new(cap: usize) -> Sender {
50 let (priority_sender, priority_receiver) = async_channel::unbounded();
55 let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
56 let len = Arc::new(AtomicUsize::new(0));
57 Sender {
58 priority_cap: cap / 2,
59 len,
60 priority_sender,
61 non_priority_sender,
62 priority_receiver,
63 non_priority_receiver,
64 }
65 }
66
67 pub(crate) fn new_receiver(&self) -> Receiver {
69 Receiver {
70 priority_queue_len: self.len.clone(),
71 priority: Box::pin(self.priority_receiver.clone().peekable()),
72 non_priority: Box::pin(self.non_priority_receiver.clone().peekable()),
73 }
74 }
75
76 #[allow(clippy::result_large_err)]
77 pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), RpcOut> {
78 if let RpcOut::Publish { .. } = rpc {
79 let len = self.len.load(Ordering::Relaxed);
81 if len >= self.priority_cap {
82 return Err(rpc);
83 }
84 self.len.store(len + 1, Ordering::Relaxed);
85 }
86 let sender = match rpc {
87 RpcOut::Publish { .. }
88 | RpcOut::Graft(_)
89 | RpcOut::Prune(_)
90 | RpcOut::Subscribe(_)
91 | RpcOut::Unsubscribe(_) => &self.priority_sender,
92 RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
93 &self.non_priority_sender
94 }
95 };
96 sender.try_send(rpc).map_err(|err| err.into_inner())
97 }
98
99 #[cfg(feature = "metrics")]
101 pub(crate) fn priority_queue_len(&self) -> usize {
102 self.len.load(Ordering::Relaxed)
103 }
104
105 #[cfg(feature = "metrics")]
107 pub(crate) fn non_priority_queue_len(&self) -> usize {
108 self.non_priority_sender.len()
109 }
110}
111
112#[derive(Debug)]
114pub struct Receiver {
115 pub(crate) priority_queue_len: Arc<AtomicUsize>,
117 pub(crate) priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
119 pub(crate) non_priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
121}
122
123impl Receiver {
124 pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
127 let priority = match self.priority.as_mut().poll_peek_mut(cx) {
129 Poll::Ready(Some(RpcOut::Publish {
130 message: _,
131 ref mut timeout,
132 })) => {
133 if Pin::new(timeout).poll(cx).is_ready() {
134 let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
136 .expect("There should be a message");
137 return Poll::Ready(Some(dropped));
138 }
139 Poll::Ready(None)
140 }
141 poll => poll,
142 };
143
144 let non_priority = match self.non_priority.as_mut().poll_peek_mut(cx) {
145 Poll::Ready(Some(RpcOut::Forward {
146 message: _,
147 ref mut timeout,
148 })) => {
149 if Pin::new(timeout).poll(cx).is_ready() {
150 let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
152 .expect("There should be a message");
153 return Poll::Ready(Some(dropped));
154 }
155 Poll::Ready(None)
156 }
157 poll => poll,
158 };
159
160 match (priority, non_priority) {
161 (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
162 _ => Poll::Pending,
163 }
164 }
165
166 pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
168 matches!(
169 (
170 self.priority.as_mut().poll_peek(cx),
171 self.non_priority.as_mut().poll_peek(cx),
172 ),
173 (Poll::Ready(None), Poll::Ready(None))
174 )
175 }
176}
177
178impl Stream for Receiver {
179 type Item = RpcOut;
180
181 fn poll_next(
182 mut self: std::pin::Pin<&mut Self>,
183 cx: &mut std::task::Context<'_>,
184 ) -> std::task::Poll<Option<Self::Item>> {
185 if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
187 if let Some(RpcOut::Publish { .. }) = rpc {
188 self.priority_queue_len.fetch_sub(1, Ordering::Relaxed);
189 }
190 return Poll::Ready(rpc);
191 }
192 Pin::new(&mut self.non_priority).poll_next(cx)
194 }
195}