libp2p_metrics/
identify.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use libp2p_identity::PeerId;
27use libp2p_swarm::StreamProtocol;
28use prometheus_client::{
29    collector::Collector,
30    encoding::{DescriptorEncoder, EncodeMetric},
31    metrics::{counter::Counter, gauge::ConstGauge, MetricType},
32    registry::Registry,
33};
34
35use crate::protocol_stack;
36
37const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[
38    #[cfg(feature = "dcutr")]
39    libp2p_dcutr::PROTOCOL_NAME,
40    // #[cfg(feature = "gossipsub")]
41    // TODO: Add Gossipsub protocol name
42    libp2p_identify::PROTOCOL_NAME,
43    libp2p_identify::PUSH_PROTOCOL_NAME,
44    #[cfg(feature = "kad")]
45    libp2p_kad::PROTOCOL_NAME,
46    #[cfg(feature = "ping")]
47    libp2p_ping::PROTOCOL_NAME,
48    #[cfg(feature = "relay")]
49    libp2p_relay::STOP_PROTOCOL_NAME,
50    #[cfg(feature = "relay")]
51    libp2p_relay::HOP_PROTOCOL_NAME,
52];
53
54pub(crate) struct Metrics {
55    peers: Peers,
56    error: Counter,
57    pushed: Counter,
58    received: Counter,
59    sent: Counter,
60}
61
62impl Metrics {
63    pub(crate) fn new(registry: &mut Registry) -> Self {
64        let sub_registry = registry.sub_registry_with_prefix("identify");
65
66        let peers = Peers::default();
67        sub_registry.register_collector(Box::new(peers.clone()));
68
69        let error = Counter::default();
70        sub_registry.register(
71            "errors",
72            "Number of errors while attempting to identify the remote",
73            error.clone(),
74        );
75
76        let pushed = Counter::default();
77        sub_registry.register(
78            "pushed",
79            "Number of times identification information of the local node has \
80             been actively pushed to a peer.",
81            pushed.clone(),
82        );
83
84        let received = Counter::default();
85        sub_registry.register(
86            "received",
87            "Number of times identification information has been received from \
88             a peer",
89            received.clone(),
90        );
91
92        let sent = Counter::default();
93        sub_registry.register(
94            "sent",
95            "Number of times identification information of the local node has \
96             been sent to a peer in response to an identification request",
97            sent.clone(),
98        );
99
100        Self {
101            peers,
102            error,
103            pushed,
104            received,
105            sent,
106        }
107    }
108}
109
110impl super::Recorder<libp2p_identify::Event> for Metrics {
111    fn record(&self, event: &libp2p_identify::Event) {
112        match event {
113            libp2p_identify::Event::Error { .. } => {
114                self.error.inc();
115            }
116            libp2p_identify::Event::Pushed { .. } => {
117                self.pushed.inc();
118            }
119            libp2p_identify::Event::Received { peer_id, info, .. } => {
120                self.received.inc();
121                self.peers.record(*peer_id, info.clone());
122            }
123            libp2p_identify::Event::Sent { .. } => {
124                self.sent.inc();
125            }
126        }
127    }
128}
129
130impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
131    fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
132        if let libp2p_swarm::SwarmEvent::ConnectionClosed {
133            peer_id,
134            num_established,
135            ..
136        } = event
137        {
138            if *num_established == 0 {
139                self.peers.remove(*peer_id);
140            }
141        }
142    }
143}
144
145#[derive(Default, Debug, Clone)]
146struct Peers(Arc<Mutex<HashMap<PeerId, libp2p_identify::Info>>>);
147
148impl Peers {
149    fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) {
150        self.0.lock().unwrap().insert(peer_id, info);
151    }
152
153    fn remove(&self, peer_id: PeerId) {
154        self.0.lock().unwrap().remove(&peer_id);
155    }
156}
157
158impl Collector for Peers {
159    fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
160        let mut count_by_protocols: HashMap<String, i64> = Default::default();
161        let mut count_by_listen_addresses: HashMap<String, i64> = Default::default();
162        let mut count_by_observed_addresses: HashMap<String, i64> = Default::default();
163
164        for (_, peer_info) in self.0.lock().unwrap().iter() {
165            {
166                let mut protocols: Vec<_> = peer_info
167                    .protocols
168                    .iter()
169                    .map(|p| {
170                        if ALLOWED_PROTOCOLS.contains(p) {
171                            p.to_string()
172                        } else {
173                            "unrecognized".to_string()
174                        }
175                    })
176                    .collect();
177                protocols.sort();
178                protocols.dedup();
179
180                for protocol in protocols.into_iter() {
181                    let count = count_by_protocols.entry(protocol).or_default();
182                    *count += 1;
183                }
184            }
185
186            {
187                let mut addrs: Vec<_> = peer_info
188                    .listen_addrs
189                    .iter()
190                    .map(protocol_stack::as_string)
191                    .collect();
192                addrs.sort();
193                addrs.dedup();
194
195                for addr in addrs {
196                    let count = count_by_listen_addresses.entry(addr).or_default();
197                    *count += 1;
198                }
199            }
200
201            {
202                let count = count_by_observed_addresses
203                    .entry(protocol_stack::as_string(&peer_info.observed_addr))
204                    .or_default();
205                *count += 1;
206            }
207        }
208
209        {
210            let mut family_encoder = encoder.encode_descriptor(
211                "remote_protocols",
212                "Number of connected nodes supporting a specific protocol, with \"unrecognized\" for each peer supporting one or more unrecognized protocols",
213                None,
214                MetricType::Gauge,
215            )?;
216            for (protocol, count) in count_by_protocols.into_iter() {
217                let labels = [("protocol", protocol)];
218                let metric_encoder = family_encoder.encode_family(&labels)?;
219                let metric = ConstGauge::new(count);
220                metric.encode(metric_encoder)?;
221            }
222        }
223
224        {
225            let mut family_encoder = encoder.encode_descriptor(
226                "remote_listen_addresses",
227                "Number of connected nodes advertising a specific listen address",
228                None,
229                MetricType::Gauge,
230            )?;
231            for (protocol, count) in count_by_listen_addresses.into_iter() {
232                let labels = [("listen_address", protocol)];
233                let metric_encoder = family_encoder.encode_family(&labels)?;
234                ConstGauge::new(count).encode(metric_encoder)?;
235            }
236        }
237
238        {
239            let mut family_encoder = encoder.encode_descriptor(
240                "local_observed_addresses",
241                "Number of connected nodes observing the local node at a specific address",
242                None,
243                MetricType::Gauge,
244            )?;
245            for (protocol, count) in count_by_observed_addresses.into_iter() {
246                let labels = [("observed_address", protocol)];
247                let metric_encoder = family_encoder.encode_family(&labels)?;
248                ConstGauge::new(count).encode(metric_encoder)?;
249            }
250        }
251
252        Ok(())
253    }
254}