libp2p_metrics/
swarm.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use libp2p_swarm::{ConnectionId, DialError, SwarmEvent};
27use prometheus_client::{
28    encoding::{EncodeLabelSet, EncodeLabelValue},
29    metrics::{
30        counter::Counter,
31        family::Family,
32        histogram::{exponential_buckets, Histogram},
33    },
34    registry::{Registry, Unit},
35};
36use web_time::Instant;
37
38use crate::protocol_stack;
39
40pub(crate) struct Metrics {
41    connections_incoming: Family<AddressLabels, Counter>,
42    connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
43
44    connections_established: Family<ConnectionLabels, Counter>,
45    connections_establishment_duration: Family<ConnectionLabels, Histogram>,
46    connections_duration: Family<ConnectionClosedLabels, Histogram>,
47
48    new_listen_addr: Family<AddressLabels, Counter>,
49    expired_listen_addr: Family<AddressLabels, Counter>,
50
51    external_addr_candidates: Family<AddressLabels, Counter>,
52    external_addr_confirmed: Family<AddressLabels, Counter>,
53    external_addr_expired: Family<AddressLabels, Counter>,
54
55    listener_closed: Family<AddressLabels, Counter>,
56    listener_error: Counter,
57
58    dial_attempt: Counter,
59    outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
60
61    connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
62}
63
64impl Metrics {
65    pub(crate) fn new(registry: &mut Registry) -> Self {
66        let sub_registry = registry.sub_registry_with_prefix("swarm");
67
68        let connections_incoming = Family::default();
69        sub_registry.register(
70            "connections_incoming",
71            "Number of incoming connections per address stack",
72            connections_incoming.clone(),
73        );
74
75        let connections_incoming_error = Family::default();
76        sub_registry.register(
77            "connections_incoming_error",
78            "Number of incoming connection errors",
79            connections_incoming_error.clone(),
80        );
81
82        let new_listen_addr = Family::default();
83        sub_registry.register(
84            "new_listen_addr",
85            "Number of new listen addresses",
86            new_listen_addr.clone(),
87        );
88
89        let expired_listen_addr = Family::default();
90        sub_registry.register(
91            "expired_listen_addr",
92            "Number of expired listen addresses",
93            expired_listen_addr.clone(),
94        );
95
96        let external_addr_candidates = Family::default();
97        sub_registry.register(
98            "external_addr_candidates",
99            "Number of new external address candidates",
100            external_addr_candidates.clone(),
101        );
102
103        let external_addr_confirmed = Family::default();
104        sub_registry.register(
105            "external_addr_confirmed",
106            "Number of confirmed external addresses",
107            external_addr_confirmed.clone(),
108        );
109
110        let external_addr_expired = Family::default();
111        sub_registry.register(
112            "external_addr_expired",
113            "Number of expired external addresses",
114            external_addr_expired.clone(),
115        );
116
117        let listener_closed = Family::default();
118        sub_registry.register(
119            "listener_closed",
120            "Number of listeners closed",
121            listener_closed.clone(),
122        );
123
124        let listener_error = Counter::default();
125        sub_registry.register(
126            "listener_error",
127            "Number of listener errors",
128            listener_error.clone(),
129        );
130
131        let dial_attempt = Counter::default();
132        sub_registry.register(
133            "dial_attempt",
134            "Number of dial attempts",
135            dial_attempt.clone(),
136        );
137
138        let outgoing_connection_error = Family::default();
139        sub_registry.register(
140            "outgoing_connection_error",
141            "Number outgoing connection errors",
142            outgoing_connection_error.clone(),
143        );
144
145        let connections_established = Family::default();
146        sub_registry.register(
147            "connections_established",
148            "Number of connections established",
149            connections_established.clone(),
150        );
151
152        let connections_establishment_duration = {
153            let constructor: fn() -> Histogram =
154                || Histogram::new(exponential_buckets(0.01, 1.5, 20));
155            Family::new_with_constructor(constructor)
156        };
157        sub_registry.register(
158            "connections_establishment_duration",
159            "Time it took (locally) to establish connections",
160            connections_establishment_duration.clone(),
161        );
162
163        let connections_duration = {
164            let constructor: fn() -> Histogram =
165                || Histogram::new(exponential_buckets(0.01, 3.0, 20));
166            Family::new_with_constructor(constructor)
167        };
168        sub_registry.register_with_unit(
169            "connections_duration",
170            "Time a connection was alive",
171            Unit::Seconds,
172            connections_duration.clone(),
173        );
174
175        Self {
176            connections_incoming,
177            connections_incoming_error,
178            connections_established,
179            new_listen_addr,
180            expired_listen_addr,
181            external_addr_candidates,
182            external_addr_confirmed,
183            external_addr_expired,
184            listener_closed,
185            listener_error,
186            dial_attempt,
187            outgoing_connection_error,
188            connections_establishment_duration,
189            connections_duration,
190            connections: Default::default(),
191        }
192    }
193}
194
195impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
196    fn record(&self, event: &SwarmEvent<TBvEv>) {
197        match event {
198            SwarmEvent::Behaviour(_) => {}
199            SwarmEvent::ConnectionEstablished {
200                endpoint,
201                established_in: time_taken,
202                connection_id,
203                ..
204            } => {
205                let labels = ConnectionLabels {
206                    role: endpoint.into(),
207                    protocols: protocol_stack::as_string(endpoint.get_remote_address()),
208                };
209                self.connections_established.get_or_create(&labels).inc();
210                self.connections_establishment_duration
211                    .get_or_create(&labels)
212                    .observe(time_taken.as_secs_f64());
213                self.connections
214                    .lock()
215                    .expect("lock not to be poisoned")
216                    .insert(*connection_id, Instant::now());
217            }
218            SwarmEvent::ConnectionClosed {
219                endpoint,
220                connection_id,
221                cause,
222                ..
223            } => {
224                let labels = ConnectionClosedLabels {
225                    connection: ConnectionLabels {
226                        role: endpoint.into(),
227                        protocols: protocol_stack::as_string(endpoint.get_remote_address()),
228                    },
229                    cause: cause.as_ref().map(Into::into),
230                };
231
232                // Only record connection duration if we have a record of when it was established.
233                // This gracefully handles cases where ConnectionClosed events are received
234                // for connections that were established before metrics collection started.
235                if let Some(established_time) = self
236                    .connections
237                    .lock()
238                    .expect("lock not to be poisoned")
239                    .remove(connection_id)
240                {
241                    self.connections_duration
242                        .get_or_create(&labels)
243                        .observe(established_time.elapsed().as_secs_f64());
244                }
245            }
246            SwarmEvent::IncomingConnection { send_back_addr, .. } => {
247                self.connections_incoming
248                    .get_or_create(&AddressLabels {
249                        protocols: protocol_stack::as_string(send_back_addr),
250                    })
251                    .inc();
252            }
253            SwarmEvent::IncomingConnectionError {
254                error,
255                send_back_addr,
256                ..
257            } => {
258                self.connections_incoming_error
259                    .get_or_create(&IncomingConnectionErrorLabels {
260                        error: error.into(),
261                        protocols: protocol_stack::as_string(send_back_addr),
262                    })
263                    .inc();
264            }
265            SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
266                let peer = match peer_id {
267                    Some(_) => PeerStatus::Known,
268                    None => PeerStatus::Unknown,
269                };
270
271                let record = |error| {
272                    self.outgoing_connection_error
273                        .get_or_create(&OutgoingConnectionErrorLabels { peer, error })
274                        .inc();
275                };
276
277                match error {
278                    DialError::Transport(errors) => {
279                        for (_multiaddr, error) in errors {
280                            match error {
281                                libp2p_core::transport::TransportError::MultiaddrNotSupported(
282                                    _,
283                                ) => {
284                                    record(OutgoingConnectionError::TransportMultiaddrNotSupported)
285                                }
286                                libp2p_core::transport::TransportError::Other(_) => {
287                                    record(OutgoingConnectionError::TransportOther)
288                                }
289                            };
290                        }
291                    }
292                    DialError::LocalPeerId { .. } => record(OutgoingConnectionError::LocalPeerId),
293                    DialError::NoAddresses => record(OutgoingConnectionError::NoAddresses),
294                    DialError::DialPeerConditionFalse(_) => {
295                        record(OutgoingConnectionError::DialPeerConditionFalse)
296                    }
297                    DialError::Aborted => record(OutgoingConnectionError::Aborted),
298                    DialError::WrongPeerId { .. } => record(OutgoingConnectionError::WrongPeerId),
299                    DialError::Denied { .. } => record(OutgoingConnectionError::Denied),
300                };
301            }
302            SwarmEvent::NewListenAddr { address, .. } => {
303                self.new_listen_addr
304                    .get_or_create(&AddressLabels {
305                        protocols: protocol_stack::as_string(address),
306                    })
307                    .inc();
308            }
309            SwarmEvent::ExpiredListenAddr { address, .. } => {
310                self.expired_listen_addr
311                    .get_or_create(&AddressLabels {
312                        protocols: protocol_stack::as_string(address),
313                    })
314                    .inc();
315            }
316            SwarmEvent::ListenerClosed { addresses, .. } => {
317                for address in addresses {
318                    self.listener_closed
319                        .get_or_create(&AddressLabels {
320                            protocols: protocol_stack::as_string(address),
321                        })
322                        .inc();
323                }
324            }
325            SwarmEvent::ListenerError { .. } => {
326                self.listener_error.inc();
327            }
328            SwarmEvent::Dialing { .. } => {
329                self.dial_attempt.inc();
330            }
331            SwarmEvent::NewExternalAddrCandidate { address } => {
332                self.external_addr_candidates
333                    .get_or_create(&AddressLabels {
334                        protocols: protocol_stack::as_string(address),
335                    })
336                    .inc();
337            }
338            SwarmEvent::ExternalAddrConfirmed { address } => {
339                self.external_addr_confirmed
340                    .get_or_create(&AddressLabels {
341                        protocols: protocol_stack::as_string(address),
342                    })
343                    .inc();
344            }
345            SwarmEvent::ExternalAddrExpired { address } => {
346                self.external_addr_expired
347                    .get_or_create(&AddressLabels {
348                        protocols: protocol_stack::as_string(address),
349                    })
350                    .inc();
351            }
352            _ => {}
353        }
354    }
355}
356
357#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
358struct ConnectionLabels {
359    role: Role,
360    protocols: String,
361}
362
363#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
364struct ConnectionClosedLabels {
365    cause: Option<ConnectionError>,
366    #[prometheus(flatten)]
367    connection: ConnectionLabels,
368}
369
370#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
371enum ConnectionError {
372    Io,
373    KeepAliveTimeout,
374}
375
376impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
377    fn from(value: &libp2p_swarm::ConnectionError) -> Self {
378        match value {
379            libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
380            libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
381        }
382    }
383}
384
385#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
386struct AddressLabels {
387    protocols: String,
388}
389
390#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
391enum Role {
392    Dialer,
393    Listener,
394}
395
396impl From<&libp2p_core::ConnectedPoint> for Role {
397    fn from(point: &libp2p_core::ConnectedPoint) -> Self {
398        match point {
399            libp2p_core::ConnectedPoint::Dialer { .. } => Role::Dialer,
400            libp2p_core::ConnectedPoint::Listener { .. } => Role::Listener,
401        }
402    }
403}
404
405#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
406struct OutgoingConnectionErrorLabels {
407    peer: PeerStatus,
408    error: OutgoingConnectionError,
409}
410
411#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)]
412enum PeerStatus {
413    Known,
414    Unknown,
415}
416
417#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
418enum OutgoingConnectionError {
419    LocalPeerId,
420    NoAddresses,
421    DialPeerConditionFalse,
422    Aborted,
423    WrongPeerId,
424    TransportMultiaddrNotSupported,
425    TransportOther,
426    Denied,
427}
428
429#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
430struct IncomingConnectionErrorLabels {
431    error: IncomingConnectionError,
432    protocols: String,
433}
434
435#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
436enum IncomingConnectionError {
437    WrongPeerId,
438    LocalPeerId,
439    TransportErrorMultiaddrNotSupported,
440    TransportErrorOther,
441    Aborted,
442    Denied,
443}
444
445impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
446    fn from(error: &libp2p_swarm::ListenError) -> Self {
447        match error {
448            libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
449            libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
450            libp2p_swarm::ListenError::Transport(
451                libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
452            ) => IncomingConnectionError::TransportErrorMultiaddrNotSupported,
453            libp2p_swarm::ListenError::Transport(
454                libp2p_core::transport::TransportError::Other(_),
455            ) => IncomingConnectionError::TransportErrorOther,
456            libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
457            libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied,
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use std::time::Duration;
465
466    use libp2p_core::ConnectedPoint;
467    use libp2p_swarm::{ConnectionId, SwarmEvent};
468    use prometheus_client::registry::Registry;
469
470    use super::*;
471    use crate::Recorder;
472
473    #[test]
474    fn test_connection_closed_without_established() {
475        let mut registry = Registry::default();
476        let metrics = Metrics::new(&mut registry);
477
478        // Create a fake ConnectionClosed event for a connection that was never tracked.
479        let connection_id = ConnectionId::new_unchecked(1);
480        let endpoint = ConnectedPoint::Dialer {
481            address: "/ip4/127.0.0.1/tcp/8080".parse().unwrap(),
482            role_override: libp2p_core::Endpoint::Dialer,
483            port_use: libp2p_core::transport::PortUse::New,
484        };
485
486        let event = SwarmEvent::<()>::ConnectionClosed {
487            peer_id: libp2p_identity::PeerId::random(),
488            connection_id,
489            endpoint,
490            num_established: 0,
491            cause: None,
492        };
493
494        // This should NOT panic.
495        metrics.record(&event);
496
497        // Verify that the connections map is still empty (no connection was removed).
498        let connections = metrics.connections.lock().expect("lock not to be poisoned");
499        assert!(connections.is_empty());
500    }
501
502    #[test]
503    fn test_connection_established_then_closed() {
504        let mut registry = Registry::default();
505        let metrics = Metrics::new(&mut registry);
506
507        let connection_id = ConnectionId::new_unchecked(1);
508        let endpoint = ConnectedPoint::Dialer {
509            address: "/ip4/127.0.0.1/tcp/8080".parse().unwrap(),
510            role_override: libp2p_core::Endpoint::Dialer,
511            port_use: libp2p_core::transport::PortUse::New,
512        };
513
514        // First, establish a connection.
515        let established_event = SwarmEvent::<()>::ConnectionEstablished {
516            peer_id: libp2p_identity::PeerId::random(),
517            connection_id,
518            endpoint: endpoint.clone(),
519            num_established: std::num::NonZeroU32::new(1).unwrap(),
520            concurrent_dial_errors: None,
521            established_in: Duration::from_millis(100),
522        };
523
524        metrics.record(&established_event);
525
526        // Verify connection was added.
527        {
528            let connections = metrics.connections.lock().expect("lock not to be poisoned");
529            assert!(connections.contains_key(&connection_id));
530        }
531
532        // Now close the connection.
533        let closed_event = SwarmEvent::<()>::ConnectionClosed {
534            peer_id: libp2p_identity::PeerId::random(),
535            connection_id,
536            endpoint,
537            num_established: 0,
538            cause: None,
539        };
540
541        metrics.record(&closed_event);
542
543        // Verify connection was removed.
544        let connections = metrics.connections.lock().expect("lock not to be poisoned");
545        assert!(!connections.contains_key(&connection_id));
546    }
547}