1use std::time::Duration;
22
23use futures::{
24 future::{select, Either},
25 AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt,
26};
27use futures_timer::Delay;
28use web_time::Instant;
29
30use crate::{Final, Intermediate, Run, RunDuration, RunParams, RunUpdate};
31
32const BUF: [u8; 1024] = [0; 1024];
33const REPORT_INTERVAL: Duration = Duration::from_secs(1);
34
35pub(crate) fn send_receive<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
36 params: RunParams,
37 stream: S,
38) -> impl Stream<Item = Result<RunUpdate, std::io::Error>> {
39 let (sender, receiver) = futures::channel::mpsc::channel(0);
42 let receiver = receiver.fuse();
43 let inner = send_receive_inner(params, stream, sender).fuse();
44
45 futures::stream::select(
46 receiver.map(|progressed| Ok(RunUpdate::Intermediate(progressed))),
47 inner
48 .map(|finished| finished.map(RunUpdate::Final))
49 .into_stream(),
50 )
51}
52
53async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
54 params: RunParams,
55 mut stream: S,
56 mut progress: futures::channel::mpsc::Sender<Intermediate>,
57) -> Result<Final, std::io::Error> {
58 let mut delay = Delay::new(REPORT_INTERVAL);
59
60 let RunParams {
61 to_send,
62 to_receive,
63 } = params;
64
65 let mut receive_buf = vec![0; 1024];
66 let to_receive_bytes = (to_receive as u64).to_be_bytes();
67 stream.write_all(&to_receive_bytes).await?;
68
69 let write_start = Instant::now();
70 let mut intermittent_start = Instant::now();
71 let mut sent = 0;
72 let mut intermittent_sent = 0;
73
74 while sent < to_send {
75 let n = std::cmp::min(to_send - sent, BUF.len());
76 let buf = &BUF[..n];
77
78 let mut write = stream.write(buf);
79 sent += loop {
80 match select(&mut delay, &mut write).await {
81 Either::Left((_, _)) => {
82 delay.reset(REPORT_INTERVAL);
83 progress
84 .send(Intermediate {
85 duration: intermittent_start.elapsed(),
86 sent: sent - intermittent_sent,
87 received: 0,
88 })
89 .await
90 .expect("receiver not to be dropped");
91 intermittent_start = Instant::now();
92 intermittent_sent = sent;
93 }
94 Either::Right((n, _)) => break n?,
95 }
96 }
97 }
98
99 loop {
100 match select(&mut delay, stream.close()).await {
101 Either::Left((_, _)) => {
102 delay.reset(REPORT_INTERVAL);
103 progress
104 .send(Intermediate {
105 duration: intermittent_start.elapsed(),
106 sent: sent - intermittent_sent,
107 received: 0,
108 })
109 .await
110 .expect("receiver not to be dropped");
111 intermittent_start = Instant::now();
112 intermittent_sent = sent;
113 }
114 Either::Right((Ok(_), _)) => break,
115 Either::Right((Err(e), _)) => return Err(e),
116 }
117 }
118
119 let write_done = Instant::now();
120 let mut received = 0;
121 let mut intermittend_received = 0;
122
123 while received < to_receive {
124 let mut read = stream.read(&mut receive_buf);
125 received += loop {
126 match select(&mut delay, &mut read).await {
127 Either::Left((_, _)) => {
128 delay.reset(REPORT_INTERVAL);
129 progress
130 .send(Intermediate {
131 duration: intermittent_start.elapsed(),
132 sent: sent - intermittent_sent,
133 received: received - intermittend_received,
134 })
135 .await
136 .expect("receiver not to be dropped");
137 intermittent_start = Instant::now();
138 intermittent_sent = sent;
139 intermittend_received = received;
140 }
141 Either::Right((n, _)) => break n?,
142 }
143 }
144 }
145
146 let read_done = Instant::now();
147
148 Ok(Final {
149 duration: RunDuration {
150 upload: write_done.duration_since(write_start),
151 download: read_done.duration_since(write_done),
152 },
153 })
154}
155
156pub(crate) async fn receive_send<S: AsyncRead + AsyncWrite + Unpin>(
157 mut stream: S,
158) -> Result<Run, std::io::Error> {
159 let to_send = {
160 let mut buf = [0; 8];
161 stream.read_exact(&mut buf).await?;
162
163 u64::from_be_bytes(buf) as usize
164 };
165
166 let read_start = Instant::now();
167
168 let mut receive_buf = vec![0; 1024];
169 let mut received = 0;
170 loop {
171 let n = stream.read(&mut receive_buf).await?;
172 received += n;
173 if n == 0 {
174 break;
175 }
176 }
177
178 let read_done = Instant::now();
179
180 let mut sent = 0;
181 while sent < to_send {
182 let n = std::cmp::min(to_send - sent, BUF.len());
183 let buf = &BUF[..n];
184
185 sent += stream.write(buf).await?;
186 }
187
188 stream.close().await?;
189 let write_done = Instant::now();
190
191 Ok(Run {
192 params: RunParams {
193 to_send: sent,
194 to_receive: received,
195 },
196 duration: RunDuration {
197 upload: write_done.duration_since(read_done),
198 download: read_done.duration_since(read_start),
199 },
200 })
201}