libp2p_memory_connection_limits/
lib.rs1use std::{
22 convert::Infallible,
23 fmt,
24 task::{Context, Poll},
25 time::{Duration, Instant},
26};
27
28use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31 dummy, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
32 THandlerOutEvent, ToSwarm,
33};
34use sysinfo::MemoryRefreshKind;
35
36pub struct Behaviour {
71 max_allowed_bytes: usize,
72 process_physical_memory_bytes: usize,
73 last_refreshed: Instant,
74}
75
76const MAX_STALE_DURATION: Duration = Duration::from_millis(100);
81
82impl Behaviour {
83 pub fn with_max_bytes(max_allowed_bytes: usize) -> Self {
87 Self {
88 max_allowed_bytes,
89 process_physical_memory_bytes: memory_stats::memory_stats()
90 .map(|s| s.physical_mem)
91 .unwrap_or_default(),
92 last_refreshed: Instant::now(),
93 }
94 }
95
96 pub fn with_max_percentage(percentage: f64) -> Self {
100 use sysinfo::{RefreshKind, System};
101
102 let system_memory_bytes = System::new_with_specifics(
103 RefreshKind::default().with_memory(MemoryRefreshKind::default().with_ram()),
104 )
105 .total_memory();
106
107 Self::with_max_bytes((system_memory_bytes as f64 * percentage).round() as usize)
108 }
109
110 pub fn max_allowed_bytes(&self) -> usize {
112 self.max_allowed_bytes
113 }
114
115 fn check_limit(&mut self) -> Result<(), ConnectionDenied> {
116 self.refresh_memory_stats_if_needed();
117
118 if self.process_physical_memory_bytes > self.max_allowed_bytes {
119 return Err(ConnectionDenied::new(MemoryUsageLimitExceeded {
120 process_physical_memory_bytes: self.process_physical_memory_bytes,
121 max_allowed_bytes: self.max_allowed_bytes,
122 }));
123 }
124
125 Ok(())
126 }
127
128 fn refresh_memory_stats_if_needed(&mut self) {
129 let now = Instant::now();
130
131 if self.last_refreshed + MAX_STALE_DURATION > now {
132 return;
134 }
135
136 let Some(stats) = memory_stats::memory_stats() else {
137 tracing::warn!("Failed to retrieve process memory stats");
138 return;
139 };
140
141 self.last_refreshed = now;
142 self.process_physical_memory_bytes = stats.physical_mem;
143 }
144}
145
146impl NetworkBehaviour for Behaviour {
147 type ConnectionHandler = dummy::ConnectionHandler;
148 type ToSwarm = Infallible;
149
150 fn handle_pending_inbound_connection(
151 &mut self,
152 _: ConnectionId,
153 _: &Multiaddr,
154 _: &Multiaddr,
155 ) -> Result<(), ConnectionDenied> {
156 self.check_limit()
157 }
158
159 fn handle_established_inbound_connection(
160 &mut self,
161 _: ConnectionId,
162 _: PeerId,
163 _: &Multiaddr,
164 _: &Multiaddr,
165 ) -> Result<THandler<Self>, ConnectionDenied> {
166 Ok(dummy::ConnectionHandler)
167 }
168
169 fn handle_pending_outbound_connection(
170 &mut self,
171 _: ConnectionId,
172 _: Option<PeerId>,
173 _: &[Multiaddr],
174 _: Endpoint,
175 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
176 self.check_limit()?;
177 Ok(vec![])
178 }
179
180 fn handle_established_outbound_connection(
181 &mut self,
182 _: ConnectionId,
183 _: PeerId,
184 _: &Multiaddr,
185 _: Endpoint,
186 _: PortUse,
187 ) -> Result<THandler<Self>, ConnectionDenied> {
188 Ok(dummy::ConnectionHandler)
189 }
190
191 fn on_swarm_event(&mut self, _: FromSwarm) {}
192
193 fn on_connection_handler_event(
194 &mut self,
195 _id: PeerId,
196 _: ConnectionId,
197 event: THandlerOutEvent<Self>,
198 ) {
199 #[allow(unreachable_patterns)]
201 libp2p_core::util::unreachable(event)
202 }
203
204 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
205 Poll::Pending
206 }
207}
208
209#[derive(Debug, Clone, Copy)]
211pub struct MemoryUsageLimitExceeded {
212 process_physical_memory_bytes: usize,
213 max_allowed_bytes: usize,
214}
215
216impl MemoryUsageLimitExceeded {
217 pub fn process_physical_memory_bytes(&self) -> usize {
218 self.process_physical_memory_bytes
219 }
220
221 pub fn max_allowed_bytes(&self) -> usize {
222 self.max_allowed_bytes
223 }
224}
225
226impl std::error::Error for MemoryUsageLimitExceeded {}
227
228impl fmt::Display for MemoryUsageLimitExceeded {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 write!(
231 f,
232 "process physical memory usage limit exceeded: process memory: {} bytes, max allowed: {} bytes",
233 self.process_physical_memory_bytes,
234 self.max_allowed_bytes,
235 )
236 }
237}