distributed_key_value_store_example/
main.rs

1// Copyright 20l9 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![doc = include_str!("../README.md")]
22
23use std::error::Error;
24
25use futures::stream::StreamExt;
26use libp2p::{
27    kad,
28    kad::{store::MemoryStore, Mode},
29    mdns, noise,
30    swarm::{NetworkBehaviour, SwarmEvent},
31    tcp, yamux,
32};
33use tokio::{
34    io::{self, AsyncBufReadExt},
35    select,
36};
37use tracing_subscriber::EnvFilter;
38
39#[tokio::main]
40async fn main() -> Result<(), Box<dyn Error>> {
41    let _ = tracing_subscriber::fmt()
42        .with_env_filter(EnvFilter::from_default_env())
43        .try_init();
44
45    // We create a custom network behaviour that combines Kademlia and mDNS.
46    #[derive(NetworkBehaviour)]
47    struct Behaviour {
48        kademlia: kad::Behaviour<MemoryStore>,
49        mdns: mdns::tokio::Behaviour,
50    }
51
52    let mut swarm = libp2p::SwarmBuilder::with_new_identity()
53        .with_tokio()
54        .with_tcp(
55            tcp::Config::default(),
56            noise::Config::new,
57            yamux::Config::default,
58        )?
59        .with_behaviour(|key| {
60            Ok(Behaviour {
61                kademlia: kad::Behaviour::new(
62                    key.public().to_peer_id(),
63                    MemoryStore::new(key.public().to_peer_id()),
64                ),
65                mdns: mdns::tokio::Behaviour::new(
66                    mdns::Config::default(),
67                    key.public().to_peer_id(),
68                )?,
69            })
70        })?
71        .build();
72
73    swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
74
75    // Read full lines from stdin
76    let mut stdin = io::BufReader::new(io::stdin()).lines();
77
78    // Listen on all interfaces and whatever port the OS assigns.
79    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
80
81    // Kick it off.
82    loop {
83        select! {
84        Ok(Some(line)) = stdin.next_line() => {
85            handle_input_line(&mut swarm.behaviour_mut().kademlia, line);
86        }
87        event = swarm.select_next_some() => match event {
88            SwarmEvent::NewListenAddr { address, .. } => {
89                println!("Listening in {address:?}");
90            },
91            SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
92                for (peer_id, multiaddr) in list {
93                    swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
94                }
95            }
96            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => {
97                match result {
98                    kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
99                        for peer in providers {
100                            println!(
101                                "Peer {peer:?} provides key {:?}",
102                                std::str::from_utf8(key.as_ref()).unwrap()
103                            );
104                        }
105                    }
106                    kad::QueryResult::GetProviders(Err(err)) => {
107                        eprintln!("Failed to get providers: {err:?}");
108                    }
109                    kad::QueryResult::GetRecord(Ok(
110                        kad::GetRecordOk::FoundRecord(kad::PeerRecord {
111                            record: kad::Record { key, value, .. },
112                            ..
113                        })
114                    )) => {
115                        println!(
116                            "Got record {:?} {:?}",
117                            std::str::from_utf8(key.as_ref()).unwrap(),
118                            std::str::from_utf8(&value).unwrap(),
119                        );
120                    }
121                    kad::QueryResult::GetRecord(Ok(_)) => {}
122                    kad::QueryResult::GetRecord(Err(err)) => {
123                        eprintln!("Failed to get record: {err:?}");
124                    }
125                    kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
126                        println!(
127                            "Successfully put record {:?}",
128                            std::str::from_utf8(key.as_ref()).unwrap()
129                        );
130                    }
131                    kad::QueryResult::PutRecord(Err(err)) => {
132                        eprintln!("Failed to put record: {err:?}");
133                    }
134                    kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
135                        println!(
136                            "Successfully put provider record {:?}",
137                            std::str::from_utf8(key.as_ref()).unwrap()
138                        );
139                    }
140                    kad::QueryResult::StartProviding(Err(err)) => {
141                        eprintln!("Failed to put provider record: {err:?}");
142                    }
143                    _ => {}
144                }
145            }
146            _ => {}
147        }
148        }
149    }
150}
151
152fn handle_input_line(kademlia: &mut kad::Behaviour<MemoryStore>, line: String) {
153    let mut args = line.split(' ');
154
155    match args.next() {
156        Some("GET") => {
157            let key = {
158                match args.next() {
159                    Some(key) => kad::RecordKey::new(&key),
160                    None => {
161                        eprintln!("Expected key");
162                        return;
163                    }
164                }
165            };
166            kademlia.get_record(key);
167        }
168        Some("GET_PROVIDERS") => {
169            let key = {
170                match args.next() {
171                    Some(key) => kad::RecordKey::new(&key),
172                    None => {
173                        eprintln!("Expected key");
174                        return;
175                    }
176                }
177            };
178            kademlia.get_providers(key);
179        }
180        Some("PUT") => {
181            let key = {
182                match args.next() {
183                    Some(key) => kad::RecordKey::new(&key),
184                    None => {
185                        eprintln!("Expected key");
186                        return;
187                    }
188                }
189            };
190            let value = {
191                match args.next() {
192                    Some(value) => value.as_bytes().to_vec(),
193                    None => {
194                        eprintln!("Expected value");
195                        return;
196                    }
197                }
198            };
199            let record = kad::Record {
200                key,
201                value,
202                publisher: None,
203                expires: None,
204            };
205            kademlia
206                .put_record(record, kad::Quorum::One)
207                .expect("Failed to store record locally.");
208        }
209        Some("PUT_PROVIDER") => {
210            let key = {
211                match args.next() {
212                    Some(key) => kad::RecordKey::new(&key),
213                    None => {
214                        eprintln!("Expected key");
215                        return;
216                    }
217                }
218            };
219
220            kademlia
221                .start_providing(key)
222                .expect("Failed to start providing key");
223        }
224        _ => {
225            eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
226        }
227    }
228}