perf/
perf.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{net::SocketAddr, str::FromStr};
22
23use anyhow::{bail, Result};
24use clap::Parser;
25use futures::StreamExt;
26use libp2p::{
27    core::{multiaddr::Protocol, upgrade, Multiaddr},
28    identity::PeerId,
29    swarm::{NetworkBehaviour, Swarm, SwarmEvent},
30    SwarmBuilder,
31};
32use libp2p_perf::{client, server, Final, Intermediate, Run, RunParams, RunUpdate};
33use serde::{Deserialize, Serialize};
34use tracing_subscriber::EnvFilter;
35use web_time::{Duration, Instant};
36
37#[derive(Debug, Parser)]
38#[command(name = "libp2p perf client")]
39struct Opts {
40    #[arg(long)]
41    server_address: Option<SocketAddr>,
42    #[arg(long)]
43    transport: Option<Transport>,
44    #[arg(long)]
45    upload_bytes: Option<usize>,
46    #[arg(long)]
47    download_bytes: Option<usize>,
48
49    /// Run in server mode.
50    #[arg(long)]
51    run_server: bool,
52}
53
54/// Supported transports by rust-libp2p.
55#[derive(Clone, Debug)]
56pub enum Transport {
57    Tcp,
58    QuicV1,
59}
60
61impl FromStr for Transport {
62    type Err = anyhow::Error;
63
64    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
65        Ok(match s {
66            "tcp" => Self::Tcp,
67            "quic-v1" => Self::QuicV1,
68            other => bail!("unknown transport {other}"),
69        })
70    }
71}
72
73#[tokio::main]
74async fn main() -> Result<()> {
75    let _ = tracing_subscriber::fmt()
76        .with_env_filter(EnvFilter::from_default_env())
77        .try_init();
78
79    let opts = Opts::parse();
80    match opts {
81        Opts {
82            server_address: Some(server_address),
83            transport: None,
84            upload_bytes: None,
85            download_bytes: None,
86            run_server: true,
87        } => server(server_address).await?,
88        Opts {
89            server_address: Some(server_address),
90            transport: Some(transport),
91            upload_bytes,
92            download_bytes,
93            run_server: false,
94        } => {
95            client(server_address, transport, upload_bytes, download_bytes).await?;
96        }
97        _ => panic!("invalid command line arguments: {opts:?}"),
98    };
99
100    Ok(())
101}
102
103async fn server(server_address: SocketAddr) -> Result<()> {
104    let mut swarm = swarm::<libp2p_perf::server::Behaviour>().await?;
105
106    swarm.listen_on(
107        Multiaddr::empty()
108            .with(server_address.ip().into())
109            .with(Protocol::Tcp(server_address.port())),
110    )?;
111
112    swarm
113        .listen_on(
114            Multiaddr::empty()
115                .with(server_address.ip().into())
116                .with(Protocol::Udp(server_address.port()))
117                .with(Protocol::QuicV1),
118        )
119        .unwrap();
120
121    tokio::spawn(async move {
122        loop {
123            match swarm.next().await.unwrap() {
124                SwarmEvent::NewListenAddr { address, .. } => {
125                    tracing::info!(%address, "Listening on address");
126                }
127                SwarmEvent::IncomingConnection { .. } => {}
128                e @ SwarmEvent::IncomingConnectionError { .. } => {
129                    tracing::error!("{e:?}");
130                }
131                SwarmEvent::ConnectionEstablished {
132                    peer_id, endpoint, ..
133                } => {
134                    tracing::info!(peer=%peer_id, ?endpoint, "Established new connection");
135                }
136                SwarmEvent::ConnectionClosed { .. } => {}
137                SwarmEvent::Behaviour(server::Event { .. }) => {
138                    tracing::info!("Finished run",)
139                }
140                e => panic!("{e:?}"),
141            }
142        }
143    })
144    .await
145    .unwrap();
146
147    Ok(())
148}
149
150async fn client(
151    server_address: SocketAddr,
152    transport: Transport,
153    upload_bytes: Option<usize>,
154    download_bytes: Option<usize>,
155) -> Result<()> {
156    let server_address = match transport {
157        Transport::Tcp => Multiaddr::empty()
158            .with(server_address.ip().into())
159            .with(Protocol::Tcp(server_address.port())),
160        Transport::QuicV1 => Multiaddr::empty()
161            .with(server_address.ip().into())
162            .with(Protocol::Udp(server_address.port()))
163            .with(Protocol::QuicV1),
164    };
165    let params = RunParams {
166        to_send: upload_bytes.unwrap(),
167        to_receive: download_bytes.unwrap(),
168    };
169    let mut swarm = swarm().await?;
170
171    tokio::spawn(async move {
172        tracing::info!("start benchmark: custom");
173
174        let start = Instant::now();
175
176        let server_peer_id = connect(&mut swarm, server_address.clone()).await?;
177
178        perf(&mut swarm, server_peer_id, params).await?;
179
180        println!(
181            "{}",
182            serde_json::to_string(&BenchmarkResult {
183                upload_bytes: params.to_send,
184                download_bytes: params.to_receive,
185                r#type: "final".to_string(),
186                time_seconds: start.elapsed().as_secs_f64(),
187            })
188            .unwrap()
189        );
190
191        anyhow::Ok(())
192    })
193    .await??;
194
195    Ok(())
196}
197
198#[derive(Serialize, Deserialize)]
199#[serde(rename_all = "camelCase")]
200struct BenchmarkResult {
201    r#type: String,
202    time_seconds: f64,
203    upload_bytes: usize,
204    download_bytes: usize,
205}
206
207async fn swarm<B: NetworkBehaviour + Default>() -> Result<Swarm<B>> {
208    let swarm = SwarmBuilder::with_new_identity()
209        .with_tokio()
210        .with_tcp(
211            libp2p_tcp::Config::default().nodelay(true),
212            libp2p_tls::Config::new,
213            libp2p_yamux::Config::default,
214        )?
215        .with_quic()
216        .with_dns()?
217        .with_behaviour(|_| B::default())?
218        .with_swarm_config(|cfg| {
219            cfg.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
220                .with_idle_connection_timeout(Duration::from_secs(60 * 5))
221        })
222        .build();
223
224    Ok(swarm)
225}
226
227async fn connect(
228    swarm: &mut Swarm<client::Behaviour>,
229    server_address: Multiaddr,
230) -> Result<PeerId> {
231    let start = Instant::now();
232    swarm.dial(server_address.clone()).unwrap();
233
234    let server_peer_id = match swarm.next().await.unwrap() {
235        SwarmEvent::ConnectionEstablished { peer_id, .. } => peer_id,
236        SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
237            bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
238        }
239        e => panic!("{e:?}"),
240    };
241
242    let duration = start.elapsed();
243    let duration_seconds = duration.as_secs_f64();
244
245    tracing::info!(elapsed_time=%format!("{duration_seconds:.4} s"));
246
247    Ok(server_peer_id)
248}
249
250async fn perf(
251    swarm: &mut Swarm<client::Behaviour>,
252    server_peer_id: PeerId,
253    params: RunParams,
254) -> Result<Run> {
255    swarm.behaviour_mut().perf(server_peer_id, params)?;
256
257    let duration = loop {
258        match swarm.next().await.unwrap() {
259            SwarmEvent::Behaviour(client::Event {
260                id: _,
261                result: Ok(RunUpdate::Intermediate(progressed)),
262            }) => {
263                tracing::info!("{progressed}");
264
265                let Intermediate {
266                    duration,
267                    sent,
268                    received,
269                } = progressed;
270
271                println!(
272                    "{}",
273                    serde_json::to_string(&BenchmarkResult {
274                        r#type: "intermediate".to_string(),
275                        time_seconds: duration.as_secs_f64(),
276                        upload_bytes: sent,
277                        download_bytes: received,
278                    })
279                    .unwrap()
280                );
281            }
282            SwarmEvent::Behaviour(client::Event {
283                id: _,
284                result: Ok(RunUpdate::Final(Final { duration })),
285            }) => break duration,
286            e => panic!("{e:?}"),
287        };
288    };
289
290    let run = Run { params, duration };
291
292    tracing::info!("{run}");
293
294    Ok(run)
295}