1use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use libp2p_swarm::{ConnectionId, DialError, SwarmEvent};
27use prometheus_client::{
28 encoding::{EncodeLabelSet, EncodeLabelValue},
29 metrics::{
30 counter::Counter,
31 family::Family,
32 histogram::{exponential_buckets, Histogram},
33 },
34 registry::{Registry, Unit},
35};
36use web_time::Instant;
37
38use crate::protocol_stack;
39
40pub(crate) struct Metrics {
41 connections_incoming: Family<AddressLabels, Counter>,
42 connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
43
44 connections_established: Family<ConnectionLabels, Counter>,
45 connections_establishment_duration: Family<ConnectionLabels, Histogram>,
46 connections_duration: Family<ConnectionClosedLabels, Histogram>,
47
48 new_listen_addr: Family<AddressLabels, Counter>,
49 expired_listen_addr: Family<AddressLabels, Counter>,
50
51 external_addr_candidates: Family<AddressLabels, Counter>,
52 external_addr_confirmed: Family<AddressLabels, Counter>,
53 external_addr_expired: Family<AddressLabels, Counter>,
54
55 listener_closed: Family<AddressLabels, Counter>,
56 listener_error: Counter,
57
58 dial_attempt: Counter,
59 outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
60
61 connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
62}
63
64impl Metrics {
65 pub(crate) fn new(registry: &mut Registry) -> Self {
66 let sub_registry = registry.sub_registry_with_prefix("swarm");
67
68 let connections_incoming = Family::default();
69 sub_registry.register(
70 "connections_incoming",
71 "Number of incoming connections per address stack",
72 connections_incoming.clone(),
73 );
74
75 let connections_incoming_error = Family::default();
76 sub_registry.register(
77 "connections_incoming_error",
78 "Number of incoming connection errors",
79 connections_incoming_error.clone(),
80 );
81
82 let new_listen_addr = Family::default();
83 sub_registry.register(
84 "new_listen_addr",
85 "Number of new listen addresses",
86 new_listen_addr.clone(),
87 );
88
89 let expired_listen_addr = Family::default();
90 sub_registry.register(
91 "expired_listen_addr",
92 "Number of expired listen addresses",
93 expired_listen_addr.clone(),
94 );
95
96 let external_addr_candidates = Family::default();
97 sub_registry.register(
98 "external_addr_candidates",
99 "Number of new external address candidates",
100 external_addr_candidates.clone(),
101 );
102
103 let external_addr_confirmed = Family::default();
104 sub_registry.register(
105 "external_addr_confirmed",
106 "Number of confirmed external addresses",
107 external_addr_confirmed.clone(),
108 );
109
110 let external_addr_expired = Family::default();
111 sub_registry.register(
112 "external_addr_expired",
113 "Number of expired external addresses",
114 external_addr_expired.clone(),
115 );
116
117 let listener_closed = Family::default();
118 sub_registry.register(
119 "listener_closed",
120 "Number of listeners closed",
121 listener_closed.clone(),
122 );
123
124 let listener_error = Counter::default();
125 sub_registry.register(
126 "listener_error",
127 "Number of listener errors",
128 listener_error.clone(),
129 );
130
131 let dial_attempt = Counter::default();
132 sub_registry.register(
133 "dial_attempt",
134 "Number of dial attempts",
135 dial_attempt.clone(),
136 );
137
138 let outgoing_connection_error = Family::default();
139 sub_registry.register(
140 "outgoing_connection_error",
141 "Number outgoing connection errors",
142 outgoing_connection_error.clone(),
143 );
144
145 let connections_established = Family::default();
146 sub_registry.register(
147 "connections_established",
148 "Number of connections established",
149 connections_established.clone(),
150 );
151
152 let connections_establishment_duration = {
153 let constructor: fn() -> Histogram =
154 || Histogram::new(exponential_buckets(0.01, 1.5, 20));
155 Family::new_with_constructor(constructor)
156 };
157 sub_registry.register(
158 "connections_establishment_duration",
159 "Time it took (locally) to establish connections",
160 connections_establishment_duration.clone(),
161 );
162
163 let connections_duration = {
164 let constructor: fn() -> Histogram =
165 || Histogram::new(exponential_buckets(0.01, 3.0, 20));
166 Family::new_with_constructor(constructor)
167 };
168 sub_registry.register_with_unit(
169 "connections_duration",
170 "Time a connection was alive",
171 Unit::Seconds,
172 connections_duration.clone(),
173 );
174
175 Self {
176 connections_incoming,
177 connections_incoming_error,
178 connections_established,
179 new_listen_addr,
180 expired_listen_addr,
181 external_addr_candidates,
182 external_addr_confirmed,
183 external_addr_expired,
184 listener_closed,
185 listener_error,
186 dial_attempt,
187 outgoing_connection_error,
188 connections_establishment_duration,
189 connections_duration,
190 connections: Default::default(),
191 }
192 }
193}
194
195impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
196 fn record(&self, event: &SwarmEvent<TBvEv>) {
197 match event {
198 SwarmEvent::Behaviour(_) => {}
199 SwarmEvent::ConnectionEstablished {
200 endpoint,
201 established_in: time_taken,
202 connection_id,
203 ..
204 } => {
205 let labels = ConnectionLabels {
206 role: endpoint.into(),
207 protocols: protocol_stack::as_string(endpoint.get_remote_address()),
208 };
209 self.connections_established.get_or_create(&labels).inc();
210 self.connections_establishment_duration
211 .get_or_create(&labels)
212 .observe(time_taken.as_secs_f64());
213 self.connections
214 .lock()
215 .expect("lock not to be poisoned")
216 .insert(*connection_id, Instant::now());
217 }
218 SwarmEvent::ConnectionClosed {
219 endpoint,
220 connection_id,
221 cause,
222 ..
223 } => {
224 let labels = ConnectionClosedLabels {
225 connection: ConnectionLabels {
226 role: endpoint.into(),
227 protocols: protocol_stack::as_string(endpoint.get_remote_address()),
228 },
229 cause: cause.as_ref().map(Into::into),
230 };
231
232 if let Some(established_time) = self
236 .connections
237 .lock()
238 .expect("lock not to be poisoned")
239 .remove(connection_id)
240 {
241 self.connections_duration
242 .get_or_create(&labels)
243 .observe(established_time.elapsed().as_secs_f64());
244 }
245 }
246 SwarmEvent::IncomingConnection { send_back_addr, .. } => {
247 self.connections_incoming
248 .get_or_create(&AddressLabels {
249 protocols: protocol_stack::as_string(send_back_addr),
250 })
251 .inc();
252 }
253 SwarmEvent::IncomingConnectionError {
254 error,
255 send_back_addr,
256 ..
257 } => {
258 self.connections_incoming_error
259 .get_or_create(&IncomingConnectionErrorLabels {
260 error: error.into(),
261 protocols: protocol_stack::as_string(send_back_addr),
262 })
263 .inc();
264 }
265 SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
266 let peer = match peer_id {
267 Some(_) => PeerStatus::Known,
268 None => PeerStatus::Unknown,
269 };
270
271 let record = |error| {
272 self.outgoing_connection_error
273 .get_or_create(&OutgoingConnectionErrorLabels { peer, error })
274 .inc();
275 };
276
277 match error {
278 DialError::Transport(errors) => {
279 for (_multiaddr, error) in errors {
280 match error {
281 libp2p_core::transport::TransportError::MultiaddrNotSupported(
282 _,
283 ) => {
284 record(OutgoingConnectionError::TransportMultiaddrNotSupported)
285 }
286 libp2p_core::transport::TransportError::Other(_) => {
287 record(OutgoingConnectionError::TransportOther)
288 }
289 };
290 }
291 }
292 DialError::LocalPeerId { .. } => record(OutgoingConnectionError::LocalPeerId),
293 DialError::NoAddresses => record(OutgoingConnectionError::NoAddresses),
294 DialError::DialPeerConditionFalse(_) => {
295 record(OutgoingConnectionError::DialPeerConditionFalse)
296 }
297 DialError::Aborted => record(OutgoingConnectionError::Aborted),
298 DialError::WrongPeerId { .. } => record(OutgoingConnectionError::WrongPeerId),
299 DialError::Denied { .. } => record(OutgoingConnectionError::Denied),
300 };
301 }
302 SwarmEvent::NewListenAddr { address, .. } => {
303 self.new_listen_addr
304 .get_or_create(&AddressLabels {
305 protocols: protocol_stack::as_string(address),
306 })
307 .inc();
308 }
309 SwarmEvent::ExpiredListenAddr { address, .. } => {
310 self.expired_listen_addr
311 .get_or_create(&AddressLabels {
312 protocols: protocol_stack::as_string(address),
313 })
314 .inc();
315 }
316 SwarmEvent::ListenerClosed { addresses, .. } => {
317 for address in addresses {
318 self.listener_closed
319 .get_or_create(&AddressLabels {
320 protocols: protocol_stack::as_string(address),
321 })
322 .inc();
323 }
324 }
325 SwarmEvent::ListenerError { .. } => {
326 self.listener_error.inc();
327 }
328 SwarmEvent::Dialing { .. } => {
329 self.dial_attempt.inc();
330 }
331 SwarmEvent::NewExternalAddrCandidate { address } => {
332 self.external_addr_candidates
333 .get_or_create(&AddressLabels {
334 protocols: protocol_stack::as_string(address),
335 })
336 .inc();
337 }
338 SwarmEvent::ExternalAddrConfirmed { address } => {
339 self.external_addr_confirmed
340 .get_or_create(&AddressLabels {
341 protocols: protocol_stack::as_string(address),
342 })
343 .inc();
344 }
345 SwarmEvent::ExternalAddrExpired { address } => {
346 self.external_addr_expired
347 .get_or_create(&AddressLabels {
348 protocols: protocol_stack::as_string(address),
349 })
350 .inc();
351 }
352 _ => {}
353 }
354 }
355}
356
357#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
358struct ConnectionLabels {
359 role: Role,
360 protocols: String,
361}
362
363#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
364struct ConnectionClosedLabels {
365 cause: Option<ConnectionError>,
366 #[prometheus(flatten)]
367 connection: ConnectionLabels,
368}
369
370#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
371enum ConnectionError {
372 Io,
373 KeepAliveTimeout,
374}
375
376impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
377 fn from(value: &libp2p_swarm::ConnectionError) -> Self {
378 match value {
379 libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
380 libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
381 }
382 }
383}
384
385#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
386struct AddressLabels {
387 protocols: String,
388}
389
390#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
391enum Role {
392 Dialer,
393 Listener,
394}
395
396impl From<&libp2p_core::ConnectedPoint> for Role {
397 fn from(point: &libp2p_core::ConnectedPoint) -> Self {
398 match point {
399 libp2p_core::ConnectedPoint::Dialer { .. } => Role::Dialer,
400 libp2p_core::ConnectedPoint::Listener { .. } => Role::Listener,
401 }
402 }
403}
404
405#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
406struct OutgoingConnectionErrorLabels {
407 peer: PeerStatus,
408 error: OutgoingConnectionError,
409}
410
411#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)]
412enum PeerStatus {
413 Known,
414 Unknown,
415}
416
417#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
418enum OutgoingConnectionError {
419 LocalPeerId,
420 NoAddresses,
421 DialPeerConditionFalse,
422 Aborted,
423 WrongPeerId,
424 TransportMultiaddrNotSupported,
425 TransportOther,
426 Denied,
427}
428
429#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
430struct IncomingConnectionErrorLabels {
431 error: IncomingConnectionError,
432 protocols: String,
433}
434
435#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
436enum IncomingConnectionError {
437 WrongPeerId,
438 LocalPeerId,
439 TransportErrorMultiaddrNotSupported,
440 TransportErrorOther,
441 Aborted,
442 Denied,
443}
444
445impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
446 fn from(error: &libp2p_swarm::ListenError) -> Self {
447 match error {
448 libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
449 libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
450 libp2p_swarm::ListenError::Transport(
451 libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
452 ) => IncomingConnectionError::TransportErrorMultiaddrNotSupported,
453 libp2p_swarm::ListenError::Transport(
454 libp2p_core::transport::TransportError::Other(_),
455 ) => IncomingConnectionError::TransportErrorOther,
456 libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
457 libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied,
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::time::Duration;
465
466 use libp2p_core::ConnectedPoint;
467 use libp2p_swarm::{ConnectionId, SwarmEvent};
468 use prometheus_client::registry::Registry;
469
470 use super::*;
471 use crate::Recorder;
472
473 #[test]
474 fn test_connection_closed_without_established() {
475 let mut registry = Registry::default();
476 let metrics = Metrics::new(&mut registry);
477
478 let connection_id = ConnectionId::new_unchecked(1);
480 let endpoint = ConnectedPoint::Dialer {
481 address: "/ip4/127.0.0.1/tcp/8080".parse().unwrap(),
482 role_override: libp2p_core::Endpoint::Dialer,
483 port_use: libp2p_core::transport::PortUse::New,
484 };
485
486 let event = SwarmEvent::<()>::ConnectionClosed {
487 peer_id: libp2p_identity::PeerId::random(),
488 connection_id,
489 endpoint,
490 num_established: 0,
491 cause: None,
492 };
493
494 metrics.record(&event);
496
497 let connections = metrics.connections.lock().expect("lock not to be poisoned");
499 assert!(connections.is_empty());
500 }
501
502 #[test]
503 fn test_connection_established_then_closed() {
504 let mut registry = Registry::default();
505 let metrics = Metrics::new(&mut registry);
506
507 let connection_id = ConnectionId::new_unchecked(1);
508 let endpoint = ConnectedPoint::Dialer {
509 address: "/ip4/127.0.0.1/tcp/8080".parse().unwrap(),
510 role_override: libp2p_core::Endpoint::Dialer,
511 port_use: libp2p_core::transport::PortUse::New,
512 };
513
514 let established_event = SwarmEvent::<()>::ConnectionEstablished {
516 peer_id: libp2p_identity::PeerId::random(),
517 connection_id,
518 endpoint: endpoint.clone(),
519 num_established: std::num::NonZeroU32::new(1).unwrap(),
520 concurrent_dial_errors: None,
521 established_in: Duration::from_millis(100),
522 };
523
524 metrics.record(&established_event);
525
526 {
528 let connections = metrics.connections.lock().expect("lock not to be poisoned");
529 assert!(connections.contains_key(&connection_id));
530 }
531
532 let closed_event = SwarmEvent::<()>::ConnectionClosed {
534 peer_id: libp2p_identity::PeerId::random(),
535 connection_id,
536 endpoint,
537 num_established: 0,
538 cause: None,
539 };
540
541 metrics.record(&closed_event);
542
543 let connections = metrics.connections.lock().expect("lock not to be poisoned");
545 assert!(!connections.contains_key(&connection_id));
546 }
547}