file_sharing_example/
network.rs

1use std::{
2    collections::{hash_map, HashMap, HashSet},
3    error::Error,
4    time::Duration,
5};
6
7use futures::{
8    channel::{mpsc, oneshot},
9    prelude::*,
10    StreamExt,
11};
12use libp2p::{
13    core::Multiaddr,
14    identity, kad,
15    multiaddr::Protocol,
16    noise,
17    request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
18    swarm::{NetworkBehaviour, Swarm, SwarmEvent},
19    tcp, yamux, PeerId, StreamProtocol,
20};
21use serde::{Deserialize, Serialize};
22
23/// Creates the network components, namely:
24///
25/// - The network client to interact with the network layer from anywhere within your application.
26///
27/// - The network event stream, e.g. for incoming requests.
28///
29/// - The network task driving the network itself.
30pub(crate) async fn new(
31    secret_key_seed: Option<u8>,
32) -> Result<(Client, impl Stream<Item = Event>, EventLoop), Box<dyn Error>> {
33    // Create a public/private key pair, either random or based on a seed.
34    let id_keys = match secret_key_seed {
35        Some(seed) => {
36            let mut bytes = [0u8; 32];
37            bytes[0] = seed;
38            identity::Keypair::ed25519_from_bytes(bytes).unwrap()
39        }
40        None => identity::Keypair::generate_ed25519(),
41    };
42    let peer_id = id_keys.public().to_peer_id();
43
44    let mut swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
45        .with_tokio()
46        .with_tcp(
47            tcp::Config::default(),
48            noise::Config::new,
49            yamux::Config::default,
50        )?
51        .with_behaviour(|key| Behaviour {
52            kademlia: kad::Behaviour::new(
53                peer_id,
54                kad::store::MemoryStore::new(key.public().to_peer_id()),
55            ),
56            request_response: request_response::cbor::Behaviour::new(
57                [(
58                    StreamProtocol::new("/file-exchange/1"),
59                    ProtocolSupport::Full,
60                )],
61                request_response::Config::default(),
62            ),
63        })?
64        .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
65        .build();
66
67    swarm
68        .behaviour_mut()
69        .kademlia
70        .set_mode(Some(kad::Mode::Server));
71
72    let (command_sender, command_receiver) = mpsc::channel(0);
73    let (event_sender, event_receiver) = mpsc::channel(0);
74
75    Ok((
76        Client {
77            sender: command_sender,
78        },
79        event_receiver,
80        EventLoop::new(swarm, command_receiver, event_sender),
81    ))
82}
83
84#[derive(Clone)]
85pub(crate) struct Client {
86    sender: mpsc::Sender<Command>,
87}
88
89impl Client {
90    /// Listen for incoming connections on the given address.
91    pub(crate) async fn start_listening(
92        &mut self,
93        addr: Multiaddr,
94    ) -> Result<(), Box<dyn Error + Send>> {
95        let (sender, receiver) = oneshot::channel();
96        self.sender
97            .send(Command::StartListening { addr, sender })
98            .await
99            .expect("Command receiver not to be dropped.");
100        receiver.await.expect("Sender not to be dropped.")
101    }
102
103    /// Dial the given peer at the given address.
104    pub(crate) async fn dial(
105        &mut self,
106        peer_id: PeerId,
107        peer_addr: Multiaddr,
108    ) -> Result<(), Box<dyn Error + Send>> {
109        let (sender, receiver) = oneshot::channel();
110        self.sender
111            .send(Command::Dial {
112                peer_id,
113                peer_addr,
114                sender,
115            })
116            .await
117            .expect("Command receiver not to be dropped.");
118        receiver.await.expect("Sender not to be dropped.")
119    }
120
121    /// Advertise the local node as the provider of the given file on the DHT.
122    pub(crate) async fn start_providing(&mut self, file_name: String) {
123        let (sender, receiver) = oneshot::channel();
124        self.sender
125            .send(Command::StartProviding { file_name, sender })
126            .await
127            .expect("Command receiver not to be dropped.");
128        receiver.await.expect("Sender not to be dropped.");
129    }
130
131    /// Find the providers for the given file on the DHT.
132    pub(crate) async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
133        let (sender, receiver) = oneshot::channel();
134        self.sender
135            .send(Command::GetProviders { file_name, sender })
136            .await
137            .expect("Command receiver not to be dropped.");
138        receiver.await.expect("Sender not to be dropped.")
139    }
140
141    /// Request the content of the given file from the given peer.
142    pub(crate) async fn request_file(
143        &mut self,
144        peer: PeerId,
145        file_name: String,
146    ) -> Result<Vec<u8>, Box<dyn Error + Send>> {
147        let (sender, receiver) = oneshot::channel();
148        self.sender
149            .send(Command::RequestFile {
150                file_name,
151                peer,
152                sender,
153            })
154            .await
155            .expect("Command receiver not to be dropped.");
156        receiver.await.expect("Sender not be dropped.")
157    }
158
159    /// Respond with the provided file content to the given request.
160    pub(crate) async fn respond_file(
161        &mut self,
162        file: Vec<u8>,
163        channel: ResponseChannel<FileResponse>,
164    ) {
165        self.sender
166            .send(Command::RespondFile { file, channel })
167            .await
168            .expect("Command receiver not to be dropped.");
169    }
170}
171
172pub(crate) struct EventLoop {
173    swarm: Swarm<Behaviour>,
174    command_receiver: mpsc::Receiver<Command>,
175    event_sender: mpsc::Sender<Event>,
176    pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
177    pending_start_providing: HashMap<kad::QueryId, oneshot::Sender<()>>,
178    pending_get_providers: HashMap<kad::QueryId, oneshot::Sender<HashSet<PeerId>>>,
179    pending_request_file:
180        HashMap<OutboundRequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
181}
182
183impl EventLoop {
184    fn new(
185        swarm: Swarm<Behaviour>,
186        command_receiver: mpsc::Receiver<Command>,
187        event_sender: mpsc::Sender<Event>,
188    ) -> Self {
189        Self {
190            swarm,
191            command_receiver,
192            event_sender,
193            pending_dial: Default::default(),
194            pending_start_providing: Default::default(),
195            pending_get_providers: Default::default(),
196            pending_request_file: Default::default(),
197        }
198    }
199
200    pub(crate) async fn run(mut self) {
201        loop {
202            tokio::select! {
203                event = self.swarm.select_next_some() => self.handle_event(event).await,
204                command = self.command_receiver.next() => match command {
205                    Some(c) => self.handle_command(c).await,
206                    // Command channel closed, thus shutting down the network event loop.
207                    None=>  return,
208                },
209            }
210        }
211    }
212
213    async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
214        match event {
215            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
216                kad::Event::OutboundQueryProgressed {
217                    id,
218                    result: kad::QueryResult::StartProviding(_),
219                    ..
220                },
221            )) => {
222                let sender: oneshot::Sender<()> = self
223                    .pending_start_providing
224                    .remove(&id)
225                    .expect("Completed query to be previously pending.");
226                let _ = sender.send(());
227            }
228            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
229                kad::Event::OutboundQueryProgressed {
230                    id,
231                    result:
232                        kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
233                            providers,
234                            ..
235                        })),
236                    ..
237                },
238            )) => {
239                if let Some(sender) = self.pending_get_providers.remove(&id) {
240                    sender.send(providers).expect("Receiver not to be dropped");
241
242                    // Finish the query. We are only interested in the first result.
243                    self.swarm
244                        .behaviour_mut()
245                        .kademlia
246                        .query_mut(&id)
247                        .unwrap()
248                        .finish();
249                }
250            }
251            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
252                kad::Event::OutboundQueryProgressed {
253                    result:
254                        kad::QueryResult::GetProviders(Ok(
255                            kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
256                        )),
257                    ..
258                },
259            )) => {}
260            SwarmEvent::Behaviour(BehaviourEvent::Kademlia(_)) => {}
261            SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
262                request_response::Event::Message { message, .. },
263            )) => match message {
264                request_response::Message::Request {
265                    request, channel, ..
266                } => {
267                    self.event_sender
268                        .send(Event::InboundRequest {
269                            request: request.0,
270                            channel,
271                        })
272                        .await
273                        .expect("Event receiver not to be dropped.");
274                }
275                request_response::Message::Response {
276                    request_id,
277                    response,
278                } => {
279                    let _ = self
280                        .pending_request_file
281                        .remove(&request_id)
282                        .expect("Request to still be pending.")
283                        .send(Ok(response.0));
284                }
285            },
286            SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
287                request_response::Event::OutboundFailure {
288                    request_id, error, ..
289                },
290            )) => {
291                let _ = self
292                    .pending_request_file
293                    .remove(&request_id)
294                    .expect("Request to still be pending.")
295                    .send(Err(Box::new(error)));
296            }
297            SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
298                request_response::Event::ResponseSent { .. },
299            )) => {}
300            SwarmEvent::NewListenAddr { address, .. } => {
301                let local_peer_id = *self.swarm.local_peer_id();
302                eprintln!(
303                    "Local node is listening on {:?}",
304                    address.with(Protocol::P2p(local_peer_id))
305                );
306            }
307            SwarmEvent::IncomingConnection { .. } => {}
308            SwarmEvent::ConnectionEstablished {
309                peer_id, endpoint, ..
310            } => {
311                if endpoint.is_dialer() {
312                    if let Some(sender) = self.pending_dial.remove(&peer_id) {
313                        let _ = sender.send(Ok(()));
314                    }
315                }
316            }
317            SwarmEvent::ConnectionClosed { .. } => {}
318            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
319                if let Some(peer_id) = peer_id {
320                    if let Some(sender) = self.pending_dial.remove(&peer_id) {
321                        let _ = sender.send(Err(Box::new(error)));
322                    }
323                }
324            }
325            SwarmEvent::IncomingConnectionError { .. } => {}
326            SwarmEvent::Dialing {
327                peer_id: Some(peer_id),
328                ..
329            } => eprintln!("Dialing {peer_id}"),
330            e => panic!("{e:?}"),
331        }
332    }
333
334    async fn handle_command(&mut self, command: Command) {
335        match command {
336            Command::StartListening { addr, sender } => {
337                let _ = match self.swarm.listen_on(addr) {
338                    Ok(_) => sender.send(Ok(())),
339                    Err(e) => sender.send(Err(Box::new(e))),
340                };
341            }
342            Command::Dial {
343                peer_id,
344                peer_addr,
345                sender,
346            } => {
347                if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) {
348                    self.swarm
349                        .behaviour_mut()
350                        .kademlia
351                        .add_address(&peer_id, peer_addr.clone());
352                    match self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id))) {
353                        Ok(()) => {
354                            e.insert(sender);
355                        }
356                        Err(e) => {
357                            let _ = sender.send(Err(Box::new(e)));
358                        }
359                    }
360                } else {
361                    todo!("Already dialing peer.");
362                }
363            }
364            Command::StartProviding { file_name, sender } => {
365                let query_id = self
366                    .swarm
367                    .behaviour_mut()
368                    .kademlia
369                    .start_providing(file_name.into_bytes().into())
370                    .expect("No store error.");
371                self.pending_start_providing.insert(query_id, sender);
372            }
373            Command::GetProviders { file_name, sender } => {
374                let query_id = self
375                    .swarm
376                    .behaviour_mut()
377                    .kademlia
378                    .get_providers(file_name.into_bytes().into());
379                self.pending_get_providers.insert(query_id, sender);
380            }
381            Command::RequestFile {
382                file_name,
383                peer,
384                sender,
385            } => {
386                let request_id = self
387                    .swarm
388                    .behaviour_mut()
389                    .request_response
390                    .send_request(&peer, FileRequest(file_name));
391                self.pending_request_file.insert(request_id, sender);
392            }
393            Command::RespondFile { file, channel } => {
394                self.swarm
395                    .behaviour_mut()
396                    .request_response
397                    .send_response(channel, FileResponse(file))
398                    .expect("Connection to peer to be still open.");
399            }
400        }
401    }
402}
403
404#[derive(NetworkBehaviour)]
405struct Behaviour {
406    request_response: request_response::cbor::Behaviour<FileRequest, FileResponse>,
407    kademlia: kad::Behaviour<kad::store::MemoryStore>,
408}
409
410#[derive(Debug)]
411enum Command {
412    StartListening {
413        addr: Multiaddr,
414        sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
415    },
416    Dial {
417        peer_id: PeerId,
418        peer_addr: Multiaddr,
419        sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
420    },
421    StartProviding {
422        file_name: String,
423        sender: oneshot::Sender<()>,
424    },
425    GetProviders {
426        file_name: String,
427        sender: oneshot::Sender<HashSet<PeerId>>,
428    },
429    RequestFile {
430        file_name: String,
431        peer: PeerId,
432        sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
433    },
434    RespondFile {
435        file: Vec<u8>,
436        channel: ResponseChannel<FileResponse>,
437    },
438}
439
440#[derive(Debug)]
441pub(crate) enum Event {
442    InboundRequest {
443        request: String,
444        channel: ResponseChannel<FileResponse>,
445    },
446}
447
448// Simple file exchange protocol
449#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
450struct FileRequest(String);
451#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
452pub(crate) struct FileResponse(Vec<u8>);