libp2p_metrics/
swarm.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use libp2p_swarm::{ConnectionId, DialError, SwarmEvent};
27use prometheus_client::{
28    encoding::{EncodeLabelSet, EncodeLabelValue},
29    metrics::{
30        counter::Counter,
31        family::Family,
32        histogram::{exponential_buckets, Histogram},
33    },
34    registry::{Registry, Unit},
35};
36use web_time::Instant;
37
38use crate::protocol_stack;
39
40pub(crate) struct Metrics {
41    connections_incoming: Family<AddressLabels, Counter>,
42    connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,
43
44    connections_established: Family<ConnectionLabels, Counter>,
45    connections_establishment_duration: Family<ConnectionLabels, Histogram>,
46    connections_duration: Family<ConnectionClosedLabels, Histogram>,
47
48    new_listen_addr: Family<AddressLabels, Counter>,
49    expired_listen_addr: Family<AddressLabels, Counter>,
50
51    external_addr_candidates: Family<AddressLabels, Counter>,
52    external_addr_confirmed: Family<AddressLabels, Counter>,
53    external_addr_expired: Family<AddressLabels, Counter>,
54
55    listener_closed: Family<AddressLabels, Counter>,
56    listener_error: Counter,
57
58    dial_attempt: Counter,
59    outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,
60
61    connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
62}
63
64impl Metrics {
65    pub(crate) fn new(registry: &mut Registry) -> Self {
66        let sub_registry = registry.sub_registry_with_prefix("swarm");
67
68        let connections_incoming = Family::default();
69        sub_registry.register(
70            "connections_incoming",
71            "Number of incoming connections per address stack",
72            connections_incoming.clone(),
73        );
74
75        let connections_incoming_error = Family::default();
76        sub_registry.register(
77            "connections_incoming_error",
78            "Number of incoming connection errors",
79            connections_incoming_error.clone(),
80        );
81
82        let new_listen_addr = Family::default();
83        sub_registry.register(
84            "new_listen_addr",
85            "Number of new listen addresses",
86            new_listen_addr.clone(),
87        );
88
89        let expired_listen_addr = Family::default();
90        sub_registry.register(
91            "expired_listen_addr",
92            "Number of expired listen addresses",
93            expired_listen_addr.clone(),
94        );
95
96        let external_addr_candidates = Family::default();
97        sub_registry.register(
98            "external_addr_candidates",
99            "Number of new external address candidates",
100            external_addr_candidates.clone(),
101        );
102
103        let external_addr_confirmed = Family::default();
104        sub_registry.register(
105            "external_addr_confirmed",
106            "Number of confirmed external addresses",
107            external_addr_confirmed.clone(),
108        );
109
110        let external_addr_expired = Family::default();
111        sub_registry.register(
112            "external_addr_expired",
113            "Number of expired external addresses",
114            external_addr_expired.clone(),
115        );
116
117        let listener_closed = Family::default();
118        sub_registry.register(
119            "listener_closed",
120            "Number of listeners closed",
121            listener_closed.clone(),
122        );
123
124        let listener_error = Counter::default();
125        sub_registry.register(
126            "listener_error",
127            "Number of listener errors",
128            listener_error.clone(),
129        );
130
131        let dial_attempt = Counter::default();
132        sub_registry.register(
133            "dial_attempt",
134            "Number of dial attempts",
135            dial_attempt.clone(),
136        );
137
138        let outgoing_connection_error = Family::default();
139        sub_registry.register(
140            "outgoing_connection_error",
141            "Number outgoing connection errors",
142            outgoing_connection_error.clone(),
143        );
144
145        let connections_established = Family::default();
146        sub_registry.register(
147            "connections_established",
148            "Number of connections established",
149            connections_established.clone(),
150        );
151
152        let connections_establishment_duration = {
153            let constructor: fn() -> Histogram =
154                || Histogram::new(exponential_buckets(0.01, 1.5, 20));
155            Family::new_with_constructor(constructor)
156        };
157        sub_registry.register(
158            "connections_establishment_duration",
159            "Time it took (locally) to establish connections",
160            connections_establishment_duration.clone(),
161        );
162
163        let connections_duration = {
164            let constructor: fn() -> Histogram =
165                || Histogram::new(exponential_buckets(0.01, 3.0, 20));
166            Family::new_with_constructor(constructor)
167        };
168        sub_registry.register_with_unit(
169            "connections_duration",
170            "Time a connection was alive",
171            Unit::Seconds,
172            connections_duration.clone(),
173        );
174
175        Self {
176            connections_incoming,
177            connections_incoming_error,
178            connections_established,
179            new_listen_addr,
180            expired_listen_addr,
181            external_addr_candidates,
182            external_addr_confirmed,
183            external_addr_expired,
184            listener_closed,
185            listener_error,
186            dial_attempt,
187            outgoing_connection_error,
188            connections_establishment_duration,
189            connections_duration,
190            connections: Default::default(),
191        }
192    }
193}
194
195impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
196    fn record(&self, event: &SwarmEvent<TBvEv>) {
197        match event {
198            SwarmEvent::Behaviour(_) => {}
199            SwarmEvent::ConnectionEstablished {
200                endpoint,
201                established_in: time_taken,
202                connection_id,
203                ..
204            } => {
205                let labels = ConnectionLabels {
206                    role: endpoint.into(),
207                    protocols: protocol_stack::as_string(endpoint.get_remote_address()),
208                };
209                self.connections_established.get_or_create(&labels).inc();
210                self.connections_establishment_duration
211                    .get_or_create(&labels)
212                    .observe(time_taken.as_secs_f64());
213                self.connections
214                    .lock()
215                    .expect("lock not to be poisoned")
216                    .insert(*connection_id, Instant::now());
217            }
218            SwarmEvent::ConnectionClosed {
219                endpoint,
220                connection_id,
221                cause,
222                ..
223            } => {
224                let labels = ConnectionClosedLabels {
225                    connection: ConnectionLabels {
226                        role: endpoint.into(),
227                        protocols: protocol_stack::as_string(endpoint.get_remote_address()),
228                    },
229                    cause: cause.as_ref().map(Into::into),
230                };
231                self.connections_duration.get_or_create(&labels).observe(
232                    self.connections
233                        .lock()
234                        .expect("lock not to be poisoned")
235                        .remove(connection_id)
236                        .expect("closed connection to previously be established")
237                        .elapsed()
238                        .as_secs_f64(),
239                );
240            }
241            SwarmEvent::IncomingConnection { send_back_addr, .. } => {
242                self.connections_incoming
243                    .get_or_create(&AddressLabels {
244                        protocols: protocol_stack::as_string(send_back_addr),
245                    })
246                    .inc();
247            }
248            SwarmEvent::IncomingConnectionError {
249                error,
250                send_back_addr,
251                ..
252            } => {
253                self.connections_incoming_error
254                    .get_or_create(&IncomingConnectionErrorLabels {
255                        error: error.into(),
256                        protocols: protocol_stack::as_string(send_back_addr),
257                    })
258                    .inc();
259            }
260            SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
261                let peer = match peer_id {
262                    Some(_) => PeerStatus::Known,
263                    None => PeerStatus::Unknown,
264                };
265
266                let record = |error| {
267                    self.outgoing_connection_error
268                        .get_or_create(&OutgoingConnectionErrorLabels { peer, error })
269                        .inc();
270                };
271
272                match error {
273                    DialError::Transport(errors) => {
274                        for (_multiaddr, error) in errors {
275                            match error {
276                                libp2p_core::transport::TransportError::MultiaddrNotSupported(
277                                    _,
278                                ) => {
279                                    record(OutgoingConnectionError::TransportMultiaddrNotSupported)
280                                }
281                                libp2p_core::transport::TransportError::Other(_) => {
282                                    record(OutgoingConnectionError::TransportOther)
283                                }
284                            };
285                        }
286                    }
287                    DialError::LocalPeerId { .. } => record(OutgoingConnectionError::LocalPeerId),
288                    DialError::NoAddresses => record(OutgoingConnectionError::NoAddresses),
289                    DialError::DialPeerConditionFalse(_) => {
290                        record(OutgoingConnectionError::DialPeerConditionFalse)
291                    }
292                    DialError::Aborted => record(OutgoingConnectionError::Aborted),
293                    DialError::WrongPeerId { .. } => record(OutgoingConnectionError::WrongPeerId),
294                    DialError::Denied { .. } => record(OutgoingConnectionError::Denied),
295                };
296            }
297            SwarmEvent::NewListenAddr { address, .. } => {
298                self.new_listen_addr
299                    .get_or_create(&AddressLabels {
300                        protocols: protocol_stack::as_string(address),
301                    })
302                    .inc();
303            }
304            SwarmEvent::ExpiredListenAddr { address, .. } => {
305                self.expired_listen_addr
306                    .get_or_create(&AddressLabels {
307                        protocols: protocol_stack::as_string(address),
308                    })
309                    .inc();
310            }
311            SwarmEvent::ListenerClosed { addresses, .. } => {
312                for address in addresses {
313                    self.listener_closed
314                        .get_or_create(&AddressLabels {
315                            protocols: protocol_stack::as_string(address),
316                        })
317                        .inc();
318                }
319            }
320            SwarmEvent::ListenerError { .. } => {
321                self.listener_error.inc();
322            }
323            SwarmEvent::Dialing { .. } => {
324                self.dial_attempt.inc();
325            }
326            SwarmEvent::NewExternalAddrCandidate { address } => {
327                self.external_addr_candidates
328                    .get_or_create(&AddressLabels {
329                        protocols: protocol_stack::as_string(address),
330                    })
331                    .inc();
332            }
333            SwarmEvent::ExternalAddrConfirmed { address } => {
334                self.external_addr_confirmed
335                    .get_or_create(&AddressLabels {
336                        protocols: protocol_stack::as_string(address),
337                    })
338                    .inc();
339            }
340            SwarmEvent::ExternalAddrExpired { address } => {
341                self.external_addr_expired
342                    .get_or_create(&AddressLabels {
343                        protocols: protocol_stack::as_string(address),
344                    })
345                    .inc();
346            }
347            _ => {}
348        }
349    }
350}
351
352#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
353struct ConnectionLabels {
354    role: Role,
355    protocols: String,
356}
357
358#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
359struct ConnectionClosedLabels {
360    cause: Option<ConnectionError>,
361    #[prometheus(flatten)]
362    connection: ConnectionLabels,
363}
364
365#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
366enum ConnectionError {
367    Io,
368    KeepAliveTimeout,
369}
370
371impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
372    fn from(value: &libp2p_swarm::ConnectionError) -> Self {
373        match value {
374            libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
375            libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
376        }
377    }
378}
379
380#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
381struct AddressLabels {
382    protocols: String,
383}
384
385#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
386enum Role {
387    Dialer,
388    Listener,
389}
390
391impl From<&libp2p_core::ConnectedPoint> for Role {
392    fn from(point: &libp2p_core::ConnectedPoint) -> Self {
393        match point {
394            libp2p_core::ConnectedPoint::Dialer { .. } => Role::Dialer,
395            libp2p_core::ConnectedPoint::Listener { .. } => Role::Listener,
396        }
397    }
398}
399
400#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
401struct OutgoingConnectionErrorLabels {
402    peer: PeerStatus,
403    error: OutgoingConnectionError,
404}
405
406#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Copy, Debug)]
407enum PeerStatus {
408    Known,
409    Unknown,
410}
411
412#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
413enum OutgoingConnectionError {
414    LocalPeerId,
415    NoAddresses,
416    DialPeerConditionFalse,
417    Aborted,
418    WrongPeerId,
419    TransportMultiaddrNotSupported,
420    TransportOther,
421    Denied,
422}
423
424#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
425struct IncomingConnectionErrorLabels {
426    error: IncomingConnectionError,
427    protocols: String,
428}
429
430#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
431enum IncomingConnectionError {
432    WrongPeerId,
433    LocalPeerId,
434    TransportErrorMultiaddrNotSupported,
435    TransportErrorOther,
436    Aborted,
437    Denied,
438}
439
440impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
441    fn from(error: &libp2p_swarm::ListenError) -> Self {
442        match error {
443            libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId,
444            libp2p_swarm::ListenError::LocalPeerId { .. } => IncomingConnectionError::LocalPeerId,
445            libp2p_swarm::ListenError::Transport(
446                libp2p_core::transport::TransportError::MultiaddrNotSupported(_),
447            ) => IncomingConnectionError::TransportErrorMultiaddrNotSupported,
448            libp2p_swarm::ListenError::Transport(
449                libp2p_core::transport::TransportError::Other(_),
450            ) => IncomingConnectionError::TransportErrorOther,
451            libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
452            libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied,
453        }
454    }
455}