1#![doc = include_str!("../README.md")]
22
23use std::{
24 num::NonZeroUsize,
25 ops::Add,
26 time::{Duration, Instant},
27};
28
29use anyhow::{bail, Result};
30use clap::Parser;
31use futures::StreamExt;
32use libp2p::{
33 bytes::BufMut,
34 identity, kad, noise,
35 swarm::{StreamProtocol, SwarmEvent},
36 tcp, yamux, PeerId,
37};
38use tracing_subscriber::EnvFilter;
39
40const BOOTNODES: [&str; 4] = [
41 "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
42 "QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
43 "QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
44 "QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
45];
46
47const IPFS_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
48
49#[tokio::main]
50async fn main() -> Result<()> {
51 let _ = tracing_subscriber::fmt()
52 .with_env_filter(EnvFilter::from_default_env())
53 .try_init();
54
55 let local_key = identity::Keypair::generate_ed25519();
57
58 let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key.clone())
59 .with_tokio()
60 .with_tcp(
61 tcp::Config::default(),
62 noise::Config::new,
63 yamux::Config::default,
64 )?
65 .with_dns()?
66 .with_behaviour(|key| {
67 let mut cfg = kad::Config::new(IPFS_PROTO_NAME);
69 cfg.set_query_timeout(Duration::from_secs(5 * 60));
70 let store = kad::store::MemoryStore::new(key.public().to_peer_id());
71 kad::Behaviour::with_config(key.public().to_peer_id(), store, cfg)
72 })?
73 .build();
74
75 for peer in &BOOTNODES {
79 swarm
80 .behaviour_mut()
81 .add_address(&peer.parse()?, "/dnsaddr/bootstrap.libp2p.io".parse()?);
82 }
83
84 let cli_opt = Opt::parse();
85
86 match cli_opt.argument {
87 CliArgument::GetPeers { peer_id } => {
88 let peer_id = peer_id.unwrap_or(PeerId::random());
89 println!("Searching for the closest peers to {peer_id}");
90 swarm.behaviour_mut().get_closest_peers(peer_id);
91 }
92 CliArgument::PutPkRecord {} => {
93 println!("Putting PK record into the DHT");
94
95 let mut pk_record_key = vec![];
96 pk_record_key.put_slice("/pk/".as_bytes());
97 pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice());
98
99 let mut pk_record =
100 kad::Record::new(pk_record_key, local_key.public().encode_protobuf());
101 pk_record.publisher = Some(*swarm.local_peer_id());
102 pk_record.expires = Some(Instant::now().add(Duration::from_secs(60)));
103
104 swarm
105 .behaviour_mut()
106 .put_record(pk_record, kad::Quorum::N(NonZeroUsize::new(3).unwrap()))?;
107 }
108 }
109
110 loop {
111 let event = swarm.select_next_some().await;
112
113 match event {
114 SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
115 result: kad::QueryResult::GetClosestPeers(Ok(ok)),
116 ..
117 }) => {
118 if ok.peers.is_empty() {
121 bail!("Query finished with no closest peers.")
122 }
123
124 println!("Query finished with closest peers: {:#?}", ok.peers);
125
126 return Ok(());
127 }
128 SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
129 result:
130 kad::QueryResult::GetClosestPeers(Err(kad::GetClosestPeersError::Timeout {
131 ..
132 })),
133 ..
134 }) => {
135 bail!("Query for closest peers timed out")
136 }
137 SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
138 result: kad::QueryResult::PutRecord(Ok(_)),
139 ..
140 }) => {
141 println!("Successfully inserted the PK record");
142
143 return Ok(());
144 }
145 SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
146 result: kad::QueryResult::PutRecord(Err(err)),
147 ..
148 }) => {
149 bail!(anyhow::Error::new(err).context("Failed to insert the PK record"));
150 }
151 _ => {}
152 }
153 }
154}
155
156#[derive(Parser, Debug)]
157#[command(name = "libp2p Kademlia DHT example")]
158struct Opt {
159 #[command(subcommand)]
160 argument: CliArgument,
161}
162
163#[derive(Debug, Parser)]
164enum CliArgument {
165 GetPeers {
166 #[arg(long)]
167 peer_id: Option<PeerId>,
168 },
169 PutPkRecord {},
170}