libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26    collections::{BTreeMap, HashMap, HashSet, VecDeque},
27    fmt,
28    num::NonZeroUsize,
29    task::{Context, Poll, Waker},
30    time::Duration,
31    vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39    dial_opts::{self, DialOpts},
40    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42    THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50    addresses::Addresses,
51    bootstrap,
52    handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53    jobs::*,
54    kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55    protocol,
56    protocol::{ConnectionType, KadPeer, ProtocolConfig},
57    query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58    record::{
59        self,
60        store::{self, RecordStore},
61        ProviderRecord, Record,
62    },
63    K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69    /// The Kademlia routing table.
70    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72    /// The k-bucket insertion strategy.
73    kbucket_inserts: BucketInserts,
74
75    /// Configuration of the wire protocol.
76    protocol_config: ProtocolConfig,
77
78    /// Configuration of [`RecordStore`] filtering.
79    record_filtering: StoreInserts,
80
81    /// The currently active (i.e. in-progress) queries.
82    queries: QueryPool,
83
84    /// The currently connected peers.
85    ///
86    /// This is a superset of the connected peers currently in the routing table.
87    connected_peers: FnvHashSet<PeerId>,
88
89    /// Periodic job for re-publication of provider records for keys
90    /// provided by the local node.
91    add_provider_job: Option<AddProviderJob>,
92
93    /// Periodic job for (re-)replication and (re-)publishing of
94    /// regular (value-)records.
95    put_record_job: Option<PutRecordJob>,
96
97    /// The TTL of regular (value-)records.
98    record_ttl: Option<Duration>,
99
100    /// The TTL of provider records.
101    provider_record_ttl: Option<Duration>,
102
103    /// Queued events to return when the behaviour is being polled.
104    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106    listen_addresses: ListenAddresses,
107
108    external_addresses: ExternalAddresses,
109
110    connections: HashMap<ConnectionId, PeerId>,
111
112    /// See [`Config::caching`].
113    caching: Caching,
114
115    local_peer_id: PeerId,
116
117    mode: Mode,
118    auto_mode: bool,
119    no_events_waker: Option<Waker>,
120
121    /// The record storage.
122    store: TStore,
123
124    /// Tracks the status of the current bootstrap.
125    bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133    /// Whenever a connection to a peer is established as a
134    /// result of a dialing attempt and that peer is not yet
135    /// in the routing table, it is inserted as long as there
136    /// is a free slot in the corresponding k-bucket. If the
137    /// k-bucket is full but still has a free pending slot,
138    /// it may be inserted into the routing table at a later time if an unresponsive
139    /// disconnected peer is evicted from the bucket.
140    OnConnected,
141    /// New peers and addresses are only added to the routing table via
142    /// explicit calls to [`Behaviour::add_address`].
143    ///
144    /// > **Note**: Even though peers can only get into the
145    /// > routing table as a result of [`Behaviour::add_address`],
146    /// > routing table entries are still updated as peers
147    /// > connect and disconnect (i.e. the order of the entries
148    /// > as well as the network addresses).
149    Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161    /// Whenever a (provider) record is received,
162    /// the record is forwarded immediately to the [`RecordStore`].
163    Unfiltered,
164    /// Whenever a (provider) record is received, an event is emitted.
165    /// Provider records generate a [`InboundRequest::AddProvider`] under
166    /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167    /// under [`Event::InboundRequest`].
168    ///
169    /// When deemed valid, a (provider) record needs to be explicitly stored in
170    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172    /// be retrieved via [`Behaviour::store_mut`].
173    FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181    kbucket_config: KBucketConfig,
182    query_config: QueryConfig,
183    protocol_config: ProtocolConfig,
184    record_ttl: Option<Duration>,
185    record_replication_interval: Option<Duration>,
186    record_publication_interval: Option<Duration>,
187    record_filtering: StoreInserts,
188    provider_record_ttl: Option<Duration>,
189    provider_publication_interval: Option<Duration>,
190    kbucket_inserts: BucketInserts,
191    caching: Caching,
192    periodic_bootstrap_interval: Option<Duration>,
193    automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197    /// Returns the default configuration.
198    ///
199    /// Deprecated: use `Config::new` instead.
200    fn default() -> Self {
201        Self::new(protocol::DEFAULT_PROTO_NAME)
202    }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209    /// Caching is disabled and the peers closest to records being looked up
210    /// that do not return a record are not tracked, i.e.
211    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212    Disabled,
213    /// Up to `max_peers` peers not returning a record that are closest to the key
214    /// being looked up are tracked and returned in
215    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216    /// performed explicitly, if desired and after choosing a record from the results, via
217    /// [`Behaviour::put_record_to`].
218    Enabled { max_peers: u16 },
219}
220
221impl Config {
222    /// Builds a new `Config` with the given protocol name.
223    pub fn new(protocol_name: StreamProtocol) -> Self {
224        Config {
225            kbucket_config: KBucketConfig::default(),
226            query_config: QueryConfig::default(),
227            protocol_config: ProtocolConfig::new(protocol_name),
228            record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229            record_replication_interval: Some(Duration::from_secs(60 * 60)),
230            record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231            record_filtering: StoreInserts::Unfiltered,
232            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233            provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234            kbucket_inserts: BucketInserts::OnConnected,
235            caching: Caching::Enabled { max_peers: 1 },
236            periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237            automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238        }
239    }
240
241    /// Sets the timeout for a single query.
242    ///
243    /// > **Note**: A single query usually comprises at least as many requests
244    /// > as the replication factor, i.e. this is not a request timeout.
245    ///
246    /// The default is 60 seconds.
247    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248        self.query_config.timeout = timeout;
249        self
250    }
251
252    /// Sets the replication factor to use.
253    ///
254    /// The replication factor determines to how many closest peers
255    /// a record is replicated. The default is [`crate::K_VALUE`].
256    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257        self.query_config.replication_factor = replication_factor;
258        self
259    }
260
261    /// Sets the allowed level of parallelism for iterative queries.
262    ///
263    /// The `α` parameter in the Kademlia paper. The maximum number of peers
264    /// that an iterative query is allowed to wait for in parallel while
265    /// iterating towards the closest nodes to a target. Defaults to
266    /// `ALPHA_VALUE`.
267    ///
268    /// This only controls the level of parallelism of an iterative query, not
269    /// the level of parallelism of a query to a fixed set of peers.
270    ///
271    /// When used with [`Config::disjoint_query_paths`] it equals
272    /// the amount of disjoint paths used.
273    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274        self.query_config.parallelism = parallelism;
275        self
276    }
277
278    /// Require iterative queries to use disjoint paths for increased resiliency
279    /// in the presence of potentially adversarial nodes.
280    ///
281    /// When enabled the number of disjoint paths used equals the configured
282    /// parallelism.
283    ///
284    /// See the S/Kademlia paper for more information on the high level design
285    /// as well as its security improvements.
286    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287        self.query_config.disjoint_query_paths = enabled;
288        self
289    }
290
291    /// Sets the TTL for stored records.
292    ///
293    /// The TTL should be significantly longer than the (re-)publication
294    /// interval, to avoid premature expiration of records. The default is 36
295    /// hours.
296    ///
297    /// `None` means records never expire.
298    ///
299    /// Does not apply to provider records.
300    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301        self.record_ttl = record_ttl;
302        self
303    }
304
305    /// Sets whether or not records should be filtered before being stored.
306    ///
307    /// See [`StoreInserts`] for the different values.
308    /// Defaults to [`StoreInserts::Unfiltered`].
309    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310        self.record_filtering = filtering;
311        self
312    }
313
314    /// Sets the (re-)replication interval for stored records.
315    ///
316    /// Periodic replication of stored records ensures that the records
317    /// are always replicated to the available nodes closest to the key in the
318    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319    /// ensuring persistence until the record expires. Replication does not
320    /// prolong the regular lifetime of a record (for otherwise it would live
321    /// forever regardless of the configured TTL). The expiry of a record
322    /// is only extended through re-publication.
323    ///
324    /// This interval should be significantly shorter than the publication
325    /// interval, to ensure persistence between re-publications. The default
326    /// is 1 hour.
327    ///
328    /// `None` means that stored records are never re-replicated.
329    ///
330    /// Does not apply to provider records.
331    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332        self.record_replication_interval = interval;
333        self
334    }
335
336    /// Sets the (re-)publication interval of stored records.
337    ///
338    /// Records persist in the DHT until they expire. By default, published
339    /// records are re-published in regular intervals for as long as the record
340    /// exists in the local storage of the original publisher, thereby extending
341    /// the records lifetime.
342    ///
343    /// This interval should be significantly shorter than the record TTL, to
344    /// ensure records do not expire prematurely. The default is 24 hours.
345    ///
346    /// `None` means that stored records are never automatically re-published.
347    ///
348    /// Does not apply to provider records.
349    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350        self.record_publication_interval = interval;
351        self
352    }
353
354    /// Sets the TTL for provider records.
355    ///
356    /// `None` means that stored provider records never expire.
357    ///
358    /// Must be significantly larger than the provider publication interval.
359    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360        self.provider_record_ttl = ttl;
361        self
362    }
363
364    /// Sets the interval at which provider records for keys provided
365    /// by the local node are re-published.
366    ///
367    /// `None` means that stored provider records are never automatically
368    /// re-published.
369    ///
370    /// Must be significantly less than the provider record TTL.
371    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372        self.provider_publication_interval = interval;
373        self
374    }
375
376    /// Modifies the maximum allowed size of individual Kademlia packets.
377    ///
378    /// It might be necessary to increase this value if trying to put large
379    /// records.
380    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381        self.protocol_config.set_max_packet_size(size);
382        self
383    }
384
385    /// Modifies the timeout duration of outbound substreams.
386    ///
387    /// * Default to `10` seconds.
388    /// * May need to increase this value when sending large records with poor connection.
389    pub fn set_substreams_timeout(&mut self, timeout: Duration) -> &mut Self {
390        self.protocol_config.set_substreams_timeout(timeout);
391        self
392    }
393
394    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
395    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
396        self.kbucket_inserts = inserts;
397        self
398    }
399
400    /// Sets the [`Caching`] strategy to use for successful lookups.
401    ///
402    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
403    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
404    /// will result in the record being cached at the closest node to the key that
405    /// did not return the record, i.e. the standard Kademlia behaviour.
406    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
407        self.caching = c;
408        self
409    }
410
411    /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
412    ///
413    /// * Default to `5` minutes.
414    /// * Set to `None` to disable periodic bootstrap.
415    pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
416        self.periodic_bootstrap_interval = interval;
417        self
418    }
419
420    /// Sets the configuration for the k-buckets.
421    ///
422    /// * Default to K_VALUE.
423    ///
424    /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
425    pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
426        self.kbucket_config.set_bucket_size(size);
427        self
428    }
429
430    /// Sets the timeout duration after creation of a pending entry after which
431    /// it becomes eligible for insertion into a full bucket, replacing the
432    /// least-recently (dis)connected node.
433    ///
434    /// * Default to `60` s.
435    pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
436        self.kbucket_config.set_pending_timeout(timeout);
437        self
438    }
439
440    /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
441    /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
442    /// inserted into the routing table "at the same time". This also allows to wait a little
443    /// bit for other potential peers to be inserted into the routing table before triggering a
444    /// bootstrap, giving more context to the future bootstrap request.
445    ///
446    /// * Default to `500` ms.
447    /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
448    ///   new peer is inserted in the routing table.
449    /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
450    ///   a new peer is inserted in the routing table).
451    #[cfg(test)]
452    pub(crate) fn set_automatic_bootstrap_throttle(
453        &mut self,
454        duration: Option<Duration>,
455    ) -> &mut Self {
456        self.automatic_bootstrap_throttle = duration;
457        self
458    }
459}
460
461impl<TStore> Behaviour<TStore>
462where
463    TStore: RecordStore + Send + 'static,
464{
465    /// Creates a new `Kademlia` network behaviour with a default configuration.
466    pub fn new(id: PeerId, store: TStore) -> Self {
467        Self::with_config(id, store, Default::default())
468    }
469
470    /// Get the protocol name of this kademlia instance.
471    pub fn protocol_names(&self) -> &[StreamProtocol] {
472        self.protocol_config.protocol_names()
473    }
474
475    /// Creates a new `Kademlia` network behaviour with the given configuration.
476    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
477        let local_key = kbucket::Key::from(id);
478
479        let put_record_job = config
480            .record_replication_interval
481            .or(config.record_publication_interval)
482            .map(|interval| {
483                PutRecordJob::new(
484                    id,
485                    interval,
486                    config.record_publication_interval,
487                    config.record_ttl,
488                )
489            });
490
491        let add_provider_job = config
492            .provider_publication_interval
493            .map(AddProviderJob::new);
494
495        Behaviour {
496            store,
497            caching: config.caching,
498            kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
499            kbucket_inserts: config.kbucket_inserts,
500            protocol_config: config.protocol_config,
501            record_filtering: config.record_filtering,
502            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
503            listen_addresses: Default::default(),
504            queries: QueryPool::new(config.query_config),
505            connected_peers: Default::default(),
506            add_provider_job,
507            put_record_job,
508            record_ttl: config.record_ttl,
509            provider_record_ttl: config.provider_record_ttl,
510            external_addresses: Default::default(),
511            local_peer_id: id,
512            connections: Default::default(),
513            mode: Mode::Client,
514            auto_mode: true,
515            no_events_waker: None,
516            bootstrap_status: bootstrap::Status::new(
517                config.periodic_bootstrap_interval,
518                config.automatic_bootstrap_throttle,
519            ),
520        }
521    }
522
523    /// Gets an iterator over immutable references to all running queries.
524    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
525        self.queries.iter().filter_map(|query| {
526            if !query.is_finished() {
527                Some(QueryRef { query })
528            } else {
529                None
530            }
531        })
532    }
533
534    /// Gets an iterator over mutable references to all running queries.
535    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
536        self.queries.iter_mut().filter_map(|query| {
537            if !query.is_finished() {
538                Some(QueryMut { query })
539            } else {
540                None
541            }
542        })
543    }
544
545    /// Gets an immutable reference to a running query, if it exists.
546    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
547        self.queries.get(id).and_then(|query| {
548            if !query.is_finished() {
549                Some(QueryRef { query })
550            } else {
551                None
552            }
553        })
554    }
555
556    /// Gets a mutable reference to a running query, if it exists.
557    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
558        self.queries.get_mut(id).and_then(|query| {
559            if !query.is_finished() {
560                Some(QueryMut { query })
561            } else {
562                None
563            }
564        })
565    }
566
567    /// Adds a known listen address of a peer participating in the DHT to the
568    /// routing table.
569    ///
570    /// Explicitly adding addresses of peers serves two purposes:
571    ///
572    ///   1. In order for a node to join the DHT, it must know about at least one other node of the
573    ///      DHT.
574    ///
575    ///   2. When a remote peer initiates a connection and that peer is not yet in the routing
576    ///      table, the `Kademlia` behaviour must be informed of an address on which that peer is
577    ///      listening for connections before it can be added to the routing table from where it can
578    ///      subsequently be discovered by all peers in the DHT.
579    ///
580    /// If the routing table has been updated as a result of this operation,
581    /// a [`Event::RoutingUpdated`] event is emitted.
582    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
583        // ensuring address is a fully-qualified /p2p multiaddr
584        let Ok(address) = address.with_p2p(*peer) else {
585            return RoutingUpdate::Failed;
586        };
587        let key = kbucket::Key::from(*peer);
588        match self.kbuckets.entry(&key) {
589            Some(kbucket::Entry::Present(mut entry, _)) => {
590                if entry.value().insert(address) {
591                    self.queued_events
592                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
593                            peer: *peer,
594                            is_new_peer: false,
595                            addresses: entry.value().clone(),
596                            old_peer: None,
597                            bucket_range: self
598                                .kbuckets
599                                .bucket(&key)
600                                .map(|b| b.range())
601                                .expect("Not kbucket::Entry::SelfEntry."),
602                        }))
603                }
604                RoutingUpdate::Success
605            }
606            Some(kbucket::Entry::Pending(mut entry, _)) => {
607                entry.value().insert(address);
608                RoutingUpdate::Pending
609            }
610            Some(kbucket::Entry::Absent(entry)) => {
611                let addresses = Addresses::new(address);
612                let status = if self.connected_peers.contains(peer) {
613                    NodeStatus::Connected
614                } else {
615                    NodeStatus::Disconnected
616                };
617                match entry.insert(addresses.clone(), status) {
618                    kbucket::InsertResult::Inserted => {
619                        self.bootstrap_on_low_peers();
620
621                        self.queued_events.push_back(ToSwarm::GenerateEvent(
622                            Event::RoutingUpdated {
623                                peer: *peer,
624                                is_new_peer: true,
625                                addresses,
626                                old_peer: None,
627                                bucket_range: self
628                                    .kbuckets
629                                    .bucket(&key)
630                                    .map(|b| b.range())
631                                    .expect("Not kbucket::Entry::SelfEntry."),
632                            },
633                        ));
634                        RoutingUpdate::Success
635                    }
636                    kbucket::InsertResult::Full => {
637                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
638                        RoutingUpdate::Failed
639                    }
640                    kbucket::InsertResult::Pending { disconnected } => {
641                        self.queued_events.push_back(ToSwarm::Dial {
642                            opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
643                        });
644                        RoutingUpdate::Pending
645                    }
646                }
647            }
648            None => RoutingUpdate::Failed,
649        }
650    }
651
652    /// Removes an address of a peer from the routing table.
653    ///
654    /// If the given address is the last address of the peer in the
655    /// routing table, the peer is removed from the routing table
656    /// and `Some` is returned with a view of the removed entry.
657    /// The same applies if the peer is currently pending insertion
658    /// into the routing table.
659    ///
660    /// If the given peer or address is not in the routing table,
661    /// this is a no-op.
662    pub fn remove_address(
663        &mut self,
664        peer: &PeerId,
665        address: &Multiaddr,
666    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
667        let address = &address.to_owned().with_p2p(*peer).ok()?;
668        let key = kbucket::Key::from(*peer);
669        match self.kbuckets.entry(&key)? {
670            kbucket::Entry::Present(mut entry, _) => {
671                if entry.value().remove(address).is_err() {
672                    Some(entry.remove()) // it is the last address, thus remove the peer.
673                } else {
674                    None
675                }
676            }
677            kbucket::Entry::Pending(mut entry, _) => {
678                if entry.value().remove(address).is_err() {
679                    Some(entry.remove()) // it is the last address, thus remove the peer.
680                } else {
681                    None
682                }
683            }
684            kbucket::Entry::Absent(..) => None,
685        }
686    }
687
688    /// Removes a peer from the routing table.
689    ///
690    /// Returns `None` if the peer was not in the routing table,
691    /// not even pending insertion.
692    pub fn remove_peer(
693        &mut self,
694        peer: &PeerId,
695    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
696        let key = kbucket::Key::from(*peer);
697        match self.kbuckets.entry(&key)? {
698            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
699            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
700            kbucket::Entry::Absent(..) => None,
701        }
702    }
703
704    /// Returns an iterator over all non-empty buckets in the routing table.
705    pub fn kbuckets(
706        &mut self,
707    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
708        self.kbuckets.iter().filter(|b| !b.is_empty())
709    }
710
711    /// Returns the k-bucket for the distance to the given key.
712    ///
713    /// Returns `None` if the given key refers to the local key.
714    pub fn kbucket<K>(
715        &mut self,
716        key: K,
717    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
718    where
719        K: Into<kbucket::Key<K>> + Clone,
720    {
721        self.kbuckets.bucket(&key.into())
722    }
723
724    /// Initiates an iterative query for the closest peers to the given key.
725    ///
726    /// The result of the query is delivered in a
727    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
728    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
729    where
730        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
731    {
732        self.get_closest_peers_inner(key, K_VALUE)
733    }
734
735    /// Initiates an iterative query for the closest peers to the given key.
736    /// The expected responding peers is specified by `num_results`
737    /// Note that the result is capped after exceeds K_VALUE
738    ///
739    /// The result of the query is delivered in a
740    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
741    pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
742    where
743        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
744    {
745        // The inner code never expect higher than K_VALUE results to be returned.
746        // And removing such cap will be tricky,
747        // since it would involve forging a new key and additional requests.
748        // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
749        let capped_num_results = std::cmp::min(num_results, K_VALUE);
750        self.get_closest_peers_inner(key, capped_num_results)
751    }
752
753    fn get_closest_peers_inner<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
754    where
755        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
756    {
757        let target: kbucket::Key<K> = key.clone().into();
758        let key: Vec<u8> = key.into();
759        let info = QueryInfo::GetClosestPeers {
760            key,
761            step: ProgressStep::first(),
762            num_results,
763        };
764        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
765        self.queries.add_iter_closest(target, peer_keys, info)
766    }
767
768    /// Returns all peers ordered by distance to the given key; takes peers from local routing table
769    /// only.
770    pub fn get_closest_local_peers<'a, K: Clone>(
771        &'a mut self,
772        key: &'a kbucket::Key<K>,
773    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
774        self.kbuckets.closest_keys(key)
775    }
776
777    /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
778    /// that the `source` peer is never included in the result.
779    ///
780    /// Takes peers from local routing table only. Only returns number of peers equal to configured
781    /// replication factor.
782    pub fn find_closest_local_peers<'a, K: Clone>(
783        &'a mut self,
784        key: &'a kbucket::Key<K>,
785        source: &'a PeerId,
786    ) -> impl Iterator<Item = KadPeer> + 'a {
787        self.kbuckets
788            .closest(key)
789            .filter(move |e| e.node.key.preimage() != source)
790            .take(K_VALUE.get())
791            .map(KadPeer::from)
792    }
793
794    /// Performs a lookup for a record in the DHT.
795    ///
796    /// The result of this operation is delivered in a
797    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
798    pub fn get_record(&mut self, key: record::Key) -> QueryId {
799        let record = if let Some(record) = self.store.get(&key) {
800            if record.is_expired(Instant::now()) {
801                self.store.remove(&key);
802                None
803            } else {
804                Some(PeerRecord {
805                    peer: None,
806                    record: record.into_owned(),
807                })
808            }
809        } else {
810            None
811        };
812
813        let step = ProgressStep::first();
814
815        let target = kbucket::Key::new(key.clone());
816        let info = if record.is_some() {
817            QueryInfo::GetRecord {
818                key,
819                step: step.next(),
820                found_a_record: true,
821                cache_candidates: BTreeMap::new(),
822            }
823        } else {
824            QueryInfo::GetRecord {
825                key,
826                step: step.clone(),
827                found_a_record: false,
828                cache_candidates: BTreeMap::new(),
829            }
830        };
831        let peers = self.kbuckets.closest_keys(&target);
832        let id = self.queries.add_iter_closest(target.clone(), peers, info);
833
834        // No queries were actually done for the results yet.
835        let stats = QueryStats::empty();
836
837        if let Some(record) = record {
838            self.queued_events
839                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
840                    id,
841                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
842                    step,
843                    stats,
844                }));
845        }
846
847        id
848    }
849
850    /// Stores a record in the DHT, locally as well as at the nodes
851    /// closest to the key as per the xor distance metric.
852    ///
853    /// Returns `Ok` if a record has been stored locally, providing the
854    /// `QueryId` of the initial query that replicates the record in the DHT.
855    /// The result of the query is eventually reported as a
856    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
857    ///
858    /// The record is always stored locally with the given expiration. If the record's
859    /// expiration is `None`, the common case, it does not expire in local storage
860    /// but is still replicated with the configured record TTL. To remove the record
861    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
862    ///
863    /// After the initial publication of the record, it is subject to (re-)replication
864    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
865    /// does not update the record's expiration in local storage, thus a given record
866    /// with an explicit expiration will always expire at that instant and until then
867    /// is subject to regular (re-)replication and (re-)publication.
868    pub fn put_record(
869        &mut self,
870        mut record: Record,
871        quorum: Quorum,
872    ) -> Result<QueryId, store::Error> {
873        record.publisher = Some(*self.kbuckets.local_key().preimage());
874        self.store.put(record.clone())?;
875        record.expires = record
876            .expires
877            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
878        let quorum = quorum.eval(self.queries.config().replication_factor);
879        let target = kbucket::Key::new(record.key.clone());
880        let peers = self.kbuckets.closest_keys(&target);
881        let context = PutRecordContext::Publish;
882        let info = QueryInfo::PutRecord {
883            context,
884            record,
885            quorum,
886            phase: PutRecordPhase::GetClosestPeers,
887        };
888        Ok(self.queries.add_iter_closest(target.clone(), peers, info))
889    }
890
891    /// Stores a record at specific peers, without storing it locally.
892    ///
893    /// The given [`Quorum`] is understood in the context of the total
894    /// number of distinct peers given.
895    ///
896    /// If the record's expiration is `None`, the configured record TTL is used.
897    ///
898    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
899    /// > used to selectively update or store a record to specific peers
900    /// > for the purpose of e.g. making sure these peers have the latest
901    /// > "version" of a record or to "cache" a record at further peers
902    /// > to increase the lookup success rate on the DHT for other peers.
903    /// >
904    /// > In particular, there is no automatic storing of records performed, and this
905    /// > method must be used to ensure the standard Kademlia
906    /// > procedure of "caching" (i.e. storing) a found record at the closest
907    /// > node to the key that _did not_ return it.
908    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
909    where
910        I: ExactSizeIterator<Item = PeerId>,
911    {
912        let quorum = if peers.len() > 0 {
913            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
914        } else {
915            // If no peers are given, we just let the query fail immediately
916            // due to the fact that the quorum must be at least one, instead of
917            // introducing a new kind of error.
918            NonZeroUsize::new(1).expect("1 > 0")
919        };
920        record.expires = record
921            .expires
922            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
923        let context = PutRecordContext::Custom;
924        let info = QueryInfo::PutRecord {
925            context,
926            record,
927            quorum,
928            phase: PutRecordPhase::PutRecord {
929                success: Vec::new(),
930                get_closest_peers_stats: QueryStats::empty(),
931            },
932        };
933        self.queries.add_fixed(peers, info)
934    }
935
936    /// Removes the record with the given key from _local_ storage,
937    /// if the local node is the publisher of the record.
938    ///
939    /// Has no effect if a record for the given key is stored locally but
940    /// the local node is not a publisher of the record.
941    ///
942    /// This is a _local_ operation. However, it also has the effect that
943    /// the record will no longer be periodically re-published, allowing the
944    /// record to eventually expire throughout the DHT.
945    pub fn remove_record(&mut self, key: &record::Key) {
946        if let Some(r) = self.store.get(key) {
947            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
948                self.store.remove(key)
949            }
950        }
951    }
952
953    /// Gets a mutable reference to the record store.
954    pub fn store_mut(&mut self) -> &mut TStore {
955        &mut self.store
956    }
957
958    /// Bootstraps the local node to join the DHT.
959    ///
960    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
961    /// own ID in the DHT. This introduces the local node to the other nodes
962    /// in the DHT and populates its routing table with the closest neighbours.
963    ///
964    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
965    /// refreshed by initiating an additional bootstrapping query for each such
966    /// bucket with random keys.
967    ///
968    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
969    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
970    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
971    /// with one such event per bootstrapping query.
972    ///
973    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
974    ///
975    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
976    /// > See [`Behaviour::add_address`].
977    ///
978    /// > **Note**: Bootstrap does not require to be called manually. It is periodically
979    /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
980    /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
981    /// > invoked
982    /// > when a new peer is inserted in the routing table.
983    /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
984    /// > to ensure a healthy routing table.
985    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
986        let local_key = *self.kbuckets.local_key();
987        let info = QueryInfo::Bootstrap {
988            peer: *local_key.preimage(),
989            remaining: None,
990            step: ProgressStep::first(),
991        };
992        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
993        if peers.is_empty() {
994            self.bootstrap_status.reset_timers();
995            Err(NoKnownPeers())
996        } else {
997            self.bootstrap_status.on_started();
998            Ok(self.queries.add_iter_closest(local_key, peers, info))
999        }
1000    }
1001
1002    /// Establishes the local node as a provider of a value for the given key.
1003    ///
1004    /// This operation publishes a provider record with the given key and
1005    /// identity of the local node to the peers closest to the key, thus establishing
1006    /// the local node as a provider.
1007    ///
1008    /// Returns `Ok` if a provider record has been stored locally, providing the
1009    /// `QueryId` of the initial query that announces the local node as a provider.
1010    ///
1011    /// The publication of the provider records is periodically repeated as per the
1012    /// configured interval, to renew the expiry and account for changes to the DHT
1013    /// topology. A provider record may be removed from local storage and
1014    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1015    ///
1016    /// In contrast to the standard Kademlia push-based model for content distribution
1017    /// implemented by [`Behaviour::put_record`], the provider API implements a
1018    /// pull-based model that may be used in addition or as an alternative.
1019    /// The means by which the actual value is obtained from a provider is out of scope
1020    /// of the libp2p Kademlia provider API.
1021    ///
1022    /// The results of the (repeated) provider announcements sent by this node are
1023    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1024    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1025        // Note: We store our own provider records locally without local addresses
1026        // to avoid redundant storage and outdated addresses. Instead these are
1027        // acquired on demand when returning a `ProviderRecord` for the local node.
1028        let local_addrs = Vec::new();
1029        let record = ProviderRecord::new(
1030            key.clone(),
1031            *self.kbuckets.local_key().preimage(),
1032            local_addrs,
1033        );
1034        self.store.add_provider(record)?;
1035        let target = kbucket::Key::new(key.clone());
1036        let peers = self.kbuckets.closest_keys(&target);
1037        let context = AddProviderContext::Publish;
1038        let info = QueryInfo::AddProvider {
1039            context,
1040            key,
1041            phase: AddProviderPhase::GetClosestPeers,
1042        };
1043        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1044        Ok(id)
1045    }
1046
1047    /// Stops the local node from announcing that it is a provider for the given key.
1048    ///
1049    /// This is a local operation. The local node will still be considered as a
1050    /// provider for the key by other nodes until these provider records expire.
1051    pub fn stop_providing(&mut self, key: &record::Key) {
1052        self.store
1053            .remove_provider(key, self.kbuckets.local_key().preimage());
1054    }
1055
1056    /// Performs a lookup for providers of a value to the given key.
1057    ///
1058    /// The result of this operation is delivered in a
1059    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1060    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1061        let providers: HashSet<_> = self
1062            .store
1063            .providers(&key)
1064            .into_iter()
1065            .filter(|p| {
1066                if p.is_expired(Instant::now()) {
1067                    self.store.remove_provider(&key, &p.provider);
1068                    false
1069                } else {
1070                    true
1071                }
1072            })
1073            .map(|p| p.provider)
1074            .collect();
1075
1076        let step = ProgressStep::first();
1077
1078        let info = QueryInfo::GetProviders {
1079            key: key.clone(),
1080            providers_found: providers.len(),
1081            step: if providers.is_empty() {
1082                step.clone()
1083            } else {
1084                step.next()
1085            },
1086        };
1087
1088        let target = kbucket::Key::new(key.clone());
1089        let peers = self.kbuckets.closest_keys(&target);
1090        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1091
1092        // No queries were actually done for the results yet.
1093        let stats = QueryStats::empty();
1094
1095        if !providers.is_empty() {
1096            self.queued_events
1097                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1098                    id,
1099                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1100                        key,
1101                        providers,
1102                    })),
1103                    step,
1104                    stats,
1105                }));
1106        }
1107        id
1108    }
1109
1110    /// Set the [`Mode`] in which we should operate.
1111    ///
1112    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1113    /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1114    ///
1115    /// Setting a mode via this function disables this automatic behaviour and unconditionally
1116    /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1117    /// instead.
1118    pub fn set_mode(&mut self, mode: Option<Mode>) {
1119        match mode {
1120            Some(mode) => {
1121                self.mode = mode;
1122                self.auto_mode = false;
1123                self.reconfigure_mode();
1124            }
1125            None => {
1126                self.auto_mode = true;
1127                self.determine_mode_from_external_addresses();
1128            }
1129        }
1130
1131        if let Some(waker) = self.no_events_waker.take() {
1132            waker.wake();
1133        }
1134    }
1135
1136    /// Get the [`Mode`] in which the DHT is currently operating.
1137    pub fn mode(&self) -> Mode {
1138        self.mode
1139    }
1140
1141    fn reconfigure_mode(&mut self) {
1142        if self.connections.is_empty() {
1143            return;
1144        }
1145
1146        let num_connections = self.connections.len();
1147
1148        tracing::debug!(
1149            "Re-configuring {} established connection{}",
1150            num_connections,
1151            if num_connections > 1 { "s" } else { "" }
1152        );
1153
1154        self.queued_events
1155            .extend(
1156                self.connections
1157                    .iter()
1158                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1159                        peer_id: *peer_id,
1160                        handler: NotifyHandler::One(*conn_id),
1161                        event: HandlerIn::ReconfigureMode {
1162                            new_mode: self.mode,
1163                        },
1164                    }),
1165            );
1166    }
1167
1168    fn determine_mode_from_external_addresses(&mut self) {
1169        let old_mode = self.mode;
1170
1171        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1172            ([], Mode::Server) => {
1173                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1174
1175                Mode::Client
1176            }
1177            ([], Mode::Client) => {
1178                // Previously client-mode, now also client-mode because no external addresses.
1179
1180                Mode::Client
1181            }
1182            (confirmed_external_addresses, Mode::Client) => {
1183                if tracing::enabled!(Level::DEBUG) {
1184                    let confirmed_external_addresses =
1185                        to_comma_separated_list(confirmed_external_addresses);
1186
1187                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1188                }
1189
1190                Mode::Server
1191            }
1192            (confirmed_external_addresses, Mode::Server) => {
1193                debug_assert!(
1194                    !confirmed_external_addresses.is_empty(),
1195                    "Previous match arm handled empty list"
1196                );
1197
1198                // Previously, server-mode, now also server-mode because > 1 external address.
1199                //  Don't log anything to avoid spam.
1200                Mode::Server
1201            }
1202        };
1203
1204        self.reconfigure_mode();
1205
1206        if old_mode != self.mode {
1207            self.queued_events
1208                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1209                    new_mode: self.mode,
1210                }));
1211        }
1212    }
1213
1214    /// Processes discovered peers from a successful request in an iterative `Query`.
1215    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1216    where
1217        I: Iterator<Item = &'a KadPeer> + Clone,
1218    {
1219        let local_id = self.kbuckets.local_key().preimage();
1220        let others_iter = peers.filter(|p| &p.node_id != local_id);
1221        if let Some(query) = self.queries.get_mut(query_id) {
1222            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1223            for peer in others_iter.clone() {
1224                tracing::trace!(
1225                    ?peer,
1226                    %source,
1227                    query=?query_id,
1228                    "Peer reported by source in query"
1229                );
1230                let addrs = peer.multiaddrs.iter().cloned().collect();
1231                query.peers.addresses.insert(peer.node_id, addrs);
1232            }
1233            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1234        }
1235    }
1236
1237    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1238    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1239        self.store
1240            .providers(key)
1241            .into_iter()
1242            .filter_map(|p| {
1243                if p.is_expired(Instant::now()) {
1244                    self.store.remove_provider(key, &p.provider);
1245                    return None;
1246                }
1247
1248                if &p.provider != source {
1249                    let node_id = p.provider;
1250                    let multiaddrs = p.addresses;
1251                    let connection_ty = if self.connected_peers.contains(&node_id) {
1252                        ConnectionType::Connected
1253                    } else {
1254                        ConnectionType::NotConnected
1255                    };
1256                    if multiaddrs.is_empty() {
1257                        // The provider is either the local node and we fill in
1258                        // the local addresses on demand, or it is a legacy
1259                        // provider record without addresses, in which case we
1260                        // try to find addresses in the routing table, as was
1261                        // done before provider records were stored along with
1262                        // their addresses.
1263                        if &node_id == self.kbuckets.local_key().preimage() {
1264                            Some(
1265                                self.listen_addresses
1266                                    .iter()
1267                                    .chain(self.external_addresses.iter())
1268                                    .cloned()
1269                                    .collect::<Vec<_>>(),
1270                            )
1271                        } else {
1272                            let key = kbucket::Key::from(node_id);
1273                            self.kbuckets
1274                                .entry(&key)
1275                                .as_mut()
1276                                .and_then(|e| e.view())
1277                                .map(|e| e.node.value.clone().into_vec())
1278                        }
1279                    } else {
1280                        Some(multiaddrs)
1281                    }
1282                    .map(|multiaddrs| KadPeer {
1283                        node_id,
1284                        multiaddrs,
1285                        connection_ty,
1286                    })
1287                } else {
1288                    None
1289                }
1290            })
1291            .take(self.queries.config().replication_factor.get())
1292            .collect()
1293    }
1294
1295    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1296    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1297        let info = QueryInfo::AddProvider {
1298            context,
1299            key: key.clone(),
1300            phase: AddProviderPhase::GetClosestPeers,
1301        };
1302        let target = kbucket::Key::new(key);
1303        let peers = self.kbuckets.closest_keys(&target);
1304        self.queries.add_iter_closest(target.clone(), peers, info);
1305    }
1306
1307    /// Starts an iterative `PUT_VALUE` query for the given record.
1308    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1309        let quorum = quorum.eval(self.queries.config().replication_factor);
1310        let target = kbucket::Key::new(record.key.clone());
1311        let peers = self.kbuckets.closest_keys(&target);
1312        let info = QueryInfo::PutRecord {
1313            record,
1314            quorum,
1315            context,
1316            phase: PutRecordPhase::GetClosestPeers,
1317        };
1318        self.queries.add_iter_closest(target.clone(), peers, info);
1319    }
1320
1321    /// Updates the routing table with a new connection status and address of a peer.
1322    fn connection_updated(
1323        &mut self,
1324        peer: PeerId,
1325        address: Option<Multiaddr>,
1326        new_status: NodeStatus,
1327    ) {
1328        let key = kbucket::Key::from(peer);
1329        match self.kbuckets.entry(&key) {
1330            Some(kbucket::Entry::Present(mut entry, old_status)) => {
1331                if old_status != new_status {
1332                    entry.update(new_status)
1333                }
1334                if let Some(address) = address {
1335                    if entry.value().insert(address) {
1336                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1337                            Event::RoutingUpdated {
1338                                peer,
1339                                is_new_peer: false,
1340                                addresses: entry.value().clone(),
1341                                old_peer: None,
1342                                bucket_range: self
1343                                    .kbuckets
1344                                    .bucket(&key)
1345                                    .map(|b| b.range())
1346                                    .expect("Not kbucket::Entry::SelfEntry."),
1347                            },
1348                        ))
1349                    }
1350                }
1351            }
1352
1353            Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1354                if let Some(address) = address {
1355                    entry.value().insert(address);
1356                }
1357                if old_status != new_status {
1358                    entry.update(new_status);
1359                }
1360            }
1361
1362            Some(kbucket::Entry::Absent(entry)) => {
1363                // Only connected nodes with a known address are newly inserted.
1364                if new_status != NodeStatus::Connected {
1365                    return;
1366                }
1367                match (address, self.kbucket_inserts) {
1368                    (None, _) => {
1369                        self.queued_events
1370                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1371                    }
1372                    (Some(a), BucketInserts::Manual) => {
1373                        self.queued_events
1374                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1375                                peer,
1376                                address: a,
1377                            }));
1378                    }
1379                    (Some(a), BucketInserts::OnConnected) => {
1380                        let addresses = Addresses::new(a);
1381                        match entry.insert(addresses.clone(), new_status) {
1382                            kbucket::InsertResult::Inserted => {
1383                                self.bootstrap_on_low_peers();
1384
1385                                let event = Event::RoutingUpdated {
1386                                    peer,
1387                                    is_new_peer: true,
1388                                    addresses,
1389                                    old_peer: None,
1390                                    bucket_range: self
1391                                        .kbuckets
1392                                        .bucket(&key)
1393                                        .map(|b| b.range())
1394                                        .expect("Not kbucket::Entry::SelfEntry."),
1395                                };
1396                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1397                            }
1398                            kbucket::InsertResult::Full => {
1399                                tracing::debug!(
1400                                    %peer,
1401                                    "Bucket full. Peer not added to routing table"
1402                                );
1403                                let address = addresses.first().clone();
1404                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1405                                    Event::RoutablePeer { peer, address },
1406                                ));
1407                            }
1408                            kbucket::InsertResult::Pending { disconnected } => {
1409                                let address = addresses.first().clone();
1410                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1411                                    Event::PendingRoutablePeer { peer, address },
1412                                ));
1413
1414                                // `disconnected` might already be in the process of re-connecting.
1415                                // In other words `disconnected` might have already re-connected but
1416                                // is not yet confirmed to support the Kademlia protocol via
1417                                // [`HandlerEvent::ProtocolConfirmed`].
1418                                //
1419                                // Only try dialing peer if not currently connected.
1420                                if !self.connected_peers.contains(disconnected.preimage()) {
1421                                    self.queued_events.push_back(ToSwarm::Dial {
1422                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1423                                            .build(),
1424                                    })
1425                                }
1426                            }
1427                        }
1428                    }
1429                }
1430            }
1431            _ => {}
1432        }
1433    }
1434
1435    /// A new peer has been inserted in the routing table but we check if the routing
1436    /// table is currently small (less that `K_VALUE` peers are present) and only
1437    /// trigger a bootstrap in that case
1438    fn bootstrap_on_low_peers(&mut self) {
1439        if self
1440            .kbuckets()
1441            .map(|kbucket| kbucket.num_entries())
1442            .sum::<usize>()
1443            < K_VALUE.get()
1444        {
1445            self.bootstrap_status.trigger();
1446        }
1447    }
1448
1449    /// Handles a finished (i.e. successful) query.
1450    fn query_finished(&mut self, q: Query) -> Option<Event> {
1451        let query_id = q.id();
1452        tracing::trace!(query=?query_id, "Query finished");
1453        match q.info {
1454            QueryInfo::Bootstrap {
1455                peer,
1456                remaining,
1457                mut step,
1458            } => {
1459                let local_key = *self.kbuckets.local_key();
1460                let mut remaining = remaining.unwrap_or_else(|| {
1461                    debug_assert_eq!(&peer, local_key.preimage());
1462                    // The lookup for the local key finished. To complete the bootstrap process,
1463                    // a bucket refresh should be performed for every bucket farther away than
1464                    // the first non-empty bucket (which are most likely no more than the last
1465                    // few, i.e. farthest, buckets).
1466                    self.kbuckets
1467                        .iter()
1468                        .skip_while(|b| b.is_empty())
1469                        .skip(1) // Skip the bucket with the closest neighbour.
1470                        .map(|b| {
1471                            // Try to find a key that falls into the bucket. While such keys can
1472                            // be generated fully deterministically, the current libp2p kademlia
1473                            // wire protocol requires transmission of the preimages of the actual
1474                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1475                            // to find a key that hashes into a specific bucket. The probabilities
1476                            // of finding a key in the bucket `b` with as most 16 trials are as
1477                            // follows:
1478                            //
1479                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1480                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1481                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1482                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1483                            // ...
1484                            let mut target = kbucket::Key::from(PeerId::random());
1485                            for _ in 0..16 {
1486                                let d = local_key.distance(&target);
1487                                if b.contains(&d) {
1488                                    break;
1489                                }
1490                                target = kbucket::Key::from(PeerId::random());
1491                            }
1492                            target
1493                        })
1494                        .collect::<Vec<_>>()
1495                        .into_iter()
1496                });
1497
1498                let num_remaining = remaining.len() as u32;
1499
1500                if let Some(target) = remaining.next() {
1501                    let info = QueryInfo::Bootstrap {
1502                        peer: *target.preimage(),
1503                        remaining: Some(remaining),
1504                        step: step.next(),
1505                    };
1506                    let peers = self.kbuckets.closest_keys(&target);
1507                    self.queries
1508                        .continue_iter_closest(query_id, target, peers, info);
1509                } else {
1510                    step.last = true;
1511                    self.bootstrap_status.on_finish();
1512                };
1513
1514                Some(Event::OutboundQueryProgressed {
1515                    id: query_id,
1516                    stats: q.stats,
1517                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1518                        peer,
1519                        num_remaining,
1520                    })),
1521                    step,
1522                })
1523            }
1524
1525            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1526                step.last = true;
1527
1528                Some(Event::OutboundQueryProgressed {
1529                    id: query_id,
1530                    stats: q.stats,
1531                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1532                        key,
1533                        peers: q.peers.into_peerinfos_iter().collect(),
1534                    })),
1535                    step,
1536                })
1537            }
1538
1539            QueryInfo::GetProviders { mut step, .. } => {
1540                step.last = true;
1541
1542                Some(Event::OutboundQueryProgressed {
1543                    id: query_id,
1544                    stats: q.stats,
1545                    result: QueryResult::GetProviders(Ok(
1546                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1547                            closest_peers: q.peers.into_peerids_iter().collect(),
1548                        },
1549                    )),
1550                    step,
1551                })
1552            }
1553
1554            QueryInfo::AddProvider {
1555                context,
1556                key,
1557                phase: AddProviderPhase::GetClosestPeers,
1558            } => {
1559                let provider_id = self.local_peer_id;
1560                let external_addresses = self.external_addresses.iter().cloned().collect();
1561                let info = QueryInfo::AddProvider {
1562                    context,
1563                    key,
1564                    phase: AddProviderPhase::AddProvider {
1565                        provider_id,
1566                        external_addresses,
1567                        get_closest_peers_stats: q.stats,
1568                    },
1569                };
1570                self.queries
1571                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1572                None
1573            }
1574
1575            QueryInfo::AddProvider {
1576                context,
1577                key,
1578                phase:
1579                    AddProviderPhase::AddProvider {
1580                        get_closest_peers_stats,
1581                        ..
1582                    },
1583            } => match context {
1584                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1585                    id: query_id,
1586                    stats: get_closest_peers_stats.merge(q.stats),
1587                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1588                    step: ProgressStep::first_and_last(),
1589                }),
1590                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1591                    id: query_id,
1592                    stats: get_closest_peers_stats.merge(q.stats),
1593                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1594                    step: ProgressStep::first_and_last(),
1595                }),
1596            },
1597
1598            QueryInfo::GetRecord {
1599                key,
1600                mut step,
1601                found_a_record,
1602                cache_candidates,
1603            } => {
1604                step.last = true;
1605
1606                let results = if found_a_record {
1607                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1608                } else {
1609                    Err(GetRecordError::NotFound {
1610                        key,
1611                        closest_peers: q.peers.into_peerids_iter().collect(),
1612                    })
1613                };
1614                Some(Event::OutboundQueryProgressed {
1615                    id: query_id,
1616                    stats: q.stats,
1617                    result: QueryResult::GetRecord(results),
1618                    step,
1619                })
1620            }
1621
1622            QueryInfo::PutRecord {
1623                context,
1624                record,
1625                quorum,
1626                phase: PutRecordPhase::GetClosestPeers,
1627            } => {
1628                let info = QueryInfo::PutRecord {
1629                    context,
1630                    record,
1631                    quorum,
1632                    phase: PutRecordPhase::PutRecord {
1633                        success: vec![],
1634                        get_closest_peers_stats: q.stats,
1635                    },
1636                };
1637                self.queries
1638                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1639                None
1640            }
1641
1642            QueryInfo::PutRecord {
1643                context,
1644                record,
1645                quorum,
1646                phase:
1647                    PutRecordPhase::PutRecord {
1648                        success,
1649                        get_closest_peers_stats,
1650                    },
1651            } => {
1652                let mk_result = |key: record::Key| {
1653                    if success.len() >= quorum.get() {
1654                        Ok(PutRecordOk { key })
1655                    } else {
1656                        Err(PutRecordError::QuorumFailed {
1657                            key,
1658                            quorum,
1659                            success,
1660                        })
1661                    }
1662                };
1663                match context {
1664                    PutRecordContext::Publish | PutRecordContext::Custom => {
1665                        Some(Event::OutboundQueryProgressed {
1666                            id: query_id,
1667                            stats: get_closest_peers_stats.merge(q.stats),
1668                            result: QueryResult::PutRecord(mk_result(record.key)),
1669                            step: ProgressStep::first_and_last(),
1670                        })
1671                    }
1672                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1673                        id: query_id,
1674                        stats: get_closest_peers_stats.merge(q.stats),
1675                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1676                        step: ProgressStep::first_and_last(),
1677                    }),
1678                    PutRecordContext::Replicate => {
1679                        tracing::debug!(record=?record.key, "Record replicated");
1680                        None
1681                    }
1682                }
1683            }
1684        }
1685    }
1686
1687    /// Handles a query that timed out.
1688    fn query_timeout(&mut self, query: Query) -> Option<Event> {
1689        let query_id = query.id();
1690        tracing::trace!(query=?query_id, "Query timed out");
1691        match query.info {
1692            QueryInfo::Bootstrap {
1693                peer,
1694                mut remaining,
1695                mut step,
1696            } => {
1697                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1698
1699                // Continue with the next bootstrap query if `remaining` is not empty.
1700                if let Some((target, remaining)) =
1701                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1702                {
1703                    let info = QueryInfo::Bootstrap {
1704                        peer: target.into_preimage(),
1705                        remaining: Some(remaining),
1706                        step: step.next(),
1707                    };
1708                    let peers = self.kbuckets.closest_keys(&target);
1709                    self.queries
1710                        .continue_iter_closest(query_id, target, peers, info);
1711                } else {
1712                    step.last = true;
1713                    self.bootstrap_status.on_finish();
1714                }
1715
1716                Some(Event::OutboundQueryProgressed {
1717                    id: query_id,
1718                    stats: query.stats,
1719                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1720                        peer,
1721                        num_remaining,
1722                    })),
1723                    step,
1724                })
1725            }
1726
1727            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1728                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1729                    id: query_id,
1730                    stats: query.stats,
1731                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1732                    step: ProgressStep::first_and_last(),
1733                },
1734                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1735                    id: query_id,
1736                    stats: query.stats,
1737                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1738                    step: ProgressStep::first_and_last(),
1739                },
1740            }),
1741
1742            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1743                step.last = true;
1744                Some(Event::OutboundQueryProgressed {
1745                    id: query_id,
1746                    stats: query.stats,
1747                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1748                        key,
1749                        peers: query.peers.into_peerinfos_iter().collect(),
1750                    })),
1751                    step,
1752                })
1753            }
1754
1755            QueryInfo::PutRecord {
1756                record,
1757                quorum,
1758                context,
1759                phase,
1760            } => {
1761                let err = Err(PutRecordError::Timeout {
1762                    key: record.key,
1763                    quorum,
1764                    success: match phase {
1765                        PutRecordPhase::GetClosestPeers => vec![],
1766                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1767                    },
1768                });
1769                match context {
1770                    PutRecordContext::Publish | PutRecordContext::Custom => {
1771                        Some(Event::OutboundQueryProgressed {
1772                            id: query_id,
1773                            stats: query.stats,
1774                            result: QueryResult::PutRecord(err),
1775                            step: ProgressStep::first_and_last(),
1776                        })
1777                    }
1778                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1779                        id: query_id,
1780                        stats: query.stats,
1781                        result: QueryResult::RepublishRecord(err),
1782                        step: ProgressStep::first_and_last(),
1783                    }),
1784                    PutRecordContext::Replicate => match phase {
1785                        PutRecordPhase::GetClosestPeers => {
1786                            tracing::warn!(
1787                                "Locating closest peers for replication failed: {:?}",
1788                                err
1789                            );
1790                            None
1791                        }
1792                        PutRecordPhase::PutRecord { .. } => {
1793                            tracing::debug!("Replicating record failed: {:?}", err);
1794                            None
1795                        }
1796                    },
1797                }
1798            }
1799
1800            QueryInfo::GetRecord { key, mut step, .. } => {
1801                step.last = true;
1802
1803                Some(Event::OutboundQueryProgressed {
1804                    id: query_id,
1805                    stats: query.stats,
1806                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1807                    step,
1808                })
1809            }
1810
1811            QueryInfo::GetProviders { key, mut step, .. } => {
1812                step.last = true;
1813
1814                Some(Event::OutboundQueryProgressed {
1815                    id: query_id,
1816                    stats: query.stats,
1817                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1818                        key,
1819                        closest_peers: query.peers.into_peerids_iter().collect(),
1820                    })),
1821                    step,
1822                })
1823            }
1824        }
1825    }
1826
1827    /// Processes a record received from a peer.
1828    fn record_received(
1829        &mut self,
1830        source: PeerId,
1831        connection: ConnectionId,
1832        request_id: RequestId,
1833        mut record: Record,
1834    ) {
1835        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1836            // If the (alleged) publisher is the local node, do nothing. The record of
1837            // the original publisher should never change as a result of replication
1838            // and the publisher is always assumed to have the "right" value.
1839            self.queued_events.push_back(ToSwarm::NotifyHandler {
1840                peer_id: source,
1841                handler: NotifyHandler::One(connection),
1842                event: HandlerIn::PutRecordRes {
1843                    key: record.key,
1844                    value: record.value,
1845                    request_id,
1846                },
1847            });
1848            return;
1849        }
1850
1851        let now = Instant::now();
1852
1853        // Calculate the expiration exponentially inversely proportional to the
1854        // number of nodes between the local node and the closest node to the key
1855        // (beyond the replication factor). This ensures avoiding over-caching
1856        // outside of the k closest nodes to a key.
1857        let target = kbucket::Key::new(record.key.clone());
1858        let num_between = self.kbuckets.count_nodes_between(&target);
1859        let k = self.queries.config().replication_factor.get();
1860        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1861        let expiration = self
1862            .record_ttl
1863            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1864        // The smaller TTL prevails. Only if neither TTL is set is the record
1865        // stored "forever".
1866        record.expires = record.expires.or(expiration).min(expiration);
1867
1868        if let Some(job) = self.put_record_job.as_mut() {
1869            // Ignore the record in the next run of the replication
1870            // job, since we can assume the sender replicated the
1871            // record to the k closest peers. Effectively, only
1872            // one of the k closest peers performs a replication
1873            // in the configured interval, assuming a shared interval.
1874            job.skip(record.key.clone())
1875        }
1876
1877        // While records received from a publisher, as well as records that do
1878        // not exist locally should always (attempted to) be stored, there is a
1879        // choice here w.r.t. the handling of replicated records whose keys refer
1880        // to records that exist locally: The value and / or the publisher may
1881        // either be overridden or left unchanged. At the moment and in the
1882        // absence of a decisive argument for another option, both are always
1883        // overridden as it avoids having to load the existing record in the
1884        // first place.
1885
1886        if !record.is_expired(now) {
1887            // The record is cloned because of the weird libp2p protocol
1888            // requirement to send back the value in the response, although this
1889            // is a waste of resources.
1890            match self.record_filtering {
1891                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1892                    Ok(()) => {
1893                        tracing::debug!(
1894                            record=?record.key,
1895                            "Record stored: {} bytes",
1896                            record.value.len()
1897                        );
1898                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1899                            Event::InboundRequest {
1900                                request: InboundRequest::PutRecord {
1901                                    source,
1902                                    connection,
1903                                    record: None,
1904                                },
1905                            },
1906                        ));
1907                    }
1908                    Err(e) => {
1909                        tracing::info!("Record not stored: {:?}", e);
1910                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1911                            peer_id: source,
1912                            handler: NotifyHandler::One(connection),
1913                            event: HandlerIn::Reset(request_id),
1914                        });
1915
1916                        return;
1917                    }
1918                },
1919                StoreInserts::FilterBoth => {
1920                    self.queued_events
1921                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1922                            request: InboundRequest::PutRecord {
1923                                source,
1924                                connection,
1925                                record: Some(record.clone()),
1926                            },
1927                        }));
1928                }
1929            }
1930        }
1931
1932        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1933        // case where the record is discarded due to being expired. Given that
1934        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1935        // request, the remote perceives the local node as one node among the k
1936        // closest nodes to the target. In addition returning
1937        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1938        // information to a possibly malicious remote node.
1939        self.queued_events.push_back(ToSwarm::NotifyHandler {
1940            peer_id: source,
1941            handler: NotifyHandler::One(connection),
1942            event: HandlerIn::PutRecordRes {
1943                key: record.key,
1944                value: record.value,
1945                request_id,
1946            },
1947        })
1948    }
1949
1950    /// Processes a provider record received from a peer.
1951    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1952        if &provider.node_id != self.kbuckets.local_key().preimage() {
1953            let record = ProviderRecord {
1954                key,
1955                provider: provider.node_id,
1956                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1957                addresses: provider.multiaddrs,
1958            };
1959            match self.record_filtering {
1960                StoreInserts::Unfiltered => {
1961                    if let Err(e) = self.store.add_provider(record) {
1962                        tracing::info!("Provider record not stored: {:?}", e);
1963                        return;
1964                    }
1965
1966                    self.queued_events
1967                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1968                            request: InboundRequest::AddProvider { record: None },
1969                        }));
1970                }
1971                StoreInserts::FilterBoth => {
1972                    self.queued_events
1973                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1974                            request: InboundRequest::AddProvider {
1975                                record: Some(record),
1976                            },
1977                        }));
1978                }
1979            }
1980        }
1981    }
1982
1983    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1984        let key = kbucket::Key::from(peer_id);
1985
1986        if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1987            // TODO: Ideally, the address should only be removed if the error can
1988            // be classified as "permanent" but since `err` is currently a borrowed
1989            // trait object without a `'static` bound, even downcasting for inspection
1990            // of the error is not possible (and also not truly desirable or ergonomic).
1991            // The error passed in should rather be a dedicated enum.
1992            if addrs.remove(address).is_ok() {
1993                tracing::debug!(
1994                    peer=%peer_id,
1995                    %address,
1996                    "Address removed from peer due to error."
1997                );
1998            } else {
1999                // Despite apparently having no reachable address (any longer),
2000                // the peer is kept in the routing table with the last address to avoid
2001                // (temporary) loss of network connectivity to "flush" the routing
2002                // table. Once in, a peer is only removed from the routing table
2003                // if it is the least recently connected peer, currently disconnected
2004                // and is unreachable in the context of another peer pending insertion
2005                // into the same bucket. This is handled transparently by the
2006                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2007                // within `Behaviour::poll`.
2008                tracing::debug!(
2009                    peer=%peer_id,
2010                    %address,
2011                    "Last remaining address of peer is unreachable."
2012                );
2013            }
2014        }
2015
2016        for query in self.queries.iter_mut() {
2017            if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2018                addrs.retain(|a| a != address);
2019            }
2020        }
2021    }
2022
2023    fn on_connection_established(
2024        &mut self,
2025        ConnectionEstablished {
2026            peer_id,
2027            failed_addresses,
2028            other_established,
2029            ..
2030        }: ConnectionEstablished,
2031    ) {
2032        for addr in failed_addresses {
2033            self.address_failed(peer_id, addr);
2034        }
2035
2036        // Peer's first connection.
2037        if other_established == 0 {
2038            self.connected_peers.insert(peer_id);
2039        }
2040    }
2041
2042    fn on_address_change(
2043        &mut self,
2044        AddressChange {
2045            peer_id: peer,
2046            old,
2047            new,
2048            ..
2049        }: AddressChange,
2050    ) {
2051        let (old, new) = (old.get_remote_address(), new.get_remote_address());
2052
2053        // Update routing table.
2054        if let Some(addrs) = self
2055            .kbuckets
2056            .entry(&kbucket::Key::from(peer))
2057            .as_mut()
2058            .and_then(|e| e.value())
2059        {
2060            if addrs.replace(old, new) {
2061                tracing::debug!(
2062                    %peer,
2063                    old_address=%old,
2064                    new_address=%new,
2065                    "Old address replaced with new address for peer."
2066                );
2067            } else {
2068                tracing::debug!(
2069                    %peer,
2070                    old_address=%old,
2071                    new_address=%new,
2072                    "Old address not replaced with new address for peer as old address wasn't present.",
2073                );
2074            }
2075        } else {
2076            tracing::debug!(
2077                %peer,
2078                old_address=%old,
2079                new_address=%new,
2080                "Old address not replaced with new address for peer as peer is not present in the \
2081                 routing table."
2082            );
2083        }
2084
2085        // Update query address cache.
2086        //
2087        // Given two connected nodes: local node A and remote node B. Say node B
2088        // is not in node A's routing table. Additionally node B is part of the
2089        // `Query::addresses` list of an ongoing query on node A. Say Node
2090        // B triggers an address change and then disconnects. Later on the
2091        // earlier mentioned query on node A would like to connect to node B.
2092        // Without replacing the address in the `Query::addresses` set node
2093        // A would attempt to dial the old and not the new address.
2094        //
2095        // While upholding correctness, iterating through all discovered
2096        // addresses of a peer in all currently ongoing queries might have a
2097        // large performance impact. If so, the code below might be worth
2098        // revisiting.
2099        for query in self.queries.iter_mut() {
2100            if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2101                for addr in addrs.iter_mut() {
2102                    if addr == old {
2103                        *addr = new.clone();
2104                    }
2105                }
2106            }
2107        }
2108    }
2109
2110    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2111        let Some(peer_id) = peer_id else { return };
2112
2113        match error {
2114            DialError::LocalPeerId { .. }
2115            | DialError::WrongPeerId { .. }
2116            | DialError::Aborted
2117            | DialError::Denied { .. }
2118            | DialError::Transport(_)
2119            | DialError::NoAddresses => {
2120                if let DialError::Transport(addresses) = error {
2121                    for (addr, _) in addresses {
2122                        self.address_failed(peer_id, addr)
2123                    }
2124                }
2125
2126                for query in self.queries.iter_mut() {
2127                    query.on_failure(&peer_id);
2128                }
2129            }
2130            DialError::DialPeerConditionFalse(
2131                dial_opts::PeerCondition::Disconnected
2132                | dial_opts::PeerCondition::NotDialing
2133                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2134            ) => {
2135                // We might (still) be connected, or about to be connected, thus do not report the
2136                // failure to the queries.
2137            }
2138            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2139                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2140            }
2141        }
2142    }
2143
2144    fn on_connection_closed(
2145        &mut self,
2146        ConnectionClosed {
2147            peer_id,
2148            remaining_established,
2149            connection_id,
2150            ..
2151        }: ConnectionClosed,
2152    ) {
2153        self.connections.remove(&connection_id);
2154
2155        if remaining_established == 0 {
2156            for query in self.queries.iter_mut() {
2157                query.on_failure(&peer_id);
2158            }
2159            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2160            self.connected_peers.remove(&peer_id);
2161        }
2162    }
2163
2164    /// Preloads a new [`Handler`] with requests that are waiting
2165    /// to be sent to the newly connected peer.
2166    fn preload_new_handler(
2167        &mut self,
2168        handler: &mut Handler,
2169        connection_id: ConnectionId,
2170        peer: PeerId,
2171    ) {
2172        self.connections.insert(connection_id, peer);
2173        // Queue events for sending pending RPCs to the connected peer.
2174        // There can be only one pending RPC for a particular peer and query per definition.
2175        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2176            q.pending_rpcs
2177                .iter()
2178                .position(|(p, _)| p == &peer)
2179                .map(|p| q.pending_rpcs.remove(p))
2180        }) {
2181            handler.on_behaviour_event(event)
2182        }
2183    }
2184}
2185
2186/// Exponentially decrease the given duration (base 2).
2187fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2188    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2189}
2190
2191impl<TStore> NetworkBehaviour for Behaviour<TStore>
2192where
2193    TStore: RecordStore + Send + 'static,
2194{
2195    type ConnectionHandler = Handler;
2196    type ToSwarm = Event;
2197
2198    fn handle_established_inbound_connection(
2199        &mut self,
2200        connection_id: ConnectionId,
2201        peer: PeerId,
2202        local_addr: &Multiaddr,
2203        remote_addr: &Multiaddr,
2204    ) -> Result<THandler<Self>, ConnectionDenied> {
2205        let connected_point = ConnectedPoint::Listener {
2206            local_addr: local_addr.clone(),
2207            send_back_addr: remote_addr.clone(),
2208        };
2209
2210        let mut handler = Handler::new(
2211            self.protocol_config.clone(),
2212            connected_point,
2213            peer,
2214            self.mode,
2215        );
2216        self.preload_new_handler(&mut handler, connection_id, peer);
2217
2218        Ok(handler)
2219    }
2220
2221    fn handle_established_outbound_connection(
2222        &mut self,
2223        connection_id: ConnectionId,
2224        peer: PeerId,
2225        addr: &Multiaddr,
2226        role_override: Endpoint,
2227        port_use: PortUse,
2228    ) -> Result<THandler<Self>, ConnectionDenied> {
2229        let connected_point = ConnectedPoint::Dialer {
2230            address: addr.clone(),
2231            role_override,
2232            port_use,
2233        };
2234
2235        let mut handler = Handler::new(
2236            self.protocol_config.clone(),
2237            connected_point,
2238            peer,
2239            self.mode,
2240        );
2241        self.preload_new_handler(&mut handler, connection_id, peer);
2242
2243        Ok(handler)
2244    }
2245
2246    fn handle_pending_outbound_connection(
2247        &mut self,
2248        _connection_id: ConnectionId,
2249        maybe_peer: Option<PeerId>,
2250        _addresses: &[Multiaddr],
2251        _effective_role: Endpoint,
2252    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2253        let Some(peer_id) = maybe_peer else {
2254            return Ok(vec![]);
2255        };
2256
2257        // We should order addresses from decreasing likelihood of connectivity, so start with
2258        // the addresses of that peer in the k-buckets.
2259        let key = kbucket::Key::from(peer_id);
2260        let mut peer_addrs =
2261            if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2262                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2263                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2264                addrs
2265            } else {
2266                Vec::new()
2267            };
2268
2269        // We add to that a temporary list of addresses from the ongoing queries.
2270        for query in self.queries.iter() {
2271            if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2272                peer_addrs.extend(addrs.iter().cloned())
2273            }
2274        }
2275
2276        Ok(peer_addrs)
2277    }
2278
2279    fn on_connection_handler_event(
2280        &mut self,
2281        source: PeerId,
2282        connection: ConnectionId,
2283        event: THandlerOutEvent<Self>,
2284    ) {
2285        match event {
2286            HandlerEvent::ProtocolConfirmed { endpoint } => {
2287                debug_assert!(self.connected_peers.contains(&source));
2288                // The remote's address can only be put into the routing table,
2289                // and thus shared with other nodes, if the local node is the dialer,
2290                // since the remote address on an inbound connection may be specific
2291                // to that connection (e.g. typically the TCP port numbers).
2292                let address = match endpoint {
2293                    ConnectedPoint::Dialer { address, .. } => Some(address),
2294                    ConnectedPoint::Listener { .. } => None,
2295                };
2296
2297                self.connection_updated(source, address, NodeStatus::Connected);
2298            }
2299
2300            HandlerEvent::ProtocolNotSupported { endpoint } => {
2301                let address = match endpoint {
2302                    ConnectedPoint::Dialer { address, .. } => Some(address),
2303                    ConnectedPoint::Listener { .. } => None,
2304                };
2305                self.connection_updated(source, address, NodeStatus::Disconnected);
2306            }
2307
2308            HandlerEvent::FindNodeReq { key, request_id } => {
2309                let closer_peers = self
2310                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2311                    .collect::<Vec<_>>();
2312
2313                self.queued_events
2314                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2315                        request: InboundRequest::FindNode {
2316                            num_closer_peers: closer_peers.len(),
2317                        },
2318                    }));
2319
2320                self.queued_events.push_back(ToSwarm::NotifyHandler {
2321                    peer_id: source,
2322                    handler: NotifyHandler::One(connection),
2323                    event: HandlerIn::FindNodeRes {
2324                        closer_peers,
2325                        request_id,
2326                    },
2327                });
2328            }
2329
2330            HandlerEvent::FindNodeRes {
2331                closer_peers,
2332                query_id,
2333            } => {
2334                self.discovered(&query_id, &source, closer_peers.iter());
2335            }
2336
2337            HandlerEvent::GetProvidersReq { key, request_id } => {
2338                let provider_peers = self.provider_peers(&key, &source);
2339                let closer_peers = self
2340                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2341                    .collect::<Vec<_>>();
2342
2343                self.queued_events
2344                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2345                        request: InboundRequest::GetProvider {
2346                            num_closer_peers: closer_peers.len(),
2347                            num_provider_peers: provider_peers.len(),
2348                        },
2349                    }));
2350
2351                self.queued_events.push_back(ToSwarm::NotifyHandler {
2352                    peer_id: source,
2353                    handler: NotifyHandler::One(connection),
2354                    event: HandlerIn::GetProvidersRes {
2355                        closer_peers,
2356                        provider_peers,
2357                        request_id,
2358                    },
2359                });
2360            }
2361
2362            HandlerEvent::GetProvidersRes {
2363                closer_peers,
2364                provider_peers,
2365                query_id,
2366            } => {
2367                let peers = closer_peers.iter().chain(provider_peers.iter());
2368                self.discovered(&query_id, &source, peers);
2369                if let Some(query) = self.queries.get_mut(&query_id) {
2370                    let stats = query.stats().clone();
2371                    if let QueryInfo::GetProviders {
2372                        ref key,
2373                        ref mut providers_found,
2374                        ref mut step,
2375                        ..
2376                    } = query.info
2377                    {
2378                        *providers_found += provider_peers.len();
2379                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2380
2381                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2382                            Event::OutboundQueryProgressed {
2383                                id: query_id,
2384                                result: QueryResult::GetProviders(Ok(
2385                                    GetProvidersOk::FoundProviders {
2386                                        key: key.clone(),
2387                                        providers,
2388                                    },
2389                                )),
2390                                step: step.clone(),
2391                                stats,
2392                            },
2393                        ));
2394                        *step = step.next();
2395                    }
2396                }
2397            }
2398            HandlerEvent::QueryError { query_id, error } => {
2399                tracing::debug!(
2400                    peer=%source,
2401                    query=?query_id,
2402                    "Request to peer in query failed with {:?}",
2403                    error
2404                );
2405                // If the query to which the error relates is still active,
2406                // signal the failure w.r.t. `source`.
2407                if let Some(query) = self.queries.get_mut(&query_id) {
2408                    query.on_failure(&source)
2409                }
2410            }
2411
2412            HandlerEvent::AddProvider { key, provider } => {
2413                // Only accept a provider record from a legitimate peer.
2414                if provider.node_id != source {
2415                    return;
2416                }
2417
2418                self.provider_received(key, provider);
2419            }
2420
2421            HandlerEvent::GetRecord { key, request_id } => {
2422                // Lookup the record locally.
2423                let record = match self.store.get(&key) {
2424                    Some(record) => {
2425                        if record.is_expired(Instant::now()) {
2426                            self.store.remove(&key);
2427                            None
2428                        } else {
2429                            Some(record.into_owned())
2430                        }
2431                    }
2432                    None => None,
2433                };
2434
2435                let closer_peers = self
2436                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2437                    .collect::<Vec<_>>();
2438
2439                self.queued_events
2440                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2441                        request: InboundRequest::GetRecord {
2442                            num_closer_peers: closer_peers.len(),
2443                            present_locally: record.is_some(),
2444                        },
2445                    }));
2446
2447                self.queued_events.push_back(ToSwarm::NotifyHandler {
2448                    peer_id: source,
2449                    handler: NotifyHandler::One(connection),
2450                    event: HandlerIn::GetRecordRes {
2451                        record,
2452                        closer_peers,
2453                        request_id,
2454                    },
2455                });
2456            }
2457
2458            HandlerEvent::GetRecordRes {
2459                record,
2460                closer_peers,
2461                query_id,
2462            } => {
2463                if let Some(query) = self.queries.get_mut(&query_id) {
2464                    let stats = query.stats().clone();
2465                    if let QueryInfo::GetRecord {
2466                        key,
2467                        ref mut step,
2468                        ref mut found_a_record,
2469                        cache_candidates,
2470                    } = &mut query.info
2471                    {
2472                        if let Some(record) = record {
2473                            *found_a_record = true;
2474                            let record = PeerRecord {
2475                                peer: Some(source),
2476                                record,
2477                            };
2478
2479                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2480                                Event::OutboundQueryProgressed {
2481                                    id: query_id,
2482                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2483                                        record,
2484                                    ))),
2485                                    step: step.clone(),
2486                                    stats,
2487                                },
2488                            ));
2489
2490                            *step = step.next();
2491                        } else {
2492                            tracing::trace!(record=?key, %source, "Record not found at source");
2493                            if let Caching::Enabled { max_peers } = self.caching {
2494                                let source_key = kbucket::Key::from(source);
2495                                let target_key = kbucket::Key::from(key.clone());
2496                                let distance = source_key.distance(&target_key);
2497                                cache_candidates.insert(distance, source);
2498                                if cache_candidates.len() > max_peers as usize {
2499                                    cache_candidates.pop_last();
2500                                }
2501                            }
2502                        }
2503                    }
2504                }
2505
2506                self.discovered(&query_id, &source, closer_peers.iter());
2507            }
2508
2509            HandlerEvent::PutRecord { record, request_id } => {
2510                self.record_received(source, connection, request_id, record);
2511            }
2512
2513            HandlerEvent::PutRecordRes { query_id, .. } => {
2514                if let Some(query) = self.queries.get_mut(&query_id) {
2515                    query.on_success(&source, vec![]);
2516                    if let QueryInfo::PutRecord {
2517                        phase: PutRecordPhase::PutRecord { success, .. },
2518                        quorum,
2519                        ..
2520                    } = &mut query.info
2521                    {
2522                        success.push(source);
2523
2524                        let quorum = quorum.get();
2525                        if success.len() >= quorum {
2526                            let peers = success.clone();
2527                            let finished = query.try_finish(peers.iter());
2528                            if !finished {
2529                                tracing::debug!(
2530                                    peer=%source,
2531                                    query=?query_id,
2532                                    "PutRecord query reached quorum ({}/{}) with response \
2533                                     from peer but could not yet finish.",
2534                                    peers.len(),
2535                                    quorum,
2536                                );
2537                            }
2538                        }
2539                    }
2540                }
2541            }
2542        };
2543    }
2544
2545    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2546    fn poll(
2547        &mut self,
2548        cx: &mut Context<'_>,
2549    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2550        let now = Instant::now();
2551
2552        // Calculate the available capacity for queries triggered by background jobs.
2553        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2554
2555        // Run the periodic provider announcement job.
2556        if let Some(mut job) = self.add_provider_job.take() {
2557            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2558            for i in 0..num {
2559                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2560                    self.start_add_provider(r.key, AddProviderContext::Republish)
2561                } else {
2562                    jobs_query_capacity -= i;
2563                    break;
2564                }
2565            }
2566            self.add_provider_job = Some(job);
2567        }
2568
2569        // Run the periodic record replication / publication job.
2570        if let Some(mut job) = self.put_record_job.take() {
2571            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2572            for _ in 0..num {
2573                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2574                    let context =
2575                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2576                            PutRecordContext::Republish
2577                        } else {
2578                            PutRecordContext::Replicate
2579                        };
2580                    self.start_put_record(r, Quorum::All, context)
2581                } else {
2582                    break;
2583                }
2584            }
2585            self.put_record_job = Some(job);
2586        }
2587
2588        // Poll bootstrap periodically and automatically.
2589        if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2590            if let Err(e) = self.bootstrap() {
2591                tracing::warn!("Failed to trigger bootstrap: {e}");
2592            }
2593        }
2594
2595        loop {
2596            // Drain queued events first.
2597            if let Some(event) = self.queued_events.pop_front() {
2598                return Poll::Ready(event);
2599            }
2600
2601            // Drain applied pending entries from the routing table.
2602            if let Some(entry) = self.kbuckets.take_applied_pending() {
2603                let kbucket::Node { key, value } = entry.inserted;
2604                let peer_id = key.into_preimage();
2605                self.queued_events
2606                    .push_back(ToSwarm::NewExternalAddrOfPeer {
2607                        peer_id,
2608                        address: value.first().clone(),
2609                    });
2610                let event = Event::RoutingUpdated {
2611                    bucket_range: self
2612                        .kbuckets
2613                        .bucket(&key)
2614                        .map(|b| b.range())
2615                        .expect("Self to never be applied from pending."),
2616                    peer: peer_id,
2617                    is_new_peer: true,
2618                    addresses: value,
2619                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2620                };
2621                return Poll::Ready(ToSwarm::GenerateEvent(event));
2622            }
2623
2624            // Look for a finished query.
2625            loop {
2626                match self.queries.poll(now) {
2627                    QueryPoolState::Finished(q) => {
2628                        if let Some(event) = self.query_finished(q) {
2629                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2630                        }
2631                    }
2632                    QueryPoolState::Timeout(q) => {
2633                        if let Some(event) = self.query_timeout(q) {
2634                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2635                        }
2636                    }
2637                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2638                        let event = query.info.to_request(query.id());
2639                        // TODO: AddProvider requests yield no response, so the query completes
2640                        // as soon as all requests have been sent. However, the handler should
2641                        // better emit an event when the request has been sent (and report
2642                        // an error if sending fails), instead of immediately reporting
2643                        // "success" somewhat prematurely here.
2644                        if let QueryInfo::AddProvider {
2645                            phase: AddProviderPhase::AddProvider { .. },
2646                            ..
2647                        } = &query.info
2648                        {
2649                            query.on_success(&peer_id, vec![])
2650                        }
2651
2652                        if self.connected_peers.contains(&peer_id) {
2653                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2654                                peer_id,
2655                                event,
2656                                handler: NotifyHandler::Any,
2657                            });
2658                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2659                            query.pending_rpcs.push((peer_id, event));
2660                            self.queued_events.push_back(ToSwarm::Dial {
2661                                opts: DialOpts::peer_id(peer_id).build(),
2662                            });
2663                        }
2664                    }
2665                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2666                }
2667            }
2668
2669            // No immediate event was produced as a result of a finished query.
2670            // If no new events have been queued either, signal `NotReady` to
2671            // be polled again later.
2672            if self.queued_events.is_empty() {
2673                self.no_events_waker = Some(cx.waker().clone());
2674
2675                return Poll::Pending;
2676            }
2677        }
2678    }
2679
2680    fn on_swarm_event(&mut self, event: FromSwarm) {
2681        self.listen_addresses.on_swarm_event(&event);
2682        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2683
2684        if self.auto_mode && external_addresses_changed {
2685            self.determine_mode_from_external_addresses();
2686        }
2687
2688        match event {
2689            FromSwarm::ConnectionEstablished(connection_established) => {
2690                self.on_connection_established(connection_established)
2691            }
2692            FromSwarm::ConnectionClosed(connection_closed) => {
2693                self.on_connection_closed(connection_closed)
2694            }
2695            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2696            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2697            FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2698                // A new listen addr was just discovered and we have no connected peers,
2699                // it can mean that our network interfaces were not up but they are now
2700                // so it might be a good idea to trigger a bootstrap.
2701                self.bootstrap_status.trigger();
2702            }
2703            _ => {}
2704        }
2705    }
2706}
2707
2708/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2709#[derive(Debug, Clone, PartialEq, Eq)]
2710pub struct PeerInfo {
2711    pub peer_id: PeerId,
2712    pub addrs: Vec<Multiaddr>,
2713}
2714
2715/// A quorum w.r.t. the configured replication factor specifies the minimum
2716/// number of distinct nodes that must be successfully contacted in order
2717/// for a query to succeed.
2718#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2719pub enum Quorum {
2720    One,
2721    Majority,
2722    All,
2723    N(NonZeroUsize),
2724}
2725
2726impl Quorum {
2727    /// Evaluate the quorum w.r.t a given total (number of peers).
2728    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2729        match self {
2730            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2731            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2732            Quorum::All => total,
2733            Quorum::N(n) => NonZeroUsize::min(total, *n),
2734        }
2735    }
2736}
2737
2738/// A record either received by the given peer or retrieved from the local
2739/// record store.
2740#[derive(Debug, Clone, PartialEq, Eq)]
2741pub struct PeerRecord {
2742    /// The peer from whom the record was received. `None` if the record was
2743    /// retrieved from local storage.
2744    pub peer: Option<PeerId>,
2745    pub record: Record,
2746}
2747
2748//////////////////////////////////////////////////////////////////////////////
2749// Events
2750
2751/// The events produced by the `Kademlia` behaviour.
2752///
2753/// See [`NetworkBehaviour::poll`].
2754#[derive(Debug, Clone)]
2755#[allow(clippy::large_enum_variant)]
2756pub enum Event {
2757    /// An inbound request has been received and handled.
2758    // Note on the difference between 'request' and 'query': A request is a
2759    // single request-response style exchange with a single remote peer. A query
2760    // is made of multiple requests across multiple remote peers.
2761    InboundRequest { request: InboundRequest },
2762
2763    /// An outbound query has made progress.
2764    OutboundQueryProgressed {
2765        /// The ID of the query that finished.
2766        id: QueryId,
2767        /// The intermediate result of the query.
2768        result: QueryResult,
2769        /// Execution statistics from the query.
2770        stats: QueryStats,
2771        /// Indicates which event this is, if therer are multiple responses for a single query.
2772        step: ProgressStep,
2773    },
2774
2775    /// The routing table has been updated with a new peer and / or
2776    /// address, thereby possibly evicting another peer.
2777    RoutingUpdated {
2778        /// The ID of the peer that was added or updated.
2779        peer: PeerId,
2780        /// Whether this is a new peer and was thus just added to the routing
2781        /// table, or whether it is an existing peer who's addresses changed.
2782        is_new_peer: bool,
2783        /// The full list of known addresses of `peer`.
2784        addresses: Addresses,
2785        /// Returns the minimum inclusive and maximum inclusive distance for
2786        /// the bucket of the peer.
2787        bucket_range: (Distance, Distance),
2788        /// The ID of the peer that was evicted from the routing table to make
2789        /// room for the new peer, if any.
2790        old_peer: Option<PeerId>,
2791    },
2792
2793    /// A peer has connected for whom no listen address is known.
2794    ///
2795    /// If the peer is to be added to the routing table, a known
2796    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2797    UnroutablePeer { peer: PeerId },
2798
2799    /// A connection to a peer has been established for whom a listen address
2800    /// is known but the peer has not been added to the routing table either
2801    /// because [`BucketInserts::Manual`] is configured or because
2802    /// the corresponding bucket is full.
2803    ///
2804    /// If the peer is to be included in the routing table, it must
2805    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2806    /// removing another peer.
2807    ///
2808    /// See [`Behaviour::kbucket`] for insight into the contents of
2809    /// the k-bucket of `peer`.
2810    RoutablePeer { peer: PeerId, address: Multiaddr },
2811
2812    /// A connection to a peer has been established for whom a listen address
2813    /// is known but the peer is only pending insertion into the routing table
2814    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2815    /// may not make it into the routing table.
2816    ///
2817    /// If the peer is to be unconditionally included in the routing table,
2818    /// it should be explicitly added via [`Behaviour::add_address`] after
2819    /// removing another peer.
2820    ///
2821    /// See [`Behaviour::kbucket`] for insight into the contents of
2822    /// the k-bucket of `peer`.
2823    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2824
2825    /// This peer's mode has been updated automatically.
2826    ///
2827    /// This happens in response to an external
2828    /// address being added or removed.
2829    ModeChanged { new_mode: Mode },
2830}
2831
2832/// Information about progress events.
2833#[derive(Debug, Clone)]
2834pub struct ProgressStep {
2835    /// The index into the event
2836    pub count: NonZeroUsize,
2837    /// Is this the final event?
2838    pub last: bool,
2839}
2840
2841impl ProgressStep {
2842    fn first() -> Self {
2843        Self {
2844            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2845            last: false,
2846        }
2847    }
2848
2849    fn first_and_last() -> Self {
2850        let mut first = ProgressStep::first();
2851        first.last = true;
2852        first
2853    }
2854
2855    fn next(&self) -> Self {
2856        assert!(!self.last);
2857        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2858
2859        Self { count, last: false }
2860    }
2861}
2862
2863/// Information about a received and handled inbound request.
2864#[derive(Debug, Clone)]
2865pub enum InboundRequest {
2866    /// Request for the list of nodes whose IDs are the closest to `key`.
2867    FindNode { num_closer_peers: usize },
2868    /// Same as `FindNode`, but should also return the entries of the local
2869    /// providers list for this key.
2870    GetProvider {
2871        num_closer_peers: usize,
2872        num_provider_peers: usize,
2873    },
2874    /// A peer sent an add provider request.
2875    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2876    /// included.
2877    ///
2878    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2879    AddProvider { record: Option<ProviderRecord> },
2880    /// Request to retrieve a record.
2881    GetRecord {
2882        num_closer_peers: usize,
2883        present_locally: bool,
2884    },
2885    /// A peer sent a put record request.
2886    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2887    ///
2888    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2889    PutRecord {
2890        source: PeerId,
2891        connection: ConnectionId,
2892        record: Option<Record>,
2893    },
2894}
2895
2896/// The results of Kademlia queries.
2897#[derive(Debug, Clone)]
2898pub enum QueryResult {
2899    /// The result of [`Behaviour::bootstrap`].
2900    Bootstrap(BootstrapResult),
2901
2902    /// The result of [`Behaviour::get_closest_peers`].
2903    GetClosestPeers(GetClosestPeersResult),
2904
2905    /// The result of [`Behaviour::get_providers`].
2906    GetProviders(GetProvidersResult),
2907
2908    /// The result of [`Behaviour::start_providing`].
2909    StartProviding(AddProviderResult),
2910
2911    /// The result of a (automatic) republishing of a provider record.
2912    RepublishProvider(AddProviderResult),
2913
2914    /// The result of [`Behaviour::get_record`].
2915    GetRecord(GetRecordResult),
2916
2917    /// The result of [`Behaviour::put_record`].
2918    PutRecord(PutRecordResult),
2919
2920    /// The result of a (automatic) republishing of a (value-)record.
2921    RepublishRecord(PutRecordResult),
2922}
2923
2924/// The result of [`Behaviour::get_record`].
2925pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2926
2927/// The successful result of [`Behaviour::get_record`].
2928#[derive(Debug, Clone)]
2929#[allow(clippy::large_enum_variant)]
2930pub enum GetRecordOk {
2931    FoundRecord(PeerRecord),
2932    FinishedWithNoAdditionalRecord {
2933        /// If caching is enabled, these are the peers closest
2934        /// _to the record key_ (not the local node) that were queried but
2935        /// did not return the record, sorted by distance to the record key
2936        /// from closest to farthest. How many of these are tracked is configured
2937        /// by [`Config::set_caching`].
2938        ///
2939        /// Writing back the cache at these peers is a manual operation.
2940        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2941        /// after selecting one of the returned records.
2942        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2943    },
2944}
2945
2946/// The error result of [`Behaviour::get_record`].
2947#[derive(Debug, Clone, Error)]
2948pub enum GetRecordError {
2949    #[error("the record was not found")]
2950    NotFound {
2951        key: record::Key,
2952        closest_peers: Vec<PeerId>,
2953    },
2954    #[error("the quorum failed; needed {quorum} peers")]
2955    QuorumFailed {
2956        key: record::Key,
2957        records: Vec<PeerRecord>,
2958        quorum: NonZeroUsize,
2959    },
2960    #[error("the request timed out")]
2961    Timeout { key: record::Key },
2962}
2963
2964impl GetRecordError {
2965    /// Gets the key of the record for which the operation failed.
2966    pub fn key(&self) -> &record::Key {
2967        match self {
2968            GetRecordError::QuorumFailed { key, .. } => key,
2969            GetRecordError::Timeout { key, .. } => key,
2970            GetRecordError::NotFound { key, .. } => key,
2971        }
2972    }
2973
2974    /// Extracts the key of the record for which the operation failed,
2975    /// consuming the error.
2976    pub fn into_key(self) -> record::Key {
2977        match self {
2978            GetRecordError::QuorumFailed { key, .. } => key,
2979            GetRecordError::Timeout { key, .. } => key,
2980            GetRecordError::NotFound { key, .. } => key,
2981        }
2982    }
2983}
2984
2985/// The result of [`Behaviour::put_record`].
2986pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2987
2988/// The successful result of [`Behaviour::put_record`].
2989#[derive(Debug, Clone)]
2990pub struct PutRecordOk {
2991    pub key: record::Key,
2992}
2993
2994/// The error result of [`Behaviour::put_record`].
2995#[derive(Debug, Clone, Error)]
2996pub enum PutRecordError {
2997    #[error("the quorum failed; needed {quorum} peers")]
2998    QuorumFailed {
2999        key: record::Key,
3000        /// [`PeerId`]s of the peers the record was successfully stored on.
3001        success: Vec<PeerId>,
3002        quorum: NonZeroUsize,
3003    },
3004    #[error("the request timed out")]
3005    Timeout {
3006        key: record::Key,
3007        /// [`PeerId`]s of the peers the record was successfully stored on.
3008        success: Vec<PeerId>,
3009        quorum: NonZeroUsize,
3010    },
3011}
3012
3013impl PutRecordError {
3014    /// Gets the key of the record for which the operation failed.
3015    pub fn key(&self) -> &record::Key {
3016        match self {
3017            PutRecordError::QuorumFailed { key, .. } => key,
3018            PutRecordError::Timeout { key, .. } => key,
3019        }
3020    }
3021
3022    /// Extracts the key of the record for which the operation failed,
3023    /// consuming the error.
3024    pub fn into_key(self) -> record::Key {
3025        match self {
3026            PutRecordError::QuorumFailed { key, .. } => key,
3027            PutRecordError::Timeout { key, .. } => key,
3028        }
3029    }
3030}
3031
3032/// The result of [`Behaviour::bootstrap`].
3033pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3034
3035/// The successful result of [`Behaviour::bootstrap`].
3036#[derive(Debug, Clone)]
3037pub struct BootstrapOk {
3038    pub peer: PeerId,
3039    pub num_remaining: u32,
3040}
3041
3042/// The error result of [`Behaviour::bootstrap`].
3043#[derive(Debug, Clone, Error)]
3044pub enum BootstrapError {
3045    #[error("the request timed out")]
3046    Timeout {
3047        peer: PeerId,
3048        num_remaining: Option<u32>,
3049    },
3050}
3051
3052/// The result of [`Behaviour::get_closest_peers`].
3053pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3054
3055/// The successful result of [`Behaviour::get_closest_peers`].
3056#[derive(Debug, Clone)]
3057pub struct GetClosestPeersOk {
3058    pub key: Vec<u8>,
3059    pub peers: Vec<PeerInfo>,
3060}
3061
3062/// The error result of [`Behaviour::get_closest_peers`].
3063#[derive(Debug, Clone, Error)]
3064pub enum GetClosestPeersError {
3065    #[error("the request timed out")]
3066    Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3067}
3068
3069impl GetClosestPeersError {
3070    /// Gets the key for which the operation failed.
3071    pub fn key(&self) -> &Vec<u8> {
3072        match self {
3073            GetClosestPeersError::Timeout { key, .. } => key,
3074        }
3075    }
3076
3077    /// Extracts the key for which the operation failed,
3078    /// consuming the error.
3079    pub fn into_key(self) -> Vec<u8> {
3080        match self {
3081            GetClosestPeersError::Timeout { key, .. } => key,
3082        }
3083    }
3084}
3085
3086/// The result of [`Behaviour::get_providers`].
3087pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3088
3089/// The successful result of [`Behaviour::get_providers`].
3090#[derive(Debug, Clone)]
3091pub enum GetProvidersOk {
3092    FoundProviders {
3093        key: record::Key,
3094        /// The new set of providers discovered.
3095        providers: HashSet<PeerId>,
3096    },
3097    FinishedWithNoAdditionalRecord {
3098        closest_peers: Vec<PeerId>,
3099    },
3100}
3101
3102/// The error result of [`Behaviour::get_providers`].
3103#[derive(Debug, Clone, Error)]
3104pub enum GetProvidersError {
3105    #[error("the request timed out")]
3106    Timeout {
3107        key: record::Key,
3108        closest_peers: Vec<PeerId>,
3109    },
3110}
3111
3112impl GetProvidersError {
3113    /// Gets the key for which the operation failed.
3114    pub fn key(&self) -> &record::Key {
3115        match self {
3116            GetProvidersError::Timeout { key, .. } => key,
3117        }
3118    }
3119
3120    /// Extracts the key for which the operation failed,
3121    /// consuming the error.
3122    pub fn into_key(self) -> record::Key {
3123        match self {
3124            GetProvidersError::Timeout { key, .. } => key,
3125        }
3126    }
3127}
3128
3129/// The result of publishing a provider record.
3130pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3131
3132/// The successful result of publishing a provider record.
3133#[derive(Debug, Clone)]
3134pub struct AddProviderOk {
3135    pub key: record::Key,
3136}
3137
3138/// The possible errors when publishing a provider record.
3139#[derive(Debug, Clone, Error)]
3140pub enum AddProviderError {
3141    #[error("the request timed out")]
3142    Timeout { key: record::Key },
3143}
3144
3145impl AddProviderError {
3146    /// Gets the key for which the operation failed.
3147    pub fn key(&self) -> &record::Key {
3148        match self {
3149            AddProviderError::Timeout { key, .. } => key,
3150        }
3151    }
3152
3153    /// Extracts the key for which the operation failed,
3154    pub fn into_key(self) -> record::Key {
3155        match self {
3156            AddProviderError::Timeout { key, .. } => key,
3157        }
3158    }
3159}
3160
3161impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3162    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3163        KadPeer {
3164            node_id: e.node.key.into_preimage(),
3165            multiaddrs: e.node.value.into_vec(),
3166            connection_ty: match e.status {
3167                NodeStatus::Connected => ConnectionType::Connected,
3168                NodeStatus::Disconnected => ConnectionType::NotConnected,
3169            },
3170        }
3171    }
3172}
3173
3174/// The context of a [`QueryInfo::AddProvider`] query.
3175#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3176pub enum AddProviderContext {
3177    /// The context is a [`Behaviour::start_providing`] operation.
3178    Publish,
3179    /// The context is periodic republishing of provider announcements
3180    /// initiated earlier via [`Behaviour::start_providing`].
3181    Republish,
3182}
3183
3184/// The context of a [`QueryInfo::PutRecord`] query.
3185#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3186pub enum PutRecordContext {
3187    /// The context is a [`Behaviour::put_record`] operation.
3188    Publish,
3189    /// The context is periodic republishing of records stored
3190    /// earlier via [`Behaviour::put_record`].
3191    Republish,
3192    /// The context is periodic replication (i.e. without extending
3193    /// the record TTL) of stored records received earlier from another peer.
3194    Replicate,
3195    /// The context is a custom store operation targeting specific
3196    /// peers initiated by [`Behaviour::put_record_to`].
3197    Custom,
3198}
3199
3200/// Information about a running query.
3201#[derive(Debug, Clone)]
3202pub enum QueryInfo {
3203    /// A query initiated by [`Behaviour::bootstrap`].
3204    Bootstrap {
3205        /// The targeted peer ID.
3206        peer: PeerId,
3207        /// The remaining random peer IDs to query, one per
3208        /// bucket that still needs refreshing.
3209        ///
3210        /// This is `None` if the initial self-lookup has not
3211        /// yet completed and `Some` with an exhausted iterator
3212        /// if bootstrapping is complete.
3213        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3214        step: ProgressStep,
3215    },
3216
3217    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3218    GetClosestPeers {
3219        /// The key being queried (the preimage).
3220        key: Vec<u8>,
3221        /// Current index of events.
3222        step: ProgressStep,
3223        /// Specifies expected number of responding peers
3224        num_results: NonZeroUsize,
3225    },
3226
3227    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3228    GetProviders {
3229        /// The key for which to search for providers.
3230        key: record::Key,
3231        /// The number of providers found so far.
3232        providers_found: usize,
3233        /// Current index of events.
3234        step: ProgressStep,
3235    },
3236
3237    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3238    AddProvider {
3239        /// The record key.
3240        key: record::Key,
3241        /// The current phase of the query.
3242        phase: AddProviderPhase,
3243        /// The execution context of the query.
3244        context: AddProviderContext,
3245    },
3246
3247    /// A (repeated) query initiated by [`Behaviour::put_record`].
3248    PutRecord {
3249        record: Record,
3250        /// The expected quorum of responses w.r.t. the replication factor.
3251        quorum: NonZeroUsize,
3252        /// The current phase of the query.
3253        phase: PutRecordPhase,
3254        /// The execution context of the query.
3255        context: PutRecordContext,
3256    },
3257
3258    /// A (repeated) query initiated by [`Behaviour::get_record`].
3259    GetRecord {
3260        /// The key to look for.
3261        key: record::Key,
3262        /// Current index of events.
3263        step: ProgressStep,
3264        /// Did we find at least one record?
3265        found_a_record: bool,
3266        /// The peers closest to the `key` that were queried but did not return a record,
3267        /// i.e. the peers that are candidates for caching the record.
3268        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3269    },
3270}
3271
3272impl QueryInfo {
3273    /// Creates an event for a handler to issue an outgoing request in the
3274    /// context of a query.
3275    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3276        match &self {
3277            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3278                key: peer.to_bytes(),
3279                query_id,
3280            },
3281            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3282                key: key.clone(),
3283                query_id,
3284            },
3285            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3286                key: key.clone(),
3287                query_id,
3288            },
3289            QueryInfo::AddProvider { key, phase, .. } => match phase {
3290                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3291                    key: key.to_vec(),
3292                    query_id,
3293                },
3294                AddProviderPhase::AddProvider {
3295                    provider_id,
3296                    external_addresses,
3297                    ..
3298                } => HandlerIn::AddProvider {
3299                    key: key.clone(),
3300                    provider: crate::protocol::KadPeer {
3301                        node_id: *provider_id,
3302                        multiaddrs: external_addresses.clone(),
3303                        connection_ty: crate::protocol::ConnectionType::Connected,
3304                    },
3305                    query_id,
3306                },
3307            },
3308            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3309                key: key.clone(),
3310                query_id,
3311            },
3312            QueryInfo::PutRecord { record, phase, .. } => match phase {
3313                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3314                    key: record.key.to_vec(),
3315                    query_id,
3316                },
3317                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3318                    record: record.clone(),
3319                    query_id,
3320                },
3321            },
3322        }
3323    }
3324}
3325
3326/// The phases of a [`QueryInfo::AddProvider`] query.
3327#[derive(Debug, Clone)]
3328pub enum AddProviderPhase {
3329    /// The query is searching for the closest nodes to the record key.
3330    GetClosestPeers,
3331
3332    /// The query advertises the local node as a provider for the key to
3333    /// the closest nodes to the key.
3334    AddProvider {
3335        /// The local peer ID that is advertised as a provider.
3336        provider_id: PeerId,
3337        /// The external addresses of the provider being advertised.
3338        external_addresses: Vec<Multiaddr>,
3339        /// Query statistics from the finished `GetClosestPeers` phase.
3340        get_closest_peers_stats: QueryStats,
3341    },
3342}
3343
3344/// The phases of a [`QueryInfo::PutRecord`] query.
3345#[derive(Debug, Clone, PartialEq, Eq)]
3346pub enum PutRecordPhase {
3347    /// The query is searching for the closest nodes to the record key.
3348    GetClosestPeers,
3349
3350    /// The query is replicating the record to the closest nodes to the key.
3351    PutRecord {
3352        /// A list of peers the given record has been successfully replicated to.
3353        success: Vec<PeerId>,
3354        /// Query statistics from the finished `GetClosestPeers` phase.
3355        get_closest_peers_stats: QueryStats,
3356    },
3357}
3358
3359/// A mutable reference to a running query.
3360pub struct QueryMut<'a> {
3361    query: &'a mut Query,
3362}
3363
3364impl QueryMut<'_> {
3365    pub fn id(&self) -> QueryId {
3366        self.query.id()
3367    }
3368
3369    /// Gets information about the type and state of the query.
3370    pub fn info(&self) -> &QueryInfo {
3371        &self.query.info
3372    }
3373
3374    /// Gets execution statistics about the query.
3375    ///
3376    /// For a multi-phase query such as `put_record`, these are the
3377    /// statistics of the current phase.
3378    pub fn stats(&self) -> &QueryStats {
3379        self.query.stats()
3380    }
3381
3382    /// Finishes the query asap, without waiting for the
3383    /// regular termination conditions.
3384    pub fn finish(&mut self) {
3385        self.query.finish()
3386    }
3387}
3388
3389/// An immutable reference to a running query.
3390pub struct QueryRef<'a> {
3391    query: &'a Query,
3392}
3393
3394impl QueryRef<'_> {
3395    pub fn id(&self) -> QueryId {
3396        self.query.id()
3397    }
3398
3399    /// Gets information about the type and state of the query.
3400    pub fn info(&self) -> &QueryInfo {
3401        &self.query.info
3402    }
3403
3404    /// Gets execution statistics about the query.
3405    ///
3406    /// For a multi-phase query such as `put_record`, these are the
3407    /// statistics of the current phase.
3408    pub fn stats(&self) -> &QueryStats {
3409        self.query.stats()
3410    }
3411}
3412
3413/// An operation failed to due no known peers in the routing table.
3414#[derive(Debug, Clone)]
3415pub struct NoKnownPeers();
3416
3417impl fmt::Display for NoKnownPeers {
3418    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3419        write!(f, "No known peers.")
3420    }
3421}
3422
3423impl std::error::Error for NoKnownPeers {}
3424
3425/// The possible outcomes of [`Behaviour::add_address`].
3426#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3427pub enum RoutingUpdate {
3428    /// The given peer and address has been added to the routing
3429    /// table.
3430    Success,
3431    /// The peer and address is pending insertion into
3432    /// the routing table, if a disconnected peer fails
3433    /// to respond. If the given peer and address ends up
3434    /// in the routing table, [`Event::RoutingUpdated`]
3435    /// is eventually emitted.
3436    Pending,
3437    /// The routing table update failed, either because the
3438    /// corresponding bucket for the peer is full and the
3439    /// pending slot(s) are occupied, or because the given
3440    /// peer ID is deemed invalid (e.g. refers to the local
3441    /// peer ID).
3442    Failed,
3443}
3444
3445#[derive(PartialEq, Copy, Clone, Debug)]
3446pub enum Mode {
3447    Client,
3448    Server,
3449}
3450
3451impl fmt::Display for Mode {
3452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3453        match self {
3454            Mode::Client => write!(f, "client"),
3455            Mode::Server => write!(f, "server"),
3456        }
3457    }
3458}
3459
3460fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3461where
3462    T: ToString,
3463{
3464    confirmed_external_addresses
3465        .iter()
3466        .map(|addr| addr.to_string())
3467        .collect::<Vec<_>>()
3468        .join(", ")
3469}