libp2p_server/
main.rs

1use std::{error::Error, path::PathBuf, str::FromStr};
2
3use base64::Engine;
4use clap::Parser;
5use futures::stream::StreamExt;
6use libp2p::{
7    identify, identity,
8    identity::PeerId,
9    kad,
10    metrics::{Metrics, Recorder},
11    noise,
12    swarm::SwarmEvent,
13    tcp, yamux,
14};
15use prometheus_client::{metrics::info::Info, registry::Registry};
16use tracing_subscriber::EnvFilter;
17use zeroize::Zeroizing;
18
19mod behaviour;
20mod config;
21mod http_service;
22
23#[derive(Debug, Parser)]
24#[command(name = "libp2p server", about = "A rust-libp2p server binary.")]
25struct Opts {
26    /// Path to IPFS config file.
27    #[arg(long)]
28    config: PathBuf,
29
30    /// Metric endpoint path.
31    #[arg(long, default_value = "/metrics")]
32    metrics_path: String,
33
34    /// Whether to run the libp2p Kademlia protocol and join the IPFS DHT.
35    #[arg(long)]
36    enable_kademlia: bool,
37
38    /// Whether to run the libp2p Autonat protocol.
39    #[arg(long)]
40    enable_autonat: bool,
41}
42
43#[tokio::main]
44async fn main() -> Result<(), Box<dyn Error>> {
45    let _ = tracing_subscriber::fmt()
46        .with_env_filter(EnvFilter::from_default_env())
47        .try_init();
48
49    let opt = Opts::parse();
50
51    let config = Zeroizing::new(config::Config::from_file(opt.config.as_path())?);
52
53    let mut metric_registry = Registry::default();
54
55    let local_keypair = {
56        let keypair = identity::Keypair::from_protobuf_encoding(&Zeroizing::new(
57            base64::engine::general_purpose::STANDARD
58                .decode(config.identity.priv_key.as_bytes())?,
59        ))?;
60
61        let peer_id = keypair.public().into();
62        assert_eq!(
63            PeerId::from_str(&config.identity.peer_id)?,
64            peer_id,
65            "Expect peer id derived from private key and peer id retrieved from config to match."
66        );
67
68        keypair
69    };
70
71    let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_keypair)
72        .with_tokio()
73        .with_tcp(
74            tcp::Config::default().nodelay(true),
75            noise::Config::new,
76            yamux::Config::default,
77        )?
78        .with_quic()
79        .with_dns()?
80        .with_websocket(noise::Config::new, yamux::Config::default)
81        .await?
82        .with_bandwidth_metrics(&mut metric_registry)
83        .with_behaviour(|key| {
84            behaviour::Behaviour::new(key.public(), opt.enable_kademlia, opt.enable_autonat)
85        })?
86        .build();
87
88    if config.addresses.swarm.is_empty() {
89        tracing::warn!("No listen addresses configured");
90    }
91    for address in &config.addresses.swarm {
92        match swarm.listen_on(address.clone()) {
93            Ok(_) => {}
94            Err(e @ libp2p::TransportError::MultiaddrNotSupported(_)) => {
95                tracing::warn!(%address, "Failed to listen on address, continuing anyways, {e}")
96            }
97            Err(e) => return Err(e.into()),
98        }
99    }
100
101    if config.addresses.append_announce.is_empty() {
102        tracing::warn!("No external addresses configured");
103    }
104    for address in &config.addresses.append_announce {
105        swarm.add_external_address(address.clone())
106    }
107    tracing::info!(
108        "External addresses: {:?}",
109        swarm.external_addresses().collect::<Vec<_>>()
110    );
111
112    let metrics = Metrics::new(&mut metric_registry);
113    let build_info = Info::new(vec![("version".to_string(), env!("CARGO_PKG_VERSION"))]);
114    metric_registry.register(
115        "build",
116        "A metric with a constant '1' value labeled by version",
117        build_info,
118    );
119    tokio::spawn(async move {
120        if let Err(e) = http_service::metrics_server(metric_registry, opt.metrics_path).await {
121            tracing::error!("Metrics server failed: {e}");
122        }
123    });
124
125    loop {
126        let event = swarm.next().await.expect("Swarm not to terminate.");
127        metrics.record(&event);
128        match event {
129            SwarmEvent::Behaviour(behaviour::BehaviourEvent::Identify(e)) => {
130                tracing::info!("{:?}", e);
131                metrics.record(&e);
132
133                if let identify::Event::Received {
134                    peer_id,
135                    info:
136                        identify::Info {
137                            listen_addrs,
138                            protocols,
139                            ..
140                        },
141                    ..
142                } = e
143                {
144                    if protocols.iter().any(|p| *p == kad::PROTOCOL_NAME) {
145                        for addr in listen_addrs {
146                            swarm
147                                .behaviour_mut()
148                                .kademlia
149                                .as_mut()
150                                .map(|k| k.add_address(&peer_id, addr));
151                        }
152                    }
153                }
154            }
155            SwarmEvent::Behaviour(behaviour::BehaviourEvent::Ping(e)) => {
156                tracing::debug!("{:?}", e);
157                metrics.record(&e);
158            }
159            SwarmEvent::Behaviour(behaviour::BehaviourEvent::Kademlia(e)) => {
160                tracing::debug!("{:?}", e);
161                metrics.record(&e);
162            }
163            SwarmEvent::Behaviour(behaviour::BehaviourEvent::Relay(e)) => {
164                tracing::info!("{:?}", e);
165                metrics.record(&e)
166            }
167            SwarmEvent::Behaviour(behaviour::BehaviourEvent::Autonat(e)) => {
168                tracing::info!("{:?}", e);
169                // TODO: Add metric recording for `NatStatus`.
170                // metrics.record(&e)
171            }
172            SwarmEvent::NewListenAddr { address, .. } => {
173                tracing::info!(%address, "Listening on address");
174            }
175            _ => {}
176        }
177    }
178}