libp2p_ping/protocol.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{io, time::Duration};
22
23use futures::prelude::*;
24use libp2p_swarm::StreamProtocol;
25use rand::{distributions, prelude::*};
26use web_time::Instant;
27
28pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0");
29
30/// The `Ping` protocol upgrade.
31///
32/// The ping protocol sends 32 bytes of random data in configurable
33/// intervals over a single outbound substream, expecting to receive
34/// the same bytes as a response. At the same time, incoming pings
35/// on inbound substreams are answered by sending back the received bytes.
36///
37/// At most a single inbound and outbound substream is kept open at
38/// any time. In case of a ping timeout or another error on a substream, the
39/// substream is dropped.
40///
41/// Successful pings report the round-trip time.
42///
43/// > **Note**: The round-trip time of a ping may be subject to delays induced
44/// > by the underlying transport, e.g. in the case of TCP there is
45/// > Nagle's algorithm, delayed acks and similar configuration options
46/// > which can affect latencies especially on otherwise low-volume
47/// > connections.
48const PING_SIZE: usize = 32;
49
50/// Sends a ping and waits for the pong.
51pub(crate) async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
52where
53 S: AsyncRead + AsyncWrite + Unpin,
54{
55 let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
56 stream.write_all(&payload).await?;
57 stream.flush().await?;
58 let started = Instant::now();
59 let mut recv_payload = [0u8; PING_SIZE];
60 stream.read_exact(&mut recv_payload).await?;
61 if recv_payload == payload {
62 Ok((stream, started.elapsed()))
63 } else {
64 Err(io::Error::new(
65 io::ErrorKind::InvalidData,
66 "Ping payload mismatch",
67 ))
68 }
69}
70
71/// Waits for a ping and sends a pong.
72pub(crate) async fn recv_ping<S>(mut stream: S) -> io::Result<S>
73where
74 S: AsyncRead + AsyncWrite + Unpin,
75{
76 let mut payload = [0u8; PING_SIZE];
77 stream.read_exact(&mut payload).await?;
78 stream.write_all(&payload).await?;
79 stream.flush().await?;
80 Ok(stream)
81}
82
83#[cfg(test)]
84mod tests {
85 use futures::StreamExt;
86 use libp2p_core::{
87 multiaddr::multiaddr,
88 transport::{memory::MemoryTransport, DialOpts, ListenerId, PortUse, Transport},
89 Endpoint,
90 };
91
92 use super::*;
93
94 #[tokio::test]
95 async fn ping_pong() {
96 let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
97 let mut transport = MemoryTransport::new().boxed();
98 transport.listen_on(ListenerId::next(), mem_addr).unwrap();
99
100 let listener_addr = transport
101 .select_next_some()
102 .now_or_never()
103 .and_then(|ev| ev.into_new_address())
104 .expect("MemoryTransport not listening on an address!");
105
106 tokio::spawn(async move {
107 let transport_event = transport.next().await.unwrap();
108 let (listener_upgrade, _) = transport_event.into_incoming().unwrap();
109 let conn = listener_upgrade.await.unwrap();
110 recv_ping(conn).await.unwrap();
111 });
112
113 let c = MemoryTransport::new()
114 .dial(
115 listener_addr,
116 DialOpts {
117 role: Endpoint::Dialer,
118 port_use: PortUse::Reuse,
119 },
120 )
121 .unwrap()
122 .await
123 .unwrap();
124 let (_, rtt) = send_ping(c).await.unwrap();
125 assert!(rtt > Duration::from_secs(0));
126 }
127}