1use std::{net::SocketAddr, str::FromStr};
22
23use anyhow::{bail, Result};
24use clap::Parser;
25use futures::StreamExt;
26use libp2p::{
27 core::{multiaddr::Protocol, upgrade, Multiaddr},
28 identity::PeerId,
29 swarm::{NetworkBehaviour, Swarm, SwarmEvent},
30 SwarmBuilder,
31};
32use libp2p_perf::{client, server, Final, Intermediate, Run, RunParams, RunUpdate};
33use serde::{Deserialize, Serialize};
34use tracing_subscriber::EnvFilter;
35use web_time::{Duration, Instant};
36
37#[derive(Debug, Parser)]
38#[command(name = "libp2p perf client")]
39struct Opts {
40 #[arg(long)]
41 server_address: Option<SocketAddr>,
42 #[arg(long)]
43 transport: Option<Transport>,
44 #[arg(long)]
45 upload_bytes: Option<usize>,
46 #[arg(long)]
47 download_bytes: Option<usize>,
48
49 #[arg(long)]
51 run_server: bool,
52}
53
54#[derive(Clone, Debug)]
56pub enum Transport {
57 Tcp,
58 QuicV1,
59}
60
61impl FromStr for Transport {
62 type Err = anyhow::Error;
63
64 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
65 Ok(match s {
66 "tcp" => Self::Tcp,
67 "quic-v1" => Self::QuicV1,
68 other => bail!("unknown transport {other}"),
69 })
70 }
71}
72
73#[tokio::main]
74async fn main() -> Result<()> {
75 let _ = tracing_subscriber::fmt()
76 .with_env_filter(EnvFilter::from_default_env())
77 .try_init();
78
79 let opts = Opts::parse();
80 match opts {
81 Opts {
82 server_address: Some(server_address),
83 transport: None,
84 upload_bytes: None,
85 download_bytes: None,
86 run_server: true,
87 } => server(server_address).await?,
88 Opts {
89 server_address: Some(server_address),
90 transport: Some(transport),
91 upload_bytes,
92 download_bytes,
93 run_server: false,
94 } => {
95 client(server_address, transport, upload_bytes, download_bytes).await?;
96 }
97 _ => panic!("invalid command line arguments: {opts:?}"),
98 };
99
100 Ok(())
101}
102
103async fn server(server_address: SocketAddr) -> Result<()> {
104 let mut swarm = swarm::<libp2p_perf::server::Behaviour>().await?;
105
106 swarm.listen_on(
107 Multiaddr::empty()
108 .with(server_address.ip().into())
109 .with(Protocol::Tcp(server_address.port())),
110 )?;
111
112 swarm
113 .listen_on(
114 Multiaddr::empty()
115 .with(server_address.ip().into())
116 .with(Protocol::Udp(server_address.port()))
117 .with(Protocol::QuicV1),
118 )
119 .unwrap();
120
121 tokio::spawn(async move {
122 loop {
123 match swarm.next().await.unwrap() {
124 SwarmEvent::NewListenAddr { address, .. } => {
125 tracing::info!(%address, "Listening on address");
126 }
127 SwarmEvent::IncomingConnection { .. } => {}
128 e @ SwarmEvent::IncomingConnectionError { .. } => {
129 tracing::error!("{e:?}");
130 }
131 SwarmEvent::ConnectionEstablished {
132 peer_id, endpoint, ..
133 } => {
134 tracing::info!(peer=%peer_id, ?endpoint, "Established new connection");
135 }
136 SwarmEvent::ConnectionClosed { .. } => {}
137 SwarmEvent::Behaviour(server::Event { .. }) => {
138 tracing::info!("Finished run",)
139 }
140 e => panic!("{e:?}"),
141 }
142 }
143 })
144 .await
145 .unwrap();
146
147 Ok(())
148}
149
150async fn client(
151 server_address: SocketAddr,
152 transport: Transport,
153 upload_bytes: Option<usize>,
154 download_bytes: Option<usize>,
155) -> Result<()> {
156 let server_address = match transport {
157 Transport::Tcp => Multiaddr::empty()
158 .with(server_address.ip().into())
159 .with(Protocol::Tcp(server_address.port())),
160 Transport::QuicV1 => Multiaddr::empty()
161 .with(server_address.ip().into())
162 .with(Protocol::Udp(server_address.port()))
163 .with(Protocol::QuicV1),
164 };
165 let params = RunParams {
166 to_send: upload_bytes.unwrap(),
167 to_receive: download_bytes.unwrap(),
168 };
169 let mut swarm = swarm().await?;
170
171 tokio::spawn(async move {
172 tracing::info!("start benchmark: custom");
173
174 let start = Instant::now();
175
176 let server_peer_id = connect(&mut swarm, server_address.clone()).await?;
177
178 perf(&mut swarm, server_peer_id, params).await?;
179
180 println!(
181 "{}",
182 serde_json::to_string(&BenchmarkResult {
183 upload_bytes: params.to_send,
184 download_bytes: params.to_receive,
185 r#type: "final".to_string(),
186 time_seconds: start.elapsed().as_secs_f64(),
187 })
188 .unwrap()
189 );
190
191 anyhow::Ok(())
192 })
193 .await??;
194
195 Ok(())
196}
197
198#[derive(Serialize, Deserialize)]
199#[serde(rename_all = "camelCase")]
200struct BenchmarkResult {
201 r#type: String,
202 time_seconds: f64,
203 upload_bytes: usize,
204 download_bytes: usize,
205}
206
207async fn swarm<B: NetworkBehaviour + Default>() -> Result<Swarm<B>> {
208 let swarm = SwarmBuilder::with_new_identity()
209 .with_tokio()
210 .with_tcp(
211 libp2p_tcp::Config::default().nodelay(true),
212 libp2p_tls::Config::new,
213 libp2p_yamux::Config::default,
214 )?
215 .with_quic()
216 .with_dns()?
217 .with_behaviour(|_| B::default())?
218 .with_swarm_config(|cfg| {
219 cfg.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
220 .with_idle_connection_timeout(Duration::from_secs(60 * 5))
221 })
222 .build();
223
224 Ok(swarm)
225}
226
227async fn connect(
228 swarm: &mut Swarm<client::Behaviour>,
229 server_address: Multiaddr,
230) -> Result<PeerId> {
231 let start = Instant::now();
232 swarm.dial(server_address.clone()).unwrap();
233
234 let server_peer_id = match swarm.next().await.unwrap() {
235 SwarmEvent::ConnectionEstablished { peer_id, .. } => peer_id,
236 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
237 bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
238 }
239 e => panic!("{e:?}"),
240 };
241
242 let duration = start.elapsed();
243 let duration_seconds = duration.as_secs_f64();
244
245 tracing::info!(elapsed_time=%format!("{duration_seconds:.4} s"));
246
247 Ok(server_peer_id)
248}
249
250async fn perf(
251 swarm: &mut Swarm<client::Behaviour>,
252 server_peer_id: PeerId,
253 params: RunParams,
254) -> Result<Run> {
255 swarm.behaviour_mut().perf(server_peer_id, params)?;
256
257 let duration = loop {
258 match swarm.next().await.unwrap() {
259 SwarmEvent::Behaviour(client::Event {
260 id: _,
261 result: Ok(RunUpdate::Intermediate(progressed)),
262 }) => {
263 tracing::info!("{progressed}");
264
265 let Intermediate {
266 duration,
267 sent,
268 received,
269 } = progressed;
270
271 println!(
272 "{}",
273 serde_json::to_string(&BenchmarkResult {
274 r#type: "intermediate".to_string(),
275 time_seconds: duration.as_secs_f64(),
276 upload_bytes: sent,
277 download_bytes: received,
278 })
279 .unwrap()
280 );
281 }
282 SwarmEvent::Behaviour(client::Event {
283 id: _,
284 result: Ok(RunUpdate::Final(Final { duration })),
285 }) => break duration,
286 e => panic!("{e:?}"),
287 };
288 };
289
290 let run = Run { params, duration };
291
292 tracing::info!("{run}");
293
294 Ok(run)
295}