libp2p_kad/jobs.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Periodic (background) jobs.
22//!
23//! ## Record Persistence & Expiry
24//!
25//! To ensure persistence of records in the DHT, a Kademlia node
26//! must periodically (re-)publish and (re-)replicate its records:
27//!
28//! 1. (Re-)publishing: The original publisher or provider of a record must regularly re-publish
29//! in order to prolong the expiration.
30//!
31//! 2. (Re-)replication: Every node storing a replica of a record must regularly re-replicate it
32//! to the closest nodes to the key in order to ensure the record is present at these nodes.
33//!
34//! Re-publishing primarily ensures persistence of the record beyond its
35//! initial TTL, for as long as the publisher stores (or provides) the record,
36//! whilst (re-)replication primarily ensures persistence for the duration
37//! of the TTL in the light of topology changes. Consequently, replication
38//! intervals should be shorter than publication intervals and
39//! publication intervals should be shorter than the TTL.
40//!
41//! This module implements two periodic jobs:
42//!
43//! * [`PutRecordJob`]: For (re-)publication and (re-)replication of regular (value-)records.
44//!
45//! * [`AddProviderJob`]: For (re-)publication of provider records. Provider records currently
46//! have no separate replication mechanism.
47//!
48//! A periodic job is driven like a `Future` or `Stream` by `poll`ing it.
49//! Once a job starts running it emits records to send to the `k` closest
50//! nodes to the key, where `k` is the replication factor.
51//!
52//! Furthermore, these jobs perform double-duty by removing expired records
53//! from the `RecordStore` on every run. Expired records are never emitted
54//! by the jobs.
55//!
56//! > **Note**: The current implementation takes a snapshot of the records
57//! > to replicate from the `RecordStore` when it starts and thus, to account
58//! > for the worst case, it temporarily requires additional memory proportional
59//! > to the size of all stored records. As a job runs, the records are moved
60//! > out of the job to the consumer, where they can be dropped after being sent.
61
62use std::{
63 collections::HashSet,
64 pin::Pin,
65 task::{Context, Poll},
66 time::Duration,
67 vec,
68};
69
70use futures::prelude::*;
71use futures_timer::Delay;
72use libp2p_identity::PeerId;
73use web_time::Instant;
74
75use crate::record::{self, store::RecordStore, ProviderRecord, Record};
76
77/// The maximum number of queries towards which background jobs
78/// are allowed to start new queries on an invocation of
79/// `Behaviour::poll`.
80pub(crate) const JOBS_MAX_QUERIES: usize = 100;
81/// The maximum number of new queries started by a background job
82/// per invocation of `Behaviour::poll`.
83pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10;
84/// A background job run periodically.
85#[derive(Debug)]
86struct PeriodicJob<T> {
87 interval: Duration,
88 state: PeriodicJobState<T>,
89}
90
91impl<T> PeriodicJob<T> {
92 #[cfg(test)]
93 fn is_running(&self) -> bool {
94 match self.state {
95 PeriodicJobState::Running(..) => true,
96 PeriodicJobState::Waiting(..) => false,
97 }
98 }
99
100 /// Cuts short the remaining delay, if the job is currently waiting
101 /// for the delay to expire.
102 #[cfg(test)]
103 fn asap(&mut self) {
104 if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
105 let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
106 *deadline = new_deadline;
107 delay.reset(Duration::from_secs(1));
108 }
109 }
110
111 /// Returns `true` if the job is currently not running but ready
112 /// to be run, `false` otherwise.
113 fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
114 if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
115 if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
116 return true;
117 }
118 }
119 false
120 }
121}
122
123/// The state of a background job run periodically.
124#[derive(Debug)]
125enum PeriodicJobState<T> {
126 Running(T),
127 Waiting(Delay, Instant),
128}
129
130//////////////////////////////////////////////////////////////////////////////
131// PutRecordJob
132
133/// Periodic job for replicating / publishing records.
134pub(crate) struct PutRecordJob {
135 local_id: PeerId,
136 next_publish: Option<Instant>,
137 publish_interval: Option<Duration>,
138 record_ttl: Option<Duration>,
139 skipped: HashSet<record::Key>,
140 inner: PeriodicJob<vec::IntoIter<Record>>,
141}
142
143impl PutRecordJob {
144 /// Creates a new periodic job for replicating and re-publishing
145 /// locally stored records.
146 pub(crate) fn new(
147 local_id: PeerId,
148 replicate_interval: Duration,
149 publish_interval: Option<Duration>,
150 record_ttl: Option<Duration>,
151 ) -> Self {
152 let now = Instant::now();
153 let deadline = now + replicate_interval;
154 let delay = Delay::new(replicate_interval);
155 let next_publish = publish_interval.map(|i| now + i);
156 Self {
157 local_id,
158 next_publish,
159 publish_interval,
160 record_ttl,
161 skipped: HashSet::new(),
162 inner: PeriodicJob {
163 interval: replicate_interval,
164 state: PeriodicJobState::Waiting(delay, deadline),
165 },
166 }
167 }
168
169 /// Adds the key of a record that is ignored on the current or
170 /// next run of the job.
171 pub(crate) fn skip(&mut self, key: record::Key) {
172 self.skipped.insert(key);
173 }
174
175 /// Checks whether the job is currently running.
176 #[cfg(test)]
177 pub(crate) fn is_running(&self) -> bool {
178 self.inner.is_running()
179 }
180
181 /// Cuts short the remaining delay, if the job is currently waiting
182 /// for the delay to expire.
183 ///
184 /// The job is guaranteed to run on the next invocation of `poll`.
185 #[cfg(test)]
186 pub(crate) fn asap(&mut self, publish: bool) {
187 if publish {
188 self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
189 }
190 self.inner.asap()
191 }
192
193 /// Polls the job for records to replicate.
194 ///
195 /// Must be called in the context of a task. When `NotReady` is returned,
196 /// the current task is registered to be notified when the job is ready
197 /// to be run.
198 pub(crate) fn poll<T>(
199 &mut self,
200 cx: &mut Context<'_>,
201 store: &mut T,
202 now: Instant,
203 ) -> Poll<Record>
204 where
205 T: RecordStore,
206 {
207 if self.inner.check_ready(cx, now) {
208 let publish = self.next_publish.is_some_and(|t_pub| now >= t_pub);
209 let records = store
210 .records()
211 .filter_map(|r| {
212 let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
213 if self.skipped.contains(&r.key) || (!publish && is_publisher) {
214 None
215 } else {
216 let mut record = r.into_owned();
217 if publish && is_publisher {
218 record.expires = record
219 .expires
220 .or_else(|| self.record_ttl.map(|ttl| now + ttl));
221 }
222 Some(record)
223 }
224 })
225 .collect::<Vec<_>>()
226 .into_iter();
227
228 // Schedule the next publishing run.
229 if publish {
230 self.next_publish = self.publish_interval.map(|i| now + i);
231 }
232
233 self.skipped.clear();
234
235 self.inner.state = PeriodicJobState::Running(records);
236 }
237
238 if let PeriodicJobState::Running(records) = &mut self.inner.state {
239 for r in records {
240 if r.is_expired(now) {
241 store.remove(&r.key)
242 } else {
243 return Poll::Ready(r);
244 }
245 }
246
247 // Wait for the next run.
248 let deadline = now + self.inner.interval;
249 let delay = Delay::new(self.inner.interval);
250 self.inner.state = PeriodicJobState::Waiting(delay, deadline);
251 assert!(!self.inner.check_ready(cx, now));
252 }
253
254 Poll::Pending
255 }
256}
257
258//////////////////////////////////////////////////////////////////////////////
259// AddProviderJob
260
261/// Periodic job for replicating provider records.
262pub(crate) struct AddProviderJob {
263 inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
264}
265
266impl AddProviderJob {
267 /// Creates a new periodic job for provider announcements.
268 pub(crate) fn new(interval: Duration) -> Self {
269 let now = Instant::now();
270 Self {
271 inner: PeriodicJob {
272 interval,
273 state: {
274 let deadline = now + interval;
275 PeriodicJobState::Waiting(Delay::new(interval), deadline)
276 },
277 },
278 }
279 }
280
281 /// Checks whether the job is currently running.
282 #[cfg(test)]
283 pub(crate) fn is_running(&self) -> bool {
284 self.inner.is_running()
285 }
286
287 /// Cuts short the remaining delay, if the job is currently waiting
288 /// for the delay to expire.
289 ///
290 /// The job is guaranteed to run on the next invocation of `poll`.
291 #[cfg(test)]
292 pub(crate) fn asap(&mut self) {
293 self.inner.asap()
294 }
295
296 /// Polls the job for provider records to replicate.
297 ///
298 /// Must be called in the context of a task. When `NotReady` is returned,
299 /// the current task is registered to be notified when the job is ready
300 /// to be run.
301 pub(crate) fn poll<T>(
302 &mut self,
303 cx: &mut Context<'_>,
304 store: &mut T,
305 now: Instant,
306 ) -> Poll<ProviderRecord>
307 where
308 T: RecordStore,
309 {
310 if self.inner.check_ready(cx, now) {
311 let records = store
312 .provided()
313 .map(|r| r.into_owned())
314 .collect::<Vec<_>>()
315 .into_iter();
316 self.inner.state = PeriodicJobState::Running(records);
317 }
318
319 if let PeriodicJobState::Running(keys) = &mut self.inner.state {
320 for r in keys {
321 if r.is_expired(now) {
322 store.remove_provider(&r.key, &r.provider)
323 } else {
324 return Poll::Ready(r);
325 }
326 }
327
328 let deadline = now + self.inner.interval;
329 let delay = Delay::new(self.inner.interval);
330 self.inner.state = PeriodicJobState::Waiting(delay, deadline);
331 assert!(!self.inner.check_ready(cx, now));
332 }
333
334 Poll::Pending
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use futures::{executor::block_on, future::poll_fn};
341 use quickcheck::*;
342 use rand::Rng;
343
344 use super::*;
345 use crate::record::store::MemoryStore;
346
347 fn rand_put_record_job() -> PutRecordJob {
348 let mut rng = rand::thread_rng();
349 let id = PeerId::random();
350 let replicate_interval = Duration::from_secs(rng.gen_range(1..60));
351 let publish_interval = Some(replicate_interval * rng.gen_range(1..10));
352 let record_ttl = Some(Duration::from_secs(rng.gen_range(1..600)));
353 PutRecordJob::new(id, replicate_interval, publish_interval, record_ttl)
354 }
355
356 fn rand_add_provider_job() -> AddProviderJob {
357 let mut rng = rand::thread_rng();
358 let interval = Duration::from_secs(rng.gen_range(1..60));
359 AddProviderJob::new(interval)
360 }
361
362 #[test]
363 fn new_job_not_running() {
364 let job = rand_put_record_job();
365 assert!(!job.is_running());
366 let job = rand_add_provider_job();
367 assert!(!job.is_running());
368 }
369
370 #[test]
371 fn run_put_record_job() {
372 fn prop(records: Vec<Record>) {
373 let mut job = rand_put_record_job();
374 // Fill a record store.
375 let mut store = MemoryStore::new(job.local_id);
376 for r in records {
377 let _ = store.put(r);
378 }
379
380 block_on(poll_fn(|ctx| {
381 let now = Instant::now() + job.inner.interval;
382 // All (non-expired) records in the store must be yielded by the job.
383 for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
384 if !r.is_expired(now) {
385 assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
386 assert!(job.is_running());
387 }
388 }
389 assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
390 assert!(!job.is_running());
391 Poll::Ready(())
392 }));
393 }
394
395 quickcheck(prop as fn(_))
396 }
397
398 #[test]
399 fn run_add_provider_job() {
400 fn prop(records: Vec<ProviderRecord>) {
401 let mut job = rand_add_provider_job();
402 let id = PeerId::random();
403 // Fill a record store.
404 let mut store = MemoryStore::new(id);
405 for mut r in records {
406 r.provider = id;
407 let _ = store.add_provider(r);
408 }
409
410 block_on(poll_fn(|ctx| {
411 let now = Instant::now() + job.inner.interval;
412 // All (non-expired) records in the store must be yielded by the job.
413 for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
414 if !r.is_expired(now) {
415 assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
416 assert!(job.is_running());
417 }
418 }
419 assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
420 assert!(!job.is_running());
421 Poll::Ready(())
422 }));
423 }
424
425 quickcheck(prop as fn(_))
426 }
427}