libp2p_kad/
jobs.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Periodic (background) jobs.
22//!
23//! ## Record Persistence & Expiry
24//!
25//! To ensure persistence of records in the DHT, a Kademlia node
26//! must periodically (re-)publish and (re-)replicate its records:
27//!
28//!   1. (Re-)publishing: The original publisher or provider of a record must regularly re-publish
29//!      in order to prolong the expiration.
30//!
31//!   2. (Re-)replication: Every node storing a replica of a record must regularly re-replicate it
32//!      to the closest nodes to the key in order to ensure the record is present at these nodes.
33//!
34//! Re-publishing primarily ensures persistence of the record beyond its
35//! initial TTL, for as long as the publisher stores (or provides) the record,
36//! whilst (re-)replication primarily ensures persistence for the duration
37//! of the TTL in the light of topology changes. Consequently, replication
38//! intervals should be shorter than publication intervals and
39//! publication intervals should be shorter than the TTL.
40//!
41//! This module implements two periodic jobs:
42//!
43//!   * [`PutRecordJob`]: For (re-)publication and (re-)replication of regular (value-)records.
44//!
45//!   * [`AddProviderJob`]: For (re-)publication of provider records. Provider records currently
46//!     have no separate replication mechanism.
47//!
48//! A periodic job is driven like a `Future` or `Stream` by `poll`ing it.
49//! Once a job starts running it emits records to send to the `k` closest
50//! nodes to the key, where `k` is the replication factor.
51//!
52//! Furthermore, these jobs perform double-duty by removing expired records
53//! from the `RecordStore` on every run. Expired records are never emitted
54//! by the jobs.
55//!
56//! > **Note**: The current implementation takes a snapshot of the records
57//! > to replicate from the `RecordStore` when it starts and thus, to account
58//! > for the worst case, it temporarily requires additional memory proportional
59//! > to the size of all stored records. As a job runs, the records are moved
60//! > out of the job to the consumer, where they can be dropped after being sent.
61
62use std::{
63    collections::HashSet,
64    pin::Pin,
65    task::{Context, Poll},
66    time::Duration,
67    vec,
68};
69
70use futures::prelude::*;
71use futures_timer::Delay;
72use libp2p_identity::PeerId;
73use web_time::Instant;
74
75use crate::record::{self, store::RecordStore, ProviderRecord, Record};
76
77/// The maximum number of queries towards which background jobs
78/// are allowed to start new queries on an invocation of
79/// `Behaviour::poll`.
80pub(crate) const JOBS_MAX_QUERIES: usize = 100;
81/// The maximum number of new queries started by a background job
82/// per invocation of `Behaviour::poll`.
83pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10;
84/// A background job run periodically.
85#[derive(Debug)]
86struct PeriodicJob<T> {
87    interval: Duration,
88    state: PeriodicJobState<T>,
89}
90
91impl<T> PeriodicJob<T> {
92    #[cfg(test)]
93    fn is_running(&self) -> bool {
94        match self.state {
95            PeriodicJobState::Running(..) => true,
96            PeriodicJobState::Waiting(..) => false,
97        }
98    }
99
100    /// Cuts short the remaining delay, if the job is currently waiting
101    /// for the delay to expire.
102    #[cfg(test)]
103    fn asap(&mut self) {
104        if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
105            let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
106            *deadline = new_deadline;
107            delay.reset(Duration::from_secs(1));
108        }
109    }
110
111    /// Returns `true` if the job is currently not running but ready
112    /// to be run, `false` otherwise.
113    fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
114        if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
115            if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
116                return true;
117            }
118        }
119        false
120    }
121}
122
123/// The state of a background job run periodically.
124#[derive(Debug)]
125enum PeriodicJobState<T> {
126    Running(T),
127    Waiting(Delay, Instant),
128}
129
130//////////////////////////////////////////////////////////////////////////////
131// PutRecordJob
132
133/// Periodic job for replicating / publishing records.
134pub(crate) struct PutRecordJob {
135    local_id: PeerId,
136    next_publish: Option<Instant>,
137    publish_interval: Option<Duration>,
138    record_ttl: Option<Duration>,
139    skipped: HashSet<record::Key>,
140    inner: PeriodicJob<vec::IntoIter<Record>>,
141}
142
143impl PutRecordJob {
144    /// Creates a new periodic job for replicating and re-publishing
145    /// locally stored records.
146    pub(crate) fn new(
147        local_id: PeerId,
148        replicate_interval: Duration,
149        publish_interval: Option<Duration>,
150        record_ttl: Option<Duration>,
151    ) -> Self {
152        let now = Instant::now();
153        let deadline = now + replicate_interval;
154        let delay = Delay::new(replicate_interval);
155        let next_publish = publish_interval.map(|i| now + i);
156        Self {
157            local_id,
158            next_publish,
159            publish_interval,
160            record_ttl,
161            skipped: HashSet::new(),
162            inner: PeriodicJob {
163                interval: replicate_interval,
164                state: PeriodicJobState::Waiting(delay, deadline),
165            },
166        }
167    }
168
169    /// Adds the key of a record that is ignored on the current or
170    /// next run of the job.
171    pub(crate) fn skip(&mut self, key: record::Key) {
172        self.skipped.insert(key);
173    }
174
175    /// Checks whether the job is currently running.
176    #[cfg(test)]
177    pub(crate) fn is_running(&self) -> bool {
178        self.inner.is_running()
179    }
180
181    /// Cuts short the remaining delay, if the job is currently waiting
182    /// for the delay to expire.
183    ///
184    /// The job is guaranteed to run on the next invocation of `poll`.
185    #[cfg(test)]
186    pub(crate) fn asap(&mut self, publish: bool) {
187        if publish {
188            self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
189        }
190        self.inner.asap()
191    }
192
193    /// Polls the job for records to replicate.
194    ///
195    /// Must be called in the context of a task. When `NotReady` is returned,
196    /// the current task is registered to be notified when the job is ready
197    /// to be run.
198    pub(crate) fn poll<T>(
199        &mut self,
200        cx: &mut Context<'_>,
201        store: &mut T,
202        now: Instant,
203    ) -> Poll<Record>
204    where
205        T: RecordStore,
206    {
207        if self.inner.check_ready(cx, now) {
208            let publish = self.next_publish.is_some_and(|t_pub| now >= t_pub);
209            let records = store
210                .records()
211                .filter_map(|r| {
212                    let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
213                    if self.skipped.contains(&r.key) || (!publish && is_publisher) {
214                        None
215                    } else {
216                        let mut record = r.into_owned();
217                        if publish && is_publisher {
218                            record.expires = record
219                                .expires
220                                .or_else(|| self.record_ttl.map(|ttl| now + ttl));
221                        }
222                        Some(record)
223                    }
224                })
225                .collect::<Vec<_>>()
226                .into_iter();
227
228            // Schedule the next publishing run.
229            if publish {
230                self.next_publish = self.publish_interval.map(|i| now + i);
231            }
232
233            self.skipped.clear();
234
235            self.inner.state = PeriodicJobState::Running(records);
236        }
237
238        if let PeriodicJobState::Running(records) = &mut self.inner.state {
239            for r in records {
240                if r.is_expired(now) {
241                    store.remove(&r.key)
242                } else {
243                    return Poll::Ready(r);
244                }
245            }
246
247            // Wait for the next run.
248            let deadline = now + self.inner.interval;
249            let delay = Delay::new(self.inner.interval);
250            self.inner.state = PeriodicJobState::Waiting(delay, deadline);
251            assert!(!self.inner.check_ready(cx, now));
252        }
253
254        Poll::Pending
255    }
256}
257
258//////////////////////////////////////////////////////////////////////////////
259// AddProviderJob
260
261/// Periodic job for replicating provider records.
262pub(crate) struct AddProviderJob {
263    inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
264}
265
266impl AddProviderJob {
267    /// Creates a new periodic job for provider announcements.
268    pub(crate) fn new(interval: Duration) -> Self {
269        let now = Instant::now();
270        Self {
271            inner: PeriodicJob {
272                interval,
273                state: {
274                    let deadline = now + interval;
275                    PeriodicJobState::Waiting(Delay::new(interval), deadline)
276                },
277            },
278        }
279    }
280
281    /// Checks whether the job is currently running.
282    #[cfg(test)]
283    pub(crate) fn is_running(&self) -> bool {
284        self.inner.is_running()
285    }
286
287    /// Cuts short the remaining delay, if the job is currently waiting
288    /// for the delay to expire.
289    ///
290    /// The job is guaranteed to run on the next invocation of `poll`.
291    #[cfg(test)]
292    pub(crate) fn asap(&mut self) {
293        self.inner.asap()
294    }
295
296    /// Polls the job for provider records to replicate.
297    ///
298    /// Must be called in the context of a task. When `NotReady` is returned,
299    /// the current task is registered to be notified when the job is ready
300    /// to be run.
301    pub(crate) fn poll<T>(
302        &mut self,
303        cx: &mut Context<'_>,
304        store: &mut T,
305        now: Instant,
306    ) -> Poll<ProviderRecord>
307    where
308        T: RecordStore,
309    {
310        if self.inner.check_ready(cx, now) {
311            let records = store
312                .provided()
313                .map(|r| r.into_owned())
314                .collect::<Vec<_>>()
315                .into_iter();
316            self.inner.state = PeriodicJobState::Running(records);
317        }
318
319        if let PeriodicJobState::Running(keys) = &mut self.inner.state {
320            for r in keys {
321                if r.is_expired(now) {
322                    store.remove_provider(&r.key, &r.provider)
323                } else {
324                    return Poll::Ready(r);
325                }
326            }
327
328            let deadline = now + self.inner.interval;
329            let delay = Delay::new(self.inner.interval);
330            self.inner.state = PeriodicJobState::Waiting(delay, deadline);
331            assert!(!self.inner.check_ready(cx, now));
332        }
333
334        Poll::Pending
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use futures::{executor::block_on, future::poll_fn};
341    use quickcheck::*;
342    use rand::Rng;
343
344    use super::*;
345    use crate::record::store::MemoryStore;
346
347    fn rand_put_record_job() -> PutRecordJob {
348        let mut rng = rand::thread_rng();
349        let id = PeerId::random();
350        let replicate_interval = Duration::from_secs(rng.gen_range(1..60));
351        let publish_interval = Some(replicate_interval * rng.gen_range(1..10));
352        let record_ttl = Some(Duration::from_secs(rng.gen_range(1..600)));
353        PutRecordJob::new(id, replicate_interval, publish_interval, record_ttl)
354    }
355
356    fn rand_add_provider_job() -> AddProviderJob {
357        let mut rng = rand::thread_rng();
358        let interval = Duration::from_secs(rng.gen_range(1..60));
359        AddProviderJob::new(interval)
360    }
361
362    #[test]
363    fn new_job_not_running() {
364        let job = rand_put_record_job();
365        assert!(!job.is_running());
366        let job = rand_add_provider_job();
367        assert!(!job.is_running());
368    }
369
370    #[test]
371    fn run_put_record_job() {
372        fn prop(records: Vec<Record>) {
373            let mut job = rand_put_record_job();
374            // Fill a record store.
375            let mut store = MemoryStore::new(job.local_id);
376            for r in records {
377                let _ = store.put(r);
378            }
379
380            block_on(poll_fn(|ctx| {
381                let now = Instant::now() + job.inner.interval;
382                // All (non-expired) records in the store must be yielded by the job.
383                for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
384                    if !r.is_expired(now) {
385                        assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
386                        assert!(job.is_running());
387                    }
388                }
389                assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
390                assert!(!job.is_running());
391                Poll::Ready(())
392            }));
393        }
394
395        quickcheck(prop as fn(_))
396    }
397
398    #[test]
399    fn run_add_provider_job() {
400        fn prop(records: Vec<ProviderRecord>) {
401            let mut job = rand_add_provider_job();
402            let id = PeerId::random();
403            // Fill a record store.
404            let mut store = MemoryStore::new(id);
405            for mut r in records {
406                r.provider = id;
407                let _ = store.add_provider(r);
408            }
409
410            block_on(poll_fn(|ctx| {
411                let now = Instant::now() + job.inner.interval;
412                // All (non-expired) records in the store must be yielded by the job.
413                for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
414                    if !r.is_expired(now) {
415                        assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
416                        assert!(job.is_running());
417                    }
418                }
419                assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
420                assert!(!job.is_running());
421                Poll::Ready(())
422            }));
423        }
424
425        quickcheck(prop as fn(_))
426    }
427}