libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26 collections::{BTreeMap, HashMap, HashSet, VecDeque},
27 fmt,
28 num::NonZeroUsize,
29 task::{Context, Poll, Waker},
30 time::Duration,
31 vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39 dial_opts::{self, DialOpts},
40 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42 THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50 addresses::Addresses,
51 bootstrap,
52 handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53 jobs::*,
54 kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55 protocol,
56 protocol::{ConnectionType, KadPeer, ProtocolConfig},
57 query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58 record::{
59 self,
60 store::{self, RecordStore},
61 ProviderRecord, Record,
62 },
63 K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69 /// The Kademlia routing table.
70 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72 /// The k-bucket insertion strategy.
73 kbucket_inserts: BucketInserts,
74
75 /// Configuration of the wire protocol.
76 protocol_config: ProtocolConfig,
77
78 /// Configuration of [`RecordStore`] filtering.
79 record_filtering: StoreInserts,
80
81 /// The currently active (i.e. in-progress) queries.
82 queries: QueryPool,
83
84 /// The currently connected peers.
85 ///
86 /// This is a superset of the connected peers currently in the routing table.
87 connected_peers: FnvHashSet<PeerId>,
88
89 /// Periodic job for re-publication of provider records for keys
90 /// provided by the local node.
91 add_provider_job: Option<AddProviderJob>,
92
93 /// Periodic job for (re-)replication and (re-)publishing of
94 /// regular (value-)records.
95 put_record_job: Option<PutRecordJob>,
96
97 /// The TTL of regular (value-)records.
98 record_ttl: Option<Duration>,
99
100 /// The TTL of provider records.
101 provider_record_ttl: Option<Duration>,
102
103 /// Queued events to return when the behaviour is being polled.
104 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106 listen_addresses: ListenAddresses,
107
108 external_addresses: ExternalAddresses,
109
110 connections: HashMap<ConnectionId, PeerId>,
111
112 /// See [`Config::caching`].
113 caching: Caching,
114
115 local_peer_id: PeerId,
116
117 mode: Mode,
118 auto_mode: bool,
119 no_events_waker: Option<Waker>,
120
121 /// The record storage.
122 store: TStore,
123
124 /// Tracks the status of the current bootstrap.
125 bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133 /// Whenever a connection to a peer is established as a
134 /// result of a dialing attempt and that peer is not yet
135 /// in the routing table, it is inserted as long as there
136 /// is a free slot in the corresponding k-bucket. If the
137 /// k-bucket is full but still has a free pending slot,
138 /// it may be inserted into the routing table at a later time if an unresponsive
139 /// disconnected peer is evicted from the bucket.
140 OnConnected,
141 /// New peers and addresses are only added to the routing table via
142 /// explicit calls to [`Behaviour::add_address`].
143 ///
144 /// > **Note**: Even though peers can only get into the
145 /// > routing table as a result of [`Behaviour::add_address`],
146 /// > routing table entries are still updated as peers
147 /// > connect and disconnect (i.e. the order of the entries
148 /// > as well as the network addresses).
149 Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161 /// Whenever a (provider) record is received,
162 /// the record is forwarded immediately to the [`RecordStore`].
163 Unfiltered,
164 /// Whenever a (provider) record is received, an event is emitted.
165 /// Provider records generate a [`InboundRequest::AddProvider`] under
166 /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167 /// under [`Event::InboundRequest`].
168 ///
169 /// When deemed valid, a (provider) record needs to be explicitly stored in
170 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172 /// be retrieved via [`Behaviour::store_mut`].
173 FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181 kbucket_config: KBucketConfig,
182 query_config: QueryConfig,
183 protocol_config: ProtocolConfig,
184 record_ttl: Option<Duration>,
185 record_replication_interval: Option<Duration>,
186 record_publication_interval: Option<Duration>,
187 record_filtering: StoreInserts,
188 provider_record_ttl: Option<Duration>,
189 provider_publication_interval: Option<Duration>,
190 kbucket_inserts: BucketInserts,
191 caching: Caching,
192 periodic_bootstrap_interval: Option<Duration>,
193 automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197 /// Returns the default configuration.
198 ///
199 /// Deprecated: use `Config::new` instead.
200 fn default() -> Self {
201 Self::new(protocol::DEFAULT_PROTO_NAME)
202 }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209 /// Caching is disabled and the peers closest to records being looked up
210 /// that do not return a record are not tracked, i.e.
211 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212 Disabled,
213 /// Up to `max_peers` peers not returning a record that are closest to the key
214 /// being looked up are tracked and returned in
215 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216 /// performed explicitly, if desired and after choosing a record from the results, via
217 /// [`Behaviour::put_record_to`].
218 Enabled { max_peers: u16 },
219}
220
221impl Config {
222 /// Builds a new `Config` with the given protocol name.
223 pub fn new(protocol_name: StreamProtocol) -> Self {
224 Config {
225 kbucket_config: KBucketConfig::default(),
226 query_config: QueryConfig::default(),
227 protocol_config: ProtocolConfig::new(protocol_name),
228 record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229 record_replication_interval: Some(Duration::from_secs(60 * 60)),
230 record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231 record_filtering: StoreInserts::Unfiltered,
232 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233 provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234 kbucket_inserts: BucketInserts::OnConnected,
235 caching: Caching::Enabled { max_peers: 1 },
236 periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237 automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238 }
239 }
240
241 /// Sets the timeout for a single query.
242 ///
243 /// > **Note**: A single query usually comprises at least as many requests
244 /// > as the replication factor, i.e. this is not a request timeout.
245 ///
246 /// The default is 60 seconds.
247 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248 self.query_config.timeout = timeout;
249 self
250 }
251
252 /// Sets the replication factor to use.
253 ///
254 /// The replication factor determines to how many closest peers
255 /// a record is replicated. The default is [`crate::K_VALUE`].
256 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257 self.query_config.replication_factor = replication_factor;
258 self
259 }
260
261 /// Sets the allowed level of parallelism for iterative queries.
262 ///
263 /// The `α` parameter in the Kademlia paper. The maximum number of peers
264 /// that an iterative query is allowed to wait for in parallel while
265 /// iterating towards the closest nodes to a target. Defaults to
266 /// `ALPHA_VALUE`.
267 ///
268 /// This only controls the level of parallelism of an iterative query, not
269 /// the level of parallelism of a query to a fixed set of peers.
270 ///
271 /// When used with [`Config::disjoint_query_paths`] it equals
272 /// the amount of disjoint paths used.
273 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274 self.query_config.parallelism = parallelism;
275 self
276 }
277
278 /// Require iterative queries to use disjoint paths for increased resiliency
279 /// in the presence of potentially adversarial nodes.
280 ///
281 /// When enabled the number of disjoint paths used equals the configured
282 /// parallelism.
283 ///
284 /// See the S/Kademlia paper for more information on the high level design
285 /// as well as its security improvements.
286 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287 self.query_config.disjoint_query_paths = enabled;
288 self
289 }
290
291 /// Sets the TTL for stored records.
292 ///
293 /// The TTL should be significantly longer than the (re-)publication
294 /// interval, to avoid premature expiration of records. The default is 36
295 /// hours.
296 ///
297 /// `None` means records never expire.
298 ///
299 /// Does not apply to provider records.
300 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301 self.record_ttl = record_ttl;
302 self
303 }
304
305 /// Sets whether or not records should be filtered before being stored.
306 ///
307 /// See [`StoreInserts`] for the different values.
308 /// Defaults to [`StoreInserts::Unfiltered`].
309 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310 self.record_filtering = filtering;
311 self
312 }
313
314 /// Sets the (re-)replication interval for stored records.
315 ///
316 /// Periodic replication of stored records ensures that the records
317 /// are always replicated to the available nodes closest to the key in the
318 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319 /// ensuring persistence until the record expires. Replication does not
320 /// prolong the regular lifetime of a record (for otherwise it would live
321 /// forever regardless of the configured TTL). The expiry of a record
322 /// is only extended through re-publication.
323 ///
324 /// This interval should be significantly shorter than the publication
325 /// interval, to ensure persistence between re-publications. The default
326 /// is 1 hour.
327 ///
328 /// `None` means that stored records are never re-replicated.
329 ///
330 /// Does not apply to provider records.
331 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332 self.record_replication_interval = interval;
333 self
334 }
335
336 /// Sets the (re-)publication interval of stored records.
337 ///
338 /// Records persist in the DHT until they expire. By default, published
339 /// records are re-published in regular intervals for as long as the record
340 /// exists in the local storage of the original publisher, thereby extending
341 /// the records lifetime.
342 ///
343 /// This interval should be significantly shorter than the record TTL, to
344 /// ensure records do not expire prematurely. The default is 24 hours.
345 ///
346 /// `None` means that stored records are never automatically re-published.
347 ///
348 /// Does not apply to provider records.
349 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350 self.record_publication_interval = interval;
351 self
352 }
353
354 /// Sets the TTL for provider records.
355 ///
356 /// `None` means that stored provider records never expire.
357 ///
358 /// Must be significantly larger than the provider publication interval.
359 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360 self.provider_record_ttl = ttl;
361 self
362 }
363
364 /// Sets the interval at which provider records for keys provided
365 /// by the local node are re-published.
366 ///
367 /// `None` means that stored provider records are never automatically
368 /// re-published.
369 ///
370 /// Must be significantly less than the provider record TTL.
371 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372 self.provider_publication_interval = interval;
373 self
374 }
375
376 /// Modifies the maximum allowed size of individual Kademlia packets.
377 ///
378 /// It might be necessary to increase this value if trying to put large
379 /// records.
380 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381 self.protocol_config.set_max_packet_size(size);
382 self
383 }
384
385 /// Modifies the timeout duration of outbount_substreams.
386 ///
387 /// * Default to `10` seconds.
388 /// * May need to increase this value when sending large records with poor connection.
389 pub fn set_outbound_substreams_timeout(&mut self, timeout: Duration) -> &mut Self {
390 self.protocol_config
391 .set_outbound_substreams_timeout(timeout);
392 self
393 }
394
395 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
396 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
397 self.kbucket_inserts = inserts;
398 self
399 }
400
401 /// Sets the [`Caching`] strategy to use for successful lookups.
402 ///
403 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
404 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
405 /// will result in the record being cached at the closest node to the key that
406 /// did not return the record, i.e. the standard Kademlia behaviour.
407 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
408 self.caching = c;
409 self
410 }
411
412 /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
413 ///
414 /// * Default to `5` minutes.
415 /// * Set to `None` to disable periodic bootstrap.
416 pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
417 self.periodic_bootstrap_interval = interval;
418 self
419 }
420
421 /// Sets the configuration for the k-buckets.
422 ///
423 /// * Default to K_VALUE.
424 ///
425 /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
426 pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
427 self.kbucket_config.set_bucket_size(size);
428 self
429 }
430
431 /// Sets the timeout duration after creation of a pending entry after which
432 /// it becomes eligible for insertion into a full bucket, replacing the
433 /// least-recently (dis)connected node.
434 ///
435 /// * Default to `60` s.
436 pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
437 self.kbucket_config.set_pending_timeout(timeout);
438 self
439 }
440
441 /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
442 /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
443 /// inserted into the routing table "at the same time". This also allows to wait a little
444 /// bit for other potential peers to be inserted into the routing table before triggering a
445 /// bootstrap, giving more context to the future bootstrap request.
446 ///
447 /// * Default to `500` ms.
448 /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
449 /// new peer is inserted in the routing table.
450 /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
451 /// a new peer is inserted in the routing table).
452 #[cfg(test)]
453 pub(crate) fn set_automatic_bootstrap_throttle(
454 &mut self,
455 duration: Option<Duration>,
456 ) -> &mut Self {
457 self.automatic_bootstrap_throttle = duration;
458 self
459 }
460}
461
462impl<TStore> Behaviour<TStore>
463where
464 TStore: RecordStore + Send + 'static,
465{
466 /// Creates a new `Kademlia` network behaviour with a default configuration.
467 pub fn new(id: PeerId, store: TStore) -> Self {
468 Self::with_config(id, store, Default::default())
469 }
470
471 /// Get the protocol name of this kademlia instance.
472 pub fn protocol_names(&self) -> &[StreamProtocol] {
473 self.protocol_config.protocol_names()
474 }
475
476 /// Creates a new `Kademlia` network behaviour with the given configuration.
477 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
478 let local_key = kbucket::Key::from(id);
479
480 let put_record_job = config
481 .record_replication_interval
482 .or(config.record_publication_interval)
483 .map(|interval| {
484 PutRecordJob::new(
485 id,
486 interval,
487 config.record_publication_interval,
488 config.record_ttl,
489 )
490 });
491
492 let add_provider_job = config
493 .provider_publication_interval
494 .map(AddProviderJob::new);
495
496 Behaviour {
497 store,
498 caching: config.caching,
499 kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
500 kbucket_inserts: config.kbucket_inserts,
501 protocol_config: config.protocol_config,
502 record_filtering: config.record_filtering,
503 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
504 listen_addresses: Default::default(),
505 queries: QueryPool::new(config.query_config),
506 connected_peers: Default::default(),
507 add_provider_job,
508 put_record_job,
509 record_ttl: config.record_ttl,
510 provider_record_ttl: config.provider_record_ttl,
511 external_addresses: Default::default(),
512 local_peer_id: id,
513 connections: Default::default(),
514 mode: Mode::Client,
515 auto_mode: true,
516 no_events_waker: None,
517 bootstrap_status: bootstrap::Status::new(
518 config.periodic_bootstrap_interval,
519 config.automatic_bootstrap_throttle,
520 ),
521 }
522 }
523
524 /// Gets an iterator over immutable references to all running queries.
525 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
526 self.queries.iter().filter_map(|query| {
527 if !query.is_finished() {
528 Some(QueryRef { query })
529 } else {
530 None
531 }
532 })
533 }
534
535 /// Gets an iterator over mutable references to all running queries.
536 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
537 self.queries.iter_mut().filter_map(|query| {
538 if !query.is_finished() {
539 Some(QueryMut { query })
540 } else {
541 None
542 }
543 })
544 }
545
546 /// Gets an immutable reference to a running query, if it exists.
547 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
548 self.queries.get(id).and_then(|query| {
549 if !query.is_finished() {
550 Some(QueryRef { query })
551 } else {
552 None
553 }
554 })
555 }
556
557 /// Gets a mutable reference to a running query, if it exists.
558 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
559 self.queries.get_mut(id).and_then(|query| {
560 if !query.is_finished() {
561 Some(QueryMut { query })
562 } else {
563 None
564 }
565 })
566 }
567
568 /// Adds a known listen address of a peer participating in the DHT to the
569 /// routing table.
570 ///
571 /// Explicitly adding addresses of peers serves two purposes:
572 ///
573 /// 1. In order for a node to join the DHT, it must know about at least one other node of the
574 /// DHT.
575 ///
576 /// 2. When a remote peer initiates a connection and that peer is not yet in the routing
577 /// table, the `Kademlia` behaviour must be informed of an address on which that peer is
578 /// listening for connections before it can be added to the routing table from where it can
579 /// subsequently be discovered by all peers in the DHT.
580 ///
581 /// If the routing table has been updated as a result of this operation,
582 /// a [`Event::RoutingUpdated`] event is emitted.
583 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
584 // ensuring address is a fully-qualified /p2p multiaddr
585 let Ok(address) = address.with_p2p(*peer) else {
586 return RoutingUpdate::Failed;
587 };
588 let key = kbucket::Key::from(*peer);
589 match self.kbuckets.entry(&key) {
590 Some(kbucket::Entry::Present(mut entry, _)) => {
591 if entry.value().insert(address) {
592 self.queued_events
593 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
594 peer: *peer,
595 is_new_peer: false,
596 addresses: entry.value().clone(),
597 old_peer: None,
598 bucket_range: self
599 .kbuckets
600 .bucket(&key)
601 .map(|b| b.range())
602 .expect("Not kbucket::Entry::SelfEntry."),
603 }))
604 }
605 RoutingUpdate::Success
606 }
607 Some(kbucket::Entry::Pending(mut entry, _)) => {
608 entry.value().insert(address);
609 RoutingUpdate::Pending
610 }
611 Some(kbucket::Entry::Absent(entry)) => {
612 let addresses = Addresses::new(address);
613 let status = if self.connected_peers.contains(peer) {
614 NodeStatus::Connected
615 } else {
616 NodeStatus::Disconnected
617 };
618 match entry.insert(addresses.clone(), status) {
619 kbucket::InsertResult::Inserted => {
620 self.bootstrap_on_low_peers();
621
622 self.queued_events.push_back(ToSwarm::GenerateEvent(
623 Event::RoutingUpdated {
624 peer: *peer,
625 is_new_peer: true,
626 addresses,
627 old_peer: None,
628 bucket_range: self
629 .kbuckets
630 .bucket(&key)
631 .map(|b| b.range())
632 .expect("Not kbucket::Entry::SelfEntry."),
633 },
634 ));
635 RoutingUpdate::Success
636 }
637 kbucket::InsertResult::Full => {
638 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
639 RoutingUpdate::Failed
640 }
641 kbucket::InsertResult::Pending { disconnected } => {
642 self.queued_events.push_back(ToSwarm::Dial {
643 opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
644 });
645 RoutingUpdate::Pending
646 }
647 }
648 }
649 None => RoutingUpdate::Failed,
650 }
651 }
652
653 /// Removes an address of a peer from the routing table.
654 ///
655 /// If the given address is the last address of the peer in the
656 /// routing table, the peer is removed from the routing table
657 /// and `Some` is returned with a view of the removed entry.
658 /// The same applies if the peer is currently pending insertion
659 /// into the routing table.
660 ///
661 /// If the given peer or address is not in the routing table,
662 /// this is a no-op.
663 pub fn remove_address(
664 &mut self,
665 peer: &PeerId,
666 address: &Multiaddr,
667 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
668 let address = &address.to_owned().with_p2p(*peer).ok()?;
669 let key = kbucket::Key::from(*peer);
670 match self.kbuckets.entry(&key)? {
671 kbucket::Entry::Present(mut entry, _) => {
672 if entry.value().remove(address).is_err() {
673 Some(entry.remove()) // it is the last address, thus remove the peer.
674 } else {
675 None
676 }
677 }
678 kbucket::Entry::Pending(mut entry, _) => {
679 if entry.value().remove(address).is_err() {
680 Some(entry.remove()) // it is the last address, thus remove the peer.
681 } else {
682 None
683 }
684 }
685 kbucket::Entry::Absent(..) => None,
686 }
687 }
688
689 /// Removes a peer from the routing table.
690 ///
691 /// Returns `None` if the peer was not in the routing table,
692 /// not even pending insertion.
693 pub fn remove_peer(
694 &mut self,
695 peer: &PeerId,
696 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
697 let key = kbucket::Key::from(*peer);
698 match self.kbuckets.entry(&key)? {
699 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
700 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
701 kbucket::Entry::Absent(..) => None,
702 }
703 }
704
705 /// Returns an iterator over all non-empty buckets in the routing table.
706 pub fn kbuckets(
707 &mut self,
708 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
709 self.kbuckets.iter().filter(|b| !b.is_empty())
710 }
711
712 /// Returns the k-bucket for the distance to the given key.
713 ///
714 /// Returns `None` if the given key refers to the local key.
715 pub fn kbucket<K>(
716 &mut self,
717 key: K,
718 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
719 where
720 K: Into<kbucket::Key<K>> + Clone,
721 {
722 self.kbuckets.bucket(&key.into())
723 }
724
725 /// Initiates an iterative query for the closest peers to the given key.
726 ///
727 /// The result of the query is delivered in a
728 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
729 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
730 where
731 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
732 {
733 self.get_closest_peers_inner(key, K_VALUE)
734 }
735
736 /// Initiates an iterative query for the closest peers to the given key.
737 /// The expected responding peers is specified by `num_results`
738 /// Note that the result is capped after exceeds K_VALUE
739 ///
740 /// The result of the query is delivered in a
741 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
742 pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
743 where
744 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
745 {
746 // The inner code never expect higher than K_VALUE results to be returned.
747 // And removing such cap will be tricky,
748 // since it would involve forging a new key and additional requests.
749 // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
750 let capped_num_results = std::cmp::min(num_results, K_VALUE);
751 self.get_closest_peers_inner(key, capped_num_results)
752 }
753
754 fn get_closest_peers_inner<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
755 where
756 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
757 {
758 let target: kbucket::Key<K> = key.clone().into();
759 let key: Vec<u8> = key.into();
760 let info = QueryInfo::GetClosestPeers {
761 key,
762 step: ProgressStep::first(),
763 num_results,
764 };
765 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
766 self.queries.add_iter_closest(target, peer_keys, info)
767 }
768
769 /// Returns all peers ordered by distance to the given key; takes peers from local routing table
770 /// only.
771 pub fn get_closest_local_peers<'a, K: Clone>(
772 &'a mut self,
773 key: &'a kbucket::Key<K>,
774 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
775 self.kbuckets.closest_keys(key)
776 }
777
778 /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
779 /// that the `source` peer is never included in the result.
780 ///
781 /// Takes peers from local routing table only. Only returns number of peers equal to configured
782 /// replication factor.
783 pub fn find_closest_local_peers<'a, K: Clone>(
784 &'a mut self,
785 key: &'a kbucket::Key<K>,
786 source: &'a PeerId,
787 ) -> impl Iterator<Item = KadPeer> + 'a {
788 self.kbuckets
789 .closest(key)
790 .filter(move |e| e.node.key.preimage() != source)
791 .take(K_VALUE.get())
792 .map(KadPeer::from)
793 }
794
795 /// Performs a lookup for a record in the DHT.
796 ///
797 /// The result of this operation is delivered in a
798 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
799 pub fn get_record(&mut self, key: record::Key) -> QueryId {
800 let record = if let Some(record) = self.store.get(&key) {
801 if record.is_expired(Instant::now()) {
802 self.store.remove(&key);
803 None
804 } else {
805 Some(PeerRecord {
806 peer: None,
807 record: record.into_owned(),
808 })
809 }
810 } else {
811 None
812 };
813
814 let step = ProgressStep::first();
815
816 let target = kbucket::Key::new(key.clone());
817 let info = if record.is_some() {
818 QueryInfo::GetRecord {
819 key,
820 step: step.next(),
821 found_a_record: true,
822 cache_candidates: BTreeMap::new(),
823 }
824 } else {
825 QueryInfo::GetRecord {
826 key,
827 step: step.clone(),
828 found_a_record: false,
829 cache_candidates: BTreeMap::new(),
830 }
831 };
832 let peers = self.kbuckets.closest_keys(&target);
833 let id = self.queries.add_iter_closest(target.clone(), peers, info);
834
835 // No queries were actually done for the results yet.
836 let stats = QueryStats::empty();
837
838 if let Some(record) = record {
839 self.queued_events
840 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
841 id,
842 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
843 step,
844 stats,
845 }));
846 }
847
848 id
849 }
850
851 /// Stores a record in the DHT, locally as well as at the nodes
852 /// closest to the key as per the xor distance metric.
853 ///
854 /// Returns `Ok` if a record has been stored locally, providing the
855 /// `QueryId` of the initial query that replicates the record in the DHT.
856 /// The result of the query is eventually reported as a
857 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
858 ///
859 /// The record is always stored locally with the given expiration. If the record's
860 /// expiration is `None`, the common case, it does not expire in local storage
861 /// but is still replicated with the configured record TTL. To remove the record
862 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
863 ///
864 /// After the initial publication of the record, it is subject to (re-)replication
865 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
866 /// does not update the record's expiration in local storage, thus a given record
867 /// with an explicit expiration will always expire at that instant and until then
868 /// is subject to regular (re-)replication and (re-)publication.
869 pub fn put_record(
870 &mut self,
871 mut record: Record,
872 quorum: Quorum,
873 ) -> Result<QueryId, store::Error> {
874 record.publisher = Some(*self.kbuckets.local_key().preimage());
875 self.store.put(record.clone())?;
876 record.expires = record
877 .expires
878 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
879 let quorum = quorum.eval(self.queries.config().replication_factor);
880 let target = kbucket::Key::new(record.key.clone());
881 let peers = self.kbuckets.closest_keys(&target);
882 let context = PutRecordContext::Publish;
883 let info = QueryInfo::PutRecord {
884 context,
885 record,
886 quorum,
887 phase: PutRecordPhase::GetClosestPeers,
888 };
889 Ok(self.queries.add_iter_closest(target.clone(), peers, info))
890 }
891
892 /// Stores a record at specific peers, without storing it locally.
893 ///
894 /// The given [`Quorum`] is understood in the context of the total
895 /// number of distinct peers given.
896 ///
897 /// If the record's expiration is `None`, the configured record TTL is used.
898 ///
899 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
900 /// > used to selectively update or store a record to specific peers
901 /// > for the purpose of e.g. making sure these peers have the latest
902 /// > "version" of a record or to "cache" a record at further peers
903 /// > to increase the lookup success rate on the DHT for other peers.
904 /// >
905 /// > In particular, there is no automatic storing of records performed, and this
906 /// > method must be used to ensure the standard Kademlia
907 /// > procedure of "caching" (i.e. storing) a found record at the closest
908 /// > node to the key that _did not_ return it.
909 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
910 where
911 I: ExactSizeIterator<Item = PeerId>,
912 {
913 let quorum = if peers.len() > 0 {
914 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
915 } else {
916 // If no peers are given, we just let the query fail immediately
917 // due to the fact that the quorum must be at least one, instead of
918 // introducing a new kind of error.
919 NonZeroUsize::new(1).expect("1 > 0")
920 };
921 record.expires = record
922 .expires
923 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
924 let context = PutRecordContext::Custom;
925 let info = QueryInfo::PutRecord {
926 context,
927 record,
928 quorum,
929 phase: PutRecordPhase::PutRecord {
930 success: Vec::new(),
931 get_closest_peers_stats: QueryStats::empty(),
932 },
933 };
934 self.queries.add_fixed(peers, info)
935 }
936
937 /// Removes the record with the given key from _local_ storage,
938 /// if the local node is the publisher of the record.
939 ///
940 /// Has no effect if a record for the given key is stored locally but
941 /// the local node is not a publisher of the record.
942 ///
943 /// This is a _local_ operation. However, it also has the effect that
944 /// the record will no longer be periodically re-published, allowing the
945 /// record to eventually expire throughout the DHT.
946 pub fn remove_record(&mut self, key: &record::Key) {
947 if let Some(r) = self.store.get(key) {
948 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
949 self.store.remove(key)
950 }
951 }
952 }
953
954 /// Gets a mutable reference to the record store.
955 pub fn store_mut(&mut self) -> &mut TStore {
956 &mut self.store
957 }
958
959 /// Bootstraps the local node to join the DHT.
960 ///
961 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
962 /// own ID in the DHT. This introduces the local node to the other nodes
963 /// in the DHT and populates its routing table with the closest neighbours.
964 ///
965 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
966 /// refreshed by initiating an additional bootstrapping query for each such
967 /// bucket with random keys.
968 ///
969 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
970 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
971 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
972 /// with one such event per bootstrapping query.
973 ///
974 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
975 ///
976 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
977 /// > See [`Behaviour::add_address`].
978 ///
979 /// > **Note**: Bootstrap does not require to be called manually. It is periodically
980 /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
981 /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
982 /// > invoked
983 /// > when a new peer is inserted in the routing table.
984 /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
985 /// > to ensure a healthy routing table.
986 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
987 let local_key = *self.kbuckets.local_key();
988 let info = QueryInfo::Bootstrap {
989 peer: *local_key.preimage(),
990 remaining: None,
991 step: ProgressStep::first(),
992 };
993 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
994 if peers.is_empty() {
995 self.bootstrap_status.reset_timers();
996 Err(NoKnownPeers())
997 } else {
998 self.bootstrap_status.on_started();
999 Ok(self.queries.add_iter_closest(local_key, peers, info))
1000 }
1001 }
1002
1003 /// Establishes the local node as a provider of a value for the given key.
1004 ///
1005 /// This operation publishes a provider record with the given key and
1006 /// identity of the local node to the peers closest to the key, thus establishing
1007 /// the local node as a provider.
1008 ///
1009 /// Returns `Ok` if a provider record has been stored locally, providing the
1010 /// `QueryId` of the initial query that announces the local node as a provider.
1011 ///
1012 /// The publication of the provider records is periodically repeated as per the
1013 /// configured interval, to renew the expiry and account for changes to the DHT
1014 /// topology. A provider record may be removed from local storage and
1015 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1016 ///
1017 /// In contrast to the standard Kademlia push-based model for content distribution
1018 /// implemented by [`Behaviour::put_record`], the provider API implements a
1019 /// pull-based model that may be used in addition or as an alternative.
1020 /// The means by which the actual value is obtained from a provider is out of scope
1021 /// of the libp2p Kademlia provider API.
1022 ///
1023 /// The results of the (repeated) provider announcements sent by this node are
1024 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1025 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1026 // Note: We store our own provider records locally without local addresses
1027 // to avoid redundant storage and outdated addresses. Instead these are
1028 // acquired on demand when returning a `ProviderRecord` for the local node.
1029 let local_addrs = Vec::new();
1030 let record = ProviderRecord::new(
1031 key.clone(),
1032 *self.kbuckets.local_key().preimage(),
1033 local_addrs,
1034 );
1035 self.store.add_provider(record)?;
1036 let target = kbucket::Key::new(key.clone());
1037 let peers = self.kbuckets.closest_keys(&target);
1038 let context = AddProviderContext::Publish;
1039 let info = QueryInfo::AddProvider {
1040 context,
1041 key,
1042 phase: AddProviderPhase::GetClosestPeers,
1043 };
1044 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1045 Ok(id)
1046 }
1047
1048 /// Stops the local node from announcing that it is a provider for the given key.
1049 ///
1050 /// This is a local operation. The local node will still be considered as a
1051 /// provider for the key by other nodes until these provider records expire.
1052 pub fn stop_providing(&mut self, key: &record::Key) {
1053 self.store
1054 .remove_provider(key, self.kbuckets.local_key().preimage());
1055 }
1056
1057 /// Performs a lookup for providers of a value to the given key.
1058 ///
1059 /// The result of this operation is delivered in a
1060 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1061 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1062 let providers: HashSet<_> = self
1063 .store
1064 .providers(&key)
1065 .into_iter()
1066 .filter(|p| {
1067 if p.is_expired(Instant::now()) {
1068 self.store.remove_provider(&key, &p.provider);
1069 false
1070 } else {
1071 true
1072 }
1073 })
1074 .map(|p| p.provider)
1075 .collect();
1076
1077 let step = ProgressStep::first();
1078
1079 let info = QueryInfo::GetProviders {
1080 key: key.clone(),
1081 providers_found: providers.len(),
1082 step: if providers.is_empty() {
1083 step.clone()
1084 } else {
1085 step.next()
1086 },
1087 };
1088
1089 let target = kbucket::Key::new(key.clone());
1090 let peers = self.kbuckets.closest_keys(&target);
1091 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1092
1093 // No queries were actually done for the results yet.
1094 let stats = QueryStats::empty();
1095
1096 if !providers.is_empty() {
1097 self.queued_events
1098 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1099 id,
1100 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1101 key,
1102 providers,
1103 })),
1104 step,
1105 stats,
1106 }));
1107 }
1108 id
1109 }
1110
1111 /// Set the [`Mode`] in which we should operate.
1112 ///
1113 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1114 /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1115 ///
1116 /// Setting a mode via this function disables this automatic behaviour and unconditionally
1117 /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1118 /// instead.
1119 pub fn set_mode(&mut self, mode: Option<Mode>) {
1120 match mode {
1121 Some(mode) => {
1122 self.mode = mode;
1123 self.auto_mode = false;
1124 self.reconfigure_mode();
1125 }
1126 None => {
1127 self.auto_mode = true;
1128 self.determine_mode_from_external_addresses();
1129 }
1130 }
1131
1132 if let Some(waker) = self.no_events_waker.take() {
1133 waker.wake();
1134 }
1135 }
1136
1137 /// Get the [`Mode`] in which the DHT is currently operating.
1138 pub fn mode(&self) -> Mode {
1139 self.mode
1140 }
1141
1142 fn reconfigure_mode(&mut self) {
1143 if self.connections.is_empty() {
1144 return;
1145 }
1146
1147 let num_connections = self.connections.len();
1148
1149 tracing::debug!(
1150 "Re-configuring {} established connection{}",
1151 num_connections,
1152 if num_connections > 1 { "s" } else { "" }
1153 );
1154
1155 self.queued_events
1156 .extend(
1157 self.connections
1158 .iter()
1159 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1160 peer_id: *peer_id,
1161 handler: NotifyHandler::One(*conn_id),
1162 event: HandlerIn::ReconfigureMode {
1163 new_mode: self.mode,
1164 },
1165 }),
1166 );
1167 }
1168
1169 fn determine_mode_from_external_addresses(&mut self) {
1170 let old_mode = self.mode;
1171
1172 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1173 ([], Mode::Server) => {
1174 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1175
1176 Mode::Client
1177 }
1178 ([], Mode::Client) => {
1179 // Previously client-mode, now also client-mode because no external addresses.
1180
1181 Mode::Client
1182 }
1183 (confirmed_external_addresses, Mode::Client) => {
1184 if tracing::enabled!(Level::DEBUG) {
1185 let confirmed_external_addresses =
1186 to_comma_separated_list(confirmed_external_addresses);
1187
1188 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1189 }
1190
1191 Mode::Server
1192 }
1193 (confirmed_external_addresses, Mode::Server) => {
1194 debug_assert!(
1195 !confirmed_external_addresses.is_empty(),
1196 "Previous match arm handled empty list"
1197 );
1198
1199 // Previously, server-mode, now also server-mode because > 1 external address.
1200 // Don't log anything to avoid spam.
1201 Mode::Server
1202 }
1203 };
1204
1205 self.reconfigure_mode();
1206
1207 if old_mode != self.mode {
1208 self.queued_events
1209 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1210 new_mode: self.mode,
1211 }));
1212 }
1213 }
1214
1215 /// Processes discovered peers from a successful request in an iterative `Query`.
1216 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1217 where
1218 I: Iterator<Item = &'a KadPeer> + Clone,
1219 {
1220 let local_id = self.kbuckets.local_key().preimage();
1221 let others_iter = peers.filter(|p| &p.node_id != local_id);
1222 if let Some(query) = self.queries.get_mut(query_id) {
1223 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1224 for peer in others_iter.clone() {
1225 tracing::trace!(
1226 ?peer,
1227 %source,
1228 query=?query_id,
1229 "Peer reported by source in query"
1230 );
1231 let addrs = peer.multiaddrs.iter().cloned().collect();
1232 query.peers.addresses.insert(peer.node_id, addrs);
1233 }
1234 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1235 }
1236 }
1237
1238 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1239 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1240 self.store
1241 .providers(key)
1242 .into_iter()
1243 .filter_map(|p| {
1244 if p.is_expired(Instant::now()) {
1245 self.store.remove_provider(key, &p.provider);
1246 return None;
1247 }
1248
1249 if &p.provider != source {
1250 let node_id = p.provider;
1251 let multiaddrs = p.addresses;
1252 let connection_ty = if self.connected_peers.contains(&node_id) {
1253 ConnectionType::Connected
1254 } else {
1255 ConnectionType::NotConnected
1256 };
1257 if multiaddrs.is_empty() {
1258 // The provider is either the local node and we fill in
1259 // the local addresses on demand, or it is a legacy
1260 // provider record without addresses, in which case we
1261 // try to find addresses in the routing table, as was
1262 // done before provider records were stored along with
1263 // their addresses.
1264 if &node_id == self.kbuckets.local_key().preimage() {
1265 Some(
1266 self.listen_addresses
1267 .iter()
1268 .chain(self.external_addresses.iter())
1269 .cloned()
1270 .collect::<Vec<_>>(),
1271 )
1272 } else {
1273 let key = kbucket::Key::from(node_id);
1274 self.kbuckets
1275 .entry(&key)
1276 .as_mut()
1277 .and_then(|e| e.view())
1278 .map(|e| e.node.value.clone().into_vec())
1279 }
1280 } else {
1281 Some(multiaddrs)
1282 }
1283 .map(|multiaddrs| KadPeer {
1284 node_id,
1285 multiaddrs,
1286 connection_ty,
1287 })
1288 } else {
1289 None
1290 }
1291 })
1292 .take(self.queries.config().replication_factor.get())
1293 .collect()
1294 }
1295
1296 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1297 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1298 let info = QueryInfo::AddProvider {
1299 context,
1300 key: key.clone(),
1301 phase: AddProviderPhase::GetClosestPeers,
1302 };
1303 let target = kbucket::Key::new(key);
1304 let peers = self.kbuckets.closest_keys(&target);
1305 self.queries.add_iter_closest(target.clone(), peers, info);
1306 }
1307
1308 /// Starts an iterative `PUT_VALUE` query for the given record.
1309 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1310 let quorum = quorum.eval(self.queries.config().replication_factor);
1311 let target = kbucket::Key::new(record.key.clone());
1312 let peers = self.kbuckets.closest_keys(&target);
1313 let info = QueryInfo::PutRecord {
1314 record,
1315 quorum,
1316 context,
1317 phase: PutRecordPhase::GetClosestPeers,
1318 };
1319 self.queries.add_iter_closest(target.clone(), peers, info);
1320 }
1321
1322 /// Updates the routing table with a new connection status and address of a peer.
1323 fn connection_updated(
1324 &mut self,
1325 peer: PeerId,
1326 address: Option<Multiaddr>,
1327 new_status: NodeStatus,
1328 ) {
1329 let key = kbucket::Key::from(peer);
1330 match self.kbuckets.entry(&key) {
1331 Some(kbucket::Entry::Present(mut entry, old_status)) => {
1332 if old_status != new_status {
1333 entry.update(new_status)
1334 }
1335 if let Some(address) = address {
1336 if entry.value().insert(address) {
1337 self.queued_events.push_back(ToSwarm::GenerateEvent(
1338 Event::RoutingUpdated {
1339 peer,
1340 is_new_peer: false,
1341 addresses: entry.value().clone(),
1342 old_peer: None,
1343 bucket_range: self
1344 .kbuckets
1345 .bucket(&key)
1346 .map(|b| b.range())
1347 .expect("Not kbucket::Entry::SelfEntry."),
1348 },
1349 ))
1350 }
1351 }
1352 }
1353
1354 Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1355 if let Some(address) = address {
1356 entry.value().insert(address);
1357 }
1358 if old_status != new_status {
1359 entry.update(new_status);
1360 }
1361 }
1362
1363 Some(kbucket::Entry::Absent(entry)) => {
1364 // Only connected nodes with a known address are newly inserted.
1365 if new_status != NodeStatus::Connected {
1366 return;
1367 }
1368 match (address, self.kbucket_inserts) {
1369 (None, _) => {
1370 self.queued_events
1371 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1372 }
1373 (Some(a), BucketInserts::Manual) => {
1374 self.queued_events
1375 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1376 peer,
1377 address: a,
1378 }));
1379 }
1380 (Some(a), BucketInserts::OnConnected) => {
1381 let addresses = Addresses::new(a);
1382 match entry.insert(addresses.clone(), new_status) {
1383 kbucket::InsertResult::Inserted => {
1384 self.bootstrap_on_low_peers();
1385
1386 let event = Event::RoutingUpdated {
1387 peer,
1388 is_new_peer: true,
1389 addresses,
1390 old_peer: None,
1391 bucket_range: self
1392 .kbuckets
1393 .bucket(&key)
1394 .map(|b| b.range())
1395 .expect("Not kbucket::Entry::SelfEntry."),
1396 };
1397 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1398 }
1399 kbucket::InsertResult::Full => {
1400 tracing::debug!(
1401 %peer,
1402 "Bucket full. Peer not added to routing table"
1403 );
1404 let address = addresses.first().clone();
1405 self.queued_events.push_back(ToSwarm::GenerateEvent(
1406 Event::RoutablePeer { peer, address },
1407 ));
1408 }
1409 kbucket::InsertResult::Pending { disconnected } => {
1410 let address = addresses.first().clone();
1411 self.queued_events.push_back(ToSwarm::GenerateEvent(
1412 Event::PendingRoutablePeer { peer, address },
1413 ));
1414
1415 // `disconnected` might already be in the process of re-connecting.
1416 // In other words `disconnected` might have already re-connected but
1417 // is not yet confirmed to support the Kademlia protocol via
1418 // [`HandlerEvent::ProtocolConfirmed`].
1419 //
1420 // Only try dialing peer if not currently connected.
1421 if !self.connected_peers.contains(disconnected.preimage()) {
1422 self.queued_events.push_back(ToSwarm::Dial {
1423 opts: DialOpts::peer_id(disconnected.into_preimage())
1424 .build(),
1425 })
1426 }
1427 }
1428 }
1429 }
1430 }
1431 }
1432 _ => {}
1433 }
1434 }
1435
1436 /// A new peer has been inserted in the routing table but we check if the routing
1437 /// table is currently small (less that `K_VALUE` peers are present) and only
1438 /// trigger a bootstrap in that case
1439 fn bootstrap_on_low_peers(&mut self) {
1440 if self
1441 .kbuckets()
1442 .map(|kbucket| kbucket.num_entries())
1443 .sum::<usize>()
1444 < K_VALUE.get()
1445 {
1446 self.bootstrap_status.trigger();
1447 }
1448 }
1449
1450 /// Handles a finished (i.e. successful) query.
1451 fn query_finished(&mut self, q: Query) -> Option<Event> {
1452 let query_id = q.id();
1453 tracing::trace!(query=?query_id, "Query finished");
1454 match q.info {
1455 QueryInfo::Bootstrap {
1456 peer,
1457 remaining,
1458 mut step,
1459 } => {
1460 let local_key = *self.kbuckets.local_key();
1461 let mut remaining = remaining.unwrap_or_else(|| {
1462 debug_assert_eq!(&peer, local_key.preimage());
1463 // The lookup for the local key finished. To complete the bootstrap process,
1464 // a bucket refresh should be performed for every bucket farther away than
1465 // the first non-empty bucket (which are most likely no more than the last
1466 // few, i.e. farthest, buckets).
1467 self.kbuckets
1468 .iter()
1469 .skip_while(|b| b.is_empty())
1470 .skip(1) // Skip the bucket with the closest neighbour.
1471 .map(|b| {
1472 // Try to find a key that falls into the bucket. While such keys can
1473 // be generated fully deterministically, the current libp2p kademlia
1474 // wire protocol requires transmission of the preimages of the actual
1475 // keys in the DHT keyspace, hence for now this is just a "best effort"
1476 // to find a key that hashes into a specific bucket. The probabilities
1477 // of finding a key in the bucket `b` with as most 16 trials are as
1478 // follows:
1479 //
1480 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1481 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1482 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1483 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1484 // ...
1485 let mut target = kbucket::Key::from(PeerId::random());
1486 for _ in 0..16 {
1487 let d = local_key.distance(&target);
1488 if b.contains(&d) {
1489 break;
1490 }
1491 target = kbucket::Key::from(PeerId::random());
1492 }
1493 target
1494 })
1495 .collect::<Vec<_>>()
1496 .into_iter()
1497 });
1498
1499 let num_remaining = remaining.len() as u32;
1500
1501 if let Some(target) = remaining.next() {
1502 let info = QueryInfo::Bootstrap {
1503 peer: *target.preimage(),
1504 remaining: Some(remaining),
1505 step: step.next(),
1506 };
1507 let peers = self.kbuckets.closest_keys(&target);
1508 self.queries
1509 .continue_iter_closest(query_id, target, peers, info);
1510 } else {
1511 step.last = true;
1512 self.bootstrap_status.on_finish();
1513 };
1514
1515 Some(Event::OutboundQueryProgressed {
1516 id: query_id,
1517 stats: q.stats,
1518 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1519 peer,
1520 num_remaining,
1521 })),
1522 step,
1523 })
1524 }
1525
1526 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1527 step.last = true;
1528
1529 Some(Event::OutboundQueryProgressed {
1530 id: query_id,
1531 stats: q.stats,
1532 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1533 key,
1534 peers: q.peers.into_peerinfos_iter().collect(),
1535 })),
1536 step,
1537 })
1538 }
1539
1540 QueryInfo::GetProviders { mut step, .. } => {
1541 step.last = true;
1542
1543 Some(Event::OutboundQueryProgressed {
1544 id: query_id,
1545 stats: q.stats,
1546 result: QueryResult::GetProviders(Ok(
1547 GetProvidersOk::FinishedWithNoAdditionalRecord {
1548 closest_peers: q.peers.into_peerids_iter().collect(),
1549 },
1550 )),
1551 step,
1552 })
1553 }
1554
1555 QueryInfo::AddProvider {
1556 context,
1557 key,
1558 phase: AddProviderPhase::GetClosestPeers,
1559 } => {
1560 let provider_id = self.local_peer_id;
1561 let external_addresses = self.external_addresses.iter().cloned().collect();
1562 let info = QueryInfo::AddProvider {
1563 context,
1564 key,
1565 phase: AddProviderPhase::AddProvider {
1566 provider_id,
1567 external_addresses,
1568 get_closest_peers_stats: q.stats,
1569 },
1570 };
1571 self.queries
1572 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1573 None
1574 }
1575
1576 QueryInfo::AddProvider {
1577 context,
1578 key,
1579 phase:
1580 AddProviderPhase::AddProvider {
1581 get_closest_peers_stats,
1582 ..
1583 },
1584 } => match context {
1585 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1586 id: query_id,
1587 stats: get_closest_peers_stats.merge(q.stats),
1588 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1589 step: ProgressStep::first_and_last(),
1590 }),
1591 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1592 id: query_id,
1593 stats: get_closest_peers_stats.merge(q.stats),
1594 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1595 step: ProgressStep::first_and_last(),
1596 }),
1597 },
1598
1599 QueryInfo::GetRecord {
1600 key,
1601 mut step,
1602 found_a_record,
1603 cache_candidates,
1604 } => {
1605 step.last = true;
1606
1607 let results = if found_a_record {
1608 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1609 } else {
1610 Err(GetRecordError::NotFound {
1611 key,
1612 closest_peers: q.peers.into_peerids_iter().collect(),
1613 })
1614 };
1615 Some(Event::OutboundQueryProgressed {
1616 id: query_id,
1617 stats: q.stats,
1618 result: QueryResult::GetRecord(results),
1619 step,
1620 })
1621 }
1622
1623 QueryInfo::PutRecord {
1624 context,
1625 record,
1626 quorum,
1627 phase: PutRecordPhase::GetClosestPeers,
1628 } => {
1629 let info = QueryInfo::PutRecord {
1630 context,
1631 record,
1632 quorum,
1633 phase: PutRecordPhase::PutRecord {
1634 success: vec![],
1635 get_closest_peers_stats: q.stats,
1636 },
1637 };
1638 self.queries
1639 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1640 None
1641 }
1642
1643 QueryInfo::PutRecord {
1644 context,
1645 record,
1646 quorum,
1647 phase:
1648 PutRecordPhase::PutRecord {
1649 success,
1650 get_closest_peers_stats,
1651 },
1652 } => {
1653 let mk_result = |key: record::Key| {
1654 if success.len() >= quorum.get() {
1655 Ok(PutRecordOk { key })
1656 } else {
1657 Err(PutRecordError::QuorumFailed {
1658 key,
1659 quorum,
1660 success,
1661 })
1662 }
1663 };
1664 match context {
1665 PutRecordContext::Publish | PutRecordContext::Custom => {
1666 Some(Event::OutboundQueryProgressed {
1667 id: query_id,
1668 stats: get_closest_peers_stats.merge(q.stats),
1669 result: QueryResult::PutRecord(mk_result(record.key)),
1670 step: ProgressStep::first_and_last(),
1671 })
1672 }
1673 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1674 id: query_id,
1675 stats: get_closest_peers_stats.merge(q.stats),
1676 result: QueryResult::RepublishRecord(mk_result(record.key)),
1677 step: ProgressStep::first_and_last(),
1678 }),
1679 PutRecordContext::Replicate => {
1680 tracing::debug!(record=?record.key, "Record replicated");
1681 None
1682 }
1683 }
1684 }
1685 }
1686 }
1687
1688 /// Handles a query that timed out.
1689 fn query_timeout(&mut self, query: Query) -> Option<Event> {
1690 let query_id = query.id();
1691 tracing::trace!(query=?query_id, "Query timed out");
1692 match query.info {
1693 QueryInfo::Bootstrap {
1694 peer,
1695 mut remaining,
1696 mut step,
1697 } => {
1698 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1699
1700 // Continue with the next bootstrap query if `remaining` is not empty.
1701 if let Some((target, remaining)) =
1702 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1703 {
1704 let info = QueryInfo::Bootstrap {
1705 peer: target.into_preimage(),
1706 remaining: Some(remaining),
1707 step: step.next(),
1708 };
1709 let peers = self.kbuckets.closest_keys(&target);
1710 self.queries
1711 .continue_iter_closest(query_id, target, peers, info);
1712 } else {
1713 step.last = true;
1714 self.bootstrap_status.on_finish();
1715 }
1716
1717 Some(Event::OutboundQueryProgressed {
1718 id: query_id,
1719 stats: query.stats,
1720 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1721 peer,
1722 num_remaining,
1723 })),
1724 step,
1725 })
1726 }
1727
1728 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1729 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1730 id: query_id,
1731 stats: query.stats,
1732 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1733 step: ProgressStep::first_and_last(),
1734 },
1735 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1736 id: query_id,
1737 stats: query.stats,
1738 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1739 step: ProgressStep::first_and_last(),
1740 },
1741 }),
1742
1743 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1744 step.last = true;
1745 Some(Event::OutboundQueryProgressed {
1746 id: query_id,
1747 stats: query.stats,
1748 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1749 key,
1750 peers: query.peers.into_peerinfos_iter().collect(),
1751 })),
1752 step,
1753 })
1754 }
1755
1756 QueryInfo::PutRecord {
1757 record,
1758 quorum,
1759 context,
1760 phase,
1761 } => {
1762 let err = Err(PutRecordError::Timeout {
1763 key: record.key,
1764 quorum,
1765 success: match phase {
1766 PutRecordPhase::GetClosestPeers => vec![],
1767 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1768 },
1769 });
1770 match context {
1771 PutRecordContext::Publish | PutRecordContext::Custom => {
1772 Some(Event::OutboundQueryProgressed {
1773 id: query_id,
1774 stats: query.stats,
1775 result: QueryResult::PutRecord(err),
1776 step: ProgressStep::first_and_last(),
1777 })
1778 }
1779 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1780 id: query_id,
1781 stats: query.stats,
1782 result: QueryResult::RepublishRecord(err),
1783 step: ProgressStep::first_and_last(),
1784 }),
1785 PutRecordContext::Replicate => match phase {
1786 PutRecordPhase::GetClosestPeers => {
1787 tracing::warn!(
1788 "Locating closest peers for replication failed: {:?}",
1789 err
1790 );
1791 None
1792 }
1793 PutRecordPhase::PutRecord { .. } => {
1794 tracing::debug!("Replicating record failed: {:?}", err);
1795 None
1796 }
1797 },
1798 }
1799 }
1800
1801 QueryInfo::GetRecord { key, mut step, .. } => {
1802 step.last = true;
1803
1804 Some(Event::OutboundQueryProgressed {
1805 id: query_id,
1806 stats: query.stats,
1807 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1808 step,
1809 })
1810 }
1811
1812 QueryInfo::GetProviders { key, mut step, .. } => {
1813 step.last = true;
1814
1815 Some(Event::OutboundQueryProgressed {
1816 id: query_id,
1817 stats: query.stats,
1818 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1819 key,
1820 closest_peers: query.peers.into_peerids_iter().collect(),
1821 })),
1822 step,
1823 })
1824 }
1825 }
1826 }
1827
1828 /// Processes a record received from a peer.
1829 fn record_received(
1830 &mut self,
1831 source: PeerId,
1832 connection: ConnectionId,
1833 request_id: RequestId,
1834 mut record: Record,
1835 ) {
1836 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1837 // If the (alleged) publisher is the local node, do nothing. The record of
1838 // the original publisher should never change as a result of replication
1839 // and the publisher is always assumed to have the "right" value.
1840 self.queued_events.push_back(ToSwarm::NotifyHandler {
1841 peer_id: source,
1842 handler: NotifyHandler::One(connection),
1843 event: HandlerIn::PutRecordRes {
1844 key: record.key,
1845 value: record.value,
1846 request_id,
1847 },
1848 });
1849 return;
1850 }
1851
1852 let now = Instant::now();
1853
1854 // Calculate the expiration exponentially inversely proportional to the
1855 // number of nodes between the local node and the closest node to the key
1856 // (beyond the replication factor). This ensures avoiding over-caching
1857 // outside of the k closest nodes to a key.
1858 let target = kbucket::Key::new(record.key.clone());
1859 let num_between = self.kbuckets.count_nodes_between(&target);
1860 let k = self.queries.config().replication_factor.get();
1861 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1862 let expiration = self
1863 .record_ttl
1864 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1865 // The smaller TTL prevails. Only if neither TTL is set is the record
1866 // stored "forever".
1867 record.expires = record.expires.or(expiration).min(expiration);
1868
1869 if let Some(job) = self.put_record_job.as_mut() {
1870 // Ignore the record in the next run of the replication
1871 // job, since we can assume the sender replicated the
1872 // record to the k closest peers. Effectively, only
1873 // one of the k closest peers performs a replication
1874 // in the configured interval, assuming a shared interval.
1875 job.skip(record.key.clone())
1876 }
1877
1878 // While records received from a publisher, as well as records that do
1879 // not exist locally should always (attempted to) be stored, there is a
1880 // choice here w.r.t. the handling of replicated records whose keys refer
1881 // to records that exist locally: The value and / or the publisher may
1882 // either be overridden or left unchanged. At the moment and in the
1883 // absence of a decisive argument for another option, both are always
1884 // overridden as it avoids having to load the existing record in the
1885 // first place.
1886
1887 if !record.is_expired(now) {
1888 // The record is cloned because of the weird libp2p protocol
1889 // requirement to send back the value in the response, although this
1890 // is a waste of resources.
1891 match self.record_filtering {
1892 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1893 Ok(()) => {
1894 tracing::debug!(
1895 record=?record.key,
1896 "Record stored: {} bytes",
1897 record.value.len()
1898 );
1899 self.queued_events.push_back(ToSwarm::GenerateEvent(
1900 Event::InboundRequest {
1901 request: InboundRequest::PutRecord {
1902 source,
1903 connection,
1904 record: None,
1905 },
1906 },
1907 ));
1908 }
1909 Err(e) => {
1910 tracing::info!("Record not stored: {:?}", e);
1911 self.queued_events.push_back(ToSwarm::NotifyHandler {
1912 peer_id: source,
1913 handler: NotifyHandler::One(connection),
1914 event: HandlerIn::Reset(request_id),
1915 });
1916
1917 return;
1918 }
1919 },
1920 StoreInserts::FilterBoth => {
1921 self.queued_events
1922 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1923 request: InboundRequest::PutRecord {
1924 source,
1925 connection,
1926 record: Some(record.clone()),
1927 },
1928 }));
1929 }
1930 }
1931 }
1932
1933 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1934 // case where the record is discarded due to being expired. Given that
1935 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1936 // request, the remote perceives the local node as one node among the k
1937 // closest nodes to the target. In addition returning
1938 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1939 // information to a possibly malicious remote node.
1940 self.queued_events.push_back(ToSwarm::NotifyHandler {
1941 peer_id: source,
1942 handler: NotifyHandler::One(connection),
1943 event: HandlerIn::PutRecordRes {
1944 key: record.key,
1945 value: record.value,
1946 request_id,
1947 },
1948 })
1949 }
1950
1951 /// Processes a provider record received from a peer.
1952 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1953 if &provider.node_id != self.kbuckets.local_key().preimage() {
1954 let record = ProviderRecord {
1955 key,
1956 provider: provider.node_id,
1957 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1958 addresses: provider.multiaddrs,
1959 };
1960 match self.record_filtering {
1961 StoreInserts::Unfiltered => {
1962 if let Err(e) = self.store.add_provider(record) {
1963 tracing::info!("Provider record not stored: {:?}", e);
1964 return;
1965 }
1966
1967 self.queued_events
1968 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1969 request: InboundRequest::AddProvider { record: None },
1970 }));
1971 }
1972 StoreInserts::FilterBoth => {
1973 self.queued_events
1974 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1975 request: InboundRequest::AddProvider {
1976 record: Some(record),
1977 },
1978 }));
1979 }
1980 }
1981 }
1982 }
1983
1984 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1985 let key = kbucket::Key::from(peer_id);
1986
1987 if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1988 // TODO: Ideally, the address should only be removed if the error can
1989 // be classified as "permanent" but since `err` is currently a borrowed
1990 // trait object without a `'static` bound, even downcasting for inspection
1991 // of the error is not possible (and also not truly desirable or ergonomic).
1992 // The error passed in should rather be a dedicated enum.
1993 if addrs.remove(address).is_ok() {
1994 tracing::debug!(
1995 peer=%peer_id,
1996 %address,
1997 "Address removed from peer due to error."
1998 );
1999 } else {
2000 // Despite apparently having no reachable address (any longer),
2001 // the peer is kept in the routing table with the last address to avoid
2002 // (temporary) loss of network connectivity to "flush" the routing
2003 // table. Once in, a peer is only removed from the routing table
2004 // if it is the least recently connected peer, currently disconnected
2005 // and is unreachable in the context of another peer pending insertion
2006 // into the same bucket. This is handled transparently by the
2007 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2008 // within `Behaviour::poll`.
2009 tracing::debug!(
2010 peer=%peer_id,
2011 %address,
2012 "Last remaining address of peer is unreachable."
2013 );
2014 }
2015 }
2016
2017 for query in self.queries.iter_mut() {
2018 if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2019 addrs.retain(|a| a != address);
2020 }
2021 }
2022 }
2023
2024 fn on_connection_established(
2025 &mut self,
2026 ConnectionEstablished {
2027 peer_id,
2028 failed_addresses,
2029 other_established,
2030 ..
2031 }: ConnectionEstablished,
2032 ) {
2033 for addr in failed_addresses {
2034 self.address_failed(peer_id, addr);
2035 }
2036
2037 // Peer's first connection.
2038 if other_established == 0 {
2039 self.connected_peers.insert(peer_id);
2040 }
2041 }
2042
2043 fn on_address_change(
2044 &mut self,
2045 AddressChange {
2046 peer_id: peer,
2047 old,
2048 new,
2049 ..
2050 }: AddressChange,
2051 ) {
2052 let (old, new) = (old.get_remote_address(), new.get_remote_address());
2053
2054 // Update routing table.
2055 if let Some(addrs) = self
2056 .kbuckets
2057 .entry(&kbucket::Key::from(peer))
2058 .as_mut()
2059 .and_then(|e| e.value())
2060 {
2061 if addrs.replace(old, new) {
2062 tracing::debug!(
2063 %peer,
2064 old_address=%old,
2065 new_address=%new,
2066 "Old address replaced with new address for peer."
2067 );
2068 } else {
2069 tracing::debug!(
2070 %peer,
2071 old_address=%old,
2072 new_address=%new,
2073 "Old address not replaced with new address for peer as old address wasn't present.",
2074 );
2075 }
2076 } else {
2077 tracing::debug!(
2078 %peer,
2079 old_address=%old,
2080 new_address=%new,
2081 "Old address not replaced with new address for peer as peer is not present in the \
2082 routing table."
2083 );
2084 }
2085
2086 // Update query address cache.
2087 //
2088 // Given two connected nodes: local node A and remote node B. Say node B
2089 // is not in node A's routing table. Additionally node B is part of the
2090 // `Query::addresses` list of an ongoing query on node A. Say Node
2091 // B triggers an address change and then disconnects. Later on the
2092 // earlier mentioned query on node A would like to connect to node B.
2093 // Without replacing the address in the `Query::addresses` set node
2094 // A would attempt to dial the old and not the new address.
2095 //
2096 // While upholding correctness, iterating through all discovered
2097 // addresses of a peer in all currently ongoing queries might have a
2098 // large performance impact. If so, the code below might be worth
2099 // revisiting.
2100 for query in self.queries.iter_mut() {
2101 if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2102 for addr in addrs.iter_mut() {
2103 if addr == old {
2104 *addr = new.clone();
2105 }
2106 }
2107 }
2108 }
2109 }
2110
2111 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2112 let Some(peer_id) = peer_id else { return };
2113
2114 match error {
2115 DialError::LocalPeerId { .. }
2116 | DialError::WrongPeerId { .. }
2117 | DialError::Aborted
2118 | DialError::Denied { .. }
2119 | DialError::Transport(_)
2120 | DialError::NoAddresses => {
2121 if let DialError::Transport(addresses) = error {
2122 for (addr, _) in addresses {
2123 self.address_failed(peer_id, addr)
2124 }
2125 }
2126
2127 for query in self.queries.iter_mut() {
2128 query.on_failure(&peer_id);
2129 }
2130 }
2131 DialError::DialPeerConditionFalse(
2132 dial_opts::PeerCondition::Disconnected
2133 | dial_opts::PeerCondition::NotDialing
2134 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2135 ) => {
2136 // We might (still) be connected, or about to be connected, thus do not report the
2137 // failure to the queries.
2138 }
2139 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2140 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2141 }
2142 }
2143 }
2144
2145 fn on_connection_closed(
2146 &mut self,
2147 ConnectionClosed {
2148 peer_id,
2149 remaining_established,
2150 connection_id,
2151 ..
2152 }: ConnectionClosed,
2153 ) {
2154 self.connections.remove(&connection_id);
2155
2156 if remaining_established == 0 {
2157 for query in self.queries.iter_mut() {
2158 query.on_failure(&peer_id);
2159 }
2160 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2161 self.connected_peers.remove(&peer_id);
2162 }
2163 }
2164
2165 /// Preloads a new [`Handler`] with requests that are waiting
2166 /// to be sent to the newly connected peer.
2167 fn preload_new_handler(
2168 &mut self,
2169 handler: &mut Handler,
2170 connection_id: ConnectionId,
2171 peer: PeerId,
2172 ) {
2173 self.connections.insert(connection_id, peer);
2174 // Queue events for sending pending RPCs to the connected peer.
2175 // There can be only one pending RPC for a particular peer and query per definition.
2176 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2177 q.pending_rpcs
2178 .iter()
2179 .position(|(p, _)| p == &peer)
2180 .map(|p| q.pending_rpcs.remove(p))
2181 }) {
2182 handler.on_behaviour_event(event)
2183 }
2184 }
2185}
2186
2187/// Exponentially decrease the given duration (base 2).
2188fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2189 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2190}
2191
2192impl<TStore> NetworkBehaviour for Behaviour<TStore>
2193where
2194 TStore: RecordStore + Send + 'static,
2195{
2196 type ConnectionHandler = Handler;
2197 type ToSwarm = Event;
2198
2199 fn handle_established_inbound_connection(
2200 &mut self,
2201 connection_id: ConnectionId,
2202 peer: PeerId,
2203 local_addr: &Multiaddr,
2204 remote_addr: &Multiaddr,
2205 ) -> Result<THandler<Self>, ConnectionDenied> {
2206 let connected_point = ConnectedPoint::Listener {
2207 local_addr: local_addr.clone(),
2208 send_back_addr: remote_addr.clone(),
2209 };
2210
2211 let mut handler = Handler::new(
2212 self.protocol_config.clone(),
2213 connected_point,
2214 peer,
2215 self.mode,
2216 );
2217 self.preload_new_handler(&mut handler, connection_id, peer);
2218
2219 Ok(handler)
2220 }
2221
2222 fn handle_established_outbound_connection(
2223 &mut self,
2224 connection_id: ConnectionId,
2225 peer: PeerId,
2226 addr: &Multiaddr,
2227 role_override: Endpoint,
2228 port_use: PortUse,
2229 ) -> Result<THandler<Self>, ConnectionDenied> {
2230 let connected_point = ConnectedPoint::Dialer {
2231 address: addr.clone(),
2232 role_override,
2233 port_use,
2234 };
2235
2236 let mut handler = Handler::new(
2237 self.protocol_config.clone(),
2238 connected_point,
2239 peer,
2240 self.mode,
2241 );
2242 self.preload_new_handler(&mut handler, connection_id, peer);
2243
2244 Ok(handler)
2245 }
2246
2247 fn handle_pending_outbound_connection(
2248 &mut self,
2249 _connection_id: ConnectionId,
2250 maybe_peer: Option<PeerId>,
2251 _addresses: &[Multiaddr],
2252 _effective_role: Endpoint,
2253 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2254 let peer_id = match maybe_peer {
2255 None => return Ok(vec![]),
2256 Some(peer) => peer,
2257 };
2258
2259 // We should order addresses from decreasing likelihood of connectivity, so start with
2260 // the addresses of that peer in the k-buckets.
2261 let key = kbucket::Key::from(peer_id);
2262 let mut peer_addrs =
2263 if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2264 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2265 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2266 addrs
2267 } else {
2268 Vec::new()
2269 };
2270
2271 // We add to that a temporary list of addresses from the ongoing queries.
2272 for query in self.queries.iter() {
2273 if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2274 peer_addrs.extend(addrs.iter().cloned())
2275 }
2276 }
2277
2278 Ok(peer_addrs)
2279 }
2280
2281 fn on_connection_handler_event(
2282 &mut self,
2283 source: PeerId,
2284 connection: ConnectionId,
2285 event: THandlerOutEvent<Self>,
2286 ) {
2287 match event {
2288 HandlerEvent::ProtocolConfirmed { endpoint } => {
2289 debug_assert!(self.connected_peers.contains(&source));
2290 // The remote's address can only be put into the routing table,
2291 // and thus shared with other nodes, if the local node is the dialer,
2292 // since the remote address on an inbound connection may be specific
2293 // to that connection (e.g. typically the TCP port numbers).
2294 let address = match endpoint {
2295 ConnectedPoint::Dialer { address, .. } => Some(address),
2296 ConnectedPoint::Listener { .. } => None,
2297 };
2298
2299 self.connection_updated(source, address, NodeStatus::Connected);
2300 }
2301
2302 HandlerEvent::ProtocolNotSupported { endpoint } => {
2303 let address = match endpoint {
2304 ConnectedPoint::Dialer { address, .. } => Some(address),
2305 ConnectedPoint::Listener { .. } => None,
2306 };
2307 self.connection_updated(source, address, NodeStatus::Disconnected);
2308 }
2309
2310 HandlerEvent::FindNodeReq { key, request_id } => {
2311 let closer_peers = self
2312 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2313 .collect::<Vec<_>>();
2314
2315 self.queued_events
2316 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2317 request: InboundRequest::FindNode {
2318 num_closer_peers: closer_peers.len(),
2319 },
2320 }));
2321
2322 self.queued_events.push_back(ToSwarm::NotifyHandler {
2323 peer_id: source,
2324 handler: NotifyHandler::One(connection),
2325 event: HandlerIn::FindNodeRes {
2326 closer_peers,
2327 request_id,
2328 },
2329 });
2330 }
2331
2332 HandlerEvent::FindNodeRes {
2333 closer_peers,
2334 query_id,
2335 } => {
2336 self.discovered(&query_id, &source, closer_peers.iter());
2337 }
2338
2339 HandlerEvent::GetProvidersReq { key, request_id } => {
2340 let provider_peers = self.provider_peers(&key, &source);
2341 let closer_peers = self
2342 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2343 .collect::<Vec<_>>();
2344
2345 self.queued_events
2346 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2347 request: InboundRequest::GetProvider {
2348 num_closer_peers: closer_peers.len(),
2349 num_provider_peers: provider_peers.len(),
2350 },
2351 }));
2352
2353 self.queued_events.push_back(ToSwarm::NotifyHandler {
2354 peer_id: source,
2355 handler: NotifyHandler::One(connection),
2356 event: HandlerIn::GetProvidersRes {
2357 closer_peers,
2358 provider_peers,
2359 request_id,
2360 },
2361 });
2362 }
2363
2364 HandlerEvent::GetProvidersRes {
2365 closer_peers,
2366 provider_peers,
2367 query_id,
2368 } => {
2369 let peers = closer_peers.iter().chain(provider_peers.iter());
2370 self.discovered(&query_id, &source, peers);
2371 if let Some(query) = self.queries.get_mut(&query_id) {
2372 let stats = query.stats().clone();
2373 if let QueryInfo::GetProviders {
2374 ref key,
2375 ref mut providers_found,
2376 ref mut step,
2377 ..
2378 } = query.info
2379 {
2380 *providers_found += provider_peers.len();
2381 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2382
2383 self.queued_events.push_back(ToSwarm::GenerateEvent(
2384 Event::OutboundQueryProgressed {
2385 id: query_id,
2386 result: QueryResult::GetProviders(Ok(
2387 GetProvidersOk::FoundProviders {
2388 key: key.clone(),
2389 providers,
2390 },
2391 )),
2392 step: step.clone(),
2393 stats,
2394 },
2395 ));
2396 *step = step.next();
2397 }
2398 }
2399 }
2400 HandlerEvent::QueryError { query_id, error } => {
2401 tracing::debug!(
2402 peer=%source,
2403 query=?query_id,
2404 "Request to peer in query failed with {:?}",
2405 error
2406 );
2407 // If the query to which the error relates is still active,
2408 // signal the failure w.r.t. `source`.
2409 if let Some(query) = self.queries.get_mut(&query_id) {
2410 query.on_failure(&source)
2411 }
2412 }
2413
2414 HandlerEvent::AddProvider { key, provider } => {
2415 // Only accept a provider record from a legitimate peer.
2416 if provider.node_id != source {
2417 return;
2418 }
2419
2420 self.provider_received(key, provider);
2421 }
2422
2423 HandlerEvent::GetRecord { key, request_id } => {
2424 // Lookup the record locally.
2425 let record = match self.store.get(&key) {
2426 Some(record) => {
2427 if record.is_expired(Instant::now()) {
2428 self.store.remove(&key);
2429 None
2430 } else {
2431 Some(record.into_owned())
2432 }
2433 }
2434 None => None,
2435 };
2436
2437 let closer_peers = self
2438 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2439 .collect::<Vec<_>>();
2440
2441 self.queued_events
2442 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2443 request: InboundRequest::GetRecord {
2444 num_closer_peers: closer_peers.len(),
2445 present_locally: record.is_some(),
2446 },
2447 }));
2448
2449 self.queued_events.push_back(ToSwarm::NotifyHandler {
2450 peer_id: source,
2451 handler: NotifyHandler::One(connection),
2452 event: HandlerIn::GetRecordRes {
2453 record,
2454 closer_peers,
2455 request_id,
2456 },
2457 });
2458 }
2459
2460 HandlerEvent::GetRecordRes {
2461 record,
2462 closer_peers,
2463 query_id,
2464 } => {
2465 if let Some(query) = self.queries.get_mut(&query_id) {
2466 let stats = query.stats().clone();
2467 if let QueryInfo::GetRecord {
2468 key,
2469 ref mut step,
2470 ref mut found_a_record,
2471 cache_candidates,
2472 } = &mut query.info
2473 {
2474 if let Some(record) = record {
2475 *found_a_record = true;
2476 let record = PeerRecord {
2477 peer: Some(source),
2478 record,
2479 };
2480
2481 self.queued_events.push_back(ToSwarm::GenerateEvent(
2482 Event::OutboundQueryProgressed {
2483 id: query_id,
2484 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2485 record,
2486 ))),
2487 step: step.clone(),
2488 stats,
2489 },
2490 ));
2491
2492 *step = step.next();
2493 } else {
2494 tracing::trace!(record=?key, %source, "Record not found at source");
2495 if let Caching::Enabled { max_peers } = self.caching {
2496 let source_key = kbucket::Key::from(source);
2497 let target_key = kbucket::Key::from(key.clone());
2498 let distance = source_key.distance(&target_key);
2499 cache_candidates.insert(distance, source);
2500 if cache_candidates.len() > max_peers as usize {
2501 cache_candidates.pop_last();
2502 }
2503 }
2504 }
2505 }
2506 }
2507
2508 self.discovered(&query_id, &source, closer_peers.iter());
2509 }
2510
2511 HandlerEvent::PutRecord { record, request_id } => {
2512 self.record_received(source, connection, request_id, record);
2513 }
2514
2515 HandlerEvent::PutRecordRes { query_id, .. } => {
2516 if let Some(query) = self.queries.get_mut(&query_id) {
2517 query.on_success(&source, vec![]);
2518 if let QueryInfo::PutRecord {
2519 phase: PutRecordPhase::PutRecord { success, .. },
2520 quorum,
2521 ..
2522 } = &mut query.info
2523 {
2524 success.push(source);
2525
2526 let quorum = quorum.get();
2527 if success.len() >= quorum {
2528 let peers = success.clone();
2529 let finished = query.try_finish(peers.iter());
2530 if !finished {
2531 tracing::debug!(
2532 peer=%source,
2533 query=?query_id,
2534 "PutRecord query reached quorum ({}/{}) with response \
2535 from peer but could not yet finish.",
2536 peers.len(),
2537 quorum,
2538 );
2539 }
2540 }
2541 }
2542 }
2543 }
2544 };
2545 }
2546
2547 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2548 fn poll(
2549 &mut self,
2550 cx: &mut Context<'_>,
2551 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2552 let now = Instant::now();
2553
2554 // Calculate the available capacity for queries triggered by background jobs.
2555 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2556
2557 // Run the periodic provider announcement job.
2558 if let Some(mut job) = self.add_provider_job.take() {
2559 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2560 for i in 0..num {
2561 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2562 self.start_add_provider(r.key, AddProviderContext::Republish)
2563 } else {
2564 jobs_query_capacity -= i;
2565 break;
2566 }
2567 }
2568 self.add_provider_job = Some(job);
2569 }
2570
2571 // Run the periodic record replication / publication job.
2572 if let Some(mut job) = self.put_record_job.take() {
2573 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2574 for _ in 0..num {
2575 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2576 let context =
2577 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2578 PutRecordContext::Republish
2579 } else {
2580 PutRecordContext::Replicate
2581 };
2582 self.start_put_record(r, Quorum::All, context)
2583 } else {
2584 break;
2585 }
2586 }
2587 self.put_record_job = Some(job);
2588 }
2589
2590 // Poll bootstrap periodically and automatically.
2591 if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2592 if let Err(e) = self.bootstrap() {
2593 tracing::warn!("Failed to trigger bootstrap: {e}");
2594 }
2595 }
2596
2597 loop {
2598 // Drain queued events first.
2599 if let Some(event) = self.queued_events.pop_front() {
2600 return Poll::Ready(event);
2601 }
2602
2603 // Drain applied pending entries from the routing table.
2604 if let Some(entry) = self.kbuckets.take_applied_pending() {
2605 let kbucket::Node { key, value } = entry.inserted;
2606 let peer_id = key.into_preimage();
2607 self.queued_events
2608 .push_back(ToSwarm::NewExternalAddrOfPeer {
2609 peer_id,
2610 address: value.first().clone(),
2611 });
2612 let event = Event::RoutingUpdated {
2613 bucket_range: self
2614 .kbuckets
2615 .bucket(&key)
2616 .map(|b| b.range())
2617 .expect("Self to never be applied from pending."),
2618 peer: peer_id,
2619 is_new_peer: true,
2620 addresses: value,
2621 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2622 };
2623 return Poll::Ready(ToSwarm::GenerateEvent(event));
2624 }
2625
2626 // Look for a finished query.
2627 loop {
2628 match self.queries.poll(now) {
2629 QueryPoolState::Finished(q) => {
2630 if let Some(event) = self.query_finished(q) {
2631 return Poll::Ready(ToSwarm::GenerateEvent(event));
2632 }
2633 }
2634 QueryPoolState::Timeout(q) => {
2635 if let Some(event) = self.query_timeout(q) {
2636 return Poll::Ready(ToSwarm::GenerateEvent(event));
2637 }
2638 }
2639 QueryPoolState::Waiting(Some((query, peer_id))) => {
2640 let event = query.info.to_request(query.id());
2641 // TODO: AddProvider requests yield no response, so the query completes
2642 // as soon as all requests have been sent. However, the handler should
2643 // better emit an event when the request has been sent (and report
2644 // an error if sending fails), instead of immediately reporting
2645 // "success" somewhat prematurely here.
2646 if let QueryInfo::AddProvider {
2647 phase: AddProviderPhase::AddProvider { .. },
2648 ..
2649 } = &query.info
2650 {
2651 query.on_success(&peer_id, vec![])
2652 }
2653
2654 if self.connected_peers.contains(&peer_id) {
2655 self.queued_events.push_back(ToSwarm::NotifyHandler {
2656 peer_id,
2657 event,
2658 handler: NotifyHandler::Any,
2659 });
2660 } else if &peer_id != self.kbuckets.local_key().preimage() {
2661 query.pending_rpcs.push((peer_id, event));
2662 self.queued_events.push_back(ToSwarm::Dial {
2663 opts: DialOpts::peer_id(peer_id).build(),
2664 });
2665 }
2666 }
2667 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2668 }
2669 }
2670
2671 // No immediate event was produced as a result of a finished query.
2672 // If no new events have been queued either, signal `NotReady` to
2673 // be polled again later.
2674 if self.queued_events.is_empty() {
2675 self.no_events_waker = Some(cx.waker().clone());
2676
2677 return Poll::Pending;
2678 }
2679 }
2680 }
2681
2682 fn on_swarm_event(&mut self, event: FromSwarm) {
2683 self.listen_addresses.on_swarm_event(&event);
2684 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2685
2686 if self.auto_mode && external_addresses_changed {
2687 self.determine_mode_from_external_addresses();
2688 }
2689
2690 match event {
2691 FromSwarm::ConnectionEstablished(connection_established) => {
2692 self.on_connection_established(connection_established)
2693 }
2694 FromSwarm::ConnectionClosed(connection_closed) => {
2695 self.on_connection_closed(connection_closed)
2696 }
2697 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2698 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2699 FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2700 // A new listen addr was just discovered and we have no connected peers,
2701 // it can mean that our network interfaces were not up but they are now
2702 // so it might be a good idea to trigger a bootstrap.
2703 self.bootstrap_status.trigger();
2704 }
2705 _ => {}
2706 }
2707 }
2708}
2709
2710/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2711#[derive(Debug, Clone, PartialEq, Eq)]
2712pub struct PeerInfo {
2713 pub peer_id: PeerId,
2714 pub addrs: Vec<Multiaddr>,
2715}
2716
2717/// A quorum w.r.t. the configured replication factor specifies the minimum
2718/// number of distinct nodes that must be successfully contacted in order
2719/// for a query to succeed.
2720#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2721pub enum Quorum {
2722 One,
2723 Majority,
2724 All,
2725 N(NonZeroUsize),
2726}
2727
2728impl Quorum {
2729 /// Evaluate the quorum w.r.t a given total (number of peers).
2730 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2731 match self {
2732 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2733 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2734 Quorum::All => total,
2735 Quorum::N(n) => NonZeroUsize::min(total, *n),
2736 }
2737 }
2738}
2739
2740/// A record either received by the given peer or retrieved from the local
2741/// record store.
2742#[derive(Debug, Clone, PartialEq, Eq)]
2743pub struct PeerRecord {
2744 /// The peer from whom the record was received. `None` if the record was
2745 /// retrieved from local storage.
2746 pub peer: Option<PeerId>,
2747 pub record: Record,
2748}
2749
2750//////////////////////////////////////////////////////////////////////////////
2751// Events
2752
2753/// The events produced by the `Kademlia` behaviour.
2754///
2755/// See [`NetworkBehaviour::poll`].
2756#[derive(Debug, Clone)]
2757#[allow(clippy::large_enum_variant)]
2758pub enum Event {
2759 /// An inbound request has been received and handled.
2760 // Note on the difference between 'request' and 'query': A request is a
2761 // single request-response style exchange with a single remote peer. A query
2762 // is made of multiple requests across multiple remote peers.
2763 InboundRequest { request: InboundRequest },
2764
2765 /// An outbound query has made progress.
2766 OutboundQueryProgressed {
2767 /// The ID of the query that finished.
2768 id: QueryId,
2769 /// The intermediate result of the query.
2770 result: QueryResult,
2771 /// Execution statistics from the query.
2772 stats: QueryStats,
2773 /// Indicates which event this is, if therer are multiple responses for a single query.
2774 step: ProgressStep,
2775 },
2776
2777 /// The routing table has been updated with a new peer and / or
2778 /// address, thereby possibly evicting another peer.
2779 RoutingUpdated {
2780 /// The ID of the peer that was added or updated.
2781 peer: PeerId,
2782 /// Whether this is a new peer and was thus just added to the routing
2783 /// table, or whether it is an existing peer who's addresses changed.
2784 is_new_peer: bool,
2785 /// The full list of known addresses of `peer`.
2786 addresses: Addresses,
2787 /// Returns the minimum inclusive and maximum inclusive distance for
2788 /// the bucket of the peer.
2789 bucket_range: (Distance, Distance),
2790 /// The ID of the peer that was evicted from the routing table to make
2791 /// room for the new peer, if any.
2792 old_peer: Option<PeerId>,
2793 },
2794
2795 /// A peer has connected for whom no listen address is known.
2796 ///
2797 /// If the peer is to be added to the routing table, a known
2798 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2799 UnroutablePeer { peer: PeerId },
2800
2801 /// A connection to a peer has been established for whom a listen address
2802 /// is known but the peer has not been added to the routing table either
2803 /// because [`BucketInserts::Manual`] is configured or because
2804 /// the corresponding bucket is full.
2805 ///
2806 /// If the peer is to be included in the routing table, it must
2807 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2808 /// removing another peer.
2809 ///
2810 /// See [`Behaviour::kbucket`] for insight into the contents of
2811 /// the k-bucket of `peer`.
2812 RoutablePeer { peer: PeerId, address: Multiaddr },
2813
2814 /// A connection to a peer has been established for whom a listen address
2815 /// is known but the peer is only pending insertion into the routing table
2816 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2817 /// may not make it into the routing table.
2818 ///
2819 /// If the peer is to be unconditionally included in the routing table,
2820 /// it should be explicitly added via [`Behaviour::add_address`] after
2821 /// removing another peer.
2822 ///
2823 /// See [`Behaviour::kbucket`] for insight into the contents of
2824 /// the k-bucket of `peer`.
2825 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2826
2827 /// This peer's mode has been updated automatically.
2828 ///
2829 /// This happens in response to an external
2830 /// address being added or removed.
2831 ModeChanged { new_mode: Mode },
2832}
2833
2834/// Information about progress events.
2835#[derive(Debug, Clone)]
2836pub struct ProgressStep {
2837 /// The index into the event
2838 pub count: NonZeroUsize,
2839 /// Is this the final event?
2840 pub last: bool,
2841}
2842
2843impl ProgressStep {
2844 fn first() -> Self {
2845 Self {
2846 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2847 last: false,
2848 }
2849 }
2850
2851 fn first_and_last() -> Self {
2852 let mut first = ProgressStep::first();
2853 first.last = true;
2854 first
2855 }
2856
2857 fn next(&self) -> Self {
2858 assert!(!self.last);
2859 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2860
2861 Self { count, last: false }
2862 }
2863}
2864
2865/// Information about a received and handled inbound request.
2866#[derive(Debug, Clone)]
2867pub enum InboundRequest {
2868 /// Request for the list of nodes whose IDs are the closest to `key`.
2869 FindNode { num_closer_peers: usize },
2870 /// Same as `FindNode`, but should also return the entries of the local
2871 /// providers list for this key.
2872 GetProvider {
2873 num_closer_peers: usize,
2874 num_provider_peers: usize,
2875 },
2876 /// A peer sent an add provider request.
2877 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2878 /// included.
2879 ///
2880 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2881 AddProvider { record: Option<ProviderRecord> },
2882 /// Request to retrieve a record.
2883 GetRecord {
2884 num_closer_peers: usize,
2885 present_locally: bool,
2886 },
2887 /// A peer sent a put record request.
2888 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2889 ///
2890 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2891 PutRecord {
2892 source: PeerId,
2893 connection: ConnectionId,
2894 record: Option<Record>,
2895 },
2896}
2897
2898/// The results of Kademlia queries.
2899#[derive(Debug, Clone)]
2900pub enum QueryResult {
2901 /// The result of [`Behaviour::bootstrap`].
2902 Bootstrap(BootstrapResult),
2903
2904 /// The result of [`Behaviour::get_closest_peers`].
2905 GetClosestPeers(GetClosestPeersResult),
2906
2907 /// The result of [`Behaviour::get_providers`].
2908 GetProviders(GetProvidersResult),
2909
2910 /// The result of [`Behaviour::start_providing`].
2911 StartProviding(AddProviderResult),
2912
2913 /// The result of a (automatic) republishing of a provider record.
2914 RepublishProvider(AddProviderResult),
2915
2916 /// The result of [`Behaviour::get_record`].
2917 GetRecord(GetRecordResult),
2918
2919 /// The result of [`Behaviour::put_record`].
2920 PutRecord(PutRecordResult),
2921
2922 /// The result of a (automatic) republishing of a (value-)record.
2923 RepublishRecord(PutRecordResult),
2924}
2925
2926/// The result of [`Behaviour::get_record`].
2927pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2928
2929/// The successful result of [`Behaviour::get_record`].
2930#[derive(Debug, Clone)]
2931#[allow(clippy::large_enum_variant)]
2932pub enum GetRecordOk {
2933 FoundRecord(PeerRecord),
2934 FinishedWithNoAdditionalRecord {
2935 /// If caching is enabled, these are the peers closest
2936 /// _to the record key_ (not the local node) that were queried but
2937 /// did not return the record, sorted by distance to the record key
2938 /// from closest to farthest. How many of these are tracked is configured
2939 /// by [`Config::set_caching`].
2940 ///
2941 /// Writing back the cache at these peers is a manual operation.
2942 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2943 /// after selecting one of the returned records.
2944 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2945 },
2946}
2947
2948/// The error result of [`Behaviour::get_record`].
2949#[derive(Debug, Clone, Error)]
2950pub enum GetRecordError {
2951 #[error("the record was not found")]
2952 NotFound {
2953 key: record::Key,
2954 closest_peers: Vec<PeerId>,
2955 },
2956 #[error("the quorum failed; needed {quorum} peers")]
2957 QuorumFailed {
2958 key: record::Key,
2959 records: Vec<PeerRecord>,
2960 quorum: NonZeroUsize,
2961 },
2962 #[error("the request timed out")]
2963 Timeout { key: record::Key },
2964}
2965
2966impl GetRecordError {
2967 /// Gets the key of the record for which the operation failed.
2968 pub fn key(&self) -> &record::Key {
2969 match self {
2970 GetRecordError::QuorumFailed { key, .. } => key,
2971 GetRecordError::Timeout { key, .. } => key,
2972 GetRecordError::NotFound { key, .. } => key,
2973 }
2974 }
2975
2976 /// Extracts the key of the record for which the operation failed,
2977 /// consuming the error.
2978 pub fn into_key(self) -> record::Key {
2979 match self {
2980 GetRecordError::QuorumFailed { key, .. } => key,
2981 GetRecordError::Timeout { key, .. } => key,
2982 GetRecordError::NotFound { key, .. } => key,
2983 }
2984 }
2985}
2986
2987/// The result of [`Behaviour::put_record`].
2988pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2989
2990/// The successful result of [`Behaviour::put_record`].
2991#[derive(Debug, Clone)]
2992pub struct PutRecordOk {
2993 pub key: record::Key,
2994}
2995
2996/// The error result of [`Behaviour::put_record`].
2997#[derive(Debug, Clone, Error)]
2998pub enum PutRecordError {
2999 #[error("the quorum failed; needed {quorum} peers")]
3000 QuorumFailed {
3001 key: record::Key,
3002 /// [`PeerId`]s of the peers the record was successfully stored on.
3003 success: Vec<PeerId>,
3004 quorum: NonZeroUsize,
3005 },
3006 #[error("the request timed out")]
3007 Timeout {
3008 key: record::Key,
3009 /// [`PeerId`]s of the peers the record was successfully stored on.
3010 success: Vec<PeerId>,
3011 quorum: NonZeroUsize,
3012 },
3013}
3014
3015impl PutRecordError {
3016 /// Gets the key of the record for which the operation failed.
3017 pub fn key(&self) -> &record::Key {
3018 match self {
3019 PutRecordError::QuorumFailed { key, .. } => key,
3020 PutRecordError::Timeout { key, .. } => key,
3021 }
3022 }
3023
3024 /// Extracts the key of the record for which the operation failed,
3025 /// consuming the error.
3026 pub fn into_key(self) -> record::Key {
3027 match self {
3028 PutRecordError::QuorumFailed { key, .. } => key,
3029 PutRecordError::Timeout { key, .. } => key,
3030 }
3031 }
3032}
3033
3034/// The result of [`Behaviour::bootstrap`].
3035pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3036
3037/// The successful result of [`Behaviour::bootstrap`].
3038#[derive(Debug, Clone)]
3039pub struct BootstrapOk {
3040 pub peer: PeerId,
3041 pub num_remaining: u32,
3042}
3043
3044/// The error result of [`Behaviour::bootstrap`].
3045#[derive(Debug, Clone, Error)]
3046pub enum BootstrapError {
3047 #[error("the request timed out")]
3048 Timeout {
3049 peer: PeerId,
3050 num_remaining: Option<u32>,
3051 },
3052}
3053
3054/// The result of [`Behaviour::get_closest_peers`].
3055pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3056
3057/// The successful result of [`Behaviour::get_closest_peers`].
3058#[derive(Debug, Clone)]
3059pub struct GetClosestPeersOk {
3060 pub key: Vec<u8>,
3061 pub peers: Vec<PeerInfo>,
3062}
3063
3064/// The error result of [`Behaviour::get_closest_peers`].
3065#[derive(Debug, Clone, Error)]
3066pub enum GetClosestPeersError {
3067 #[error("the request timed out")]
3068 Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3069}
3070
3071impl GetClosestPeersError {
3072 /// Gets the key for which the operation failed.
3073 pub fn key(&self) -> &Vec<u8> {
3074 match self {
3075 GetClosestPeersError::Timeout { key, .. } => key,
3076 }
3077 }
3078
3079 /// Extracts the key for which the operation failed,
3080 /// consuming the error.
3081 pub fn into_key(self) -> Vec<u8> {
3082 match self {
3083 GetClosestPeersError::Timeout { key, .. } => key,
3084 }
3085 }
3086}
3087
3088/// The result of [`Behaviour::get_providers`].
3089pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3090
3091/// The successful result of [`Behaviour::get_providers`].
3092#[derive(Debug, Clone)]
3093pub enum GetProvidersOk {
3094 FoundProviders {
3095 key: record::Key,
3096 /// The new set of providers discovered.
3097 providers: HashSet<PeerId>,
3098 },
3099 FinishedWithNoAdditionalRecord {
3100 closest_peers: Vec<PeerId>,
3101 },
3102}
3103
3104/// The error result of [`Behaviour::get_providers`].
3105#[derive(Debug, Clone, Error)]
3106pub enum GetProvidersError {
3107 #[error("the request timed out")]
3108 Timeout {
3109 key: record::Key,
3110 closest_peers: Vec<PeerId>,
3111 },
3112}
3113
3114impl GetProvidersError {
3115 /// Gets the key for which the operation failed.
3116 pub fn key(&self) -> &record::Key {
3117 match self {
3118 GetProvidersError::Timeout { key, .. } => key,
3119 }
3120 }
3121
3122 /// Extracts the key for which the operation failed,
3123 /// consuming the error.
3124 pub fn into_key(self) -> record::Key {
3125 match self {
3126 GetProvidersError::Timeout { key, .. } => key,
3127 }
3128 }
3129}
3130
3131/// The result of publishing a provider record.
3132pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3133
3134/// The successful result of publishing a provider record.
3135#[derive(Debug, Clone)]
3136pub struct AddProviderOk {
3137 pub key: record::Key,
3138}
3139
3140/// The possible errors when publishing a provider record.
3141#[derive(Debug, Clone, Error)]
3142pub enum AddProviderError {
3143 #[error("the request timed out")]
3144 Timeout { key: record::Key },
3145}
3146
3147impl AddProviderError {
3148 /// Gets the key for which the operation failed.
3149 pub fn key(&self) -> &record::Key {
3150 match self {
3151 AddProviderError::Timeout { key, .. } => key,
3152 }
3153 }
3154
3155 /// Extracts the key for which the operation failed,
3156 pub fn into_key(self) -> record::Key {
3157 match self {
3158 AddProviderError::Timeout { key, .. } => key,
3159 }
3160 }
3161}
3162
3163impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3164 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3165 KadPeer {
3166 node_id: e.node.key.into_preimage(),
3167 multiaddrs: e.node.value.into_vec(),
3168 connection_ty: match e.status {
3169 NodeStatus::Connected => ConnectionType::Connected,
3170 NodeStatus::Disconnected => ConnectionType::NotConnected,
3171 },
3172 }
3173 }
3174}
3175
3176/// The context of a [`QueryInfo::AddProvider`] query.
3177#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3178pub enum AddProviderContext {
3179 /// The context is a [`Behaviour::start_providing`] operation.
3180 Publish,
3181 /// The context is periodic republishing of provider announcements
3182 /// initiated earlier via [`Behaviour::start_providing`].
3183 Republish,
3184}
3185
3186/// The context of a [`QueryInfo::PutRecord`] query.
3187#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3188pub enum PutRecordContext {
3189 /// The context is a [`Behaviour::put_record`] operation.
3190 Publish,
3191 /// The context is periodic republishing of records stored
3192 /// earlier via [`Behaviour::put_record`].
3193 Republish,
3194 /// The context is periodic replication (i.e. without extending
3195 /// the record TTL) of stored records received earlier from another peer.
3196 Replicate,
3197 /// The context is a custom store operation targeting specific
3198 /// peers initiated by [`Behaviour::put_record_to`].
3199 Custom,
3200}
3201
3202/// Information about a running query.
3203#[derive(Debug, Clone)]
3204pub enum QueryInfo {
3205 /// A query initiated by [`Behaviour::bootstrap`].
3206 Bootstrap {
3207 /// The targeted peer ID.
3208 peer: PeerId,
3209 /// The remaining random peer IDs to query, one per
3210 /// bucket that still needs refreshing.
3211 ///
3212 /// This is `None` if the initial self-lookup has not
3213 /// yet completed and `Some` with an exhausted iterator
3214 /// if bootstrapping is complete.
3215 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3216 step: ProgressStep,
3217 },
3218
3219 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3220 GetClosestPeers {
3221 /// The key being queried (the preimage).
3222 key: Vec<u8>,
3223 /// Current index of events.
3224 step: ProgressStep,
3225 /// Specifies expected number of responding peers
3226 num_results: NonZeroUsize,
3227 },
3228
3229 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3230 GetProviders {
3231 /// The key for which to search for providers.
3232 key: record::Key,
3233 /// The number of providers found so far.
3234 providers_found: usize,
3235 /// Current index of events.
3236 step: ProgressStep,
3237 },
3238
3239 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3240 AddProvider {
3241 /// The record key.
3242 key: record::Key,
3243 /// The current phase of the query.
3244 phase: AddProviderPhase,
3245 /// The execution context of the query.
3246 context: AddProviderContext,
3247 },
3248
3249 /// A (repeated) query initiated by [`Behaviour::put_record`].
3250 PutRecord {
3251 record: Record,
3252 /// The expected quorum of responses w.r.t. the replication factor.
3253 quorum: NonZeroUsize,
3254 /// The current phase of the query.
3255 phase: PutRecordPhase,
3256 /// The execution context of the query.
3257 context: PutRecordContext,
3258 },
3259
3260 /// A (repeated) query initiated by [`Behaviour::get_record`].
3261 GetRecord {
3262 /// The key to look for.
3263 key: record::Key,
3264 /// Current index of events.
3265 step: ProgressStep,
3266 /// Did we find at least one record?
3267 found_a_record: bool,
3268 /// The peers closest to the `key` that were queried but did not return a record,
3269 /// i.e. the peers that are candidates for caching the record.
3270 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3271 },
3272}
3273
3274impl QueryInfo {
3275 /// Creates an event for a handler to issue an outgoing request in the
3276 /// context of a query.
3277 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3278 match &self {
3279 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3280 key: peer.to_bytes(),
3281 query_id,
3282 },
3283 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3284 key: key.clone(),
3285 query_id,
3286 },
3287 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3288 key: key.clone(),
3289 query_id,
3290 },
3291 QueryInfo::AddProvider { key, phase, .. } => match phase {
3292 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3293 key: key.to_vec(),
3294 query_id,
3295 },
3296 AddProviderPhase::AddProvider {
3297 provider_id,
3298 external_addresses,
3299 ..
3300 } => HandlerIn::AddProvider {
3301 key: key.clone(),
3302 provider: crate::protocol::KadPeer {
3303 node_id: *provider_id,
3304 multiaddrs: external_addresses.clone(),
3305 connection_ty: crate::protocol::ConnectionType::Connected,
3306 },
3307 query_id,
3308 },
3309 },
3310 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3311 key: key.clone(),
3312 query_id,
3313 },
3314 QueryInfo::PutRecord { record, phase, .. } => match phase {
3315 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3316 key: record.key.to_vec(),
3317 query_id,
3318 },
3319 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3320 record: record.clone(),
3321 query_id,
3322 },
3323 },
3324 }
3325 }
3326}
3327
3328/// The phases of a [`QueryInfo::AddProvider`] query.
3329#[derive(Debug, Clone)]
3330pub enum AddProviderPhase {
3331 /// The query is searching for the closest nodes to the record key.
3332 GetClosestPeers,
3333
3334 /// The query advertises the local node as a provider for the key to
3335 /// the closest nodes to the key.
3336 AddProvider {
3337 /// The local peer ID that is advertised as a provider.
3338 provider_id: PeerId,
3339 /// The external addresses of the provider being advertised.
3340 external_addresses: Vec<Multiaddr>,
3341 /// Query statistics from the finished `GetClosestPeers` phase.
3342 get_closest_peers_stats: QueryStats,
3343 },
3344}
3345
3346/// The phases of a [`QueryInfo::PutRecord`] query.
3347#[derive(Debug, Clone, PartialEq, Eq)]
3348pub enum PutRecordPhase {
3349 /// The query is searching for the closest nodes to the record key.
3350 GetClosestPeers,
3351
3352 /// The query is replicating the record to the closest nodes to the key.
3353 PutRecord {
3354 /// A list of peers the given record has been successfully replicated to.
3355 success: Vec<PeerId>,
3356 /// Query statistics from the finished `GetClosestPeers` phase.
3357 get_closest_peers_stats: QueryStats,
3358 },
3359}
3360
3361/// A mutable reference to a running query.
3362pub struct QueryMut<'a> {
3363 query: &'a mut Query,
3364}
3365
3366impl QueryMut<'_> {
3367 pub fn id(&self) -> QueryId {
3368 self.query.id()
3369 }
3370
3371 /// Gets information about the type and state of the query.
3372 pub fn info(&self) -> &QueryInfo {
3373 &self.query.info
3374 }
3375
3376 /// Gets execution statistics about the query.
3377 ///
3378 /// For a multi-phase query such as `put_record`, these are the
3379 /// statistics of the current phase.
3380 pub fn stats(&self) -> &QueryStats {
3381 self.query.stats()
3382 }
3383
3384 /// Finishes the query asap, without waiting for the
3385 /// regular termination conditions.
3386 pub fn finish(&mut self) {
3387 self.query.finish()
3388 }
3389}
3390
3391/// An immutable reference to a running query.
3392pub struct QueryRef<'a> {
3393 query: &'a Query,
3394}
3395
3396impl QueryRef<'_> {
3397 pub fn id(&self) -> QueryId {
3398 self.query.id()
3399 }
3400
3401 /// Gets information about the type and state of the query.
3402 pub fn info(&self) -> &QueryInfo {
3403 &self.query.info
3404 }
3405
3406 /// Gets execution statistics about the query.
3407 ///
3408 /// For a multi-phase query such as `put_record`, these are the
3409 /// statistics of the current phase.
3410 pub fn stats(&self) -> &QueryStats {
3411 self.query.stats()
3412 }
3413}
3414
3415/// An operation failed to due no known peers in the routing table.
3416#[derive(Debug, Clone)]
3417pub struct NoKnownPeers();
3418
3419impl fmt::Display for NoKnownPeers {
3420 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3421 write!(f, "No known peers.")
3422 }
3423}
3424
3425impl std::error::Error for NoKnownPeers {}
3426
3427/// The possible outcomes of [`Behaviour::add_address`].
3428#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3429pub enum RoutingUpdate {
3430 /// The given peer and address has been added to the routing
3431 /// table.
3432 Success,
3433 /// The peer and address is pending insertion into
3434 /// the routing table, if a disconnected peer fails
3435 /// to respond. If the given peer and address ends up
3436 /// in the routing table, [`Event::RoutingUpdated`]
3437 /// is eventually emitted.
3438 Pending,
3439 /// The routing table update failed, either because the
3440 /// corresponding bucket for the peer is full and the
3441 /// pending slot(s) are occupied, or because the given
3442 /// peer ID is deemed invalid (e.g. refers to the local
3443 /// peer ID).
3444 Failed,
3445}
3446
3447#[derive(PartialEq, Copy, Clone, Debug)]
3448pub enum Mode {
3449 Client,
3450 Server,
3451}
3452
3453impl fmt::Display for Mode {
3454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3455 match self {
3456 Mode::Client => write!(f, "client"),
3457 Mode::Server => write!(f, "server"),
3458 }
3459 }
3460}
3461
3462fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3463where
3464 T: ToString,
3465{
3466 confirmed_external_addresses
3467 .iter()
3468 .map(|addr| addr.to_string())
3469 .collect::<Vec<_>>()
3470 .join(", ")
3471}