1use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use libp2p_swarm::{ConnectionId, DialError, SwarmEvent};
27use prometheus_client::{
28 encoding::{EncodeLabelSet, EncodeLabelValue},
29 metrics::{
30 counter::Counter,
31 family::Family,
32 histogram::{exponential_buckets, Histogram},
33 },
34 registry::{Registry, Unit},
35};
36use web_time::Instant;
37
38use crate::protocol_stack;
39
40pub(crate) struct Metrics {
41 connections_incoming: Family<AddressLabels, Counter>,
42 connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
43
44 connections_established: Family<ConnectionLabels, Counter>,
45 connections_establishment_duration: Family<ConnectionLabels, Histogram>,
46 connections_duration: Family<ConnectionClosedLabels, Histogram>,
47
48 new_listen_addr: Family<AddressLabels, Counter>,
49 expired_listen_addr: Family<AddressLabels, Counter>,
50
51 external_addr_candidates: Family<AddressLabels, Counter>,
52 external_addr_confirmed: Family<AddressLabels, Counter>,
53 external_addr_expired: Family<AddressLabels, Counter>,
54
55 listener_closed: Family<AddressLabels, Counter>,
56 listener_error: Counter,
57
58 dial_attempt: Counter,
59 outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
60
61 connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
62}
63
64impl Metrics {
65 pub(crate) fn new(registry: &mut Registry) -> Self {
66 let sub_registry = registry.sub_registry_with_prefix("swarm");
67
68 let connections_incoming = Family::default();
69 sub_registry.register(
70 "connections_incoming",
71 "Number of incoming connections per address stack",
72 connections_incoming.clone(),
73 );
74
75 let connections_incoming_error = Family::default();
76 sub_registry.register(
77 "connections_incoming_error",
78 "Number of incoming connection errors",
79 connections_incoming_error.clone(),
80 );
81
82 let new_listen_addr = Family::default();
83 sub_registry.register(
84 "new_listen_addr",
85 "Number of new listen addresses",
86 new_listen_addr.clone(),
87 );
88
89 let expired_listen_addr = Family::default();
90 sub_registry.register(
91 "expired_listen_addr",
92 "Number of expired listen addresses",
93 expired_listen_addr.clone(),
94 );
95
96 let external_addr_candidates = Family::default();
97 sub_registry.register(
98 "external_addr_candidates",
99 "Number of new external address candidates",
100 external_addr_candidates.clone(),
101 );
102
103 let external_addr_confirmed = Family::default();
104 sub_registry.register(
105 "external_addr_confirmed",
106 "Number of confirmed external addresses",
107 external_addr_confirmed.clone(),
108 );
109
110 let external_addr_expired = Family::default();
111 sub_registry.register(
112 "external_addr_expired",
113 "Number of expired external addresses",
114 external_addr_expired.clone(),
115 );
116
117 let listener_closed = Family::default();
118 sub_registry.register(
119 "listener_closed",
120 "Number of listeners closed",
121 listener_closed.clone(),
122 );
123
124 let listener_error = Counter::default();
125 sub_registry.register(
126 "listener_error",
127 "Number of listener errors",
128 listener_error.clone(),
129 );
130
131 let dial_attempt = Counter::default();
132 sub_registry.register(
133 "dial_attempt",
134 "Number of dial attempts",
135 dial_attempt.clone(),
136 );
137
138 let outgoing_connection_error = Family::default();
139 sub_registry.register(
140 "outgoing_connection_error",
141 "Number outgoing connection errors",
142 outgoing_connection_error.clone(),
143 );
144
145 let connections_established = Family::default();
146 sub_registry.register(
147 "connections_established",
148 "Number of connections established",
149 connections_established.clone(),
150 );
151
152 let connections_establishment_duration = {
153 let constructor: fn() -> Histogram =
154 || Histogram::new(exponential_buckets(0.01, 1.5, 20));
155 Family::new_with_constructor(constructor)
156 };
157 sub_registry.register(
158 "connections_establishment_duration",
159 "Time it took (locally) to establish connections",
160 connections_establishment_duration.clone(),
161 );
162
163 let connections_duration = {
164 let constructor: fn() -> Histogram =
165 || Histogram::new(exponential_buckets(0.01, 3.0, 20));
166 Family::new_with_constructor(constructor)
167 };
168 sub_registry.register_with_unit(
169 "connections_duration",
170 "Time a connection was alive",
171 Unit::Seconds,
172 connections_duration.clone(),
173 );
174
175 Self {
176 connections_incoming,
177 connections_incoming_error,
178 connections_established,
179 new_listen_addr,
180 expired_listen_addr,
181 external_addr_candidates,
182 external_addr_confirmed,
183 external_addr_expired,
184 listener_closed,
185 listener_error,
186 dial_attempt,
187 outgoing_connection_error,
188 connections_establishment_duration,
189 connections_duration,
190 connections: Default::default(),
191 }
192 }
193}
194
195impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
196 fn record(&self, event: &SwarmEvent<TBvEv>) {
197 match event {
198 SwarmEvent::Behaviour(_) => {}
199 SwarmEvent::ConnectionEstablished {
200 endpoint,
201 established_in: time_taken,
202 connection_id,
203 ..
204 } => {
205 let labels = ConnectionLabels {
206 role: endpoint.into(),
207 protocols: protocol_stack::as_string(endpoint.get_remote_address()),
208 };
209 self.connections_established.get_or_create(&labels).inc();
210 self.connections_establishment_duration
211 .get_or_create(&labels)
212 .observe(time_taken.as_secs_f64());
213 self.connections
214 .lock()
215 .expect("lock not to be poisoned")
216 .insert(*connection_id, Instant::now());
217 }
218 SwarmEvent::ConnectionClosed {
219 endpoint,
220 connection_id,
221 cause,
222 ..
223 } => {
224 let labels = ConnectionClosedLabels {
225 connection: ConnectionLabels {
226 role: endpoint.into(),
227 protocols: protocol_stack::as_string(endpoint.get_remote_address()),
228 },
229 cause: cause.as_ref().map(Into::into),
230 };
231 self.connections_duration.get_or_create(&labels).observe(
232 self.connections
233 .lock()
234 .expect("lock not to be poisoned")
235 .remove(connection_id)
236 .expect("closed connection to previously be established")
237 .elapsed()
238 .as_secs_f64(),
239 );
240 }
241 SwarmEvent::IncomingConnection { send_back_addr, .. } => {
242 self.connections_incoming
243 .get_or_create(&AddressLabels {
244 protocols: protocol_stack::as_string(send_back_addr),
245 })
246 .inc();
247 }
248 SwarmEvent::IncomingConnectionError {
249 error,
250 send_back_addr,
251 ..
252 } => {
253 self.connections_incoming_error
254 .get_or_create(&IncomingConnectionErrorLabels {
255 error: error.into(),
256 protocols: protocol_stack::as_string(send_back_addr),
257 })
258 .inc();
259 }
260 SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
261 let peer = match peer_id {
262 Some(_) => PeerStatus::Known,
263 None => PeerStatus::Unknown,
264 };
265
266 let record = |error| {
267 self.outgoing_connection_error
268 .get_or_create(&OutgoingConnectionErrorLabels { peer, error })
269 .inc();
270 };
271
272 match error {
273 DialError::Transport(errors) => {
274 for (_multiaddr, error) in errors {
275 match error {
276 libp2p_core::transport::TransportError::MultiaddrNotSupported(
277 _,
278 ) => {
279 record(OutgoingConnectionError::TransportMultiaddrNotSupported)
280 }
281 libp2p_core::transport::TransportError::Other(_) => {
282 record(OutgoingConnectionError::TransportOther)
283 }
284 };
285 }
286 }
287 DialError::LocalPeerId { .. } => record(OutgoingConnectionError::LocalPeerId),
288 DialError::NoAddresses => record(OutgoingConnectionError::NoAddresses),
289 DialError::DialPeerConditionFalse(_) => {
290 record(OutgoingConnectionError::DialPeerConditionFalse)
291 }
292 DialError::Aborted => record(OutgoingConnectionError::Aborted),
293 DialError::WrongPeerId { .. } => record(OutgoingConnectionError::WrongPeerId),
294 DialError::Denied { .. } => record(OutgoingConnectionError::Denied),
295 };
296 }
297 SwarmEvent::NewListenAddr { address, .. } => {
298 self.new_listen_addr
299 .get_or_create(&AddressLabels {
300 protocols: protocol_stack::as_string(address),
301 })
302 .inc();
303 }
304 SwarmEvent::ExpiredListenAddr { address, .. } => {
305 self.expired_listen_addr
306 .get_or_create(&AddressLabels {
307 protocols: protocol_stack::as_string(address),
308 })
309 .inc();
310 }
311 SwarmEvent::ListenerClosed { addresses, .. } => {
312 for address in addresses {
313 self.listener_closed
314 .get_or_create(&AddressLabels {
315 protocols: protocol_stack::as_string(address),
316 })
317 .inc();
318 }
319 }
320 SwarmEvent::ListenerError { .. } => {
321 self.listener_error.inc();
322 }
323 SwarmEvent::Dialing { .. } => {
324 self.dial_attempt.inc();
325 }
326 SwarmEvent::NewExternalAddrCandidate { address } => {
327 self.external_addr_candidates
328 .get_or_create(&AddressLabels {
329 protocols: protocol_stack::as_string(address),
330 })
331 .inc();
332 }
333 SwarmEvent::ExternalAddrConfirmed { address } => {
334 self.external_addr_confirmed
335 .get_or_create(&AddressLabels {
336 protocols: protocol_stack::as_string(address),
337 })
338 .inc();
339 }
340 SwarmEvent::ExternalAddrExpired { address } => {
341 self.external_addr_expired
342 .get_or_create(&AddressLabels {
343 protocols: protocol_stack::as_string(address),
344 })
345 .inc();
346 }
347 _ => {}
348 }
349 }
350}
351
352#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
353struct ConnectionLabels {
354 role: Role,
355 protocols: String,
356}
357
358#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
359struct ConnectionClosedLabels {
360 cause: Option<ConnectionError>,
361 #[prometheus(flatten)]
362 connection: ConnectionLabels,
363}
364
365#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
366enum ConnectionError {
367 Io,
368 KeepAliveTimeout,
369}
370
371impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
372 fn from(value: &libp2p_swarm::ConnectionError) -> Self {
373 match value {
374 libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
375 libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
376 }
377 }
378}
379
380#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
381struct AddressLabels {
382 protocols: String,
383}
384
385#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
386enum Role {
387 Dialer,
388 Listener,
389}
390
391impl From<&libp2p_core::ConnectedPoint> for Role {
392 fn from(point: &libp2p_core::ConnectedPoint) -> Self {
393 match point {
394 libp2p_core::ConnectedPoint::Dialer { .. } => Role::Dialer,
395 libp2p_core::ConnectedPoint::Listener { .. } => Role::Listener,
396 }
397 }
398}
399
400#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
401struct OutgoingConnectionErrorLabels {
402 peer: PeerStatus,
403 error: OutgoingConnectionError,
404}
405
406#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)]
407enum PeerStatus {
408 Known,
409 Unknown,
410}
411
412#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
413enum OutgoingConnectionError {
414 LocalPeerId,
415 NoAddresses,
416 DialPeerConditionFalse,
417 Aborted,
418 WrongPeerId,
419 TransportMultiaddrNotSupported,
420 TransportOther,
421 Denied,
422}
423
424#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
425struct IncomingConnectionErrorLabels {
426 error: IncomingConnectionError,
427 protocols: String,
428}
429
430#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
431enum IncomingConnectionError {
432 WrongPeerId,
433 LocalPeerId,
434 TransportErrorMultiaddrNotSupported,
435 TransportErrorOther,
436 Aborted,
437 Denied,
438}
439
440impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
441 fn from(error: &libp2p_swarm::ListenError) -> Self {
442 match error {
443 libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
444 libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
445 libp2p_swarm::ListenError::Transport(
446 libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
447 ) => IncomingConnectionError::TransportErrorMultiaddrNotSupported,
448 libp2p_swarm::ListenError::Transport(
449 libp2p_core::transport::TransportError::Other(_),
450 ) => IncomingConnectionError::TransportErrorOther,
451 libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
452 libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied,
453 }
454 }
455}