libp2p_memory_connection_limits/
lib.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    convert::Infallible,
23    fmt,
24    task::{Context, Poll},
25    time::{Duration, Instant},
26};
27
28use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31    dummy, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
32    THandlerOutEvent, ToSwarm,
33};
34use sysinfo::MemoryRefreshKind;
35
36/// A [`NetworkBehaviour`] that enforces a set of memory usage based limits.
37///
38/// For these limits to take effect, this needs to be composed
39/// into the behaviour tree of your application.
40///
41/// If a connection is denied due to a limit, either a
42/// [`SwarmEvent::IncomingConnectionError`](libp2p_swarm::SwarmEvent::IncomingConnectionError)
43/// or [`SwarmEvent::OutgoingConnectionError`](libp2p_swarm::SwarmEvent::OutgoingConnectionError)
44/// will be emitted. The [`ListenError::Denied`](libp2p_swarm::ListenError::Denied) and respectively
45/// the [`DialError::Denied`](libp2p_swarm::DialError::Denied) variant
46/// contain a [`ConnectionDenied`] type that can be downcast to [`MemoryUsageLimitExceeded`] error
47/// if (and only if) **this** behaviour denied the connection.
48///
49/// If you employ multiple [`NetworkBehaviour`]s that manage connections,
50/// it may also be a different error.
51///
52/// [Behaviour::with_max_bytes] and [Behaviour::with_max_percentage] are mutually exclusive.
53/// If you need to employ both of them,
54/// compose two instances of [Behaviour] into your custom behaviour.
55///
56/// # Example
57///
58/// ```rust
59/// # use libp2p_identify as identify;
60/// # use libp2p_swarm_derive::NetworkBehaviour;
61/// # use libp2p_memory_connection_limits as memory_connection_limits;
62///
63/// #[derive(NetworkBehaviour)]
64/// # #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
65/// struct MyBehaviour {
66///     identify: identify::Behaviour,
67///     limits: memory_connection_limits::Behaviour,
68/// }
69/// ```
70pub struct Behaviour {
71    max_allowed_bytes: usize,
72    process_physical_memory_bytes: usize,
73    last_refreshed: Instant,
74}
75
76/// The maximum duration for which the retrieved memory-stats
77/// of the process are allowed to be stale.
78///
79/// Once exceeded, we will retrieve new stats.
80const MAX_STALE_DURATION: Duration = Duration::from_millis(100);
81
82impl Behaviour {
83    /// Sets the process memory usage threshold in absolute bytes.
84    ///
85    /// New inbound and outbound connections will be denied when the threshold is reached.
86    pub fn with_max_bytes(max_allowed_bytes: usize) -> Self {
87        Self {
88            max_allowed_bytes,
89            process_physical_memory_bytes: memory_stats::memory_stats()
90                .map(|s| s.physical_mem)
91                .unwrap_or_default(),
92            last_refreshed: Instant::now(),
93        }
94    }
95
96    /// Sets the process memory usage threshold in the percentage of the total physical memory.
97    ///
98    /// New inbound and outbound connections will be denied when the threshold is reached.
99    pub fn with_max_percentage(percentage: f64) -> Self {
100        use sysinfo::{RefreshKind, System};
101
102        let system_memory_bytes = System::new_with_specifics(
103            RefreshKind::default().with_memory(MemoryRefreshKind::default().with_ram()),
104        )
105        .total_memory();
106
107        Self::with_max_bytes((system_memory_bytes as f64 * percentage).round() as usize)
108    }
109
110    /// Gets the process memory usage threshold in bytes.
111    pub fn max_allowed_bytes(&self) -> usize {
112        self.max_allowed_bytes
113    }
114
115    fn check_limit(&mut self) -> Result<(), ConnectionDenied> {
116        self.refresh_memory_stats_if_needed();
117
118        if self.process_physical_memory_bytes > self.max_allowed_bytes {
119            return Err(ConnectionDenied::new(MemoryUsageLimitExceeded {
120                process_physical_memory_bytes: self.process_physical_memory_bytes,
121                max_allowed_bytes: self.max_allowed_bytes,
122            }));
123        }
124
125        Ok(())
126    }
127
128    fn refresh_memory_stats_if_needed(&mut self) {
129        let now = Instant::now();
130
131        if self.last_refreshed + MAX_STALE_DURATION > now {
132            // Memory stats are reasonably recent, don't refresh.
133            return;
134        }
135
136        let Some(stats) = memory_stats::memory_stats() else {
137            tracing::warn!("Failed to retrieve process memory stats");
138            return;
139        };
140
141        self.last_refreshed = now;
142        self.process_physical_memory_bytes = stats.physical_mem;
143    }
144}
145
146impl NetworkBehaviour for Behaviour {
147    type ConnectionHandler = dummy::ConnectionHandler;
148    type ToSwarm = Infallible;
149
150    fn handle_pending_inbound_connection(
151        &mut self,
152        _: ConnectionId,
153        _: &Multiaddr,
154        _: &Multiaddr,
155    ) -> Result<(), ConnectionDenied> {
156        self.check_limit()
157    }
158
159    fn handle_established_inbound_connection(
160        &mut self,
161        _: ConnectionId,
162        _: PeerId,
163        _: &Multiaddr,
164        _: &Multiaddr,
165    ) -> Result<THandler<Self>, ConnectionDenied> {
166        Ok(dummy::ConnectionHandler)
167    }
168
169    fn handle_pending_outbound_connection(
170        &mut self,
171        _: ConnectionId,
172        _: Option<PeerId>,
173        _: &[Multiaddr],
174        _: Endpoint,
175    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
176        self.check_limit()?;
177        Ok(vec![])
178    }
179
180    fn handle_established_outbound_connection(
181        &mut self,
182        _: ConnectionId,
183        _: PeerId,
184        _: &Multiaddr,
185        _: Endpoint,
186        _: PortUse,
187    ) -> Result<THandler<Self>, ConnectionDenied> {
188        Ok(dummy::ConnectionHandler)
189    }
190
191    fn on_swarm_event(&mut self, _: FromSwarm) {}
192
193    fn on_connection_handler_event(
194        &mut self,
195        _id: PeerId,
196        _: ConnectionId,
197        event: THandlerOutEvent<Self>,
198    ) {
199        // TODO: remove when Rust 1.82 is MSRV
200        #[allow(unreachable_patterns)]
201        libp2p_core::util::unreachable(event)
202    }
203
204    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
205        Poll::Pending
206    }
207}
208
209/// A connection limit has been exceeded.
210#[derive(Debug, Clone, Copy)]
211pub struct MemoryUsageLimitExceeded {
212    process_physical_memory_bytes: usize,
213    max_allowed_bytes: usize,
214}
215
216impl MemoryUsageLimitExceeded {
217    pub fn process_physical_memory_bytes(&self) -> usize {
218        self.process_physical_memory_bytes
219    }
220
221    pub fn max_allowed_bytes(&self) -> usize {
222        self.max_allowed_bytes
223    }
224}
225
226impl std::error::Error for MemoryUsageLimitExceeded {}
227
228impl fmt::Display for MemoryUsageLimitExceeded {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        write!(
231            f,
232            "process physical memory usage limit exceeded: process memory: {} bytes, max allowed: {} bytes",
233            self.process_physical_memory_bytes,
234            self.max_allowed_bytes,
235        )
236    }
237}