distributed_key_value_store_example/
main.rs1#![doc = include_str!("../README.md")]
22
23use std::error::Error;
24
25use futures::stream::StreamExt;
26use libp2p::{
27 kad,
28 kad::{store::MemoryStore, Mode},
29 mdns, noise,
30 swarm::{NetworkBehaviour, SwarmEvent},
31 tcp, yamux,
32};
33use tokio::{
34 io::{self, AsyncBufReadExt},
35 select,
36};
37use tracing_subscriber::EnvFilter;
38
39#[tokio::main]
40async fn main() -> Result<(), Box<dyn Error>> {
41 let _ = tracing_subscriber::fmt()
42 .with_env_filter(EnvFilter::from_default_env())
43 .try_init();
44
45 #[derive(NetworkBehaviour)]
47 struct Behaviour {
48 kademlia: kad::Behaviour<MemoryStore>,
49 mdns: mdns::tokio::Behaviour,
50 }
51
52 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
53 .with_tokio()
54 .with_tcp(
55 tcp::Config::default(),
56 noise::Config::new,
57 yamux::Config::default,
58 )?
59 .with_behaviour(|key| {
60 Ok(Behaviour {
61 kademlia: kad::Behaviour::new(
62 key.public().to_peer_id(),
63 MemoryStore::new(key.public().to_peer_id()),
64 ),
65 mdns: mdns::tokio::Behaviour::new(
66 mdns::Config::default(),
67 key.public().to_peer_id(),
68 )?,
69 })
70 })?
71 .build();
72
73 swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
74
75 let mut stdin = io::BufReader::new(io::stdin()).lines();
77
78 swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
80
81 loop {
83 select! {
84 Ok(Some(line)) = stdin.next_line() => {
85 handle_input_line(&mut swarm.behaviour_mut().kademlia, line);
86 }
87 event = swarm.select_next_some() => match event {
88 SwarmEvent::NewListenAddr { address, .. } => {
89 println!("Listening in {address:?}");
90 },
91 SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
92 for (peer_id, multiaddr) in list {
93 swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
94 }
95 }
96 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => {
97 match result {
98 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
99 for peer in providers {
100 println!(
101 "Peer {peer:?} provides key {:?}",
102 std::str::from_utf8(key.as_ref()).unwrap()
103 );
104 }
105 }
106 kad::QueryResult::GetProviders(Err(err)) => {
107 eprintln!("Failed to get providers: {err:?}");
108 }
109 kad::QueryResult::GetRecord(Ok(
110 kad::GetRecordOk::FoundRecord(kad::PeerRecord {
111 record: kad::Record { key, value, .. },
112 ..
113 })
114 )) => {
115 println!(
116 "Got record {:?} {:?}",
117 std::str::from_utf8(key.as_ref()).unwrap(),
118 std::str::from_utf8(&value).unwrap(),
119 );
120 }
121 kad::QueryResult::GetRecord(Ok(_)) => {}
122 kad::QueryResult::GetRecord(Err(err)) => {
123 eprintln!("Failed to get record: {err:?}");
124 }
125 kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
126 println!(
127 "Successfully put record {:?}",
128 std::str::from_utf8(key.as_ref()).unwrap()
129 );
130 }
131 kad::QueryResult::PutRecord(Err(err)) => {
132 eprintln!("Failed to put record: {err:?}");
133 }
134 kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
135 println!(
136 "Successfully put provider record {:?}",
137 std::str::from_utf8(key.as_ref()).unwrap()
138 );
139 }
140 kad::QueryResult::StartProviding(Err(err)) => {
141 eprintln!("Failed to put provider record: {err:?}");
142 }
143 _ => {}
144 }
145 }
146 _ => {}
147 }
148 }
149 }
150}
151
152fn handle_input_line(kademlia: &mut kad::Behaviour<MemoryStore>, line: String) {
153 let mut args = line.split(' ');
154
155 match args.next() {
156 Some("GET") => {
157 let key = {
158 match args.next() {
159 Some(key) => kad::RecordKey::new(&key),
160 None => {
161 eprintln!("Expected key");
162 return;
163 }
164 }
165 };
166 kademlia.get_record(key);
167 }
168 Some("GET_PROVIDERS") => {
169 let key = {
170 match args.next() {
171 Some(key) => kad::RecordKey::new(&key),
172 None => {
173 eprintln!("Expected key");
174 return;
175 }
176 }
177 };
178 kademlia.get_providers(key);
179 }
180 Some("PUT") => {
181 let key = {
182 match args.next() {
183 Some(key) => kad::RecordKey::new(&key),
184 None => {
185 eprintln!("Expected key");
186 return;
187 }
188 }
189 };
190 let value = {
191 match args.next() {
192 Some(value) => value.as_bytes().to_vec(),
193 None => {
194 eprintln!("Expected value");
195 return;
196 }
197 }
198 };
199 let record = kad::Record {
200 key,
201 value,
202 publisher: None,
203 expires: None,
204 };
205 kademlia
206 .put_record(record, kad::Quorum::One)
207 .expect("Failed to store record locally.");
208 }
209 Some("PUT_PROVIDER") => {
210 let key = {
211 match args.next() {
212 Some(key) => kad::RecordKey::new(&key),
213 None => {
214 eprintln!("Expected key");
215 return;
216 }
217 }
218 };
219
220 kademlia
221 .start_providing(key)
222 .expect("Failed to start providing key");
223 }
224 _ => {
225 eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
226 }
227 }
228}