libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::VecDeque,
23    convert::Infallible,
24    error::Error,
25    fmt, io,
26    task::{Context, Poll},
27    time::Duration,
28};
29
30use futures::{
31    future::{BoxFuture, Either},
32    prelude::*,
33};
34use futures_timer::Delay;
35use libp2p_core::upgrade::ReadyUpgrade;
36use libp2p_swarm::{
37    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
38    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39    SubstreamProtocol,
40};
41
42use crate::{protocol, PROTOCOL_NAME};
43
44/// The configuration for outbound pings.
45#[derive(Debug, Clone)]
46pub struct Config {
47    /// The timeout of an outbound ping.
48    timeout: Duration,
49    /// The duration between outbound pings.
50    interval: Duration,
51}
52
53impl Config {
54    /// Creates a new [`Config`] with the following default settings:
55    ///
56    ///   * [`Config::with_interval`] 15s
57    ///   * [`Config::with_timeout`] 20s
58    ///
59    /// These settings have the following effect:
60    ///
61    ///   * A ping is sent every 15 seconds on a healthy connection.
62    ///   * Every ping sent must yield a response within 20 seconds in order to be successful.
63    pub fn new() -> Self {
64        Self {
65            timeout: Duration::from_secs(20),
66            interval: Duration::from_secs(15),
67        }
68    }
69
70    /// Sets the ping timeout.
71    pub fn with_timeout(mut self, d: Duration) -> Self {
72        self.timeout = d;
73        self
74    }
75
76    /// Sets the ping interval.
77    pub fn with_interval(mut self, d: Duration) -> Self {
78        self.interval = d;
79        self
80    }
81}
82
83impl Default for Config {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89/// An outbound ping failure.
90#[derive(Debug)]
91pub enum Failure {
92    /// The ping timed out, i.e. no response was received within the
93    /// configured ping timeout.
94    Timeout,
95    /// The peer does not support the ping protocol.
96    Unsupported,
97    /// The ping failed for reasons other than a timeout.
98    Other {
99        error: Box<dyn std::error::Error + Send + Sync + 'static>,
100    },
101}
102
103impl Failure {
104    fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
105        Self::Other { error: Box::new(e) }
106    }
107}
108
109impl fmt::Display for Failure {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            Failure::Timeout => f.write_str("Ping timeout"),
113            Failure::Other { error } => write!(f, "Ping error: {error}"),
114            Failure::Unsupported => write!(f, "Ping protocol not supported"),
115        }
116    }
117}
118
119impl Error for Failure {
120    fn source(&self) -> Option<&(dyn Error + 'static)> {
121        match self {
122            Failure::Timeout => None,
123            Failure::Other { error } => Some(&**error),
124            Failure::Unsupported => None,
125        }
126    }
127}
128
129/// Protocol handler that handles pinging the remote at a regular period
130/// and answering ping queries.
131pub struct Handler {
132    /// Configuration options.
133    config: Config,
134    /// The timer used for the delay to the next ping.
135    interval: Delay,
136    /// Outbound ping failures that are pending to be processed by `poll()`.
137    pending_errors: VecDeque<Failure>,
138    /// The number of consecutive ping failures that occurred.
139    ///
140    /// Each successful ping resets this counter to 0.
141    failures: u32,
142    /// The outbound ping state.
143    outbound: Option<OutboundState>,
144    /// The inbound pong handler, i.e. if there is an inbound
145    /// substream, this is always a future that waits for the
146    /// next inbound ping to be answered.
147    inbound: Option<PongFuture>,
148    /// Tracks the state of our handler.
149    state: State,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum State {
154    /// We are inactive because the other peer doesn't support ping.
155    Inactive {
156        /// Whether or not we've reported the missing support yet.
157        ///
158        /// This is used to avoid repeated events being emitted for a specific connection.
159        reported: bool,
160    },
161    /// We are actively pinging the other peer.
162    Active,
163}
164
165impl Handler {
166    /// Builds a new [`Handler`] with the given configuration.
167    pub fn new(config: Config) -> Self {
168        Handler {
169            config,
170            interval: Delay::new(Duration::new(0, 0)),
171            pending_errors: VecDeque::with_capacity(2),
172            failures: 0,
173            outbound: None,
174            inbound: None,
175            state: State::Active,
176        }
177    }
178
179    fn on_dial_upgrade_error(
180        &mut self,
181        DialUpgradeError { error, .. }: DialUpgradeError<
182            (),
183            <Self as ConnectionHandler>::OutboundProtocol,
184        >,
185    ) {
186        self.outbound = None; // Request a new substream on the next `poll`.
187
188        // Timer is already polled and expired before substream request is initiated
189        // and will be polled again later on in our `poll` because we reset `self.outbound`.
190        //
191        // `futures-timer` allows an expired timer to be polled again and returns
192        // immediately `Poll::Ready`. However in its WASM implementation there is
193        // a bug that causes the expired timer to panic.
194        // This is a workaround until a proper fix is merged and released.
195        // See libp2p/rust-libp2p#5447 for more info.
196        //
197        // TODO: remove when async-rs/futures-timer#74 gets merged.
198        self.interval.reset(Duration::new(0, 0));
199
200        let error = match error {
201            StreamUpgradeError::NegotiationFailed => {
202                debug_assert_eq!(self.state, State::Active);
203
204                self.state = State::Inactive { reported: false };
205                return;
206            }
207            // Note: This timeout only covers protocol negotiation.
208            StreamUpgradeError::Timeout => Failure::Other {
209                error: Box::new(std::io::Error::new(
210                    std::io::ErrorKind::TimedOut,
211                    "ping protocol negotiation timed out",
212                )),
213            },
214            StreamUpgradeError::Apply(e) => libp2p_core::util::unreachable(e),
215            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
216        };
217
218        self.pending_errors.push_front(error);
219    }
220}
221
222impl ConnectionHandler for Handler {
223    type FromBehaviour = Infallible;
224    type ToBehaviour = Result<Duration, Failure>;
225    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
226    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
227    type OutboundOpenInfo = ();
228    type InboundOpenInfo = ();
229
230    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>> {
231        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
232    }
233
234    fn on_behaviour_event(&mut self, _: Infallible) {}
235
236    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
237    fn poll(
238        &mut self,
239        cx: &mut Context<'_>,
240    ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
241    {
242        match self.state {
243            State::Inactive { reported: true } => {
244                return Poll::Pending; // nothing to do on this connection
245            }
246            State::Inactive { reported: false } => {
247                self.state = State::Inactive { reported: true };
248                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
249                    Failure::Unsupported,
250                )));
251            }
252            State::Active => {}
253        }
254
255        // Respond to inbound pings.
256        if let Some(fut) = self.inbound.as_mut() {
257            match fut.poll_unpin(cx) {
258                Poll::Pending => {}
259                Poll::Ready(Err(e)) => {
260                    tracing::debug!("Inbound ping error: {:?}", e);
261                    self.inbound = None;
262                }
263                Poll::Ready(Ok(stream)) => {
264                    tracing::trace!("answered inbound ping from peer");
265
266                    // A ping from a remote peer has been answered, wait for the next.
267                    self.inbound = Some(protocol::recv_ping(stream).boxed());
268                }
269            }
270        }
271
272        loop {
273            // Check for outbound ping failures.
274            if let Some(error) = self.pending_errors.pop_back() {
275                tracing::debug!("Ping failure: {:?}", error);
276
277                self.failures += 1;
278
279                // Note: For backward-compatibility the first failure is always "free"
280                // and silent. This allows peers who use a new substream
281                // for each ping to have successful ping exchanges with peers
282                // that use a single substream, since every successful ping
283                // resets `failures` to `0`.
284                if self.failures > 1 {
285                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
286                }
287            }
288
289            // Continue outbound pings.
290            match self.outbound.take() {
291                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
292                    Poll::Pending => {
293                        self.outbound = Some(OutboundState::Ping(ping));
294                        break;
295                    }
296                    Poll::Ready(Ok((stream, rtt))) => {
297                        tracing::debug!(?rtt, "ping succeeded");
298                        self.failures = 0;
299                        self.interval.reset(self.config.interval);
300                        self.outbound = Some(OutboundState::Idle(stream));
301                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
302                    }
303                    Poll::Ready(Err(e)) => {
304                        self.interval.reset(self.config.interval);
305                        self.pending_errors.push_front(e);
306                    }
307                },
308                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
309                    Poll::Pending => {
310                        self.outbound = Some(OutboundState::Idle(stream));
311                        break;
312                    }
313                    Poll::Ready(()) => {
314                        self.outbound = Some(OutboundState::Ping(
315                            send_ping(stream, self.config.timeout).boxed(),
316                        ));
317                    }
318                },
319                Some(OutboundState::OpenStream) => {
320                    self.outbound = Some(OutboundState::OpenStream);
321                    break;
322                }
323                None => match self.interval.poll_unpin(cx) {
324                    Poll::Pending => break,
325                    Poll::Ready(()) => {
326                        self.outbound = Some(OutboundState::OpenStream);
327                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
328                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
329                            protocol,
330                        });
331                    }
332                },
333            }
334        }
335
336        Poll::Pending
337    }
338
339    fn on_connection_event(
340        &mut self,
341        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
342    ) {
343        match event {
344            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
345                protocol: mut stream,
346                ..
347            }) => {
348                stream.ignore_for_keep_alive();
349                self.inbound = Some(protocol::recv_ping(stream).boxed());
350            }
351            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
352                protocol: mut stream,
353                ..
354            }) => {
355                stream.ignore_for_keep_alive();
356                self.outbound = Some(OutboundState::Ping(
357                    send_ping(stream, self.config.timeout).boxed(),
358                ));
359            }
360            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
361                self.on_dial_upgrade_error(dial_upgrade_error)
362            }
363            _ => {}
364        }
365    }
366}
367
368type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
369type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
370
371/// The current state w.r.t. outbound pings.
372enum OutboundState {
373    /// A new substream is being negotiated for the ping protocol.
374    OpenStream,
375    /// The substream is idle, waiting to send the next ping.
376    Idle(Stream),
377    /// A ping is being sent and the response awaited.
378    Ping(PingFuture),
379}
380
381/// A wrapper around [`protocol::send_ping`] that enforces a time out.
382async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
383    let ping = protocol::send_ping(stream);
384    futures::pin_mut!(ping);
385
386    match future::select(ping, Delay::new(timeout)).await {
387        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
388        Either::Left((Err(e), _)) => Err(Failure::other(e)),
389        Either::Right(((), _)) => Err(Failure::Timeout),
390    }
391}