libp2p_perf/
protocol.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::time::Duration;
22
23use futures::{
24    future::{select, Either},
25    AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt,
26};
27use futures_timer::Delay;
28use web_time::Instant;
29
30use crate::{Final, Intermediate, Run, RunDuration, RunParams, RunUpdate};
31
32const BUF: [u8; 1024] = [0; 1024];
33const REPORT_INTERVAL: Duration = Duration::from_secs(1);
34
35pub(crate) fn send_receive<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
36    params: RunParams,
37    stream: S,
38) -> impl Stream<Item = Result<RunUpdate, std::io::Error>> {
39    // Use a channel to simulate a generator. `send_receive_inner` can `yield` events through the
40    // channel.
41    let (sender, receiver) = futures::channel::mpsc::channel(0);
42    let receiver = receiver.fuse();
43    let inner = send_receive_inner(params, stream, sender).fuse();
44
45    futures::stream::select(
46        receiver.map(|progressed| Ok(RunUpdate::Intermediate(progressed))),
47        inner
48            .map(|finished| finished.map(RunUpdate::Final))
49            .into_stream(),
50    )
51}
52
53async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
54    params: RunParams,
55    mut stream: S,
56    mut progress: futures::channel::mpsc::Sender<Intermediate>,
57) -> Result<Final, std::io::Error> {
58    let mut delay = Delay::new(REPORT_INTERVAL);
59
60    let RunParams {
61        to_send,
62        to_receive,
63    } = params;
64
65    let mut receive_buf = vec![0; 1024];
66    let to_receive_bytes = (to_receive as u64).to_be_bytes();
67    stream.write_all(&to_receive_bytes).await?;
68
69    let write_start = Instant::now();
70    let mut intermittent_start = Instant::now();
71    let mut sent = 0;
72    let mut intermittent_sent = 0;
73
74    while sent < to_send {
75        let n = std::cmp::min(to_send - sent, BUF.len());
76        let buf = &BUF[..n];
77
78        let mut write = stream.write(buf);
79        sent += loop {
80            match select(&mut delay, &mut write).await {
81                Either::Left((_, _)) => {
82                    delay.reset(REPORT_INTERVAL);
83                    progress
84                        .send(Intermediate {
85                            duration: intermittent_start.elapsed(),
86                            sent: sent - intermittent_sent,
87                            received: 0,
88                        })
89                        .await
90                        .expect("receiver not to be dropped");
91                    intermittent_start = Instant::now();
92                    intermittent_sent = sent;
93                }
94                Either::Right((n, _)) => break n?,
95            }
96        }
97    }
98
99    loop {
100        match select(&mut delay, stream.close()).await {
101            Either::Left((_, _)) => {
102                delay.reset(REPORT_INTERVAL);
103                progress
104                    .send(Intermediate {
105                        duration: intermittent_start.elapsed(),
106                        sent: sent - intermittent_sent,
107                        received: 0,
108                    })
109                    .await
110                    .expect("receiver not to be dropped");
111                intermittent_start = Instant::now();
112                intermittent_sent = sent;
113            }
114            Either::Right((Ok(_), _)) => break,
115            Either::Right((Err(e), _)) => return Err(e),
116        }
117    }
118
119    let write_done = Instant::now();
120    let mut received = 0;
121    let mut intermittend_received = 0;
122
123    while received < to_receive {
124        let mut read = stream.read(&mut receive_buf);
125        received += loop {
126            match select(&mut delay, &mut read).await {
127                Either::Left((_, _)) => {
128                    delay.reset(REPORT_INTERVAL);
129                    progress
130                        .send(Intermediate {
131                            duration: intermittent_start.elapsed(),
132                            sent: sent - intermittent_sent,
133                            received: received - intermittend_received,
134                        })
135                        .await
136                        .expect("receiver not to be dropped");
137                    intermittent_start = Instant::now();
138                    intermittent_sent = sent;
139                    intermittend_received = received;
140                }
141                Either::Right((n, _)) => break n?,
142            }
143        }
144    }
145
146    let read_done = Instant::now();
147
148    Ok(Final {
149        duration: RunDuration {
150            upload: write_done.duration_since(write_start),
151            download: read_done.duration_since(write_done),
152        },
153    })
154}
155
156pub(crate) async fn receive_send<S: AsyncRead + AsyncWrite + Unpin>(
157    mut stream: S,
158) -> Result<Run, std::io::Error> {
159    let to_send = {
160        let mut buf = [0; 8];
161        stream.read_exact(&mut buf).await?;
162
163        u64::from_be_bytes(buf) as usize
164    };
165
166    let read_start = Instant::now();
167
168    let mut receive_buf = vec![0; 1024];
169    let mut received = 0;
170    loop {
171        let n = stream.read(&mut receive_buf).await?;
172        received += n;
173        if n == 0 {
174            break;
175        }
176    }
177
178    let read_done = Instant::now();
179
180    let mut sent = 0;
181    while sent < to_send {
182        let n = std::cmp::min(to_send - sent, BUF.len());
183        let buf = &BUF[..n];
184
185        sent += stream.write(buf).await?;
186    }
187
188    stream.close().await?;
189    let write_done = Instant::now();
190
191    Ok(Run {
192        params: RunParams {
193            to_send: sent,
194            to_receive: received,
195        },
196        duration: RunDuration {
197            upload: write_done.duration_since(read_done),
198            download: read_done.duration_since(read_start),
199        },
200    })
201}