1use std::{
22 future::Future,
23 pin::Pin,
24 sync::{
25 atomic::{AtomicUsize, Ordering},
26 Arc,
27 },
28 task::{Context, Poll},
29};
30
31use futures::{stream::Peekable, Stream, StreamExt};
32
33use crate::types::RpcOut;
34
35#[derive(Debug)]
37pub(crate) struct Sender {
38 priority_cap: usize,
40 len: Arc<AtomicUsize>,
41 pub(crate) priority_sender: async_channel::Sender<RpcOut>,
42 pub(crate) non_priority_sender: async_channel::Sender<RpcOut>,
43 priority_receiver: async_channel::Receiver<RpcOut>,
44 non_priority_receiver: async_channel::Receiver<RpcOut>,
45}
46
47impl Sender {
48 pub(crate) fn new(cap: usize) -> Sender {
50 let (priority_sender, priority_receiver) = async_channel::unbounded();
55 let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
56 let len = Arc::new(AtomicUsize::new(0));
57 Sender {
58 priority_cap: cap / 2,
59 len,
60 priority_sender,
61 non_priority_sender,
62 priority_receiver,
63 non_priority_receiver,
64 }
65 }
66
67 pub(crate) fn new_receiver(&self) -> Receiver {
69 Receiver {
70 priority_queue_len: self.len.clone(),
71 priority: Box::pin(self.priority_receiver.clone().peekable()),
72 non_priority: Box::pin(self.non_priority_receiver.clone().peekable()),
73 }
74 }
75
76 #[allow(clippy::result_large_err)]
77 pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), RpcOut> {
78 if let RpcOut::Publish { .. } = rpc {
79 let len = self.len.load(Ordering::Relaxed);
81 if len >= self.priority_cap {
82 return Err(rpc);
83 }
84 self.len.store(len + 1, Ordering::Relaxed);
85 }
86 let sender = match rpc {
87 RpcOut::Publish { .. }
88 | RpcOut::Graft(_)
89 | RpcOut::Prune(_)
90 | RpcOut::Subscribe(_)
91 | RpcOut::Unsubscribe(_) => &self.priority_sender,
92 RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
93 &self.non_priority_sender
94 }
95 };
96 sender.try_send(rpc).map_err(|err| err.into_inner())
97 }
98
99 pub(crate) fn priority_queue_len(&self) -> usize {
101 self.len.load(Ordering::Relaxed)
102 }
103
104 pub(crate) fn non_priority_queue_len(&self) -> usize {
106 self.non_priority_sender.len()
107 }
108}
109
110#[derive(Debug)]
112pub struct Receiver {
113 pub(crate) priority_queue_len: Arc<AtomicUsize>,
115 pub(crate) priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
117 pub(crate) non_priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
119}
120
121impl Receiver {
122 pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
125 let priority = match self.priority.as_mut().poll_peek_mut(cx) {
127 Poll::Ready(Some(RpcOut::Publish {
128 message: _,
129 ref mut timeout,
130 })) => {
131 if Pin::new(timeout).poll(cx).is_ready() {
132 let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
134 .expect("There should be a message");
135 return Poll::Ready(Some(dropped));
136 }
137 Poll::Ready(None)
138 }
139 poll => poll,
140 };
141
142 let non_priority = match self.non_priority.as_mut().poll_peek_mut(cx) {
143 Poll::Ready(Some(RpcOut::Forward {
144 message: _,
145 ref mut timeout,
146 })) => {
147 if Pin::new(timeout).poll(cx).is_ready() {
148 let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
150 .expect("There should be a message");
151 return Poll::Ready(Some(dropped));
152 }
153 Poll::Ready(None)
154 }
155 poll => poll,
156 };
157
158 match (priority, non_priority) {
159 (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
160 _ => Poll::Pending,
161 }
162 }
163
164 pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
166 matches!(
167 (
168 self.priority.as_mut().poll_peek(cx),
169 self.non_priority.as_mut().poll_peek(cx),
170 ),
171 (Poll::Ready(None), Poll::Ready(None))
172 )
173 }
174}
175
176impl Stream for Receiver {
177 type Item = RpcOut;
178
179 fn poll_next(
180 mut self: std::pin::Pin<&mut Self>,
181 cx: &mut std::task::Context<'_>,
182 ) -> std::task::Poll<Option<Self::Item>> {
183 if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
185 if let Some(RpcOut::Publish { .. }) = rpc {
186 self.priority_queue_len.fetch_sub(1, Ordering::Relaxed);
187 }
188 return Poll::Ready(rpc);
189 }
190 Pin::new(&mut self.non_priority).poll_next(cx)
192 }
193}