1use std::{
2 collections::{hash_map, HashMap, HashSet},
3 error::Error,
4 time::Duration,
5};
6
7use futures::{
8 channel::{mpsc, oneshot},
9 prelude::*,
10 StreamExt,
11};
12use libp2p::{
13 core::Multiaddr,
14 identity, kad,
15 multiaddr::Protocol,
16 noise,
17 request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
18 swarm::{NetworkBehaviour, Swarm, SwarmEvent},
19 tcp, yamux, PeerId, StreamProtocol,
20};
21use serde::{Deserialize, Serialize};
22
23pub(crate) async fn new(
31 secret_key_seed: Option<u8>,
32) -> Result<(Client, impl Stream<Item = Event>, EventLoop), Box<dyn Error>> {
33 let id_keys = match secret_key_seed {
35 Some(seed) => {
36 let mut bytes = [0u8; 32];
37 bytes[0] = seed;
38 identity::Keypair::ed25519_from_bytes(bytes).unwrap()
39 }
40 None => identity::Keypair::generate_ed25519(),
41 };
42 let peer_id = id_keys.public().to_peer_id();
43
44 let mut swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
45 .with_tokio()
46 .with_tcp(
47 tcp::Config::default(),
48 noise::Config::new,
49 yamux::Config::default,
50 )?
51 .with_behaviour(|key| Behaviour {
52 kademlia: kad::Behaviour::new(
53 peer_id,
54 kad::store::MemoryStore::new(key.public().to_peer_id()),
55 ),
56 request_response: request_response::cbor::Behaviour::new(
57 [(
58 StreamProtocol::new("/file-exchange/1"),
59 ProtocolSupport::Full,
60 )],
61 request_response::Config::default(),
62 ),
63 })?
64 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
65 .build();
66
67 swarm
68 .behaviour_mut()
69 .kademlia
70 .set_mode(Some(kad::Mode::Server));
71
72 let (command_sender, command_receiver) = mpsc::channel(0);
73 let (event_sender, event_receiver) = mpsc::channel(0);
74
75 Ok((
76 Client {
77 sender: command_sender,
78 },
79 event_receiver,
80 EventLoop::new(swarm, command_receiver, event_sender),
81 ))
82}
83
84#[derive(Clone)]
85pub(crate) struct Client {
86 sender: mpsc::Sender<Command>,
87}
88
89impl Client {
90 pub(crate) async fn start_listening(
92 &mut self,
93 addr: Multiaddr,
94 ) -> Result<(), Box<dyn Error + Send>> {
95 let (sender, receiver) = oneshot::channel();
96 self.sender
97 .send(Command::StartListening { addr, sender })
98 .await
99 .expect("Command receiver not to be dropped.");
100 receiver.await.expect("Sender not to be dropped.")
101 }
102
103 pub(crate) async fn dial(
105 &mut self,
106 peer_id: PeerId,
107 peer_addr: Multiaddr,
108 ) -> Result<(), Box<dyn Error + Send>> {
109 let (sender, receiver) = oneshot::channel();
110 self.sender
111 .send(Command::Dial {
112 peer_id,
113 peer_addr,
114 sender,
115 })
116 .await
117 .expect("Command receiver not to be dropped.");
118 receiver.await.expect("Sender not to be dropped.")
119 }
120
121 pub(crate) async fn start_providing(&mut self, file_name: String) {
123 let (sender, receiver) = oneshot::channel();
124 self.sender
125 .send(Command::StartProviding { file_name, sender })
126 .await
127 .expect("Command receiver not to be dropped.");
128 receiver.await.expect("Sender not to be dropped.");
129 }
130
131 pub(crate) async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
133 let (sender, receiver) = oneshot::channel();
134 self.sender
135 .send(Command::GetProviders { file_name, sender })
136 .await
137 .expect("Command receiver not to be dropped.");
138 receiver.await.expect("Sender not to be dropped.")
139 }
140
141 pub(crate) async fn request_file(
143 &mut self,
144 peer: PeerId,
145 file_name: String,
146 ) -> Result<Vec<u8>, Box<dyn Error + Send>> {
147 let (sender, receiver) = oneshot::channel();
148 self.sender
149 .send(Command::RequestFile {
150 file_name,
151 peer,
152 sender,
153 })
154 .await
155 .expect("Command receiver not to be dropped.");
156 receiver.await.expect("Sender not be dropped.")
157 }
158
159 pub(crate) async fn respond_file(
161 &mut self,
162 file: Vec<u8>,
163 channel: ResponseChannel<FileResponse>,
164 ) {
165 self.sender
166 .send(Command::RespondFile { file, channel })
167 .await
168 .expect("Command receiver not to be dropped.");
169 }
170}
171
172pub(crate) struct EventLoop {
173 swarm: Swarm<Behaviour>,
174 command_receiver: mpsc::Receiver<Command>,
175 event_sender: mpsc::Sender<Event>,
176 pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
177 pending_start_providing: HashMap<kad::QueryId, oneshot::Sender<()>>,
178 pending_get_providers: HashMap<kad::QueryId, oneshot::Sender<HashSet<PeerId>>>,
179 pending_request_file:
180 HashMap<OutboundRequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
181}
182
183impl EventLoop {
184 fn new(
185 swarm: Swarm<Behaviour>,
186 command_receiver: mpsc::Receiver<Command>,
187 event_sender: mpsc::Sender<Event>,
188 ) -> Self {
189 Self {
190 swarm,
191 command_receiver,
192 event_sender,
193 pending_dial: Default::default(),
194 pending_start_providing: Default::default(),
195 pending_get_providers: Default::default(),
196 pending_request_file: Default::default(),
197 }
198 }
199
200 pub(crate) async fn run(mut self) {
201 loop {
202 tokio::select! {
203 event = self.swarm.select_next_some() => self.handle_event(event).await,
204 command = self.command_receiver.next() => match command {
205 Some(c) => self.handle_command(c).await,
206 None=> return,
208 },
209 }
210 }
211 }
212
213 async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
214 match event {
215 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
216 kad::Event::OutboundQueryProgressed {
217 id,
218 result: kad::QueryResult::StartProviding(_),
219 ..
220 },
221 )) => {
222 let sender: oneshot::Sender<()> = self
223 .pending_start_providing
224 .remove(&id)
225 .expect("Completed query to be previously pending.");
226 let _ = sender.send(());
227 }
228 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
229 kad::Event::OutboundQueryProgressed {
230 id,
231 result:
232 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
233 providers,
234 ..
235 })),
236 ..
237 },
238 )) => {
239 if let Some(sender) = self.pending_get_providers.remove(&id) {
240 sender.send(providers).expect("Receiver not to be dropped");
241
242 self.swarm
244 .behaviour_mut()
245 .kademlia
246 .query_mut(&id)
247 .unwrap()
248 .finish();
249 }
250 }
251 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
252 kad::Event::OutboundQueryProgressed {
253 result:
254 kad::QueryResult::GetProviders(Ok(
255 kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
256 )),
257 ..
258 },
259 )) => {}
260 SwarmEvent::Behaviour(BehaviourEvent::Kademlia(_)) => {}
261 SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
262 request_response::Event::Message { message, .. },
263 )) => match message {
264 request_response::Message::Request {
265 request, channel, ..
266 } => {
267 self.event_sender
268 .send(Event::InboundRequest {
269 request: request.0,
270 channel,
271 })
272 .await
273 .expect("Event receiver not to be dropped.");
274 }
275 request_response::Message::Response {
276 request_id,
277 response,
278 } => {
279 let _ = self
280 .pending_request_file
281 .remove(&request_id)
282 .expect("Request to still be pending.")
283 .send(Ok(response.0));
284 }
285 },
286 SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
287 request_response::Event::OutboundFailure {
288 request_id, error, ..
289 },
290 )) => {
291 let _ = self
292 .pending_request_file
293 .remove(&request_id)
294 .expect("Request to still be pending.")
295 .send(Err(Box::new(error)));
296 }
297 SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
298 request_response::Event::ResponseSent { .. },
299 )) => {}
300 SwarmEvent::NewListenAddr { address, .. } => {
301 let local_peer_id = *self.swarm.local_peer_id();
302 eprintln!(
303 "Local node is listening on {:?}",
304 address.with(Protocol::P2p(local_peer_id))
305 );
306 }
307 SwarmEvent::IncomingConnection { .. } => {}
308 SwarmEvent::ConnectionEstablished {
309 peer_id, endpoint, ..
310 } => {
311 if endpoint.is_dialer() {
312 if let Some(sender) = self.pending_dial.remove(&peer_id) {
313 let _ = sender.send(Ok(()));
314 }
315 }
316 }
317 SwarmEvent::ConnectionClosed { .. } => {}
318 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
319 if let Some(peer_id) = peer_id {
320 if let Some(sender) = self.pending_dial.remove(&peer_id) {
321 let _ = sender.send(Err(Box::new(error)));
322 }
323 }
324 }
325 SwarmEvent::IncomingConnectionError { .. } => {}
326 SwarmEvent::Dialing {
327 peer_id: Some(peer_id),
328 ..
329 } => eprintln!("Dialing {peer_id}"),
330 e => panic!("{e:?}"),
331 }
332 }
333
334 async fn handle_command(&mut self, command: Command) {
335 match command {
336 Command::StartListening { addr, sender } => {
337 let _ = match self.swarm.listen_on(addr) {
338 Ok(_) => sender.send(Ok(())),
339 Err(e) => sender.send(Err(Box::new(e))),
340 };
341 }
342 Command::Dial {
343 peer_id,
344 peer_addr,
345 sender,
346 } => {
347 if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) {
348 self.swarm
349 .behaviour_mut()
350 .kademlia
351 .add_address(&peer_id, peer_addr.clone());
352 match self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id))) {
353 Ok(()) => {
354 e.insert(sender);
355 }
356 Err(e) => {
357 let _ = sender.send(Err(Box::new(e)));
358 }
359 }
360 } else {
361 todo!("Already dialing peer.");
362 }
363 }
364 Command::StartProviding { file_name, sender } => {
365 let query_id = self
366 .swarm
367 .behaviour_mut()
368 .kademlia
369 .start_providing(file_name.into_bytes().into())
370 .expect("No store error.");
371 self.pending_start_providing.insert(query_id, sender);
372 }
373 Command::GetProviders { file_name, sender } => {
374 let query_id = self
375 .swarm
376 .behaviour_mut()
377 .kademlia
378 .get_providers(file_name.into_bytes().into());
379 self.pending_get_providers.insert(query_id, sender);
380 }
381 Command::RequestFile {
382 file_name,
383 peer,
384 sender,
385 } => {
386 let request_id = self
387 .swarm
388 .behaviour_mut()
389 .request_response
390 .send_request(&peer, FileRequest(file_name));
391 self.pending_request_file.insert(request_id, sender);
392 }
393 Command::RespondFile { file, channel } => {
394 self.swarm
395 .behaviour_mut()
396 .request_response
397 .send_response(channel, FileResponse(file))
398 .expect("Connection to peer to be still open.");
399 }
400 }
401 }
402}
403
404#[derive(NetworkBehaviour)]
405struct Behaviour {
406 request_response: request_response::cbor::Behaviour<FileRequest, FileResponse>,
407 kademlia: kad::Behaviour<kad::store::MemoryStore>,
408}
409
410#[derive(Debug)]
411enum Command {
412 StartListening {
413 addr: Multiaddr,
414 sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
415 },
416 Dial {
417 peer_id: PeerId,
418 peer_addr: Multiaddr,
419 sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
420 },
421 StartProviding {
422 file_name: String,
423 sender: oneshot::Sender<()>,
424 },
425 GetProviders {
426 file_name: String,
427 sender: oneshot::Sender<HashSet<PeerId>>,
428 },
429 RequestFile {
430 file_name: String,
431 peer: PeerId,
432 sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
433 },
434 RespondFile {
435 file: Vec<u8>,
436 channel: ResponseChannel<FileResponse>,
437 },
438}
439
440#[derive(Debug)]
441pub(crate) enum Event {
442 InboundRequest {
443 request: String,
444 channel: ResponseChannel<FileResponse>,
445 },
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
450struct FileRequest(String);
451#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
452pub(crate) struct FileResponse(Vec<u8>);