libp2p_metrics/
identify.rs1use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use libp2p_identity::PeerId;
27use libp2p_swarm::StreamProtocol;
28use prometheus_client::{
29 collector::Collector,
30 encoding::{DescriptorEncoder, EncodeMetric},
31 metrics::{counter::Counter, gauge::ConstGauge, MetricType},
32 registry::Registry,
33};
34
35use crate::protocol_stack;
36
37const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[
38 #[cfg(feature = "dcutr")]
39 libp2p_dcutr::PROTOCOL_NAME,
40 libp2p_identify::PROTOCOL_NAME,
43 libp2p_identify::PUSH_PROTOCOL_NAME,
44 #[cfg(feature = "kad")]
45 libp2p_kad::PROTOCOL_NAME,
46 #[cfg(feature = "ping")]
47 libp2p_ping::PROTOCOL_NAME,
48 #[cfg(feature = "relay")]
49 libp2p_relay::STOP_PROTOCOL_NAME,
50 #[cfg(feature = "relay")]
51 libp2p_relay::HOP_PROTOCOL_NAME,
52];
53
54pub(crate) struct Metrics {
55 peers: Peers,
56 error: Counter,
57 pushed: Counter,
58 received: Counter,
59 sent: Counter,
60}
61
62impl Metrics {
63 pub(crate) fn new(registry: &mut Registry) -> Self {
64 let sub_registry = registry.sub_registry_with_prefix("identify");
65
66 let peers = Peers::default();
67 sub_registry.register_collector(Box::new(peers.clone()));
68
69 let error = Counter::default();
70 sub_registry.register(
71 "errors",
72 "Number of errors while attempting to identify the remote",
73 error.clone(),
74 );
75
76 let pushed = Counter::default();
77 sub_registry.register(
78 "pushed",
79 "Number of times identification information of the local node has \
80 been actively pushed to a peer.",
81 pushed.clone(),
82 );
83
84 let received = Counter::default();
85 sub_registry.register(
86 "received",
87 "Number of times identification information has been received from \
88 a peer",
89 received.clone(),
90 );
91
92 let sent = Counter::default();
93 sub_registry.register(
94 "sent",
95 "Number of times identification information of the local node has \
96 been sent to a peer in response to an identification request",
97 sent.clone(),
98 );
99
100 Self {
101 peers,
102 error,
103 pushed,
104 received,
105 sent,
106 }
107 }
108}
109
110impl super::Recorder<libp2p_identify::Event> for Metrics {
111 fn record(&self, event: &libp2p_identify::Event) {
112 match event {
113 libp2p_identify::Event::Error { .. } => {
114 self.error.inc();
115 }
116 libp2p_identify::Event::Pushed { .. } => {
117 self.pushed.inc();
118 }
119 libp2p_identify::Event::Received { peer_id, info, .. } => {
120 self.received.inc();
121 self.peers.record(*peer_id, info.clone());
122 }
123 libp2p_identify::Event::Sent { .. } => {
124 self.sent.inc();
125 }
126 }
127 }
128}
129
130impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
131 fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
132 if let libp2p_swarm::SwarmEvent::ConnectionClosed {
133 peer_id,
134 num_established,
135 ..
136 } = event
137 {
138 if *num_established == 0 {
139 self.peers.remove(*peer_id);
140 }
141 }
142 }
143}
144
145#[derive(Default, Debug, Clone)]
146struct Peers(Arc<Mutex<HashMap<PeerId, libp2p_identify::Info>>>);
147
148impl Peers {
149 fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) {
150 self.0.lock().unwrap().insert(peer_id, info);
151 }
152
153 fn remove(&self, peer_id: PeerId) {
154 self.0.lock().unwrap().remove(&peer_id);
155 }
156}
157
158impl Collector for Peers {
159 fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
160 let mut count_by_protocols: HashMap<String, i64> = Default::default();
161 let mut count_by_listen_addresses: HashMap<String, i64> = Default::default();
162 let mut count_by_observed_addresses: HashMap<String, i64> = Default::default();
163
164 for (_, peer_info) in self.0.lock().unwrap().iter() {
165 {
166 let mut protocols: Vec<_> = peer_info
167 .protocols
168 .iter()
169 .map(|p| {
170 if ALLOWED_PROTOCOLS.contains(p) {
171 p.to_string()
172 } else {
173 "unrecognized".to_string()
174 }
175 })
176 .collect();
177 protocols.sort();
178 protocols.dedup();
179
180 for protocol in protocols.into_iter() {
181 let count = count_by_protocols.entry(protocol).or_default();
182 *count += 1;
183 }
184 }
185
186 {
187 let mut addrs: Vec<_> = peer_info
188 .listen_addrs
189 .iter()
190 .map(protocol_stack::as_string)
191 .collect();
192 addrs.sort();
193 addrs.dedup();
194
195 for addr in addrs {
196 let count = count_by_listen_addresses.entry(addr).or_default();
197 *count += 1;
198 }
199 }
200
201 {
202 let count = count_by_observed_addresses
203 .entry(protocol_stack::as_string(&peer_info.observed_addr))
204 .or_default();
205 *count += 1;
206 }
207 }
208
209 {
210 let mut family_encoder = encoder.encode_descriptor(
211 "remote_protocols",
212 "Number of connected nodes supporting a specific protocol, with \"unrecognized\" for each peer supporting one or more unrecognized protocols",
213 None,
214 MetricType::Gauge,
215 )?;
216 for (protocol, count) in count_by_protocols.into_iter() {
217 let labels = [("protocol", protocol)];
218 let metric_encoder = family_encoder.encode_family(&labels)?;
219 let metric = ConstGauge::new(count);
220 metric.encode(metric_encoder)?;
221 }
222 }
223
224 {
225 let mut family_encoder = encoder.encode_descriptor(
226 "remote_listen_addresses",
227 "Number of connected nodes advertising a specific listen address",
228 None,
229 MetricType::Gauge,
230 )?;
231 for (protocol, count) in count_by_listen_addresses.into_iter() {
232 let labels = [("listen_address", protocol)];
233 let metric_encoder = family_encoder.encode_family(&labels)?;
234 ConstGauge::new(count).encode(metric_encoder)?;
235 }
236 }
237
238 {
239 let mut family_encoder = encoder.encode_descriptor(
240 "local_observed_addresses",
241 "Number of connected nodes observing the local node at a specific address",
242 None,
243 MetricType::Gauge,
244 )?;
245 for (protocol, count) in count_by_observed_addresses.into_iter() {
246 let labels = [("observed_address", protocol)];
247 let metric_encoder = family_encoder.encode_family(&labels)?;
248 ConstGauge::new(count).encode(metric_encoder)?;
249 }
250 }
251
252 Ok(())
253 }
254}