1use std::{io, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
5use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
6use libp2p_stream as stream;
7use rand::RngCore;
8use tracing::level_filters::LevelFilter;
9use tracing_subscriber::EnvFilter;
10
11const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");
12
13#[tokio::main]
14async fn main() -> Result<()> {
15 tracing_subscriber::fmt()
16 .with_env_filter(
17 EnvFilter::builder()
18 .with_default_directive(LevelFilter::INFO.into())
19 .from_env()?,
20 )
21 .init();
22
23 let maybe_address = std::env::args()
24 .nth(1)
25 .map(|arg| arg.parse::<Multiaddr>())
26 .transpose()
27 .context("Failed to parse argument as `Multiaddr`")?;
28
29 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
30 .with_tokio()
31 .with_quic()
32 .with_behaviour(|_| stream::Behaviour::new())?
33 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
34 .build();
35
36 swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;
37
38 let mut incoming_streams = swarm
39 .behaviour()
40 .new_control()
41 .accept(ECHO_PROTOCOL)
42 .unwrap();
43
44 tokio::spawn(async move {
50 while let Some((peer, stream)) = incoming_streams.next().await {
57 match echo(stream).await {
58 Ok(n) => {
59 tracing::info!(%peer, "Echoed {n} bytes!");
60 }
61 Err(e) => {
62 tracing::warn!(%peer, "Echo failed: {e}");
63 continue;
64 }
65 };
66 }
67 });
68
69 if let Some(address) = maybe_address {
71 let Some(Protocol::P2p(peer_id)) = address.iter().last() else {
72 anyhow::bail!("Provided address does not end in `/p2p`");
73 };
74
75 swarm.dial(address)?;
76
77 tokio::spawn(connection_handler(peer_id, swarm.behaviour().new_control()));
78 }
79
80 loop {
82 let event = swarm.next().await.expect("never terminates");
83
84 match event {
85 libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
86 let listen_address = address.with_p2p(*swarm.local_peer_id()).unwrap();
87 tracing::info!(%listen_address);
88 }
89 event => tracing::trace!(?event),
90 }
91 }
92}
93
94async fn connection_handler(peer: PeerId, mut control: stream::Control) {
96 loop {
97 tokio::time::sleep(Duration::from_secs(1)).await; let stream = match control.open_stream(peer, ECHO_PROTOCOL).await {
100 Ok(stream) => stream,
101 Err(error @ stream::OpenStreamError::UnsupportedProtocol(_)) => {
102 tracing::info!(%peer, %error);
103 return;
104 }
105 Err(error) => {
106 tracing::debug!(%peer, %error);
110 continue;
111 }
112 };
113
114 if let Err(e) = send(stream).await {
115 tracing::warn!(%peer, "Echo protocol failed: {e}");
116 continue;
117 }
118
119 tracing::info!(%peer, "Echo complete!")
120 }
121}
122
123async fn echo(mut stream: Stream) -> io::Result<usize> {
124 let mut total = 0;
125
126 let mut buf = [0u8; 100];
127
128 loop {
129 let read = stream.read(&mut buf).await?;
130 if read == 0 {
131 return Ok(total);
132 }
133
134 total += read;
135 stream.write_all(&buf[..read]).await?;
136 }
137}
138
139async fn send(mut stream: Stream) -> io::Result<()> {
140 let num_bytes = rand::random::<usize>() % 1000;
141
142 let mut bytes = vec![0; num_bytes];
143 rand::thread_rng().fill_bytes(&mut bytes);
144
145 stream.write_all(&bytes).await?;
146
147 let mut buf = vec![0; num_bytes];
148 stream.read_exact(&mut buf).await?;
149
150 if bytes != buf {
151 return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
152 }
153
154 stream.close().await?;
155
156 Ok(())
157}