libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26    collections::{BTreeMap, HashMap, HashSet, VecDeque},
27    fmt,
28    num::NonZeroUsize,
29    task::{Context, Poll, Waker},
30    time::Duration,
31    vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39    dial_opts::{self, DialOpts},
40    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42    THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50    addresses::Addresses,
51    bootstrap,
52    handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53    jobs::*,
54    kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55    protocol,
56    protocol::{ConnectionType, KadPeer, ProtocolConfig},
57    query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58    record::{
59        self,
60        store::{self, RecordStore},
61        ProviderRecord, Record,
62    },
63    K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69    /// The Kademlia routing table.
70    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72    /// The k-bucket insertion strategy.
73    kbucket_inserts: BucketInserts,
74
75    /// Configuration of the wire protocol.
76    protocol_config: ProtocolConfig,
77
78    /// Configuration of [`RecordStore`] filtering.
79    record_filtering: StoreInserts,
80
81    /// The currently active (i.e. in-progress) queries.
82    queries: QueryPool,
83
84    /// The currently connected peers.
85    ///
86    /// This is a superset of the connected peers currently in the routing table.
87    connected_peers: FnvHashSet<PeerId>,
88
89    /// Periodic job for re-publication of provider records for keys
90    /// provided by the local node.
91    add_provider_job: Option<AddProviderJob>,
92
93    /// Periodic job for (re-)replication and (re-)publishing of
94    /// regular (value-)records.
95    put_record_job: Option<PutRecordJob>,
96
97    /// The TTL of regular (value-)records.
98    record_ttl: Option<Duration>,
99
100    /// The TTL of provider records.
101    provider_record_ttl: Option<Duration>,
102
103    /// Queued events to return when the behaviour is being polled.
104    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106    listen_addresses: ListenAddresses,
107
108    external_addresses: ExternalAddresses,
109
110    connections: HashMap<ConnectionId, PeerId>,
111
112    /// See [`Config::caching`].
113    caching: Caching,
114
115    local_peer_id: PeerId,
116
117    mode: Mode,
118    auto_mode: bool,
119    no_events_waker: Option<Waker>,
120
121    /// The record storage.
122    store: TStore,
123
124    /// Tracks the status of the current bootstrap.
125    bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133    /// Whenever a connection to a peer is established as a
134    /// result of a dialing attempt and that peer is not yet
135    /// in the routing table, it is inserted as long as there
136    /// is a free slot in the corresponding k-bucket. If the
137    /// k-bucket is full but still has a free pending slot,
138    /// it may be inserted into the routing table at a later time if an unresponsive
139    /// disconnected peer is evicted from the bucket.
140    OnConnected,
141    /// New peers and addresses are only added to the routing table via
142    /// explicit calls to [`Behaviour::add_address`].
143    ///
144    /// > **Note**: Even though peers can only get into the
145    /// > routing table as a result of [`Behaviour::add_address`],
146    /// > routing table entries are still updated as peers
147    /// > connect and disconnect (i.e. the order of the entries
148    /// > as well as the network addresses).
149    Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161    /// Whenever a (provider) record is received,
162    /// the record is forwarded immediately to the [`RecordStore`].
163    Unfiltered,
164    /// Whenever a (provider) record is received, an event is emitted.
165    /// Provider records generate a [`InboundRequest::AddProvider`] under
166    /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167    /// under [`Event::InboundRequest`].
168    ///
169    /// When deemed valid, a (provider) record needs to be explicitly stored in
170    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172    /// be retrieved via [`Behaviour::store_mut`].
173    FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181    kbucket_config: KBucketConfig,
182    query_config: QueryConfig,
183    protocol_config: ProtocolConfig,
184    record_ttl: Option<Duration>,
185    record_replication_interval: Option<Duration>,
186    record_publication_interval: Option<Duration>,
187    record_filtering: StoreInserts,
188    provider_record_ttl: Option<Duration>,
189    provider_publication_interval: Option<Duration>,
190    kbucket_inserts: BucketInserts,
191    caching: Caching,
192    periodic_bootstrap_interval: Option<Duration>,
193    automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197    /// Returns the default configuration.
198    ///
199    /// Deprecated: use `Config::new` instead.
200    fn default() -> Self {
201        Self::new(protocol::DEFAULT_PROTO_NAME)
202    }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209    /// Caching is disabled and the peers closest to records being looked up
210    /// that do not return a record are not tracked, i.e.
211    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212    Disabled,
213    /// Up to `max_peers` peers not returning a record that are closest to the key
214    /// being looked up are tracked and returned in
215    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216    /// performed explicitly, if desired and after choosing a record from the results, via
217    /// [`Behaviour::put_record_to`].
218    Enabled { max_peers: u16 },
219}
220
221impl Config {
222    /// Builds a new `Config` with the given protocol name.
223    pub fn new(protocol_name: StreamProtocol) -> Self {
224        Config {
225            kbucket_config: KBucketConfig::default(),
226            query_config: QueryConfig::default(),
227            protocol_config: ProtocolConfig::new(protocol_name),
228            record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229            record_replication_interval: Some(Duration::from_secs(60 * 60)),
230            record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231            record_filtering: StoreInserts::Unfiltered,
232            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233            provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234            kbucket_inserts: BucketInserts::OnConnected,
235            caching: Caching::Enabled { max_peers: 1 },
236            periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237            automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238        }
239    }
240
241    /// Sets the timeout for a single query.
242    ///
243    /// > **Note**: A single query usually comprises at least as many requests
244    /// > as the replication factor, i.e. this is not a request timeout.
245    ///
246    /// The default is 60 seconds.
247    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248        self.query_config.timeout = timeout;
249        self
250    }
251
252    /// Sets the replication factor to use.
253    ///
254    /// The replication factor determines to how many closest peers
255    /// a record is replicated. The default is [`crate::K_VALUE`].
256    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257        self.query_config.replication_factor = replication_factor;
258        self
259    }
260
261    /// Sets the allowed level of parallelism for iterative queries.
262    ///
263    /// The `α` parameter in the Kademlia paper. The maximum number of peers
264    /// that an iterative query is allowed to wait for in parallel while
265    /// iterating towards the closest nodes to a target. Defaults to
266    /// `ALPHA_VALUE`.
267    ///
268    /// This only controls the level of parallelism of an iterative query, not
269    /// the level of parallelism of a query to a fixed set of peers.
270    ///
271    /// When used with [`Config::disjoint_query_paths`] it equals
272    /// the amount of disjoint paths used.
273    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274        self.query_config.parallelism = parallelism;
275        self
276    }
277
278    /// Require iterative queries to use disjoint paths for increased resiliency
279    /// in the presence of potentially adversarial nodes.
280    ///
281    /// When enabled the number of disjoint paths used equals the configured
282    /// parallelism.
283    ///
284    /// See the S/Kademlia paper for more information on the high level design
285    /// as well as its security improvements.
286    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287        self.query_config.disjoint_query_paths = enabled;
288        self
289    }
290
291    /// Sets the TTL for stored records.
292    ///
293    /// The TTL should be significantly longer than the (re-)publication
294    /// interval, to avoid premature expiration of records. The default is 36
295    /// hours.
296    ///
297    /// `None` means records never expire.
298    ///
299    /// Does not apply to provider records.
300    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301        self.record_ttl = record_ttl;
302        self
303    }
304
305    /// Sets whether or not records should be filtered before being stored.
306    ///
307    /// See [`StoreInserts`] for the different values.
308    /// Defaults to [`StoreInserts::Unfiltered`].
309    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310        self.record_filtering = filtering;
311        self
312    }
313
314    /// Sets the (re-)replication interval for stored records.
315    ///
316    /// Periodic replication of stored records ensures that the records
317    /// are always replicated to the available nodes closest to the key in the
318    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319    /// ensuring persistence until the record expires. Replication does not
320    /// prolong the regular lifetime of a record (for otherwise it would live
321    /// forever regardless of the configured TTL). The expiry of a record
322    /// is only extended through re-publication.
323    ///
324    /// This interval should be significantly shorter than the publication
325    /// interval, to ensure persistence between re-publications. The default
326    /// is 1 hour.
327    ///
328    /// `None` means that stored records are never re-replicated.
329    ///
330    /// Does not apply to provider records.
331    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332        self.record_replication_interval = interval;
333        self
334    }
335
336    /// Sets the (re-)publication interval of stored records.
337    ///
338    /// Records persist in the DHT until they expire. By default, published
339    /// records are re-published in regular intervals for as long as the record
340    /// exists in the local storage of the original publisher, thereby extending
341    /// the records lifetime.
342    ///
343    /// This interval should be significantly shorter than the record TTL, to
344    /// ensure records do not expire prematurely. The default is 24 hours.
345    ///
346    /// `None` means that stored records are never automatically re-published.
347    ///
348    /// Does not apply to provider records.
349    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350        self.record_publication_interval = interval;
351        self
352    }
353
354    /// Sets the TTL for provider records.
355    ///
356    /// `None` means that stored provider records never expire.
357    ///
358    /// Must be significantly larger than the provider publication interval.
359    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360        self.provider_record_ttl = ttl;
361        self
362    }
363
364    /// Sets the interval at which provider records for keys provided
365    /// by the local node are re-published.
366    ///
367    /// `None` means that stored provider records are never automatically
368    /// re-published.
369    ///
370    /// Must be significantly less than the provider record TTL.
371    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372        self.provider_publication_interval = interval;
373        self
374    }
375
376    /// Modifies the maximum allowed size of individual Kademlia packets.
377    ///
378    /// It might be necessary to increase this value if trying to put large
379    /// records.
380    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381        self.protocol_config.set_max_packet_size(size);
382        self
383    }
384
385    /// Modifies the timeout duration of outbount_substreams.
386    ///
387    /// * Default to `10` seconds.
388    /// * May need to increase this value when sending large records with poor connection.
389    pub fn set_outbound_substreams_timeout(&mut self, timeout: Duration) -> &mut Self {
390        self.protocol_config
391            .set_outbound_substreams_timeout(timeout);
392        self
393    }
394
395    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
396    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
397        self.kbucket_inserts = inserts;
398        self
399    }
400
401    /// Sets the [`Caching`] strategy to use for successful lookups.
402    ///
403    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
404    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
405    /// will result in the record being cached at the closest node to the key that
406    /// did not return the record, i.e. the standard Kademlia behaviour.
407    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
408        self.caching = c;
409        self
410    }
411
412    /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
413    ///
414    /// * Default to `5` minutes.
415    /// * Set to `None` to disable periodic bootstrap.
416    pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
417        self.periodic_bootstrap_interval = interval;
418        self
419    }
420
421    /// Sets the configuration for the k-buckets.
422    ///
423    /// * Default to K_VALUE.
424    ///
425    /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
426    pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
427        self.kbucket_config.set_bucket_size(size);
428        self
429    }
430
431    /// Sets the timeout duration after creation of a pending entry after which
432    /// it becomes eligible for insertion into a full bucket, replacing the
433    /// least-recently (dis)connected node.
434    ///
435    /// * Default to `60` s.
436    pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
437        self.kbucket_config.set_pending_timeout(timeout);
438        self
439    }
440
441    /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
442    /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
443    /// inserted into the routing table "at the same time". This also allows to wait a little
444    /// bit for other potential peers to be inserted into the routing table before triggering a
445    /// bootstrap, giving more context to the future bootstrap request.
446    ///
447    /// * Default to `500` ms.
448    /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
449    ///   new peer is inserted in the routing table.
450    /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
451    ///   a new peer is inserted in the routing table).
452    #[cfg(test)]
453    pub(crate) fn set_automatic_bootstrap_throttle(
454        &mut self,
455        duration: Option<Duration>,
456    ) -> &mut Self {
457        self.automatic_bootstrap_throttle = duration;
458        self
459    }
460}
461
462impl<TStore> Behaviour<TStore>
463where
464    TStore: RecordStore + Send + 'static,
465{
466    /// Creates a new `Kademlia` network behaviour with a default configuration.
467    pub fn new(id: PeerId, store: TStore) -> Self {
468        Self::with_config(id, store, Default::default())
469    }
470
471    /// Get the protocol name of this kademlia instance.
472    pub fn protocol_names(&self) -> &[StreamProtocol] {
473        self.protocol_config.protocol_names()
474    }
475
476    /// Creates a new `Kademlia` network behaviour with the given configuration.
477    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
478        let local_key = kbucket::Key::from(id);
479
480        let put_record_job = config
481            .record_replication_interval
482            .or(config.record_publication_interval)
483            .map(|interval| {
484                PutRecordJob::new(
485                    id,
486                    interval,
487                    config.record_publication_interval,
488                    config.record_ttl,
489                )
490            });
491
492        let add_provider_job = config
493            .provider_publication_interval
494            .map(AddProviderJob::new);
495
496        Behaviour {
497            store,
498            caching: config.caching,
499            kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
500            kbucket_inserts: config.kbucket_inserts,
501            protocol_config: config.protocol_config,
502            record_filtering: config.record_filtering,
503            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
504            listen_addresses: Default::default(),
505            queries: QueryPool::new(config.query_config),
506            connected_peers: Default::default(),
507            add_provider_job,
508            put_record_job,
509            record_ttl: config.record_ttl,
510            provider_record_ttl: config.provider_record_ttl,
511            external_addresses: Default::default(),
512            local_peer_id: id,
513            connections: Default::default(),
514            mode: Mode::Client,
515            auto_mode: true,
516            no_events_waker: None,
517            bootstrap_status: bootstrap::Status::new(
518                config.periodic_bootstrap_interval,
519                config.automatic_bootstrap_throttle,
520            ),
521        }
522    }
523
524    /// Gets an iterator over immutable references to all running queries.
525    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
526        self.queries.iter().filter_map(|query| {
527            if !query.is_finished() {
528                Some(QueryRef { query })
529            } else {
530                None
531            }
532        })
533    }
534
535    /// Gets an iterator over mutable references to all running queries.
536    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
537        self.queries.iter_mut().filter_map(|query| {
538            if !query.is_finished() {
539                Some(QueryMut { query })
540            } else {
541                None
542            }
543        })
544    }
545
546    /// Gets an immutable reference to a running query, if it exists.
547    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
548        self.queries.get(id).and_then(|query| {
549            if !query.is_finished() {
550                Some(QueryRef { query })
551            } else {
552                None
553            }
554        })
555    }
556
557    /// Gets a mutable reference to a running query, if it exists.
558    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
559        self.queries.get_mut(id).and_then(|query| {
560            if !query.is_finished() {
561                Some(QueryMut { query })
562            } else {
563                None
564            }
565        })
566    }
567
568    /// Adds a known listen address of a peer participating in the DHT to the
569    /// routing table.
570    ///
571    /// Explicitly adding addresses of peers serves two purposes:
572    ///
573    ///   1. In order for a node to join the DHT, it must know about at least one other node of the
574    ///      DHT.
575    ///
576    ///   2. When a remote peer initiates a connection and that peer is not yet in the routing
577    ///      table, the `Kademlia` behaviour must be informed of an address on which that peer is
578    ///      listening for connections before it can be added to the routing table from where it can
579    ///      subsequently be discovered by all peers in the DHT.
580    ///
581    /// If the routing table has been updated as a result of this operation,
582    /// a [`Event::RoutingUpdated`] event is emitted.
583    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
584        // ensuring address is a fully-qualified /p2p multiaddr
585        let Ok(address) = address.with_p2p(*peer) else {
586            return RoutingUpdate::Failed;
587        };
588        let key = kbucket::Key::from(*peer);
589        match self.kbuckets.entry(&key) {
590            Some(kbucket::Entry::Present(mut entry, _)) => {
591                if entry.value().insert(address) {
592                    self.queued_events
593                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
594                            peer: *peer,
595                            is_new_peer: false,
596                            addresses: entry.value().clone(),
597                            old_peer: None,
598                            bucket_range: self
599                                .kbuckets
600                                .bucket(&key)
601                                .map(|b| b.range())
602                                .expect("Not kbucket::Entry::SelfEntry."),
603                        }))
604                }
605                RoutingUpdate::Success
606            }
607            Some(kbucket::Entry::Pending(mut entry, _)) => {
608                entry.value().insert(address);
609                RoutingUpdate::Pending
610            }
611            Some(kbucket::Entry::Absent(entry)) => {
612                let addresses = Addresses::new(address);
613                let status = if self.connected_peers.contains(peer) {
614                    NodeStatus::Connected
615                } else {
616                    NodeStatus::Disconnected
617                };
618                match entry.insert(addresses.clone(), status) {
619                    kbucket::InsertResult::Inserted => {
620                        self.bootstrap_on_low_peers();
621
622                        self.queued_events.push_back(ToSwarm::GenerateEvent(
623                            Event::RoutingUpdated {
624                                peer: *peer,
625                                is_new_peer: true,
626                                addresses,
627                                old_peer: None,
628                                bucket_range: self
629                                    .kbuckets
630                                    .bucket(&key)
631                                    .map(|b| b.range())
632                                    .expect("Not kbucket::Entry::SelfEntry."),
633                            },
634                        ));
635                        RoutingUpdate::Success
636                    }
637                    kbucket::InsertResult::Full => {
638                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
639                        RoutingUpdate::Failed
640                    }
641                    kbucket::InsertResult::Pending { disconnected } => {
642                        self.queued_events.push_back(ToSwarm::Dial {
643                            opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
644                        });
645                        RoutingUpdate::Pending
646                    }
647                }
648            }
649            None => RoutingUpdate::Failed,
650        }
651    }
652
653    /// Removes an address of a peer from the routing table.
654    ///
655    /// If the given address is the last address of the peer in the
656    /// routing table, the peer is removed from the routing table
657    /// and `Some` is returned with a view of the removed entry.
658    /// The same applies if the peer is currently pending insertion
659    /// into the routing table.
660    ///
661    /// If the given peer or address is not in the routing table,
662    /// this is a no-op.
663    pub fn remove_address(
664        &mut self,
665        peer: &PeerId,
666        address: &Multiaddr,
667    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
668        let address = &address.to_owned().with_p2p(*peer).ok()?;
669        let key = kbucket::Key::from(*peer);
670        match self.kbuckets.entry(&key)? {
671            kbucket::Entry::Present(mut entry, _) => {
672                if entry.value().remove(address).is_err() {
673                    Some(entry.remove()) // it is the last address, thus remove the peer.
674                } else {
675                    None
676                }
677            }
678            kbucket::Entry::Pending(mut entry, _) => {
679                if entry.value().remove(address).is_err() {
680                    Some(entry.remove()) // it is the last address, thus remove the peer.
681                } else {
682                    None
683                }
684            }
685            kbucket::Entry::Absent(..) => None,
686        }
687    }
688
689    /// Removes a peer from the routing table.
690    ///
691    /// Returns `None` if the peer was not in the routing table,
692    /// not even pending insertion.
693    pub fn remove_peer(
694        &mut self,
695        peer: &PeerId,
696    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
697        let key = kbucket::Key::from(*peer);
698        match self.kbuckets.entry(&key)? {
699            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
700            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
701            kbucket::Entry::Absent(..) => None,
702        }
703    }
704
705    /// Returns an iterator over all non-empty buckets in the routing table.
706    pub fn kbuckets(
707        &mut self,
708    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
709        self.kbuckets.iter().filter(|b| !b.is_empty())
710    }
711
712    /// Returns the k-bucket for the distance to the given key.
713    ///
714    /// Returns `None` if the given key refers to the local key.
715    pub fn kbucket<K>(
716        &mut self,
717        key: K,
718    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
719    where
720        K: Into<kbucket::Key<K>> + Clone,
721    {
722        self.kbuckets.bucket(&key.into())
723    }
724
725    /// Initiates an iterative query for the closest peers to the given key.
726    ///
727    /// The result of the query is delivered in a
728    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
729    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
730    where
731        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
732    {
733        self.get_closest_peers_inner(key, K_VALUE)
734    }
735
736    /// Initiates an iterative query for the closest peers to the given key.
737    /// The expected responding peers is specified by `num_results`
738    /// Note that the result is capped after exceeds K_VALUE
739    ///
740    /// The result of the query is delivered in a
741    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
742    pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
743    where
744        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
745    {
746        // The inner code never expect higher than K_VALUE results to be returned.
747        // And removing such cap will be tricky,
748        // since it would involve forging a new key and additional requests.
749        // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
750        let capped_num_results = std::cmp::min(num_results, K_VALUE);
751        self.get_closest_peers_inner(key, capped_num_results)
752    }
753
754    fn get_closest_peers_inner<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
755    where
756        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
757    {
758        let target: kbucket::Key<K> = key.clone().into();
759        let key: Vec<u8> = key.into();
760        let info = QueryInfo::GetClosestPeers {
761            key,
762            step: ProgressStep::first(),
763            num_results,
764        };
765        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
766        self.queries.add_iter_closest(target, peer_keys, info)
767    }
768
769    /// Returns all peers ordered by distance to the given key; takes peers from local routing table
770    /// only.
771    pub fn get_closest_local_peers<'a, K: Clone>(
772        &'a mut self,
773        key: &'a kbucket::Key<K>,
774    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
775        self.kbuckets.closest_keys(key)
776    }
777
778    /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
779    /// that the `source` peer is never included in the result.
780    ///
781    /// Takes peers from local routing table only. Only returns number of peers equal to configured
782    /// replication factor.
783    pub fn find_closest_local_peers<'a, K: Clone>(
784        &'a mut self,
785        key: &'a kbucket::Key<K>,
786        source: &'a PeerId,
787    ) -> impl Iterator<Item = KadPeer> + 'a {
788        self.kbuckets
789            .closest(key)
790            .filter(move |e| e.node.key.preimage() != source)
791            .take(K_VALUE.get())
792            .map(KadPeer::from)
793    }
794
795    /// Performs a lookup for a record in the DHT.
796    ///
797    /// The result of this operation is delivered in a
798    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
799    pub fn get_record(&mut self, key: record::Key) -> QueryId {
800        let record = if let Some(record) = self.store.get(&key) {
801            if record.is_expired(Instant::now()) {
802                self.store.remove(&key);
803                None
804            } else {
805                Some(PeerRecord {
806                    peer: None,
807                    record: record.into_owned(),
808                })
809            }
810        } else {
811            None
812        };
813
814        let step = ProgressStep::first();
815
816        let target = kbucket::Key::new(key.clone());
817        let info = if record.is_some() {
818            QueryInfo::GetRecord {
819                key,
820                step: step.next(),
821                found_a_record: true,
822                cache_candidates: BTreeMap::new(),
823            }
824        } else {
825            QueryInfo::GetRecord {
826                key,
827                step: step.clone(),
828                found_a_record: false,
829                cache_candidates: BTreeMap::new(),
830            }
831        };
832        let peers = self.kbuckets.closest_keys(&target);
833        let id = self.queries.add_iter_closest(target.clone(), peers, info);
834
835        // No queries were actually done for the results yet.
836        let stats = QueryStats::empty();
837
838        if let Some(record) = record {
839            self.queued_events
840                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
841                    id,
842                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
843                    step,
844                    stats,
845                }));
846        }
847
848        id
849    }
850
851    /// Stores a record in the DHT, locally as well as at the nodes
852    /// closest to the key as per the xor distance metric.
853    ///
854    /// Returns `Ok` if a record has been stored locally, providing the
855    /// `QueryId` of the initial query that replicates the record in the DHT.
856    /// The result of the query is eventually reported as a
857    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
858    ///
859    /// The record is always stored locally with the given expiration. If the record's
860    /// expiration is `None`, the common case, it does not expire in local storage
861    /// but is still replicated with the configured record TTL. To remove the record
862    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
863    ///
864    /// After the initial publication of the record, it is subject to (re-)replication
865    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
866    /// does not update the record's expiration in local storage, thus a given record
867    /// with an explicit expiration will always expire at that instant and until then
868    /// is subject to regular (re-)replication and (re-)publication.
869    pub fn put_record(
870        &mut self,
871        mut record: Record,
872        quorum: Quorum,
873    ) -> Result<QueryId, store::Error> {
874        record.publisher = Some(*self.kbuckets.local_key().preimage());
875        self.store.put(record.clone())?;
876        record.expires = record
877            .expires
878            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
879        let quorum = quorum.eval(self.queries.config().replication_factor);
880        let target = kbucket::Key::new(record.key.clone());
881        let peers = self.kbuckets.closest_keys(&target);
882        let context = PutRecordContext::Publish;
883        let info = QueryInfo::PutRecord {
884            context,
885            record,
886            quorum,
887            phase: PutRecordPhase::GetClosestPeers,
888        };
889        Ok(self.queries.add_iter_closest(target.clone(), peers, info))
890    }
891
892    /// Stores a record at specific peers, without storing it locally.
893    ///
894    /// The given [`Quorum`] is understood in the context of the total
895    /// number of distinct peers given.
896    ///
897    /// If the record's expiration is `None`, the configured record TTL is used.
898    ///
899    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
900    /// > used to selectively update or store a record to specific peers
901    /// > for the purpose of e.g. making sure these peers have the latest
902    /// > "version" of a record or to "cache" a record at further peers
903    /// > to increase the lookup success rate on the DHT for other peers.
904    /// >
905    /// > In particular, there is no automatic storing of records performed, and this
906    /// > method must be used to ensure the standard Kademlia
907    /// > procedure of "caching" (i.e. storing) a found record at the closest
908    /// > node to the key that _did not_ return it.
909    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
910    where
911        I: ExactSizeIterator<Item = PeerId>,
912    {
913        let quorum = if peers.len() > 0 {
914            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
915        } else {
916            // If no peers are given, we just let the query fail immediately
917            // due to the fact that the quorum must be at least one, instead of
918            // introducing a new kind of error.
919            NonZeroUsize::new(1).expect("1 > 0")
920        };
921        record.expires = record
922            .expires
923            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
924        let context = PutRecordContext::Custom;
925        let info = QueryInfo::PutRecord {
926            context,
927            record,
928            quorum,
929            phase: PutRecordPhase::PutRecord {
930                success: Vec::new(),
931                get_closest_peers_stats: QueryStats::empty(),
932            },
933        };
934        self.queries.add_fixed(peers, info)
935    }
936
937    /// Removes the record with the given key from _local_ storage,
938    /// if the local node is the publisher of the record.
939    ///
940    /// Has no effect if a record for the given key is stored locally but
941    /// the local node is not a publisher of the record.
942    ///
943    /// This is a _local_ operation. However, it also has the effect that
944    /// the record will no longer be periodically re-published, allowing the
945    /// record to eventually expire throughout the DHT.
946    pub fn remove_record(&mut self, key: &record::Key) {
947        if let Some(r) = self.store.get(key) {
948            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
949                self.store.remove(key)
950            }
951        }
952    }
953
954    /// Gets a mutable reference to the record store.
955    pub fn store_mut(&mut self) -> &mut TStore {
956        &mut self.store
957    }
958
959    /// Bootstraps the local node to join the DHT.
960    ///
961    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
962    /// own ID in the DHT. This introduces the local node to the other nodes
963    /// in the DHT and populates its routing table with the closest neighbours.
964    ///
965    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
966    /// refreshed by initiating an additional bootstrapping query for each such
967    /// bucket with random keys.
968    ///
969    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
970    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
971    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
972    /// with one such event per bootstrapping query.
973    ///
974    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
975    ///
976    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
977    /// > See [`Behaviour::add_address`].
978    ///
979    /// > **Note**: Bootstrap does not require to be called manually. It is periodically
980    /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
981    /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
982    /// > invoked
983    /// > when a new peer is inserted in the routing table.
984    /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
985    /// > to ensure a healthy routing table.
986    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
987        let local_key = *self.kbuckets.local_key();
988        let info = QueryInfo::Bootstrap {
989            peer: *local_key.preimage(),
990            remaining: None,
991            step: ProgressStep::first(),
992        };
993        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
994        if peers.is_empty() {
995            self.bootstrap_status.reset_timers();
996            Err(NoKnownPeers())
997        } else {
998            self.bootstrap_status.on_started();
999            Ok(self.queries.add_iter_closest(local_key, peers, info))
1000        }
1001    }
1002
1003    /// Establishes the local node as a provider of a value for the given key.
1004    ///
1005    /// This operation publishes a provider record with the given key and
1006    /// identity of the local node to the peers closest to the key, thus establishing
1007    /// the local node as a provider.
1008    ///
1009    /// Returns `Ok` if a provider record has been stored locally, providing the
1010    /// `QueryId` of the initial query that announces the local node as a provider.
1011    ///
1012    /// The publication of the provider records is periodically repeated as per the
1013    /// configured interval, to renew the expiry and account for changes to the DHT
1014    /// topology. A provider record may be removed from local storage and
1015    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1016    ///
1017    /// In contrast to the standard Kademlia push-based model for content distribution
1018    /// implemented by [`Behaviour::put_record`], the provider API implements a
1019    /// pull-based model that may be used in addition or as an alternative.
1020    /// The means by which the actual value is obtained from a provider is out of scope
1021    /// of the libp2p Kademlia provider API.
1022    ///
1023    /// The results of the (repeated) provider announcements sent by this node are
1024    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1025    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1026        // Note: We store our own provider records locally without local addresses
1027        // to avoid redundant storage and outdated addresses. Instead these are
1028        // acquired on demand when returning a `ProviderRecord` for the local node.
1029        let local_addrs = Vec::new();
1030        let record = ProviderRecord::new(
1031            key.clone(),
1032            *self.kbuckets.local_key().preimage(),
1033            local_addrs,
1034        );
1035        self.store.add_provider(record)?;
1036        let target = kbucket::Key::new(key.clone());
1037        let peers = self.kbuckets.closest_keys(&target);
1038        let context = AddProviderContext::Publish;
1039        let info = QueryInfo::AddProvider {
1040            context,
1041            key,
1042            phase: AddProviderPhase::GetClosestPeers,
1043        };
1044        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1045        Ok(id)
1046    }
1047
1048    /// Stops the local node from announcing that it is a provider for the given key.
1049    ///
1050    /// This is a local operation. The local node will still be considered as a
1051    /// provider for the key by other nodes until these provider records expire.
1052    pub fn stop_providing(&mut self, key: &record::Key) {
1053        self.store
1054            .remove_provider(key, self.kbuckets.local_key().preimage());
1055    }
1056
1057    /// Performs a lookup for providers of a value to the given key.
1058    ///
1059    /// The result of this operation is delivered in a
1060    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1061    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1062        let providers: HashSet<_> = self
1063            .store
1064            .providers(&key)
1065            .into_iter()
1066            .filter(|p| {
1067                if p.is_expired(Instant::now()) {
1068                    self.store.remove_provider(&key, &p.provider);
1069                    false
1070                } else {
1071                    true
1072                }
1073            })
1074            .map(|p| p.provider)
1075            .collect();
1076
1077        let step = ProgressStep::first();
1078
1079        let info = QueryInfo::GetProviders {
1080            key: key.clone(),
1081            providers_found: providers.len(),
1082            step: if providers.is_empty() {
1083                step.clone()
1084            } else {
1085                step.next()
1086            },
1087        };
1088
1089        let target = kbucket::Key::new(key.clone());
1090        let peers = self.kbuckets.closest_keys(&target);
1091        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1092
1093        // No queries were actually done for the results yet.
1094        let stats = QueryStats::empty();
1095
1096        if !providers.is_empty() {
1097            self.queued_events
1098                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1099                    id,
1100                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1101                        key,
1102                        providers,
1103                    })),
1104                    step,
1105                    stats,
1106                }));
1107        }
1108        id
1109    }
1110
1111    /// Set the [`Mode`] in which we should operate.
1112    ///
1113    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1114    /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1115    ///
1116    /// Setting a mode via this function disables this automatic behaviour and unconditionally
1117    /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1118    /// instead.
1119    pub fn set_mode(&mut self, mode: Option<Mode>) {
1120        match mode {
1121            Some(mode) => {
1122                self.mode = mode;
1123                self.auto_mode = false;
1124                self.reconfigure_mode();
1125            }
1126            None => {
1127                self.auto_mode = true;
1128                self.determine_mode_from_external_addresses();
1129            }
1130        }
1131
1132        if let Some(waker) = self.no_events_waker.take() {
1133            waker.wake();
1134        }
1135    }
1136
1137    /// Get the [`Mode`] in which the DHT is currently operating.
1138    pub fn mode(&self) -> Mode {
1139        self.mode
1140    }
1141
1142    fn reconfigure_mode(&mut self) {
1143        if self.connections.is_empty() {
1144            return;
1145        }
1146
1147        let num_connections = self.connections.len();
1148
1149        tracing::debug!(
1150            "Re-configuring {} established connection{}",
1151            num_connections,
1152            if num_connections > 1 { "s" } else { "" }
1153        );
1154
1155        self.queued_events
1156            .extend(
1157                self.connections
1158                    .iter()
1159                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1160                        peer_id: *peer_id,
1161                        handler: NotifyHandler::One(*conn_id),
1162                        event: HandlerIn::ReconfigureMode {
1163                            new_mode: self.mode,
1164                        },
1165                    }),
1166            );
1167    }
1168
1169    fn determine_mode_from_external_addresses(&mut self) {
1170        let old_mode = self.mode;
1171
1172        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1173            ([], Mode::Server) => {
1174                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1175
1176                Mode::Client
1177            }
1178            ([], Mode::Client) => {
1179                // Previously client-mode, now also client-mode because no external addresses.
1180
1181                Mode::Client
1182            }
1183            (confirmed_external_addresses, Mode::Client) => {
1184                if tracing::enabled!(Level::DEBUG) {
1185                    let confirmed_external_addresses =
1186                        to_comma_separated_list(confirmed_external_addresses);
1187
1188                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1189                }
1190
1191                Mode::Server
1192            }
1193            (confirmed_external_addresses, Mode::Server) => {
1194                debug_assert!(
1195                    !confirmed_external_addresses.is_empty(),
1196                    "Previous match arm handled empty list"
1197                );
1198
1199                // Previously, server-mode, now also server-mode because > 1 external address.
1200                //  Don't log anything to avoid spam.
1201                Mode::Server
1202            }
1203        };
1204
1205        self.reconfigure_mode();
1206
1207        if old_mode != self.mode {
1208            self.queued_events
1209                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1210                    new_mode: self.mode,
1211                }));
1212        }
1213    }
1214
1215    /// Processes discovered peers from a successful request in an iterative `Query`.
1216    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1217    where
1218        I: Iterator<Item = &'a KadPeer> + Clone,
1219    {
1220        let local_id = self.kbuckets.local_key().preimage();
1221        let others_iter = peers.filter(|p| &p.node_id != local_id);
1222        if let Some(query) = self.queries.get_mut(query_id) {
1223            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1224            for peer in others_iter.clone() {
1225                tracing::trace!(
1226                    ?peer,
1227                    %source,
1228                    query=?query_id,
1229                    "Peer reported by source in query"
1230                );
1231                let addrs = peer.multiaddrs.iter().cloned().collect();
1232                query.peers.addresses.insert(peer.node_id, addrs);
1233            }
1234            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1235        }
1236    }
1237
1238    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1239    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1240        self.store
1241            .providers(key)
1242            .into_iter()
1243            .filter_map(|p| {
1244                if p.is_expired(Instant::now()) {
1245                    self.store.remove_provider(key, &p.provider);
1246                    return None;
1247                }
1248
1249                if &p.provider != source {
1250                    let node_id = p.provider;
1251                    let multiaddrs = p.addresses;
1252                    let connection_ty = if self.connected_peers.contains(&node_id) {
1253                        ConnectionType::Connected
1254                    } else {
1255                        ConnectionType::NotConnected
1256                    };
1257                    if multiaddrs.is_empty() {
1258                        // The provider is either the local node and we fill in
1259                        // the local addresses on demand, or it is a legacy
1260                        // provider record without addresses, in which case we
1261                        // try to find addresses in the routing table, as was
1262                        // done before provider records were stored along with
1263                        // their addresses.
1264                        if &node_id == self.kbuckets.local_key().preimage() {
1265                            Some(
1266                                self.listen_addresses
1267                                    .iter()
1268                                    .chain(self.external_addresses.iter())
1269                                    .cloned()
1270                                    .collect::<Vec<_>>(),
1271                            )
1272                        } else {
1273                            let key = kbucket::Key::from(node_id);
1274                            self.kbuckets
1275                                .entry(&key)
1276                                .as_mut()
1277                                .and_then(|e| e.view())
1278                                .map(|e| e.node.value.clone().into_vec())
1279                        }
1280                    } else {
1281                        Some(multiaddrs)
1282                    }
1283                    .map(|multiaddrs| KadPeer {
1284                        node_id,
1285                        multiaddrs,
1286                        connection_ty,
1287                    })
1288                } else {
1289                    None
1290                }
1291            })
1292            .take(self.queries.config().replication_factor.get())
1293            .collect()
1294    }
1295
1296    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1297    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1298        let info = QueryInfo::AddProvider {
1299            context,
1300            key: key.clone(),
1301            phase: AddProviderPhase::GetClosestPeers,
1302        };
1303        let target = kbucket::Key::new(key);
1304        let peers = self.kbuckets.closest_keys(&target);
1305        self.queries.add_iter_closest(target.clone(), peers, info);
1306    }
1307
1308    /// Starts an iterative `PUT_VALUE` query for the given record.
1309    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1310        let quorum = quorum.eval(self.queries.config().replication_factor);
1311        let target = kbucket::Key::new(record.key.clone());
1312        let peers = self.kbuckets.closest_keys(&target);
1313        let info = QueryInfo::PutRecord {
1314            record,
1315            quorum,
1316            context,
1317            phase: PutRecordPhase::GetClosestPeers,
1318        };
1319        self.queries.add_iter_closest(target.clone(), peers, info);
1320    }
1321
1322    /// Updates the routing table with a new connection status and address of a peer.
1323    fn connection_updated(
1324        &mut self,
1325        peer: PeerId,
1326        address: Option<Multiaddr>,
1327        new_status: NodeStatus,
1328    ) {
1329        let key = kbucket::Key::from(peer);
1330        match self.kbuckets.entry(&key) {
1331            Some(kbucket::Entry::Present(mut entry, old_status)) => {
1332                if old_status != new_status {
1333                    entry.update(new_status)
1334                }
1335                if let Some(address) = address {
1336                    if entry.value().insert(address) {
1337                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1338                            Event::RoutingUpdated {
1339                                peer,
1340                                is_new_peer: false,
1341                                addresses: entry.value().clone(),
1342                                old_peer: None,
1343                                bucket_range: self
1344                                    .kbuckets
1345                                    .bucket(&key)
1346                                    .map(|b| b.range())
1347                                    .expect("Not kbucket::Entry::SelfEntry."),
1348                            },
1349                        ))
1350                    }
1351                }
1352            }
1353
1354            Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1355                if let Some(address) = address {
1356                    entry.value().insert(address);
1357                }
1358                if old_status != new_status {
1359                    entry.update(new_status);
1360                }
1361            }
1362
1363            Some(kbucket::Entry::Absent(entry)) => {
1364                // Only connected nodes with a known address are newly inserted.
1365                if new_status != NodeStatus::Connected {
1366                    return;
1367                }
1368                match (address, self.kbucket_inserts) {
1369                    (None, _) => {
1370                        self.queued_events
1371                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1372                    }
1373                    (Some(a), BucketInserts::Manual) => {
1374                        self.queued_events
1375                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1376                                peer,
1377                                address: a,
1378                            }));
1379                    }
1380                    (Some(a), BucketInserts::OnConnected) => {
1381                        let addresses = Addresses::new(a);
1382                        match entry.insert(addresses.clone(), new_status) {
1383                            kbucket::InsertResult::Inserted => {
1384                                self.bootstrap_on_low_peers();
1385
1386                                let event = Event::RoutingUpdated {
1387                                    peer,
1388                                    is_new_peer: true,
1389                                    addresses,
1390                                    old_peer: None,
1391                                    bucket_range: self
1392                                        .kbuckets
1393                                        .bucket(&key)
1394                                        .map(|b| b.range())
1395                                        .expect("Not kbucket::Entry::SelfEntry."),
1396                                };
1397                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1398                            }
1399                            kbucket::InsertResult::Full => {
1400                                tracing::debug!(
1401                                    %peer,
1402                                    "Bucket full. Peer not added to routing table"
1403                                );
1404                                let address = addresses.first().clone();
1405                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1406                                    Event::RoutablePeer { peer, address },
1407                                ));
1408                            }
1409                            kbucket::InsertResult::Pending { disconnected } => {
1410                                let address = addresses.first().clone();
1411                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1412                                    Event::PendingRoutablePeer { peer, address },
1413                                ));
1414
1415                                // `disconnected` might already be in the process of re-connecting.
1416                                // In other words `disconnected` might have already re-connected but
1417                                // is not yet confirmed to support the Kademlia protocol via
1418                                // [`HandlerEvent::ProtocolConfirmed`].
1419                                //
1420                                // Only try dialing peer if not currently connected.
1421                                if !self.connected_peers.contains(disconnected.preimage()) {
1422                                    self.queued_events.push_back(ToSwarm::Dial {
1423                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1424                                            .build(),
1425                                    })
1426                                }
1427                            }
1428                        }
1429                    }
1430                }
1431            }
1432            _ => {}
1433        }
1434    }
1435
1436    /// A new peer has been inserted in the routing table but we check if the routing
1437    /// table is currently small (less that `K_VALUE` peers are present) and only
1438    /// trigger a bootstrap in that case
1439    fn bootstrap_on_low_peers(&mut self) {
1440        if self
1441            .kbuckets()
1442            .map(|kbucket| kbucket.num_entries())
1443            .sum::<usize>()
1444            < K_VALUE.get()
1445        {
1446            self.bootstrap_status.trigger();
1447        }
1448    }
1449
1450    /// Handles a finished (i.e. successful) query.
1451    fn query_finished(&mut self, q: Query) -> Option<Event> {
1452        let query_id = q.id();
1453        tracing::trace!(query=?query_id, "Query finished");
1454        match q.info {
1455            QueryInfo::Bootstrap {
1456                peer,
1457                remaining,
1458                mut step,
1459            } => {
1460                let local_key = *self.kbuckets.local_key();
1461                let mut remaining = remaining.unwrap_or_else(|| {
1462                    debug_assert_eq!(&peer, local_key.preimage());
1463                    // The lookup for the local key finished. To complete the bootstrap process,
1464                    // a bucket refresh should be performed for every bucket farther away than
1465                    // the first non-empty bucket (which are most likely no more than the last
1466                    // few, i.e. farthest, buckets).
1467                    self.kbuckets
1468                        .iter()
1469                        .skip_while(|b| b.is_empty())
1470                        .skip(1) // Skip the bucket with the closest neighbour.
1471                        .map(|b| {
1472                            // Try to find a key that falls into the bucket. While such keys can
1473                            // be generated fully deterministically, the current libp2p kademlia
1474                            // wire protocol requires transmission of the preimages of the actual
1475                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1476                            // to find a key that hashes into a specific bucket. The probabilities
1477                            // of finding a key in the bucket `b` with as most 16 trials are as
1478                            // follows:
1479                            //
1480                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1481                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1482                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1483                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1484                            // ...
1485                            let mut target = kbucket::Key::from(PeerId::random());
1486                            for _ in 0..16 {
1487                                let d = local_key.distance(&target);
1488                                if b.contains(&d) {
1489                                    break;
1490                                }
1491                                target = kbucket::Key::from(PeerId::random());
1492                            }
1493                            target
1494                        })
1495                        .collect::<Vec<_>>()
1496                        .into_iter()
1497                });
1498
1499                let num_remaining = remaining.len() as u32;
1500
1501                if let Some(target) = remaining.next() {
1502                    let info = QueryInfo::Bootstrap {
1503                        peer: *target.preimage(),
1504                        remaining: Some(remaining),
1505                        step: step.next(),
1506                    };
1507                    let peers = self.kbuckets.closest_keys(&target);
1508                    self.queries
1509                        .continue_iter_closest(query_id, target, peers, info);
1510                } else {
1511                    step.last = true;
1512                    self.bootstrap_status.on_finish();
1513                };
1514
1515                Some(Event::OutboundQueryProgressed {
1516                    id: query_id,
1517                    stats: q.stats,
1518                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1519                        peer,
1520                        num_remaining,
1521                    })),
1522                    step,
1523                })
1524            }
1525
1526            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1527                step.last = true;
1528
1529                Some(Event::OutboundQueryProgressed {
1530                    id: query_id,
1531                    stats: q.stats,
1532                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1533                        key,
1534                        peers: q.peers.into_peerinfos_iter().collect(),
1535                    })),
1536                    step,
1537                })
1538            }
1539
1540            QueryInfo::GetProviders { mut step, .. } => {
1541                step.last = true;
1542
1543                Some(Event::OutboundQueryProgressed {
1544                    id: query_id,
1545                    stats: q.stats,
1546                    result: QueryResult::GetProviders(Ok(
1547                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1548                            closest_peers: q.peers.into_peerids_iter().collect(),
1549                        },
1550                    )),
1551                    step,
1552                })
1553            }
1554
1555            QueryInfo::AddProvider {
1556                context,
1557                key,
1558                phase: AddProviderPhase::GetClosestPeers,
1559            } => {
1560                let provider_id = self.local_peer_id;
1561                let external_addresses = self.external_addresses.iter().cloned().collect();
1562                let info = QueryInfo::AddProvider {
1563                    context,
1564                    key,
1565                    phase: AddProviderPhase::AddProvider {
1566                        provider_id,
1567                        external_addresses,
1568                        get_closest_peers_stats: q.stats,
1569                    },
1570                };
1571                self.queries
1572                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1573                None
1574            }
1575
1576            QueryInfo::AddProvider {
1577                context,
1578                key,
1579                phase:
1580                    AddProviderPhase::AddProvider {
1581                        get_closest_peers_stats,
1582                        ..
1583                    },
1584            } => match context {
1585                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1586                    id: query_id,
1587                    stats: get_closest_peers_stats.merge(q.stats),
1588                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1589                    step: ProgressStep::first_and_last(),
1590                }),
1591                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1592                    id: query_id,
1593                    stats: get_closest_peers_stats.merge(q.stats),
1594                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1595                    step: ProgressStep::first_and_last(),
1596                }),
1597            },
1598
1599            QueryInfo::GetRecord {
1600                key,
1601                mut step,
1602                found_a_record,
1603                cache_candidates,
1604            } => {
1605                step.last = true;
1606
1607                let results = if found_a_record {
1608                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1609                } else {
1610                    Err(GetRecordError::NotFound {
1611                        key,
1612                        closest_peers: q.peers.into_peerids_iter().collect(),
1613                    })
1614                };
1615                Some(Event::OutboundQueryProgressed {
1616                    id: query_id,
1617                    stats: q.stats,
1618                    result: QueryResult::GetRecord(results),
1619                    step,
1620                })
1621            }
1622
1623            QueryInfo::PutRecord {
1624                context,
1625                record,
1626                quorum,
1627                phase: PutRecordPhase::GetClosestPeers,
1628            } => {
1629                let info = QueryInfo::PutRecord {
1630                    context,
1631                    record,
1632                    quorum,
1633                    phase: PutRecordPhase::PutRecord {
1634                        success: vec![],
1635                        get_closest_peers_stats: q.stats,
1636                    },
1637                };
1638                self.queries
1639                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1640                None
1641            }
1642
1643            QueryInfo::PutRecord {
1644                context,
1645                record,
1646                quorum,
1647                phase:
1648                    PutRecordPhase::PutRecord {
1649                        success,
1650                        get_closest_peers_stats,
1651                    },
1652            } => {
1653                let mk_result = |key: record::Key| {
1654                    if success.len() >= quorum.get() {
1655                        Ok(PutRecordOk { key })
1656                    } else {
1657                        Err(PutRecordError::QuorumFailed {
1658                            key,
1659                            quorum,
1660                            success,
1661                        })
1662                    }
1663                };
1664                match context {
1665                    PutRecordContext::Publish | PutRecordContext::Custom => {
1666                        Some(Event::OutboundQueryProgressed {
1667                            id: query_id,
1668                            stats: get_closest_peers_stats.merge(q.stats),
1669                            result: QueryResult::PutRecord(mk_result(record.key)),
1670                            step: ProgressStep::first_and_last(),
1671                        })
1672                    }
1673                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1674                        id: query_id,
1675                        stats: get_closest_peers_stats.merge(q.stats),
1676                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1677                        step: ProgressStep::first_and_last(),
1678                    }),
1679                    PutRecordContext::Replicate => {
1680                        tracing::debug!(record=?record.key, "Record replicated");
1681                        None
1682                    }
1683                }
1684            }
1685        }
1686    }
1687
1688    /// Handles a query that timed out.
1689    fn query_timeout(&mut self, query: Query) -> Option<Event> {
1690        let query_id = query.id();
1691        tracing::trace!(query=?query_id, "Query timed out");
1692        match query.info {
1693            QueryInfo::Bootstrap {
1694                peer,
1695                mut remaining,
1696                mut step,
1697            } => {
1698                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1699
1700                // Continue with the next bootstrap query if `remaining` is not empty.
1701                if let Some((target, remaining)) =
1702                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1703                {
1704                    let info = QueryInfo::Bootstrap {
1705                        peer: target.into_preimage(),
1706                        remaining: Some(remaining),
1707                        step: step.next(),
1708                    };
1709                    let peers = self.kbuckets.closest_keys(&target);
1710                    self.queries
1711                        .continue_iter_closest(query_id, target, peers, info);
1712                } else {
1713                    step.last = true;
1714                    self.bootstrap_status.on_finish();
1715                }
1716
1717                Some(Event::OutboundQueryProgressed {
1718                    id: query_id,
1719                    stats: query.stats,
1720                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1721                        peer,
1722                        num_remaining,
1723                    })),
1724                    step,
1725                })
1726            }
1727
1728            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1729                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1730                    id: query_id,
1731                    stats: query.stats,
1732                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1733                    step: ProgressStep::first_and_last(),
1734                },
1735                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1736                    id: query_id,
1737                    stats: query.stats,
1738                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1739                    step: ProgressStep::first_and_last(),
1740                },
1741            }),
1742
1743            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1744                step.last = true;
1745                Some(Event::OutboundQueryProgressed {
1746                    id: query_id,
1747                    stats: query.stats,
1748                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1749                        key,
1750                        peers: query.peers.into_peerinfos_iter().collect(),
1751                    })),
1752                    step,
1753                })
1754            }
1755
1756            QueryInfo::PutRecord {
1757                record,
1758                quorum,
1759                context,
1760                phase,
1761            } => {
1762                let err = Err(PutRecordError::Timeout {
1763                    key: record.key,
1764                    quorum,
1765                    success: match phase {
1766                        PutRecordPhase::GetClosestPeers => vec![],
1767                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1768                    },
1769                });
1770                match context {
1771                    PutRecordContext::Publish | PutRecordContext::Custom => {
1772                        Some(Event::OutboundQueryProgressed {
1773                            id: query_id,
1774                            stats: query.stats,
1775                            result: QueryResult::PutRecord(err),
1776                            step: ProgressStep::first_and_last(),
1777                        })
1778                    }
1779                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1780                        id: query_id,
1781                        stats: query.stats,
1782                        result: QueryResult::RepublishRecord(err),
1783                        step: ProgressStep::first_and_last(),
1784                    }),
1785                    PutRecordContext::Replicate => match phase {
1786                        PutRecordPhase::GetClosestPeers => {
1787                            tracing::warn!(
1788                                "Locating closest peers for replication failed: {:?}",
1789                                err
1790                            );
1791                            None
1792                        }
1793                        PutRecordPhase::PutRecord { .. } => {
1794                            tracing::debug!("Replicating record failed: {:?}", err);
1795                            None
1796                        }
1797                    },
1798                }
1799            }
1800
1801            QueryInfo::GetRecord { key, mut step, .. } => {
1802                step.last = true;
1803
1804                Some(Event::OutboundQueryProgressed {
1805                    id: query_id,
1806                    stats: query.stats,
1807                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1808                    step,
1809                })
1810            }
1811
1812            QueryInfo::GetProviders { key, mut step, .. } => {
1813                step.last = true;
1814
1815                Some(Event::OutboundQueryProgressed {
1816                    id: query_id,
1817                    stats: query.stats,
1818                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1819                        key,
1820                        closest_peers: query.peers.into_peerids_iter().collect(),
1821                    })),
1822                    step,
1823                })
1824            }
1825        }
1826    }
1827
1828    /// Processes a record received from a peer.
1829    fn record_received(
1830        &mut self,
1831        source: PeerId,
1832        connection: ConnectionId,
1833        request_id: RequestId,
1834        mut record: Record,
1835    ) {
1836        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1837            // If the (alleged) publisher is the local node, do nothing. The record of
1838            // the original publisher should never change as a result of replication
1839            // and the publisher is always assumed to have the "right" value.
1840            self.queued_events.push_back(ToSwarm::NotifyHandler {
1841                peer_id: source,
1842                handler: NotifyHandler::One(connection),
1843                event: HandlerIn::PutRecordRes {
1844                    key: record.key,
1845                    value: record.value,
1846                    request_id,
1847                },
1848            });
1849            return;
1850        }
1851
1852        let now = Instant::now();
1853
1854        // Calculate the expiration exponentially inversely proportional to the
1855        // number of nodes between the local node and the closest node to the key
1856        // (beyond the replication factor). This ensures avoiding over-caching
1857        // outside of the k closest nodes to a key.
1858        let target = kbucket::Key::new(record.key.clone());
1859        let num_between = self.kbuckets.count_nodes_between(&target);
1860        let k = self.queries.config().replication_factor.get();
1861        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1862        let expiration = self
1863            .record_ttl
1864            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1865        // The smaller TTL prevails. Only if neither TTL is set is the record
1866        // stored "forever".
1867        record.expires = record.expires.or(expiration).min(expiration);
1868
1869        if let Some(job) = self.put_record_job.as_mut() {
1870            // Ignore the record in the next run of the replication
1871            // job, since we can assume the sender replicated the
1872            // record to the k closest peers. Effectively, only
1873            // one of the k closest peers performs a replication
1874            // in the configured interval, assuming a shared interval.
1875            job.skip(record.key.clone())
1876        }
1877
1878        // While records received from a publisher, as well as records that do
1879        // not exist locally should always (attempted to) be stored, there is a
1880        // choice here w.r.t. the handling of replicated records whose keys refer
1881        // to records that exist locally: The value and / or the publisher may
1882        // either be overridden or left unchanged. At the moment and in the
1883        // absence of a decisive argument for another option, both are always
1884        // overridden as it avoids having to load the existing record in the
1885        // first place.
1886
1887        if !record.is_expired(now) {
1888            // The record is cloned because of the weird libp2p protocol
1889            // requirement to send back the value in the response, although this
1890            // is a waste of resources.
1891            match self.record_filtering {
1892                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1893                    Ok(()) => {
1894                        tracing::debug!(
1895                            record=?record.key,
1896                            "Record stored: {} bytes",
1897                            record.value.len()
1898                        );
1899                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1900                            Event::InboundRequest {
1901                                request: InboundRequest::PutRecord {
1902                                    source,
1903                                    connection,
1904                                    record: None,
1905                                },
1906                            },
1907                        ));
1908                    }
1909                    Err(e) => {
1910                        tracing::info!("Record not stored: {:?}", e);
1911                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1912                            peer_id: source,
1913                            handler: NotifyHandler::One(connection),
1914                            event: HandlerIn::Reset(request_id),
1915                        });
1916
1917                        return;
1918                    }
1919                },
1920                StoreInserts::FilterBoth => {
1921                    self.queued_events
1922                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1923                            request: InboundRequest::PutRecord {
1924                                source,
1925                                connection,
1926                                record: Some(record.clone()),
1927                            },
1928                        }));
1929                }
1930            }
1931        }
1932
1933        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1934        // case where the record is discarded due to being expired. Given that
1935        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1936        // request, the remote perceives the local node as one node among the k
1937        // closest nodes to the target. In addition returning
1938        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1939        // information to a possibly malicious remote node.
1940        self.queued_events.push_back(ToSwarm::NotifyHandler {
1941            peer_id: source,
1942            handler: NotifyHandler::One(connection),
1943            event: HandlerIn::PutRecordRes {
1944                key: record.key,
1945                value: record.value,
1946                request_id,
1947            },
1948        })
1949    }
1950
1951    /// Processes a provider record received from a peer.
1952    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1953        if &provider.node_id != self.kbuckets.local_key().preimage() {
1954            let record = ProviderRecord {
1955                key,
1956                provider: provider.node_id,
1957                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1958                addresses: provider.multiaddrs,
1959            };
1960            match self.record_filtering {
1961                StoreInserts::Unfiltered => {
1962                    if let Err(e) = self.store.add_provider(record) {
1963                        tracing::info!("Provider record not stored: {:?}", e);
1964                        return;
1965                    }
1966
1967                    self.queued_events
1968                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1969                            request: InboundRequest::AddProvider { record: None },
1970                        }));
1971                }
1972                StoreInserts::FilterBoth => {
1973                    self.queued_events
1974                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1975                            request: InboundRequest::AddProvider {
1976                                record: Some(record),
1977                            },
1978                        }));
1979                }
1980            }
1981        }
1982    }
1983
1984    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1985        let key = kbucket::Key::from(peer_id);
1986
1987        if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1988            // TODO: Ideally, the address should only be removed if the error can
1989            // be classified as "permanent" but since `err` is currently a borrowed
1990            // trait object without a `'static` bound, even downcasting for inspection
1991            // of the error is not possible (and also not truly desirable or ergonomic).
1992            // The error passed in should rather be a dedicated enum.
1993            if addrs.remove(address).is_ok() {
1994                tracing::debug!(
1995                    peer=%peer_id,
1996                    %address,
1997                    "Address removed from peer due to error."
1998                );
1999            } else {
2000                // Despite apparently having no reachable address (any longer),
2001                // the peer is kept in the routing table with the last address to avoid
2002                // (temporary) loss of network connectivity to "flush" the routing
2003                // table. Once in, a peer is only removed from the routing table
2004                // if it is the least recently connected peer, currently disconnected
2005                // and is unreachable in the context of another peer pending insertion
2006                // into the same bucket. This is handled transparently by the
2007                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2008                // within `Behaviour::poll`.
2009                tracing::debug!(
2010                    peer=%peer_id,
2011                    %address,
2012                    "Last remaining address of peer is unreachable."
2013                );
2014            }
2015        }
2016
2017        for query in self.queries.iter_mut() {
2018            if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2019                addrs.retain(|a| a != address);
2020            }
2021        }
2022    }
2023
2024    fn on_connection_established(
2025        &mut self,
2026        ConnectionEstablished {
2027            peer_id,
2028            failed_addresses,
2029            other_established,
2030            ..
2031        }: ConnectionEstablished,
2032    ) {
2033        for addr in failed_addresses {
2034            self.address_failed(peer_id, addr);
2035        }
2036
2037        // Peer's first connection.
2038        if other_established == 0 {
2039            self.connected_peers.insert(peer_id);
2040        }
2041    }
2042
2043    fn on_address_change(
2044        &mut self,
2045        AddressChange {
2046            peer_id: peer,
2047            old,
2048            new,
2049            ..
2050        }: AddressChange,
2051    ) {
2052        let (old, new) = (old.get_remote_address(), new.get_remote_address());
2053
2054        // Update routing table.
2055        if let Some(addrs) = self
2056            .kbuckets
2057            .entry(&kbucket::Key::from(peer))
2058            .as_mut()
2059            .and_then(|e| e.value())
2060        {
2061            if addrs.replace(old, new) {
2062                tracing::debug!(
2063                    %peer,
2064                    old_address=%old,
2065                    new_address=%new,
2066                    "Old address replaced with new address for peer."
2067                );
2068            } else {
2069                tracing::debug!(
2070                    %peer,
2071                    old_address=%old,
2072                    new_address=%new,
2073                    "Old address not replaced with new address for peer as old address wasn't present.",
2074                );
2075            }
2076        } else {
2077            tracing::debug!(
2078                %peer,
2079                old_address=%old,
2080                new_address=%new,
2081                "Old address not replaced with new address for peer as peer is not present in the \
2082                 routing table."
2083            );
2084        }
2085
2086        // Update query address cache.
2087        //
2088        // Given two connected nodes: local node A and remote node B. Say node B
2089        // is not in node A's routing table. Additionally node B is part of the
2090        // `Query::addresses` list of an ongoing query on node A. Say Node
2091        // B triggers an address change and then disconnects. Later on the
2092        // earlier mentioned query on node A would like to connect to node B.
2093        // Without replacing the address in the `Query::addresses` set node
2094        // A would attempt to dial the old and not the new address.
2095        //
2096        // While upholding correctness, iterating through all discovered
2097        // addresses of a peer in all currently ongoing queries might have a
2098        // large performance impact. If so, the code below might be worth
2099        // revisiting.
2100        for query in self.queries.iter_mut() {
2101            if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2102                for addr in addrs.iter_mut() {
2103                    if addr == old {
2104                        *addr = new.clone();
2105                    }
2106                }
2107            }
2108        }
2109    }
2110
2111    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2112        let Some(peer_id) = peer_id else { return };
2113
2114        match error {
2115            DialError::LocalPeerId { .. }
2116            | DialError::WrongPeerId { .. }
2117            | DialError::Aborted
2118            | DialError::Denied { .. }
2119            | DialError::Transport(_)
2120            | DialError::NoAddresses => {
2121                if let DialError::Transport(addresses) = error {
2122                    for (addr, _) in addresses {
2123                        self.address_failed(peer_id, addr)
2124                    }
2125                }
2126
2127                for query in self.queries.iter_mut() {
2128                    query.on_failure(&peer_id);
2129                }
2130            }
2131            DialError::DialPeerConditionFalse(
2132                dial_opts::PeerCondition::Disconnected
2133                | dial_opts::PeerCondition::NotDialing
2134                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2135            ) => {
2136                // We might (still) be connected, or about to be connected, thus do not report the
2137                // failure to the queries.
2138            }
2139            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2140                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2141            }
2142        }
2143    }
2144
2145    fn on_connection_closed(
2146        &mut self,
2147        ConnectionClosed {
2148            peer_id,
2149            remaining_established,
2150            connection_id,
2151            ..
2152        }: ConnectionClosed,
2153    ) {
2154        self.connections.remove(&connection_id);
2155
2156        if remaining_established == 0 {
2157            for query in self.queries.iter_mut() {
2158                query.on_failure(&peer_id);
2159            }
2160            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2161            self.connected_peers.remove(&peer_id);
2162        }
2163    }
2164
2165    /// Preloads a new [`Handler`] with requests that are waiting
2166    /// to be sent to the newly connected peer.
2167    fn preload_new_handler(
2168        &mut self,
2169        handler: &mut Handler,
2170        connection_id: ConnectionId,
2171        peer: PeerId,
2172    ) {
2173        self.connections.insert(connection_id, peer);
2174        // Queue events for sending pending RPCs to the connected peer.
2175        // There can be only one pending RPC for a particular peer and query per definition.
2176        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2177            q.pending_rpcs
2178                .iter()
2179                .position(|(p, _)| p == &peer)
2180                .map(|p| q.pending_rpcs.remove(p))
2181        }) {
2182            handler.on_behaviour_event(event)
2183        }
2184    }
2185}
2186
2187/// Exponentially decrease the given duration (base 2).
2188fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2189    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2190}
2191
2192impl<TStore> NetworkBehaviour for Behaviour<TStore>
2193where
2194    TStore: RecordStore + Send + 'static,
2195{
2196    type ConnectionHandler = Handler;
2197    type ToSwarm = Event;
2198
2199    fn handle_established_inbound_connection(
2200        &mut self,
2201        connection_id: ConnectionId,
2202        peer: PeerId,
2203        local_addr: &Multiaddr,
2204        remote_addr: &Multiaddr,
2205    ) -> Result<THandler<Self>, ConnectionDenied> {
2206        let connected_point = ConnectedPoint::Listener {
2207            local_addr: local_addr.clone(),
2208            send_back_addr: remote_addr.clone(),
2209        };
2210
2211        let mut handler = Handler::new(
2212            self.protocol_config.clone(),
2213            connected_point,
2214            peer,
2215            self.mode,
2216        );
2217        self.preload_new_handler(&mut handler, connection_id, peer);
2218
2219        Ok(handler)
2220    }
2221
2222    fn handle_established_outbound_connection(
2223        &mut self,
2224        connection_id: ConnectionId,
2225        peer: PeerId,
2226        addr: &Multiaddr,
2227        role_override: Endpoint,
2228        port_use: PortUse,
2229    ) -> Result<THandler<Self>, ConnectionDenied> {
2230        let connected_point = ConnectedPoint::Dialer {
2231            address: addr.clone(),
2232            role_override,
2233            port_use,
2234        };
2235
2236        let mut handler = Handler::new(
2237            self.protocol_config.clone(),
2238            connected_point,
2239            peer,
2240            self.mode,
2241        );
2242        self.preload_new_handler(&mut handler, connection_id, peer);
2243
2244        Ok(handler)
2245    }
2246
2247    fn handle_pending_outbound_connection(
2248        &mut self,
2249        _connection_id: ConnectionId,
2250        maybe_peer: Option<PeerId>,
2251        _addresses: &[Multiaddr],
2252        _effective_role: Endpoint,
2253    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2254        let peer_id = match maybe_peer {
2255            None => return Ok(vec![]),
2256            Some(peer) => peer,
2257        };
2258
2259        // We should order addresses from decreasing likelihood of connectivity, so start with
2260        // the addresses of that peer in the k-buckets.
2261        let key = kbucket::Key::from(peer_id);
2262        let mut peer_addrs =
2263            if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2264                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2265                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2266                addrs
2267            } else {
2268                Vec::new()
2269            };
2270
2271        // We add to that a temporary list of addresses from the ongoing queries.
2272        for query in self.queries.iter() {
2273            if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2274                peer_addrs.extend(addrs.iter().cloned())
2275            }
2276        }
2277
2278        Ok(peer_addrs)
2279    }
2280
2281    fn on_connection_handler_event(
2282        &mut self,
2283        source: PeerId,
2284        connection: ConnectionId,
2285        event: THandlerOutEvent<Self>,
2286    ) {
2287        match event {
2288            HandlerEvent::ProtocolConfirmed { endpoint } => {
2289                debug_assert!(self.connected_peers.contains(&source));
2290                // The remote's address can only be put into the routing table,
2291                // and thus shared with other nodes, if the local node is the dialer,
2292                // since the remote address on an inbound connection may be specific
2293                // to that connection (e.g. typically the TCP port numbers).
2294                let address = match endpoint {
2295                    ConnectedPoint::Dialer { address, .. } => Some(address),
2296                    ConnectedPoint::Listener { .. } => None,
2297                };
2298
2299                self.connection_updated(source, address, NodeStatus::Connected);
2300            }
2301
2302            HandlerEvent::ProtocolNotSupported { endpoint } => {
2303                let address = match endpoint {
2304                    ConnectedPoint::Dialer { address, .. } => Some(address),
2305                    ConnectedPoint::Listener { .. } => None,
2306                };
2307                self.connection_updated(source, address, NodeStatus::Disconnected);
2308            }
2309
2310            HandlerEvent::FindNodeReq { key, request_id } => {
2311                let closer_peers = self
2312                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2313                    .collect::<Vec<_>>();
2314
2315                self.queued_events
2316                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2317                        request: InboundRequest::FindNode {
2318                            num_closer_peers: closer_peers.len(),
2319                        },
2320                    }));
2321
2322                self.queued_events.push_back(ToSwarm::NotifyHandler {
2323                    peer_id: source,
2324                    handler: NotifyHandler::One(connection),
2325                    event: HandlerIn::FindNodeRes {
2326                        closer_peers,
2327                        request_id,
2328                    },
2329                });
2330            }
2331
2332            HandlerEvent::FindNodeRes {
2333                closer_peers,
2334                query_id,
2335            } => {
2336                self.discovered(&query_id, &source, closer_peers.iter());
2337            }
2338
2339            HandlerEvent::GetProvidersReq { key, request_id } => {
2340                let provider_peers = self.provider_peers(&key, &source);
2341                let closer_peers = self
2342                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2343                    .collect::<Vec<_>>();
2344
2345                self.queued_events
2346                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2347                        request: InboundRequest::GetProvider {
2348                            num_closer_peers: closer_peers.len(),
2349                            num_provider_peers: provider_peers.len(),
2350                        },
2351                    }));
2352
2353                self.queued_events.push_back(ToSwarm::NotifyHandler {
2354                    peer_id: source,
2355                    handler: NotifyHandler::One(connection),
2356                    event: HandlerIn::GetProvidersRes {
2357                        closer_peers,
2358                        provider_peers,
2359                        request_id,
2360                    },
2361                });
2362            }
2363
2364            HandlerEvent::GetProvidersRes {
2365                closer_peers,
2366                provider_peers,
2367                query_id,
2368            } => {
2369                let peers = closer_peers.iter().chain(provider_peers.iter());
2370                self.discovered(&query_id, &source, peers);
2371                if let Some(query) = self.queries.get_mut(&query_id) {
2372                    let stats = query.stats().clone();
2373                    if let QueryInfo::GetProviders {
2374                        ref key,
2375                        ref mut providers_found,
2376                        ref mut step,
2377                        ..
2378                    } = query.info
2379                    {
2380                        *providers_found += provider_peers.len();
2381                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2382
2383                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2384                            Event::OutboundQueryProgressed {
2385                                id: query_id,
2386                                result: QueryResult::GetProviders(Ok(
2387                                    GetProvidersOk::FoundProviders {
2388                                        key: key.clone(),
2389                                        providers,
2390                                    },
2391                                )),
2392                                step: step.clone(),
2393                                stats,
2394                            },
2395                        ));
2396                        *step = step.next();
2397                    }
2398                }
2399            }
2400            HandlerEvent::QueryError { query_id, error } => {
2401                tracing::debug!(
2402                    peer=%source,
2403                    query=?query_id,
2404                    "Request to peer in query failed with {:?}",
2405                    error
2406                );
2407                // If the query to which the error relates is still active,
2408                // signal the failure w.r.t. `source`.
2409                if let Some(query) = self.queries.get_mut(&query_id) {
2410                    query.on_failure(&source)
2411                }
2412            }
2413
2414            HandlerEvent::AddProvider { key, provider } => {
2415                // Only accept a provider record from a legitimate peer.
2416                if provider.node_id != source {
2417                    return;
2418                }
2419
2420                self.provider_received(key, provider);
2421            }
2422
2423            HandlerEvent::GetRecord { key, request_id } => {
2424                // Lookup the record locally.
2425                let record = match self.store.get(&key) {
2426                    Some(record) => {
2427                        if record.is_expired(Instant::now()) {
2428                            self.store.remove(&key);
2429                            None
2430                        } else {
2431                            Some(record.into_owned())
2432                        }
2433                    }
2434                    None => None,
2435                };
2436
2437                let closer_peers = self
2438                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2439                    .collect::<Vec<_>>();
2440
2441                self.queued_events
2442                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2443                        request: InboundRequest::GetRecord {
2444                            num_closer_peers: closer_peers.len(),
2445                            present_locally: record.is_some(),
2446                        },
2447                    }));
2448
2449                self.queued_events.push_back(ToSwarm::NotifyHandler {
2450                    peer_id: source,
2451                    handler: NotifyHandler::One(connection),
2452                    event: HandlerIn::GetRecordRes {
2453                        record,
2454                        closer_peers,
2455                        request_id,
2456                    },
2457                });
2458            }
2459
2460            HandlerEvent::GetRecordRes {
2461                record,
2462                closer_peers,
2463                query_id,
2464            } => {
2465                if let Some(query) = self.queries.get_mut(&query_id) {
2466                    let stats = query.stats().clone();
2467                    if let QueryInfo::GetRecord {
2468                        key,
2469                        ref mut step,
2470                        ref mut found_a_record,
2471                        cache_candidates,
2472                    } = &mut query.info
2473                    {
2474                        if let Some(record) = record {
2475                            *found_a_record = true;
2476                            let record = PeerRecord {
2477                                peer: Some(source),
2478                                record,
2479                            };
2480
2481                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2482                                Event::OutboundQueryProgressed {
2483                                    id: query_id,
2484                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2485                                        record,
2486                                    ))),
2487                                    step: step.clone(),
2488                                    stats,
2489                                },
2490                            ));
2491
2492                            *step = step.next();
2493                        } else {
2494                            tracing::trace!(record=?key, %source, "Record not found at source");
2495                            if let Caching::Enabled { max_peers } = self.caching {
2496                                let source_key = kbucket::Key::from(source);
2497                                let target_key = kbucket::Key::from(key.clone());
2498                                let distance = source_key.distance(&target_key);
2499                                cache_candidates.insert(distance, source);
2500                                if cache_candidates.len() > max_peers as usize {
2501                                    cache_candidates.pop_last();
2502                                }
2503                            }
2504                        }
2505                    }
2506                }
2507
2508                self.discovered(&query_id, &source, closer_peers.iter());
2509            }
2510
2511            HandlerEvent::PutRecord { record, request_id } => {
2512                self.record_received(source, connection, request_id, record);
2513            }
2514
2515            HandlerEvent::PutRecordRes { query_id, .. } => {
2516                if let Some(query) = self.queries.get_mut(&query_id) {
2517                    query.on_success(&source, vec![]);
2518                    if let QueryInfo::PutRecord {
2519                        phase: PutRecordPhase::PutRecord { success, .. },
2520                        quorum,
2521                        ..
2522                    } = &mut query.info
2523                    {
2524                        success.push(source);
2525
2526                        let quorum = quorum.get();
2527                        if success.len() >= quorum {
2528                            let peers = success.clone();
2529                            let finished = query.try_finish(peers.iter());
2530                            if !finished {
2531                                tracing::debug!(
2532                                    peer=%source,
2533                                    query=?query_id,
2534                                    "PutRecord query reached quorum ({}/{}) with response \
2535                                     from peer but could not yet finish.",
2536                                    peers.len(),
2537                                    quorum,
2538                                );
2539                            }
2540                        }
2541                    }
2542                }
2543            }
2544        };
2545    }
2546
2547    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2548    fn poll(
2549        &mut self,
2550        cx: &mut Context<'_>,
2551    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2552        let now = Instant::now();
2553
2554        // Calculate the available capacity for queries triggered by background jobs.
2555        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2556
2557        // Run the periodic provider announcement job.
2558        if let Some(mut job) = self.add_provider_job.take() {
2559            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2560            for i in 0..num {
2561                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2562                    self.start_add_provider(r.key, AddProviderContext::Republish)
2563                } else {
2564                    jobs_query_capacity -= i;
2565                    break;
2566                }
2567            }
2568            self.add_provider_job = Some(job);
2569        }
2570
2571        // Run the periodic record replication / publication job.
2572        if let Some(mut job) = self.put_record_job.take() {
2573            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2574            for _ in 0..num {
2575                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2576                    let context =
2577                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2578                            PutRecordContext::Republish
2579                        } else {
2580                            PutRecordContext::Replicate
2581                        };
2582                    self.start_put_record(r, Quorum::All, context)
2583                } else {
2584                    break;
2585                }
2586            }
2587            self.put_record_job = Some(job);
2588        }
2589
2590        // Poll bootstrap periodically and automatically.
2591        if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2592            if let Err(e) = self.bootstrap() {
2593                tracing::warn!("Failed to trigger bootstrap: {e}");
2594            }
2595        }
2596
2597        loop {
2598            // Drain queued events first.
2599            if let Some(event) = self.queued_events.pop_front() {
2600                return Poll::Ready(event);
2601            }
2602
2603            // Drain applied pending entries from the routing table.
2604            if let Some(entry) = self.kbuckets.take_applied_pending() {
2605                let kbucket::Node { key, value } = entry.inserted;
2606                let peer_id = key.into_preimage();
2607                self.queued_events
2608                    .push_back(ToSwarm::NewExternalAddrOfPeer {
2609                        peer_id,
2610                        address: value.first().clone(),
2611                    });
2612                let event = Event::RoutingUpdated {
2613                    bucket_range: self
2614                        .kbuckets
2615                        .bucket(&key)
2616                        .map(|b| b.range())
2617                        .expect("Self to never be applied from pending."),
2618                    peer: peer_id,
2619                    is_new_peer: true,
2620                    addresses: value,
2621                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2622                };
2623                return Poll::Ready(ToSwarm::GenerateEvent(event));
2624            }
2625
2626            // Look for a finished query.
2627            loop {
2628                match self.queries.poll(now) {
2629                    QueryPoolState::Finished(q) => {
2630                        if let Some(event) = self.query_finished(q) {
2631                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2632                        }
2633                    }
2634                    QueryPoolState::Timeout(q) => {
2635                        if let Some(event) = self.query_timeout(q) {
2636                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2637                        }
2638                    }
2639                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2640                        let event = query.info.to_request(query.id());
2641                        // TODO: AddProvider requests yield no response, so the query completes
2642                        // as soon as all requests have been sent. However, the handler should
2643                        // better emit an event when the request has been sent (and report
2644                        // an error if sending fails), instead of immediately reporting
2645                        // "success" somewhat prematurely here.
2646                        if let QueryInfo::AddProvider {
2647                            phase: AddProviderPhase::AddProvider { .. },
2648                            ..
2649                        } = &query.info
2650                        {
2651                            query.on_success(&peer_id, vec![])
2652                        }
2653
2654                        if self.connected_peers.contains(&peer_id) {
2655                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2656                                peer_id,
2657                                event,
2658                                handler: NotifyHandler::Any,
2659                            });
2660                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2661                            query.pending_rpcs.push((peer_id, event));
2662                            self.queued_events.push_back(ToSwarm::Dial {
2663                                opts: DialOpts::peer_id(peer_id).build(),
2664                            });
2665                        }
2666                    }
2667                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2668                }
2669            }
2670
2671            // No immediate event was produced as a result of a finished query.
2672            // If no new events have been queued either, signal `NotReady` to
2673            // be polled again later.
2674            if self.queued_events.is_empty() {
2675                self.no_events_waker = Some(cx.waker().clone());
2676
2677                return Poll::Pending;
2678            }
2679        }
2680    }
2681
2682    fn on_swarm_event(&mut self, event: FromSwarm) {
2683        self.listen_addresses.on_swarm_event(&event);
2684        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2685
2686        if self.auto_mode && external_addresses_changed {
2687            self.determine_mode_from_external_addresses();
2688        }
2689
2690        match event {
2691            FromSwarm::ConnectionEstablished(connection_established) => {
2692                self.on_connection_established(connection_established)
2693            }
2694            FromSwarm::ConnectionClosed(connection_closed) => {
2695                self.on_connection_closed(connection_closed)
2696            }
2697            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2698            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2699            FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2700                // A new listen addr was just discovered and we have no connected peers,
2701                // it can mean that our network interfaces were not up but they are now
2702                // so it might be a good idea to trigger a bootstrap.
2703                self.bootstrap_status.trigger();
2704            }
2705            _ => {}
2706        }
2707    }
2708}
2709
2710/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2711#[derive(Debug, Clone, PartialEq, Eq)]
2712pub struct PeerInfo {
2713    pub peer_id: PeerId,
2714    pub addrs: Vec<Multiaddr>,
2715}
2716
2717/// A quorum w.r.t. the configured replication factor specifies the minimum
2718/// number of distinct nodes that must be successfully contacted in order
2719/// for a query to succeed.
2720#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2721pub enum Quorum {
2722    One,
2723    Majority,
2724    All,
2725    N(NonZeroUsize),
2726}
2727
2728impl Quorum {
2729    /// Evaluate the quorum w.r.t a given total (number of peers).
2730    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2731        match self {
2732            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2733            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2734            Quorum::All => total,
2735            Quorum::N(n) => NonZeroUsize::min(total, *n),
2736        }
2737    }
2738}
2739
2740/// A record either received by the given peer or retrieved from the local
2741/// record store.
2742#[derive(Debug, Clone, PartialEq, Eq)]
2743pub struct PeerRecord {
2744    /// The peer from whom the record was received. `None` if the record was
2745    /// retrieved from local storage.
2746    pub peer: Option<PeerId>,
2747    pub record: Record,
2748}
2749
2750//////////////////////////////////////////////////////////////////////////////
2751// Events
2752
2753/// The events produced by the `Kademlia` behaviour.
2754///
2755/// See [`NetworkBehaviour::poll`].
2756#[derive(Debug, Clone)]
2757#[allow(clippy::large_enum_variant)]
2758pub enum Event {
2759    /// An inbound request has been received and handled.
2760    // Note on the difference between 'request' and 'query': A request is a
2761    // single request-response style exchange with a single remote peer. A query
2762    // is made of multiple requests across multiple remote peers.
2763    InboundRequest { request: InboundRequest },
2764
2765    /// An outbound query has made progress.
2766    OutboundQueryProgressed {
2767        /// The ID of the query that finished.
2768        id: QueryId,
2769        /// The intermediate result of the query.
2770        result: QueryResult,
2771        /// Execution statistics from the query.
2772        stats: QueryStats,
2773        /// Indicates which event this is, if therer are multiple responses for a single query.
2774        step: ProgressStep,
2775    },
2776
2777    /// The routing table has been updated with a new peer and / or
2778    /// address, thereby possibly evicting another peer.
2779    RoutingUpdated {
2780        /// The ID of the peer that was added or updated.
2781        peer: PeerId,
2782        /// Whether this is a new peer and was thus just added to the routing
2783        /// table, or whether it is an existing peer who's addresses changed.
2784        is_new_peer: bool,
2785        /// The full list of known addresses of `peer`.
2786        addresses: Addresses,
2787        /// Returns the minimum inclusive and maximum inclusive distance for
2788        /// the bucket of the peer.
2789        bucket_range: (Distance, Distance),
2790        /// The ID of the peer that was evicted from the routing table to make
2791        /// room for the new peer, if any.
2792        old_peer: Option<PeerId>,
2793    },
2794
2795    /// A peer has connected for whom no listen address is known.
2796    ///
2797    /// If the peer is to be added to the routing table, a known
2798    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2799    UnroutablePeer { peer: PeerId },
2800
2801    /// A connection to a peer has been established for whom a listen address
2802    /// is known but the peer has not been added to the routing table either
2803    /// because [`BucketInserts::Manual`] is configured or because
2804    /// the corresponding bucket is full.
2805    ///
2806    /// If the peer is to be included in the routing table, it must
2807    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2808    /// removing another peer.
2809    ///
2810    /// See [`Behaviour::kbucket`] for insight into the contents of
2811    /// the k-bucket of `peer`.
2812    RoutablePeer { peer: PeerId, address: Multiaddr },
2813
2814    /// A connection to a peer has been established for whom a listen address
2815    /// is known but the peer is only pending insertion into the routing table
2816    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2817    /// may not make it into the routing table.
2818    ///
2819    /// If the peer is to be unconditionally included in the routing table,
2820    /// it should be explicitly added via [`Behaviour::add_address`] after
2821    /// removing another peer.
2822    ///
2823    /// See [`Behaviour::kbucket`] for insight into the contents of
2824    /// the k-bucket of `peer`.
2825    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2826
2827    /// This peer's mode has been updated automatically.
2828    ///
2829    /// This happens in response to an external
2830    /// address being added or removed.
2831    ModeChanged { new_mode: Mode },
2832}
2833
2834/// Information about progress events.
2835#[derive(Debug, Clone)]
2836pub struct ProgressStep {
2837    /// The index into the event
2838    pub count: NonZeroUsize,
2839    /// Is this the final event?
2840    pub last: bool,
2841}
2842
2843impl ProgressStep {
2844    fn first() -> Self {
2845        Self {
2846            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2847            last: false,
2848        }
2849    }
2850
2851    fn first_and_last() -> Self {
2852        let mut first = ProgressStep::first();
2853        first.last = true;
2854        first
2855    }
2856
2857    fn next(&self) -> Self {
2858        assert!(!self.last);
2859        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2860
2861        Self { count, last: false }
2862    }
2863}
2864
2865/// Information about a received and handled inbound request.
2866#[derive(Debug, Clone)]
2867pub enum InboundRequest {
2868    /// Request for the list of nodes whose IDs are the closest to `key`.
2869    FindNode { num_closer_peers: usize },
2870    /// Same as `FindNode`, but should also return the entries of the local
2871    /// providers list for this key.
2872    GetProvider {
2873        num_closer_peers: usize,
2874        num_provider_peers: usize,
2875    },
2876    /// A peer sent an add provider request.
2877    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2878    /// included.
2879    ///
2880    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2881    AddProvider { record: Option<ProviderRecord> },
2882    /// Request to retrieve a record.
2883    GetRecord {
2884        num_closer_peers: usize,
2885        present_locally: bool,
2886    },
2887    /// A peer sent a put record request.
2888    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2889    ///
2890    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2891    PutRecord {
2892        source: PeerId,
2893        connection: ConnectionId,
2894        record: Option<Record>,
2895    },
2896}
2897
2898/// The results of Kademlia queries.
2899#[derive(Debug, Clone)]
2900pub enum QueryResult {
2901    /// The result of [`Behaviour::bootstrap`].
2902    Bootstrap(BootstrapResult),
2903
2904    /// The result of [`Behaviour::get_closest_peers`].
2905    GetClosestPeers(GetClosestPeersResult),
2906
2907    /// The result of [`Behaviour::get_providers`].
2908    GetProviders(GetProvidersResult),
2909
2910    /// The result of [`Behaviour::start_providing`].
2911    StartProviding(AddProviderResult),
2912
2913    /// The result of a (automatic) republishing of a provider record.
2914    RepublishProvider(AddProviderResult),
2915
2916    /// The result of [`Behaviour::get_record`].
2917    GetRecord(GetRecordResult),
2918
2919    /// The result of [`Behaviour::put_record`].
2920    PutRecord(PutRecordResult),
2921
2922    /// The result of a (automatic) republishing of a (value-)record.
2923    RepublishRecord(PutRecordResult),
2924}
2925
2926/// The result of [`Behaviour::get_record`].
2927pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2928
2929/// The successful result of [`Behaviour::get_record`].
2930#[derive(Debug, Clone)]
2931#[allow(clippy::large_enum_variant)]
2932pub enum GetRecordOk {
2933    FoundRecord(PeerRecord),
2934    FinishedWithNoAdditionalRecord {
2935        /// If caching is enabled, these are the peers closest
2936        /// _to the record key_ (not the local node) that were queried but
2937        /// did not return the record, sorted by distance to the record key
2938        /// from closest to farthest. How many of these are tracked is configured
2939        /// by [`Config::set_caching`].
2940        ///
2941        /// Writing back the cache at these peers is a manual operation.
2942        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2943        /// after selecting one of the returned records.
2944        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2945    },
2946}
2947
2948/// The error result of [`Behaviour::get_record`].
2949#[derive(Debug, Clone, Error)]
2950pub enum GetRecordError {
2951    #[error("the record was not found")]
2952    NotFound {
2953        key: record::Key,
2954        closest_peers: Vec<PeerId>,
2955    },
2956    #[error("the quorum failed; needed {quorum} peers")]
2957    QuorumFailed {
2958        key: record::Key,
2959        records: Vec<PeerRecord>,
2960        quorum: NonZeroUsize,
2961    },
2962    #[error("the request timed out")]
2963    Timeout { key: record::Key },
2964}
2965
2966impl GetRecordError {
2967    /// Gets the key of the record for which the operation failed.
2968    pub fn key(&self) -> &record::Key {
2969        match self {
2970            GetRecordError::QuorumFailed { key, .. } => key,
2971            GetRecordError::Timeout { key, .. } => key,
2972            GetRecordError::NotFound { key, .. } => key,
2973        }
2974    }
2975
2976    /// Extracts the key of the record for which the operation failed,
2977    /// consuming the error.
2978    pub fn into_key(self) -> record::Key {
2979        match self {
2980            GetRecordError::QuorumFailed { key, .. } => key,
2981            GetRecordError::Timeout { key, .. } => key,
2982            GetRecordError::NotFound { key, .. } => key,
2983        }
2984    }
2985}
2986
2987/// The result of [`Behaviour::put_record`].
2988pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2989
2990/// The successful result of [`Behaviour::put_record`].
2991#[derive(Debug, Clone)]
2992pub struct PutRecordOk {
2993    pub key: record::Key,
2994}
2995
2996/// The error result of [`Behaviour::put_record`].
2997#[derive(Debug, Clone, Error)]
2998pub enum PutRecordError {
2999    #[error("the quorum failed; needed {quorum} peers")]
3000    QuorumFailed {
3001        key: record::Key,
3002        /// [`PeerId`]s of the peers the record was successfully stored on.
3003        success: Vec<PeerId>,
3004        quorum: NonZeroUsize,
3005    },
3006    #[error("the request timed out")]
3007    Timeout {
3008        key: record::Key,
3009        /// [`PeerId`]s of the peers the record was successfully stored on.
3010        success: Vec<PeerId>,
3011        quorum: NonZeroUsize,
3012    },
3013}
3014
3015impl PutRecordError {
3016    /// Gets the key of the record for which the operation failed.
3017    pub fn key(&self) -> &record::Key {
3018        match self {
3019            PutRecordError::QuorumFailed { key, .. } => key,
3020            PutRecordError::Timeout { key, .. } => key,
3021        }
3022    }
3023
3024    /// Extracts the key of the record for which the operation failed,
3025    /// consuming the error.
3026    pub fn into_key(self) -> record::Key {
3027        match self {
3028            PutRecordError::QuorumFailed { key, .. } => key,
3029            PutRecordError::Timeout { key, .. } => key,
3030        }
3031    }
3032}
3033
3034/// The result of [`Behaviour::bootstrap`].
3035pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3036
3037/// The successful result of [`Behaviour::bootstrap`].
3038#[derive(Debug, Clone)]
3039pub struct BootstrapOk {
3040    pub peer: PeerId,
3041    pub num_remaining: u32,
3042}
3043
3044/// The error result of [`Behaviour::bootstrap`].
3045#[derive(Debug, Clone, Error)]
3046pub enum BootstrapError {
3047    #[error("the request timed out")]
3048    Timeout {
3049        peer: PeerId,
3050        num_remaining: Option<u32>,
3051    },
3052}
3053
3054/// The result of [`Behaviour::get_closest_peers`].
3055pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3056
3057/// The successful result of [`Behaviour::get_closest_peers`].
3058#[derive(Debug, Clone)]
3059pub struct GetClosestPeersOk {
3060    pub key: Vec<u8>,
3061    pub peers: Vec<PeerInfo>,
3062}
3063
3064/// The error result of [`Behaviour::get_closest_peers`].
3065#[derive(Debug, Clone, Error)]
3066pub enum GetClosestPeersError {
3067    #[error("the request timed out")]
3068    Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3069}
3070
3071impl GetClosestPeersError {
3072    /// Gets the key for which the operation failed.
3073    pub fn key(&self) -> &Vec<u8> {
3074        match self {
3075            GetClosestPeersError::Timeout { key, .. } => key,
3076        }
3077    }
3078
3079    /// Extracts the key for which the operation failed,
3080    /// consuming the error.
3081    pub fn into_key(self) -> Vec<u8> {
3082        match self {
3083            GetClosestPeersError::Timeout { key, .. } => key,
3084        }
3085    }
3086}
3087
3088/// The result of [`Behaviour::get_providers`].
3089pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3090
3091/// The successful result of [`Behaviour::get_providers`].
3092#[derive(Debug, Clone)]
3093pub enum GetProvidersOk {
3094    FoundProviders {
3095        key: record::Key,
3096        /// The new set of providers discovered.
3097        providers: HashSet<PeerId>,
3098    },
3099    FinishedWithNoAdditionalRecord {
3100        closest_peers: Vec<PeerId>,
3101    },
3102}
3103
3104/// The error result of [`Behaviour::get_providers`].
3105#[derive(Debug, Clone, Error)]
3106pub enum GetProvidersError {
3107    #[error("the request timed out")]
3108    Timeout {
3109        key: record::Key,
3110        closest_peers: Vec<PeerId>,
3111    },
3112}
3113
3114impl GetProvidersError {
3115    /// Gets the key for which the operation failed.
3116    pub fn key(&self) -> &record::Key {
3117        match self {
3118            GetProvidersError::Timeout { key, .. } => key,
3119        }
3120    }
3121
3122    /// Extracts the key for which the operation failed,
3123    /// consuming the error.
3124    pub fn into_key(self) -> record::Key {
3125        match self {
3126            GetProvidersError::Timeout { key, .. } => key,
3127        }
3128    }
3129}
3130
3131/// The result of publishing a provider record.
3132pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3133
3134/// The successful result of publishing a provider record.
3135#[derive(Debug, Clone)]
3136pub struct AddProviderOk {
3137    pub key: record::Key,
3138}
3139
3140/// The possible errors when publishing a provider record.
3141#[derive(Debug, Clone, Error)]
3142pub enum AddProviderError {
3143    #[error("the request timed out")]
3144    Timeout { key: record::Key },
3145}
3146
3147impl AddProviderError {
3148    /// Gets the key for which the operation failed.
3149    pub fn key(&self) -> &record::Key {
3150        match self {
3151            AddProviderError::Timeout { key, .. } => key,
3152        }
3153    }
3154
3155    /// Extracts the key for which the operation failed,
3156    pub fn into_key(self) -> record::Key {
3157        match self {
3158            AddProviderError::Timeout { key, .. } => key,
3159        }
3160    }
3161}
3162
3163impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3164    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3165        KadPeer {
3166            node_id: e.node.key.into_preimage(),
3167            multiaddrs: e.node.value.into_vec(),
3168            connection_ty: match e.status {
3169                NodeStatus::Connected => ConnectionType::Connected,
3170                NodeStatus::Disconnected => ConnectionType::NotConnected,
3171            },
3172        }
3173    }
3174}
3175
3176/// The context of a [`QueryInfo::AddProvider`] query.
3177#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3178pub enum AddProviderContext {
3179    /// The context is a [`Behaviour::start_providing`] operation.
3180    Publish,
3181    /// The context is periodic republishing of provider announcements
3182    /// initiated earlier via [`Behaviour::start_providing`].
3183    Republish,
3184}
3185
3186/// The context of a [`QueryInfo::PutRecord`] query.
3187#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3188pub enum PutRecordContext {
3189    /// The context is a [`Behaviour::put_record`] operation.
3190    Publish,
3191    /// The context is periodic republishing of records stored
3192    /// earlier via [`Behaviour::put_record`].
3193    Republish,
3194    /// The context is periodic replication (i.e. without extending
3195    /// the record TTL) of stored records received earlier from another peer.
3196    Replicate,
3197    /// The context is a custom store operation targeting specific
3198    /// peers initiated by [`Behaviour::put_record_to`].
3199    Custom,
3200}
3201
3202/// Information about a running query.
3203#[derive(Debug, Clone)]
3204pub enum QueryInfo {
3205    /// A query initiated by [`Behaviour::bootstrap`].
3206    Bootstrap {
3207        /// The targeted peer ID.
3208        peer: PeerId,
3209        /// The remaining random peer IDs to query, one per
3210        /// bucket that still needs refreshing.
3211        ///
3212        /// This is `None` if the initial self-lookup has not
3213        /// yet completed and `Some` with an exhausted iterator
3214        /// if bootstrapping is complete.
3215        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3216        step: ProgressStep,
3217    },
3218
3219    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3220    GetClosestPeers {
3221        /// The key being queried (the preimage).
3222        key: Vec<u8>,
3223        /// Current index of events.
3224        step: ProgressStep,
3225        /// Specifies expected number of responding peers
3226        num_results: NonZeroUsize,
3227    },
3228
3229    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3230    GetProviders {
3231        /// The key for which to search for providers.
3232        key: record::Key,
3233        /// The number of providers found so far.
3234        providers_found: usize,
3235        /// Current index of events.
3236        step: ProgressStep,
3237    },
3238
3239    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3240    AddProvider {
3241        /// The record key.
3242        key: record::Key,
3243        /// The current phase of the query.
3244        phase: AddProviderPhase,
3245        /// The execution context of the query.
3246        context: AddProviderContext,
3247    },
3248
3249    /// A (repeated) query initiated by [`Behaviour::put_record`].
3250    PutRecord {
3251        record: Record,
3252        /// The expected quorum of responses w.r.t. the replication factor.
3253        quorum: NonZeroUsize,
3254        /// The current phase of the query.
3255        phase: PutRecordPhase,
3256        /// The execution context of the query.
3257        context: PutRecordContext,
3258    },
3259
3260    /// A (repeated) query initiated by [`Behaviour::get_record`].
3261    GetRecord {
3262        /// The key to look for.
3263        key: record::Key,
3264        /// Current index of events.
3265        step: ProgressStep,
3266        /// Did we find at least one record?
3267        found_a_record: bool,
3268        /// The peers closest to the `key` that were queried but did not return a record,
3269        /// i.e. the peers that are candidates for caching the record.
3270        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3271    },
3272}
3273
3274impl QueryInfo {
3275    /// Creates an event for a handler to issue an outgoing request in the
3276    /// context of a query.
3277    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3278        match &self {
3279            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3280                key: peer.to_bytes(),
3281                query_id,
3282            },
3283            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3284                key: key.clone(),
3285                query_id,
3286            },
3287            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3288                key: key.clone(),
3289                query_id,
3290            },
3291            QueryInfo::AddProvider { key, phase, .. } => match phase {
3292                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3293                    key: key.to_vec(),
3294                    query_id,
3295                },
3296                AddProviderPhase::AddProvider {
3297                    provider_id,
3298                    external_addresses,
3299                    ..
3300                } => HandlerIn::AddProvider {
3301                    key: key.clone(),
3302                    provider: crate::protocol::KadPeer {
3303                        node_id: *provider_id,
3304                        multiaddrs: external_addresses.clone(),
3305                        connection_ty: crate::protocol::ConnectionType::Connected,
3306                    },
3307                    query_id,
3308                },
3309            },
3310            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3311                key: key.clone(),
3312                query_id,
3313            },
3314            QueryInfo::PutRecord { record, phase, .. } => match phase {
3315                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3316                    key: record.key.to_vec(),
3317                    query_id,
3318                },
3319                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3320                    record: record.clone(),
3321                    query_id,
3322                },
3323            },
3324        }
3325    }
3326}
3327
3328/// The phases of a [`QueryInfo::AddProvider`] query.
3329#[derive(Debug, Clone)]
3330pub enum AddProviderPhase {
3331    /// The query is searching for the closest nodes to the record key.
3332    GetClosestPeers,
3333
3334    /// The query advertises the local node as a provider for the key to
3335    /// the closest nodes to the key.
3336    AddProvider {
3337        /// The local peer ID that is advertised as a provider.
3338        provider_id: PeerId,
3339        /// The external addresses of the provider being advertised.
3340        external_addresses: Vec<Multiaddr>,
3341        /// Query statistics from the finished `GetClosestPeers` phase.
3342        get_closest_peers_stats: QueryStats,
3343    },
3344}
3345
3346/// The phases of a [`QueryInfo::PutRecord`] query.
3347#[derive(Debug, Clone, PartialEq, Eq)]
3348pub enum PutRecordPhase {
3349    /// The query is searching for the closest nodes to the record key.
3350    GetClosestPeers,
3351
3352    /// The query is replicating the record to the closest nodes to the key.
3353    PutRecord {
3354        /// A list of peers the given record has been successfully replicated to.
3355        success: Vec<PeerId>,
3356        /// Query statistics from the finished `GetClosestPeers` phase.
3357        get_closest_peers_stats: QueryStats,
3358    },
3359}
3360
3361/// A mutable reference to a running query.
3362pub struct QueryMut<'a> {
3363    query: &'a mut Query,
3364}
3365
3366impl QueryMut<'_> {
3367    pub fn id(&self) -> QueryId {
3368        self.query.id()
3369    }
3370
3371    /// Gets information about the type and state of the query.
3372    pub fn info(&self) -> &QueryInfo {
3373        &self.query.info
3374    }
3375
3376    /// Gets execution statistics about the query.
3377    ///
3378    /// For a multi-phase query such as `put_record`, these are the
3379    /// statistics of the current phase.
3380    pub fn stats(&self) -> &QueryStats {
3381        self.query.stats()
3382    }
3383
3384    /// Finishes the query asap, without waiting for the
3385    /// regular termination conditions.
3386    pub fn finish(&mut self) {
3387        self.query.finish()
3388    }
3389}
3390
3391/// An immutable reference to a running query.
3392pub struct QueryRef<'a> {
3393    query: &'a Query,
3394}
3395
3396impl QueryRef<'_> {
3397    pub fn id(&self) -> QueryId {
3398        self.query.id()
3399    }
3400
3401    /// Gets information about the type and state of the query.
3402    pub fn info(&self) -> &QueryInfo {
3403        &self.query.info
3404    }
3405
3406    /// Gets execution statistics about the query.
3407    ///
3408    /// For a multi-phase query such as `put_record`, these are the
3409    /// statistics of the current phase.
3410    pub fn stats(&self) -> &QueryStats {
3411        self.query.stats()
3412    }
3413}
3414
3415/// An operation failed to due no known peers in the routing table.
3416#[derive(Debug, Clone)]
3417pub struct NoKnownPeers();
3418
3419impl fmt::Display for NoKnownPeers {
3420    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3421        write!(f, "No known peers.")
3422    }
3423}
3424
3425impl std::error::Error for NoKnownPeers {}
3426
3427/// The possible outcomes of [`Behaviour::add_address`].
3428#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3429pub enum RoutingUpdate {
3430    /// The given peer and address has been added to the routing
3431    /// table.
3432    Success,
3433    /// The peer and address is pending insertion into
3434    /// the routing table, if a disconnected peer fails
3435    /// to respond. If the given peer and address ends up
3436    /// in the routing table, [`Event::RoutingUpdated`]
3437    /// is eventually emitted.
3438    Pending,
3439    /// The routing table update failed, either because the
3440    /// corresponding bucket for the peer is full and the
3441    /// pending slot(s) are occupied, or because the given
3442    /// peer ID is deemed invalid (e.g. refers to the local
3443    /// peer ID).
3444    Failed,
3445}
3446
3447#[derive(PartialEq, Copy, Clone, Debug)]
3448pub enum Mode {
3449    Client,
3450    Server,
3451}
3452
3453impl fmt::Display for Mode {
3454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3455        match self {
3456            Mode::Client => write!(f, "client"),
3457            Mode::Server => write!(f, "server"),
3458        }
3459    }
3460}
3461
3462fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3463where
3464    T: ToString,
3465{
3466    confirmed_external_addresses
3467        .iter()
3468        .map(|addr| addr.to_string())
3469        .collect::<Vec<_>>()
3470        .join(", ")
3471}