1use std::{error::Error, path::PathBuf, str::FromStr};
2
3use base64::Engine;
4use clap::Parser;
5use futures::stream::StreamExt;
6use libp2p::{
7 identify, identity,
8 identity::PeerId,
9 kad,
10 metrics::{Metrics, Recorder},
11 noise,
12 swarm::SwarmEvent,
13 tcp, yamux,
14};
15use prometheus_client::{metrics::info::Info, registry::Registry};
16use tracing_subscriber::EnvFilter;
17use zeroize::Zeroizing;
18
19mod behaviour;
20mod config;
21mod http_service;
22
23#[derive(Debug, Parser)]
24#[command(name = "libp2p server", about = "A rust-libp2p server binary.")]
25struct Opts {
26 #[arg(long)]
28 config: PathBuf,
29
30 #[arg(long, default_value = "/metrics")]
32 metrics_path: String,
33
34 #[arg(long)]
36 enable_kademlia: bool,
37
38 #[arg(long)]
40 enable_autonat: bool,
41}
42
43#[tokio::main]
44async fn main() -> Result<(), Box<dyn Error>> {
45 let _ = tracing_subscriber::fmt()
46 .with_env_filter(EnvFilter::from_default_env())
47 .try_init();
48
49 let opt = Opts::parse();
50
51 let config = Zeroizing::new(config::Config::from_file(opt.config.as_path())?);
52
53 let mut metric_registry = Registry::default();
54
55 let local_keypair = {
56 let keypair = identity::Keypair::from_protobuf_encoding(&Zeroizing::new(
57 base64::engine::general_purpose::STANDARD
58 .decode(config.identity.priv_key.as_bytes())?,
59 ))?;
60
61 let peer_id = keypair.public().into();
62 assert_eq!(
63 PeerId::from_str(&config.identity.peer_id)?,
64 peer_id,
65 "Expect peer id derived from private key and peer id retrieved from config to match."
66 );
67
68 keypair
69 };
70
71 let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_keypair)
72 .with_tokio()
73 .with_tcp(
74 tcp::Config::default().nodelay(true),
75 noise::Config::new,
76 yamux::Config::default,
77 )?
78 .with_quic()
79 .with_dns()?
80 .with_websocket(noise::Config::new, yamux::Config::default)
81 .await?
82 .with_bandwidth_metrics(&mut metric_registry)
83 .with_behaviour(|key| {
84 behaviour::Behaviour::new(key.public(), opt.enable_kademlia, opt.enable_autonat)
85 })?
86 .build();
87
88 if config.addresses.swarm.is_empty() {
89 tracing::warn!("No listen addresses configured");
90 }
91 for address in &config.addresses.swarm {
92 match swarm.listen_on(address.clone()) {
93 Ok(_) => {}
94 Err(e @ libp2p::TransportError::MultiaddrNotSupported(_)) => {
95 tracing::warn!(%address, "Failed to listen on address, continuing anyways, {e}")
96 }
97 Err(e) => return Err(e.into()),
98 }
99 }
100
101 if config.addresses.append_announce.is_empty() {
102 tracing::warn!("No external addresses configured");
103 }
104 for address in &config.addresses.append_announce {
105 swarm.add_external_address(address.clone())
106 }
107 tracing::info!(
108 "External addresses: {:?}",
109 swarm.external_addresses().collect::<Vec<_>>()
110 );
111
112 let metrics = Metrics::new(&mut metric_registry);
113 let build_info = Info::new(vec![("version".to_string(), env!("CARGO_PKG_VERSION"))]);
114 metric_registry.register(
115 "build",
116 "A metric with a constant '1' value labeled by version",
117 build_info,
118 );
119 tokio::spawn(async move {
120 if let Err(e) = http_service::metrics_server(metric_registry, opt.metrics_path).await {
121 tracing::error!("Metrics server failed: {e}");
122 }
123 });
124
125 loop {
126 let event = swarm.next().await.expect("Swarm not to terminate.");
127 metrics.record(&event);
128 match event {
129 SwarmEvent::Behaviour(behaviour::BehaviourEvent::Identify(e)) => {
130 tracing::info!("{:?}", e);
131 metrics.record(&e);
132
133 if let identify::Event::Received {
134 peer_id,
135 info:
136 identify::Info {
137 listen_addrs,
138 protocols,
139 ..
140 },
141 ..
142 } = e
143 {
144 if protocols.iter().any(|p| *p == kad::PROTOCOL_NAME) {
145 for addr in listen_addrs {
146 swarm
147 .behaviour_mut()
148 .kademlia
149 .as_mut()
150 .map(|k| k.add_address(&peer_id, addr));
151 }
152 }
153 }
154 }
155 SwarmEvent::Behaviour(behaviour::BehaviourEvent::Ping(e)) => {
156 tracing::debug!("{:?}", e);
157 metrics.record(&e);
158 }
159 SwarmEvent::Behaviour(behaviour::BehaviourEvent::Kademlia(e)) => {
160 tracing::debug!("{:?}", e);
161 metrics.record(&e);
162 }
163 SwarmEvent::Behaviour(behaviour::BehaviourEvent::Relay(e)) => {
164 tracing::info!("{:?}", e);
165 metrics.record(&e)
166 }
167 SwarmEvent::Behaviour(behaviour::BehaviourEvent::Autonat(e)) => {
168 tracing::info!("{:?}", e);
169 }
172 SwarmEvent::NewListenAddr { address, .. } => {
173 tracing::info!(%address, "Listening on address");
174 }
175 _ => {}
176 }
177 }
178}