libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26 collections::{BTreeMap, HashMap, HashSet, VecDeque},
27 fmt,
28 num::NonZeroUsize,
29 task::{Context, Poll, Waker},
30 time::Duration,
31 vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39 dial_opts::{self, DialOpts},
40 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42 THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50 addresses::Addresses,
51 bootstrap,
52 handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53 jobs::*,
54 kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55 protocol,
56 protocol::{ConnectionType, KadPeer, ProtocolConfig},
57 query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58 record::{
59 self,
60 store::{self, RecordStore},
61 ProviderRecord, Record,
62 },
63 K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69 /// The Kademlia routing table.
70 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72 /// The k-bucket insertion strategy.
73 kbucket_inserts: BucketInserts,
74
75 /// Configuration of the wire protocol.
76 protocol_config: ProtocolConfig,
77
78 /// Configuration of [`RecordStore`] filtering.
79 record_filtering: StoreInserts,
80
81 /// The currently active (i.e. in-progress) queries.
82 queries: QueryPool,
83
84 /// The currently connected peers.
85 ///
86 /// This is a superset of the connected peers currently in the routing table.
87 connected_peers: FnvHashSet<PeerId>,
88
89 /// Periodic job for re-publication of provider records for keys
90 /// provided by the local node.
91 add_provider_job: Option<AddProviderJob>,
92
93 /// Periodic job for (re-)replication and (re-)publishing of
94 /// regular (value-)records.
95 put_record_job: Option<PutRecordJob>,
96
97 /// The TTL of regular (value-)records.
98 record_ttl: Option<Duration>,
99
100 /// The TTL of provider records.
101 provider_record_ttl: Option<Duration>,
102
103 /// Queued events to return when the behaviour is being polled.
104 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106 listen_addresses: ListenAddresses,
107
108 external_addresses: ExternalAddresses,
109
110 connections: HashMap<ConnectionId, PeerId>,
111
112 /// See [`Config::caching`].
113 caching: Caching,
114
115 local_peer_id: PeerId,
116
117 mode: Mode,
118 auto_mode: bool,
119 no_events_waker: Option<Waker>,
120
121 /// The record storage.
122 store: TStore,
123
124 /// Tracks the status of the current bootstrap.
125 bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133 /// Whenever a connection to a peer is established as a
134 /// result of a dialing attempt and that peer is not yet
135 /// in the routing table, it is inserted as long as there
136 /// is a free slot in the corresponding k-bucket. If the
137 /// k-bucket is full but still has a free pending slot,
138 /// it may be inserted into the routing table at a later time if an unresponsive
139 /// disconnected peer is evicted from the bucket.
140 OnConnected,
141 /// New peers and addresses are only added to the routing table via
142 /// explicit calls to [`Behaviour::add_address`].
143 ///
144 /// > **Note**: Even though peers can only get into the
145 /// > routing table as a result of [`Behaviour::add_address`],
146 /// > routing table entries are still updated as peers
147 /// > connect and disconnect (i.e. the order of the entries
148 /// > as well as the network addresses).
149 Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161 /// Whenever a (provider) record is received,
162 /// the record is forwarded immediately to the [`RecordStore`].
163 Unfiltered,
164 /// Whenever a (provider) record is received, an event is emitted.
165 /// Provider records generate a [`InboundRequest::AddProvider`] under
166 /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167 /// under [`Event::InboundRequest`].
168 ///
169 /// When deemed valid, a (provider) record needs to be explicitly stored in
170 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172 /// be retrieved via [`Behaviour::store_mut`].
173 FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181 kbucket_config: KBucketConfig,
182 query_config: QueryConfig,
183 protocol_config: ProtocolConfig,
184 record_ttl: Option<Duration>,
185 record_replication_interval: Option<Duration>,
186 record_publication_interval: Option<Duration>,
187 record_filtering: StoreInserts,
188 provider_record_ttl: Option<Duration>,
189 provider_publication_interval: Option<Duration>,
190 kbucket_inserts: BucketInserts,
191 caching: Caching,
192 periodic_bootstrap_interval: Option<Duration>,
193 automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197 /// Returns the default configuration.
198 ///
199 /// Deprecated: use `Config::new` instead.
200 fn default() -> Self {
201 Self::new(protocol::DEFAULT_PROTO_NAME)
202 }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209 /// Caching is disabled and the peers closest to records being looked up
210 /// that do not return a record are not tracked, i.e.
211 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212 Disabled,
213 /// Up to `max_peers` peers not returning a record that are closest to the key
214 /// being looked up are tracked and returned in
215 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216 /// performed explicitly, if desired and after choosing a record from the results, via
217 /// [`Behaviour::put_record_to`].
218 Enabled { max_peers: u16 },
219}
220
221impl Config {
222 /// Builds a new `Config` with the given protocol name.
223 pub fn new(protocol_name: StreamProtocol) -> Self {
224 Config {
225 kbucket_config: KBucketConfig::default(),
226 query_config: QueryConfig::default(),
227 protocol_config: ProtocolConfig::new(protocol_name),
228 record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229 record_replication_interval: Some(Duration::from_secs(60 * 60)),
230 record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231 record_filtering: StoreInserts::Unfiltered,
232 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233 provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234 kbucket_inserts: BucketInserts::OnConnected,
235 caching: Caching::Enabled { max_peers: 1 },
236 periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237 automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238 }
239 }
240
241 /// Sets the timeout for a single query.
242 ///
243 /// > **Note**: A single query usually comprises at least as many requests
244 /// > as the replication factor, i.e. this is not a request timeout.
245 ///
246 /// The default is 60 seconds.
247 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248 self.query_config.timeout = timeout;
249 self
250 }
251
252 /// Sets the replication factor to use.
253 ///
254 /// The replication factor determines to how many closest peers
255 /// a record is replicated. The default is [`crate::K_VALUE`].
256 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257 self.query_config.replication_factor = replication_factor;
258 self
259 }
260
261 /// Sets the allowed level of parallelism for iterative queries.
262 ///
263 /// The `α` parameter in the Kademlia paper. The maximum number of peers
264 /// that an iterative query is allowed to wait for in parallel while
265 /// iterating towards the closest nodes to a target. Defaults to
266 /// `ALPHA_VALUE`.
267 ///
268 /// This only controls the level of parallelism of an iterative query, not
269 /// the level of parallelism of a query to a fixed set of peers.
270 ///
271 /// When used with [`Config::disjoint_query_paths`] it equals
272 /// the amount of disjoint paths used.
273 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274 self.query_config.parallelism = parallelism;
275 self
276 }
277
278 /// Require iterative queries to use disjoint paths for increased resiliency
279 /// in the presence of potentially adversarial nodes.
280 ///
281 /// When enabled the number of disjoint paths used equals the configured
282 /// parallelism.
283 ///
284 /// See the S/Kademlia paper for more information on the high level design
285 /// as well as its security improvements.
286 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287 self.query_config.disjoint_query_paths = enabled;
288 self
289 }
290
291 /// Sets the TTL for stored records.
292 ///
293 /// The TTL should be significantly longer than the (re-)publication
294 /// interval, to avoid premature expiration of records. The default is 36
295 /// hours.
296 ///
297 /// `None` means records never expire.
298 ///
299 /// Does not apply to provider records.
300 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301 self.record_ttl = record_ttl;
302 self
303 }
304
305 /// Sets whether or not records should be filtered before being stored.
306 ///
307 /// See [`StoreInserts`] for the different values.
308 /// Defaults to [`StoreInserts::Unfiltered`].
309 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310 self.record_filtering = filtering;
311 self
312 }
313
314 /// Sets the (re-)replication interval for stored records.
315 ///
316 /// Periodic replication of stored records ensures that the records
317 /// are always replicated to the available nodes closest to the key in the
318 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319 /// ensuring persistence until the record expires. Replication does not
320 /// prolong the regular lifetime of a record (for otherwise it would live
321 /// forever regardless of the configured TTL). The expiry of a record
322 /// is only extended through re-publication.
323 ///
324 /// This interval should be significantly shorter than the publication
325 /// interval, to ensure persistence between re-publications. The default
326 /// is 1 hour.
327 ///
328 /// `None` means that stored records are never re-replicated.
329 ///
330 /// Does not apply to provider records.
331 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332 self.record_replication_interval = interval;
333 self
334 }
335
336 /// Sets the (re-)publication interval of stored records.
337 ///
338 /// Records persist in the DHT until they expire. By default, published
339 /// records are re-published in regular intervals for as long as the record
340 /// exists in the local storage of the original publisher, thereby extending
341 /// the records lifetime.
342 ///
343 /// This interval should be significantly shorter than the record TTL, to
344 /// ensure records do not expire prematurely. The default is 24 hours.
345 ///
346 /// `None` means that stored records are never automatically re-published.
347 ///
348 /// Does not apply to provider records.
349 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350 self.record_publication_interval = interval;
351 self
352 }
353
354 /// Sets the TTL for provider records.
355 ///
356 /// `None` means that stored provider records never expire.
357 ///
358 /// Must be significantly larger than the provider publication interval.
359 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360 self.provider_record_ttl = ttl;
361 self
362 }
363
364 /// Sets the interval at which provider records for keys provided
365 /// by the local node are re-published.
366 ///
367 /// `None` means that stored provider records are never automatically
368 /// re-published.
369 ///
370 /// Must be significantly less than the provider record TTL.
371 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372 self.provider_publication_interval = interval;
373 self
374 }
375
376 /// Modifies the maximum allowed size of individual Kademlia packets.
377 ///
378 /// It might be necessary to increase this value if trying to put large
379 /// records.
380 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381 self.protocol_config.set_max_packet_size(size);
382 self
383 }
384
385 /// Modifies the timeout duration of outbound substreams.
386 ///
387 /// * Default to `10` seconds.
388 /// * May need to increase this value when sending large records with poor connection.
389 pub fn set_substreams_timeout(&mut self, timeout: Duration) -> &mut Self {
390 self.protocol_config.set_substreams_timeout(timeout);
391 self
392 }
393
394 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
395 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
396 self.kbucket_inserts = inserts;
397 self
398 }
399
400 /// Sets the [`Caching`] strategy to use for successful lookups.
401 ///
402 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
403 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
404 /// will result in the record being cached at the closest node to the key that
405 /// did not return the record, i.e. the standard Kademlia behaviour.
406 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
407 self.caching = c;
408 self
409 }
410
411 /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
412 ///
413 /// * Default to `5` minutes.
414 /// * Set to `None` to disable periodic bootstrap.
415 pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
416 self.periodic_bootstrap_interval = interval;
417 self
418 }
419
420 /// Sets the configuration for the k-buckets.
421 ///
422 /// * Default to K_VALUE.
423 ///
424 /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
425 pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
426 self.kbucket_config.set_bucket_size(size);
427 self
428 }
429
430 /// Sets the timeout duration after creation of a pending entry after which
431 /// it becomes eligible for insertion into a full bucket, replacing the
432 /// least-recently (dis)connected node.
433 ///
434 /// * Default to `60` s.
435 pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
436 self.kbucket_config.set_pending_timeout(timeout);
437 self
438 }
439
440 /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
441 /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
442 /// inserted into the routing table "at the same time". This also allows to wait a little
443 /// bit for other potential peers to be inserted into the routing table before triggering a
444 /// bootstrap, giving more context to the future bootstrap request.
445 ///
446 /// * Default to `500` ms.
447 /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
448 /// new peer is inserted in the routing table.
449 /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
450 /// a new peer is inserted in the routing table).
451 #[cfg(test)]
452 pub(crate) fn set_automatic_bootstrap_throttle(
453 &mut self,
454 duration: Option<Duration>,
455 ) -> &mut Self {
456 self.automatic_bootstrap_throttle = duration;
457 self
458 }
459}
460
461impl<TStore> Behaviour<TStore>
462where
463 TStore: RecordStore + Send + 'static,
464{
465 /// Creates a new `Kademlia` network behaviour with a default configuration.
466 pub fn new(id: PeerId, store: TStore) -> Self {
467 Self::with_config(id, store, Default::default())
468 }
469
470 /// Get the protocol name of this kademlia instance.
471 pub fn protocol_names(&self) -> &[StreamProtocol] {
472 self.protocol_config.protocol_names()
473 }
474
475 /// Creates a new `Kademlia` network behaviour with the given configuration.
476 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
477 let local_key = kbucket::Key::from(id);
478
479 let put_record_job = config
480 .record_replication_interval
481 .or(config.record_publication_interval)
482 .map(|interval| {
483 PutRecordJob::new(
484 id,
485 interval,
486 config.record_publication_interval,
487 config.record_ttl,
488 )
489 });
490
491 let add_provider_job = config
492 .provider_publication_interval
493 .map(AddProviderJob::new);
494
495 Behaviour {
496 store,
497 caching: config.caching,
498 kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
499 kbucket_inserts: config.kbucket_inserts,
500 protocol_config: config.protocol_config,
501 record_filtering: config.record_filtering,
502 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
503 listen_addresses: Default::default(),
504 queries: QueryPool::new(config.query_config),
505 connected_peers: Default::default(),
506 add_provider_job,
507 put_record_job,
508 record_ttl: config.record_ttl,
509 provider_record_ttl: config.provider_record_ttl,
510 external_addresses: Default::default(),
511 local_peer_id: id,
512 connections: Default::default(),
513 mode: Mode::Client,
514 auto_mode: true,
515 no_events_waker: None,
516 bootstrap_status: bootstrap::Status::new(
517 config.periodic_bootstrap_interval,
518 config.automatic_bootstrap_throttle,
519 ),
520 }
521 }
522
523 /// Gets an iterator over immutable references to all running queries.
524 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
525 self.queries.iter().filter_map(|query| {
526 if !query.is_finished() {
527 Some(QueryRef { query })
528 } else {
529 None
530 }
531 })
532 }
533
534 /// Gets an iterator over mutable references to all running queries.
535 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
536 self.queries.iter_mut().filter_map(|query| {
537 if !query.is_finished() {
538 Some(QueryMut { query })
539 } else {
540 None
541 }
542 })
543 }
544
545 /// Gets an immutable reference to a running query, if it exists.
546 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
547 self.queries.get(id).and_then(|query| {
548 if !query.is_finished() {
549 Some(QueryRef { query })
550 } else {
551 None
552 }
553 })
554 }
555
556 /// Gets a mutable reference to a running query, if it exists.
557 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
558 self.queries.get_mut(id).and_then(|query| {
559 if !query.is_finished() {
560 Some(QueryMut { query })
561 } else {
562 None
563 }
564 })
565 }
566
567 /// Adds a known listen address of a peer participating in the DHT to the
568 /// routing table.
569 ///
570 /// Explicitly adding addresses of peers serves two purposes:
571 ///
572 /// 1. In order for a node to join the DHT, it must know about at least one other node of the
573 /// DHT.
574 ///
575 /// 2. When a remote peer initiates a connection and that peer is not yet in the routing
576 /// table, the `Kademlia` behaviour must be informed of an address on which that peer is
577 /// listening for connections before it can be added to the routing table from where it can
578 /// subsequently be discovered by all peers in the DHT.
579 ///
580 /// If the routing table has been updated as a result of this operation,
581 /// a [`Event::RoutingUpdated`] event is emitted.
582 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
583 // ensuring address is a fully-qualified /p2p multiaddr
584 let Ok(address) = address.with_p2p(*peer) else {
585 return RoutingUpdate::Failed;
586 };
587 let key = kbucket::Key::from(*peer);
588 match self.kbuckets.entry(&key) {
589 Some(kbucket::Entry::Present(mut entry, _)) => {
590 if entry.value().insert(address) {
591 self.queued_events
592 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
593 peer: *peer,
594 is_new_peer: false,
595 addresses: entry.value().clone(),
596 old_peer: None,
597 bucket_range: self
598 .kbuckets
599 .bucket(&key)
600 .map(|b| b.range())
601 .expect("Not kbucket::Entry::SelfEntry."),
602 }))
603 }
604 RoutingUpdate::Success
605 }
606 Some(kbucket::Entry::Pending(mut entry, _)) => {
607 entry.value().insert(address);
608 RoutingUpdate::Pending
609 }
610 Some(kbucket::Entry::Absent(entry)) => {
611 let addresses = Addresses::new(address);
612 let status = if self.connected_peers.contains(peer) {
613 NodeStatus::Connected
614 } else {
615 NodeStatus::Disconnected
616 };
617 match entry.insert(addresses.clone(), status) {
618 kbucket::InsertResult::Inserted => {
619 self.bootstrap_on_low_peers();
620
621 self.queued_events.push_back(ToSwarm::GenerateEvent(
622 Event::RoutingUpdated {
623 peer: *peer,
624 is_new_peer: true,
625 addresses,
626 old_peer: None,
627 bucket_range: self
628 .kbuckets
629 .bucket(&key)
630 .map(|b| b.range())
631 .expect("Not kbucket::Entry::SelfEntry."),
632 },
633 ));
634 RoutingUpdate::Success
635 }
636 kbucket::InsertResult::Full => {
637 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
638 RoutingUpdate::Failed
639 }
640 kbucket::InsertResult::Pending { disconnected } => {
641 self.queued_events.push_back(ToSwarm::Dial {
642 opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
643 });
644 RoutingUpdate::Pending
645 }
646 }
647 }
648 None => RoutingUpdate::Failed,
649 }
650 }
651
652 /// Removes an address of a peer from the routing table.
653 ///
654 /// If the given address is the last address of the peer in the
655 /// routing table, the peer is removed from the routing table
656 /// and `Some` is returned with a view of the removed entry.
657 /// The same applies if the peer is currently pending insertion
658 /// into the routing table.
659 ///
660 /// If the given peer or address is not in the routing table,
661 /// this is a no-op.
662 pub fn remove_address(
663 &mut self,
664 peer: &PeerId,
665 address: &Multiaddr,
666 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
667 let address = &address.to_owned().with_p2p(*peer).ok()?;
668 let key = kbucket::Key::from(*peer);
669 match self.kbuckets.entry(&key)? {
670 kbucket::Entry::Present(mut entry, _) => {
671 if entry.value().remove(address).is_err() {
672 Some(entry.remove()) // it is the last address, thus remove the peer.
673 } else {
674 None
675 }
676 }
677 kbucket::Entry::Pending(mut entry, _) => {
678 if entry.value().remove(address).is_err() {
679 Some(entry.remove()) // it is the last address, thus remove the peer.
680 } else {
681 None
682 }
683 }
684 kbucket::Entry::Absent(..) => None,
685 }
686 }
687
688 /// Removes a peer from the routing table.
689 ///
690 /// Returns `None` if the peer was not in the routing table,
691 /// not even pending insertion.
692 pub fn remove_peer(
693 &mut self,
694 peer: &PeerId,
695 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
696 let key = kbucket::Key::from(*peer);
697 match self.kbuckets.entry(&key)? {
698 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
699 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
700 kbucket::Entry::Absent(..) => None,
701 }
702 }
703
704 /// Returns an iterator over all non-empty buckets in the routing table.
705 pub fn kbuckets(
706 &mut self,
707 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
708 self.kbuckets.iter().filter(|b| !b.is_empty())
709 }
710
711 /// Returns the k-bucket for the distance to the given key.
712 ///
713 /// Returns `None` if the given key refers to the local key.
714 pub fn kbucket<K>(
715 &mut self,
716 key: K,
717 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
718 where
719 K: Into<kbucket::Key<K>> + Clone,
720 {
721 self.kbuckets.bucket(&key.into())
722 }
723
724 /// Initiates an iterative query for the closest peers to the given key.
725 ///
726 /// The result of the query is delivered in a
727 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
728 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
729 where
730 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
731 {
732 self.get_closest_peers_inner(key, K_VALUE)
733 }
734
735 /// Initiates an iterative query for the closest peers to the given key.
736 /// The expected responding peers is specified by `num_results`
737 /// Note that the result is capped after exceeds K_VALUE
738 ///
739 /// The result of the query is delivered in a
740 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
741 pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
742 where
743 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
744 {
745 // The inner code never expect higher than K_VALUE results to be returned.
746 // And removing such cap will be tricky,
747 // since it would involve forging a new key and additional requests.
748 // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
749 let capped_num_results = std::cmp::min(num_results, K_VALUE);
750 self.get_closest_peers_inner(key, capped_num_results)
751 }
752
753 fn get_closest_peers_inner<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
754 where
755 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
756 {
757 let target: kbucket::Key<K> = key.clone().into();
758 let key: Vec<u8> = key.into();
759 let info = QueryInfo::GetClosestPeers {
760 key,
761 step: ProgressStep::first(),
762 num_results,
763 };
764 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
765 self.queries.add_iter_closest(target, peer_keys, info)
766 }
767
768 /// Returns all peers ordered by distance to the given key; takes peers from local routing table
769 /// only.
770 pub fn get_closest_local_peers<'a, K: Clone>(
771 &'a mut self,
772 key: &'a kbucket::Key<K>,
773 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
774 self.kbuckets.closest_keys(key)
775 }
776
777 /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
778 /// that the `source` peer is never included in the result.
779 ///
780 /// Takes peers from local routing table only. Only returns number of peers equal to configured
781 /// replication factor.
782 pub fn find_closest_local_peers<'a, K: Clone>(
783 &'a mut self,
784 key: &'a kbucket::Key<K>,
785 source: &'a PeerId,
786 ) -> impl Iterator<Item = KadPeer> + 'a {
787 self.kbuckets
788 .closest(key)
789 .filter(move |e| e.node.key.preimage() != source)
790 .take(K_VALUE.get())
791 .map(KadPeer::from)
792 }
793
794 /// Performs a lookup for a record in the DHT.
795 ///
796 /// The result of this operation is delivered in a
797 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
798 pub fn get_record(&mut self, key: record::Key) -> QueryId {
799 let record = if let Some(record) = self.store.get(&key) {
800 if record.is_expired(Instant::now()) {
801 self.store.remove(&key);
802 None
803 } else {
804 Some(PeerRecord {
805 peer: None,
806 record: record.into_owned(),
807 })
808 }
809 } else {
810 None
811 };
812
813 let step = ProgressStep::first();
814
815 let target = kbucket::Key::new(key.clone());
816 let info = if record.is_some() {
817 QueryInfo::GetRecord {
818 key,
819 step: step.next(),
820 found_a_record: true,
821 cache_candidates: BTreeMap::new(),
822 }
823 } else {
824 QueryInfo::GetRecord {
825 key,
826 step: step.clone(),
827 found_a_record: false,
828 cache_candidates: BTreeMap::new(),
829 }
830 };
831 let peers = self.kbuckets.closest_keys(&target);
832 let id = self.queries.add_iter_closest(target.clone(), peers, info);
833
834 // No queries were actually done for the results yet.
835 let stats = QueryStats::empty();
836
837 if let Some(record) = record {
838 self.queued_events
839 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
840 id,
841 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
842 step,
843 stats,
844 }));
845 }
846
847 id
848 }
849
850 /// Stores a record in the DHT, locally as well as at the nodes
851 /// closest to the key as per the xor distance metric.
852 ///
853 /// Returns `Ok` if a record has been stored locally, providing the
854 /// `QueryId` of the initial query that replicates the record in the DHT.
855 /// The result of the query is eventually reported as a
856 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
857 ///
858 /// The record is always stored locally with the given expiration. If the record's
859 /// expiration is `None`, the common case, it does not expire in local storage
860 /// but is still replicated with the configured record TTL. To remove the record
861 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
862 ///
863 /// After the initial publication of the record, it is subject to (re-)replication
864 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
865 /// does not update the record's expiration in local storage, thus a given record
866 /// with an explicit expiration will always expire at that instant and until then
867 /// is subject to regular (re-)replication and (re-)publication.
868 pub fn put_record(
869 &mut self,
870 mut record: Record,
871 quorum: Quorum,
872 ) -> Result<QueryId, store::Error> {
873 record.publisher = Some(*self.kbuckets.local_key().preimage());
874 self.store.put(record.clone())?;
875 record.expires = record
876 .expires
877 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
878 let quorum = quorum.eval(self.queries.config().replication_factor);
879 let target = kbucket::Key::new(record.key.clone());
880 let peers = self.kbuckets.closest_keys(&target);
881 let context = PutRecordContext::Publish;
882 let info = QueryInfo::PutRecord {
883 context,
884 record,
885 quorum,
886 phase: PutRecordPhase::GetClosestPeers,
887 };
888 Ok(self.queries.add_iter_closest(target.clone(), peers, info))
889 }
890
891 /// Stores a record at specific peers, without storing it locally.
892 ///
893 /// The given [`Quorum`] is understood in the context of the total
894 /// number of distinct peers given.
895 ///
896 /// If the record's expiration is `None`, the configured record TTL is used.
897 ///
898 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
899 /// > used to selectively update or store a record to specific peers
900 /// > for the purpose of e.g. making sure these peers have the latest
901 /// > "version" of a record or to "cache" a record at further peers
902 /// > to increase the lookup success rate on the DHT for other peers.
903 /// >
904 /// > In particular, there is no automatic storing of records performed, and this
905 /// > method must be used to ensure the standard Kademlia
906 /// > procedure of "caching" (i.e. storing) a found record at the closest
907 /// > node to the key that _did not_ return it.
908 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
909 where
910 I: ExactSizeIterator<Item = PeerId>,
911 {
912 let quorum = if peers.len() > 0 {
913 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
914 } else {
915 // If no peers are given, we just let the query fail immediately
916 // due to the fact that the quorum must be at least one, instead of
917 // introducing a new kind of error.
918 NonZeroUsize::new(1).expect("1 > 0")
919 };
920 record.expires = record
921 .expires
922 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
923 let context = PutRecordContext::Custom;
924 let info = QueryInfo::PutRecord {
925 context,
926 record,
927 quorum,
928 phase: PutRecordPhase::PutRecord {
929 success: Vec::new(),
930 get_closest_peers_stats: QueryStats::empty(),
931 },
932 };
933 self.queries.add_fixed(peers, info)
934 }
935
936 /// Removes the record with the given key from _local_ storage,
937 /// if the local node is the publisher of the record.
938 ///
939 /// Has no effect if a record for the given key is stored locally but
940 /// the local node is not a publisher of the record.
941 ///
942 /// This is a _local_ operation. However, it also has the effect that
943 /// the record will no longer be periodically re-published, allowing the
944 /// record to eventually expire throughout the DHT.
945 pub fn remove_record(&mut self, key: &record::Key) {
946 if let Some(r) = self.store.get(key) {
947 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
948 self.store.remove(key)
949 }
950 }
951 }
952
953 /// Gets a mutable reference to the record store.
954 pub fn store_mut(&mut self) -> &mut TStore {
955 &mut self.store
956 }
957
958 /// Bootstraps the local node to join the DHT.
959 ///
960 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
961 /// own ID in the DHT. This introduces the local node to the other nodes
962 /// in the DHT and populates its routing table with the closest neighbours.
963 ///
964 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
965 /// refreshed by initiating an additional bootstrapping query for each such
966 /// bucket with random keys.
967 ///
968 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
969 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
970 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
971 /// with one such event per bootstrapping query.
972 ///
973 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
974 ///
975 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
976 /// > See [`Behaviour::add_address`].
977 ///
978 /// > **Note**: Bootstrap does not require to be called manually. It is periodically
979 /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
980 /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
981 /// > invoked
982 /// > when a new peer is inserted in the routing table.
983 /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
984 /// > to ensure a healthy routing table.
985 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
986 let local_key = *self.kbuckets.local_key();
987 let info = QueryInfo::Bootstrap {
988 peer: *local_key.preimage(),
989 remaining: None,
990 step: ProgressStep::first(),
991 };
992 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
993 if peers.is_empty() {
994 self.bootstrap_status.reset_timers();
995 Err(NoKnownPeers())
996 } else {
997 self.bootstrap_status.on_started();
998 Ok(self.queries.add_iter_closest(local_key, peers, info))
999 }
1000 }
1001
1002 /// Establishes the local node as a provider of a value for the given key.
1003 ///
1004 /// This operation publishes a provider record with the given key and
1005 /// identity of the local node to the peers closest to the key, thus establishing
1006 /// the local node as a provider.
1007 ///
1008 /// Returns `Ok` if a provider record has been stored locally, providing the
1009 /// `QueryId` of the initial query that announces the local node as a provider.
1010 ///
1011 /// The publication of the provider records is periodically repeated as per the
1012 /// configured interval, to renew the expiry and account for changes to the DHT
1013 /// topology. A provider record may be removed from local storage and
1014 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1015 ///
1016 /// In contrast to the standard Kademlia push-based model for content distribution
1017 /// implemented by [`Behaviour::put_record`], the provider API implements a
1018 /// pull-based model that may be used in addition or as an alternative.
1019 /// The means by which the actual value is obtained from a provider is out of scope
1020 /// of the libp2p Kademlia provider API.
1021 ///
1022 /// The results of the (repeated) provider announcements sent by this node are
1023 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1024 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1025 // Note: We store our own provider records locally without local addresses
1026 // to avoid redundant storage and outdated addresses. Instead these are
1027 // acquired on demand when returning a `ProviderRecord` for the local node.
1028 let local_addrs = Vec::new();
1029 let record = ProviderRecord::new(
1030 key.clone(),
1031 *self.kbuckets.local_key().preimage(),
1032 local_addrs,
1033 );
1034 self.store.add_provider(record)?;
1035 let target = kbucket::Key::new(key.clone());
1036 let peers = self.kbuckets.closest_keys(&target);
1037 let context = AddProviderContext::Publish;
1038 let info = QueryInfo::AddProvider {
1039 context,
1040 key,
1041 phase: AddProviderPhase::GetClosestPeers,
1042 };
1043 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1044 Ok(id)
1045 }
1046
1047 /// Stops the local node from announcing that it is a provider for the given key.
1048 ///
1049 /// This is a local operation. The local node will still be considered as a
1050 /// provider for the key by other nodes until these provider records expire.
1051 pub fn stop_providing(&mut self, key: &record::Key) {
1052 self.store
1053 .remove_provider(key, self.kbuckets.local_key().preimage());
1054 }
1055
1056 /// Performs a lookup for providers of a value to the given key.
1057 ///
1058 /// The result of this operation is delivered in a
1059 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1060 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1061 let providers: HashSet<_> = self
1062 .store
1063 .providers(&key)
1064 .into_iter()
1065 .filter(|p| {
1066 if p.is_expired(Instant::now()) {
1067 self.store.remove_provider(&key, &p.provider);
1068 false
1069 } else {
1070 true
1071 }
1072 })
1073 .map(|p| p.provider)
1074 .collect();
1075
1076 let step = ProgressStep::first();
1077
1078 let info = QueryInfo::GetProviders {
1079 key: key.clone(),
1080 providers_found: providers.len(),
1081 step: if providers.is_empty() {
1082 step.clone()
1083 } else {
1084 step.next()
1085 },
1086 };
1087
1088 let target = kbucket::Key::new(key.clone());
1089 let peers = self.kbuckets.closest_keys(&target);
1090 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1091
1092 // No queries were actually done for the results yet.
1093 let stats = QueryStats::empty();
1094
1095 if !providers.is_empty() {
1096 self.queued_events
1097 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1098 id,
1099 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1100 key,
1101 providers,
1102 })),
1103 step,
1104 stats,
1105 }));
1106 }
1107 id
1108 }
1109
1110 /// Set the [`Mode`] in which we should operate.
1111 ///
1112 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1113 /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1114 ///
1115 /// Setting a mode via this function disables this automatic behaviour and unconditionally
1116 /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1117 /// instead.
1118 pub fn set_mode(&mut self, mode: Option<Mode>) {
1119 match mode {
1120 Some(mode) => {
1121 self.mode = mode;
1122 self.auto_mode = false;
1123 self.reconfigure_mode();
1124 }
1125 None => {
1126 self.auto_mode = true;
1127 self.determine_mode_from_external_addresses();
1128 }
1129 }
1130
1131 if let Some(waker) = self.no_events_waker.take() {
1132 waker.wake();
1133 }
1134 }
1135
1136 /// Get the [`Mode`] in which the DHT is currently operating.
1137 pub fn mode(&self) -> Mode {
1138 self.mode
1139 }
1140
1141 fn reconfigure_mode(&mut self) {
1142 if self.connections.is_empty() {
1143 return;
1144 }
1145
1146 let num_connections = self.connections.len();
1147
1148 tracing::debug!(
1149 "Re-configuring {} established connection{}",
1150 num_connections,
1151 if num_connections > 1 { "s" } else { "" }
1152 );
1153
1154 self.queued_events
1155 .extend(
1156 self.connections
1157 .iter()
1158 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1159 peer_id: *peer_id,
1160 handler: NotifyHandler::One(*conn_id),
1161 event: HandlerIn::ReconfigureMode {
1162 new_mode: self.mode,
1163 },
1164 }),
1165 );
1166 }
1167
1168 fn determine_mode_from_external_addresses(&mut self) {
1169 let old_mode = self.mode;
1170
1171 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1172 ([], Mode::Server) => {
1173 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1174
1175 Mode::Client
1176 }
1177 ([], Mode::Client) => {
1178 // Previously client-mode, now also client-mode because no external addresses.
1179
1180 Mode::Client
1181 }
1182 (confirmed_external_addresses, Mode::Client) => {
1183 if tracing::enabled!(Level::DEBUG) {
1184 let confirmed_external_addresses =
1185 to_comma_separated_list(confirmed_external_addresses);
1186
1187 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1188 }
1189
1190 Mode::Server
1191 }
1192 (confirmed_external_addresses, Mode::Server) => {
1193 debug_assert!(
1194 !confirmed_external_addresses.is_empty(),
1195 "Previous match arm handled empty list"
1196 );
1197
1198 // Previously, server-mode, now also server-mode because > 1 external address.
1199 // Don't log anything to avoid spam.
1200 Mode::Server
1201 }
1202 };
1203
1204 self.reconfigure_mode();
1205
1206 if old_mode != self.mode {
1207 self.queued_events
1208 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1209 new_mode: self.mode,
1210 }));
1211 }
1212 }
1213
1214 /// Processes discovered peers from a successful request in an iterative `Query`.
1215 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1216 where
1217 I: Iterator<Item = &'a KadPeer> + Clone,
1218 {
1219 let local_id = self.kbuckets.local_key().preimage();
1220 let others_iter = peers.filter(|p| &p.node_id != local_id);
1221 if let Some(query) = self.queries.get_mut(query_id) {
1222 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1223 for peer in others_iter.clone() {
1224 tracing::trace!(
1225 ?peer,
1226 %source,
1227 query=?query_id,
1228 "Peer reported by source in query"
1229 );
1230 let addrs = peer.multiaddrs.iter().cloned().collect();
1231 query.peers.addresses.insert(peer.node_id, addrs);
1232 }
1233 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1234 }
1235 }
1236
1237 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1238 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1239 self.store
1240 .providers(key)
1241 .into_iter()
1242 .filter_map(|p| {
1243 if p.is_expired(Instant::now()) {
1244 self.store.remove_provider(key, &p.provider);
1245 return None;
1246 }
1247
1248 if &p.provider != source {
1249 let node_id = p.provider;
1250 let multiaddrs = p.addresses;
1251 let connection_ty = if self.connected_peers.contains(&node_id) {
1252 ConnectionType::Connected
1253 } else {
1254 ConnectionType::NotConnected
1255 };
1256 if multiaddrs.is_empty() {
1257 // The provider is either the local node and we fill in
1258 // the local addresses on demand, or it is a legacy
1259 // provider record without addresses, in which case we
1260 // try to find addresses in the routing table, as was
1261 // done before provider records were stored along with
1262 // their addresses.
1263 if &node_id == self.kbuckets.local_key().preimage() {
1264 Some(
1265 self.listen_addresses
1266 .iter()
1267 .chain(self.external_addresses.iter())
1268 .cloned()
1269 .collect::<Vec<_>>(),
1270 )
1271 } else {
1272 let key = kbucket::Key::from(node_id);
1273 self.kbuckets
1274 .entry(&key)
1275 .as_mut()
1276 .and_then(|e| e.view())
1277 .map(|e| e.node.value.clone().into_vec())
1278 }
1279 } else {
1280 Some(multiaddrs)
1281 }
1282 .map(|multiaddrs| KadPeer {
1283 node_id,
1284 multiaddrs,
1285 connection_ty,
1286 })
1287 } else {
1288 None
1289 }
1290 })
1291 .take(self.queries.config().replication_factor.get())
1292 .collect()
1293 }
1294
1295 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1296 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1297 let info = QueryInfo::AddProvider {
1298 context,
1299 key: key.clone(),
1300 phase: AddProviderPhase::GetClosestPeers,
1301 };
1302 let target = kbucket::Key::new(key);
1303 let peers = self.kbuckets.closest_keys(&target);
1304 self.queries.add_iter_closest(target.clone(), peers, info);
1305 }
1306
1307 /// Starts an iterative `PUT_VALUE` query for the given record.
1308 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1309 let quorum = quorum.eval(self.queries.config().replication_factor);
1310 let target = kbucket::Key::new(record.key.clone());
1311 let peers = self.kbuckets.closest_keys(&target);
1312 let info = QueryInfo::PutRecord {
1313 record,
1314 quorum,
1315 context,
1316 phase: PutRecordPhase::GetClosestPeers,
1317 };
1318 self.queries.add_iter_closest(target.clone(), peers, info);
1319 }
1320
1321 /// Updates the routing table with a new connection status and address of a peer.
1322 fn connection_updated(
1323 &mut self,
1324 peer: PeerId,
1325 address: Option<Multiaddr>,
1326 new_status: NodeStatus,
1327 ) {
1328 let key = kbucket::Key::from(peer);
1329 match self.kbuckets.entry(&key) {
1330 Some(kbucket::Entry::Present(mut entry, old_status)) => {
1331 if old_status != new_status {
1332 entry.update(new_status)
1333 }
1334 if let Some(address) = address {
1335 if entry.value().insert(address) {
1336 self.queued_events.push_back(ToSwarm::GenerateEvent(
1337 Event::RoutingUpdated {
1338 peer,
1339 is_new_peer: false,
1340 addresses: entry.value().clone(),
1341 old_peer: None,
1342 bucket_range: self
1343 .kbuckets
1344 .bucket(&key)
1345 .map(|b| b.range())
1346 .expect("Not kbucket::Entry::SelfEntry."),
1347 },
1348 ))
1349 }
1350 }
1351 }
1352
1353 Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1354 if let Some(address) = address {
1355 entry.value().insert(address);
1356 }
1357 if old_status != new_status {
1358 entry.update(new_status);
1359 }
1360 }
1361
1362 Some(kbucket::Entry::Absent(entry)) => {
1363 // Only connected nodes with a known address are newly inserted.
1364 if new_status != NodeStatus::Connected {
1365 return;
1366 }
1367 match (address, self.kbucket_inserts) {
1368 (None, _) => {
1369 self.queued_events
1370 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1371 }
1372 (Some(a), BucketInserts::Manual) => {
1373 self.queued_events
1374 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1375 peer,
1376 address: a,
1377 }));
1378 }
1379 (Some(a), BucketInserts::OnConnected) => {
1380 let addresses = Addresses::new(a);
1381 match entry.insert(addresses.clone(), new_status) {
1382 kbucket::InsertResult::Inserted => {
1383 self.bootstrap_on_low_peers();
1384
1385 let event = Event::RoutingUpdated {
1386 peer,
1387 is_new_peer: true,
1388 addresses,
1389 old_peer: None,
1390 bucket_range: self
1391 .kbuckets
1392 .bucket(&key)
1393 .map(|b| b.range())
1394 .expect("Not kbucket::Entry::SelfEntry."),
1395 };
1396 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1397 }
1398 kbucket::InsertResult::Full => {
1399 tracing::debug!(
1400 %peer,
1401 "Bucket full. Peer not added to routing table"
1402 );
1403 let address = addresses.first().clone();
1404 self.queued_events.push_back(ToSwarm::GenerateEvent(
1405 Event::RoutablePeer { peer, address },
1406 ));
1407 }
1408 kbucket::InsertResult::Pending { disconnected } => {
1409 let address = addresses.first().clone();
1410 self.queued_events.push_back(ToSwarm::GenerateEvent(
1411 Event::PendingRoutablePeer { peer, address },
1412 ));
1413
1414 // `disconnected` might already be in the process of re-connecting.
1415 // In other words `disconnected` might have already re-connected but
1416 // is not yet confirmed to support the Kademlia protocol via
1417 // [`HandlerEvent::ProtocolConfirmed`].
1418 //
1419 // Only try dialing peer if not currently connected.
1420 if !self.connected_peers.contains(disconnected.preimage()) {
1421 self.queued_events.push_back(ToSwarm::Dial {
1422 opts: DialOpts::peer_id(disconnected.into_preimage())
1423 .build(),
1424 })
1425 }
1426 }
1427 }
1428 }
1429 }
1430 }
1431 _ => {}
1432 }
1433 }
1434
1435 /// A new peer has been inserted in the routing table but we check if the routing
1436 /// table is currently small (less that `K_VALUE` peers are present) and only
1437 /// trigger a bootstrap in that case
1438 fn bootstrap_on_low_peers(&mut self) {
1439 if self
1440 .kbuckets()
1441 .map(|kbucket| kbucket.num_entries())
1442 .sum::<usize>()
1443 < K_VALUE.get()
1444 {
1445 self.bootstrap_status.trigger();
1446 }
1447 }
1448
1449 /// Handles a finished (i.e. successful) query.
1450 fn query_finished(&mut self, q: Query) -> Option<Event> {
1451 let query_id = q.id();
1452 tracing::trace!(query=?query_id, "Query finished");
1453 match q.info {
1454 QueryInfo::Bootstrap {
1455 peer,
1456 remaining,
1457 mut step,
1458 } => {
1459 let local_key = *self.kbuckets.local_key();
1460 let mut remaining = remaining.unwrap_or_else(|| {
1461 debug_assert_eq!(&peer, local_key.preimage());
1462 // The lookup for the local key finished. To complete the bootstrap process,
1463 // a bucket refresh should be performed for every bucket farther away than
1464 // the first non-empty bucket (which are most likely no more than the last
1465 // few, i.e. farthest, buckets).
1466 self.kbuckets
1467 .iter()
1468 .skip_while(|b| b.is_empty())
1469 .skip(1) // Skip the bucket with the closest neighbour.
1470 .map(|b| {
1471 // Try to find a key that falls into the bucket. While such keys can
1472 // be generated fully deterministically, the current libp2p kademlia
1473 // wire protocol requires transmission of the preimages of the actual
1474 // keys in the DHT keyspace, hence for now this is just a "best effort"
1475 // to find a key that hashes into a specific bucket. The probabilities
1476 // of finding a key in the bucket `b` with as most 16 trials are as
1477 // follows:
1478 //
1479 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1480 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1481 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1482 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1483 // ...
1484 let mut target = kbucket::Key::from(PeerId::random());
1485 for _ in 0..16 {
1486 let d = local_key.distance(&target);
1487 if b.contains(&d) {
1488 break;
1489 }
1490 target = kbucket::Key::from(PeerId::random());
1491 }
1492 target
1493 })
1494 .collect::<Vec<_>>()
1495 .into_iter()
1496 });
1497
1498 let num_remaining = remaining.len() as u32;
1499
1500 if let Some(target) = remaining.next() {
1501 let info = QueryInfo::Bootstrap {
1502 peer: *target.preimage(),
1503 remaining: Some(remaining),
1504 step: step.next(),
1505 };
1506 let peers = self.kbuckets.closest_keys(&target);
1507 self.queries
1508 .continue_iter_closest(query_id, target, peers, info);
1509 } else {
1510 step.last = true;
1511 self.bootstrap_status.on_finish();
1512 };
1513
1514 Some(Event::OutboundQueryProgressed {
1515 id: query_id,
1516 stats: q.stats,
1517 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1518 peer,
1519 num_remaining,
1520 })),
1521 step,
1522 })
1523 }
1524
1525 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1526 step.last = true;
1527
1528 Some(Event::OutboundQueryProgressed {
1529 id: query_id,
1530 stats: q.stats,
1531 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1532 key,
1533 peers: q.peers.into_peerinfos_iter().collect(),
1534 })),
1535 step,
1536 })
1537 }
1538
1539 QueryInfo::GetProviders { mut step, .. } => {
1540 step.last = true;
1541
1542 Some(Event::OutboundQueryProgressed {
1543 id: query_id,
1544 stats: q.stats,
1545 result: QueryResult::GetProviders(Ok(
1546 GetProvidersOk::FinishedWithNoAdditionalRecord {
1547 closest_peers: q.peers.into_peerids_iter().collect(),
1548 },
1549 )),
1550 step,
1551 })
1552 }
1553
1554 QueryInfo::AddProvider {
1555 context,
1556 key,
1557 phase: AddProviderPhase::GetClosestPeers,
1558 } => {
1559 let provider_id = self.local_peer_id;
1560 let external_addresses = self.external_addresses.iter().cloned().collect();
1561 let info = QueryInfo::AddProvider {
1562 context,
1563 key,
1564 phase: AddProviderPhase::AddProvider {
1565 provider_id,
1566 external_addresses,
1567 get_closest_peers_stats: q.stats,
1568 },
1569 };
1570 self.queries
1571 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1572 None
1573 }
1574
1575 QueryInfo::AddProvider {
1576 context,
1577 key,
1578 phase:
1579 AddProviderPhase::AddProvider {
1580 get_closest_peers_stats,
1581 ..
1582 },
1583 } => match context {
1584 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1585 id: query_id,
1586 stats: get_closest_peers_stats.merge(q.stats),
1587 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1588 step: ProgressStep::first_and_last(),
1589 }),
1590 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1591 id: query_id,
1592 stats: get_closest_peers_stats.merge(q.stats),
1593 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1594 step: ProgressStep::first_and_last(),
1595 }),
1596 },
1597
1598 QueryInfo::GetRecord {
1599 key,
1600 mut step,
1601 found_a_record,
1602 cache_candidates,
1603 } => {
1604 step.last = true;
1605
1606 let results = if found_a_record {
1607 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1608 } else {
1609 Err(GetRecordError::NotFound {
1610 key,
1611 closest_peers: q.peers.into_peerids_iter().collect(),
1612 })
1613 };
1614 Some(Event::OutboundQueryProgressed {
1615 id: query_id,
1616 stats: q.stats,
1617 result: QueryResult::GetRecord(results),
1618 step,
1619 })
1620 }
1621
1622 QueryInfo::PutRecord {
1623 context,
1624 record,
1625 quorum,
1626 phase: PutRecordPhase::GetClosestPeers,
1627 } => {
1628 let info = QueryInfo::PutRecord {
1629 context,
1630 record,
1631 quorum,
1632 phase: PutRecordPhase::PutRecord {
1633 success: vec![],
1634 get_closest_peers_stats: q.stats,
1635 },
1636 };
1637 self.queries
1638 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1639 None
1640 }
1641
1642 QueryInfo::PutRecord {
1643 context,
1644 record,
1645 quorum,
1646 phase:
1647 PutRecordPhase::PutRecord {
1648 success,
1649 get_closest_peers_stats,
1650 },
1651 } => {
1652 let mk_result = |key: record::Key| {
1653 if success.len() >= quorum.get() {
1654 Ok(PutRecordOk { key })
1655 } else {
1656 Err(PutRecordError::QuorumFailed {
1657 key,
1658 quorum,
1659 success,
1660 })
1661 }
1662 };
1663 match context {
1664 PutRecordContext::Publish | PutRecordContext::Custom => {
1665 Some(Event::OutboundQueryProgressed {
1666 id: query_id,
1667 stats: get_closest_peers_stats.merge(q.stats),
1668 result: QueryResult::PutRecord(mk_result(record.key)),
1669 step: ProgressStep::first_and_last(),
1670 })
1671 }
1672 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1673 id: query_id,
1674 stats: get_closest_peers_stats.merge(q.stats),
1675 result: QueryResult::RepublishRecord(mk_result(record.key)),
1676 step: ProgressStep::first_and_last(),
1677 }),
1678 PutRecordContext::Replicate => {
1679 tracing::debug!(record=?record.key, "Record replicated");
1680 None
1681 }
1682 }
1683 }
1684 }
1685 }
1686
1687 /// Handles a query that timed out.
1688 fn query_timeout(&mut self, query: Query) -> Option<Event> {
1689 let query_id = query.id();
1690 tracing::trace!(query=?query_id, "Query timed out");
1691 match query.info {
1692 QueryInfo::Bootstrap {
1693 peer,
1694 mut remaining,
1695 mut step,
1696 } => {
1697 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1698
1699 // Continue with the next bootstrap query if `remaining` is not empty.
1700 if let Some((target, remaining)) =
1701 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1702 {
1703 let info = QueryInfo::Bootstrap {
1704 peer: target.into_preimage(),
1705 remaining: Some(remaining),
1706 step: step.next(),
1707 };
1708 let peers = self.kbuckets.closest_keys(&target);
1709 self.queries
1710 .continue_iter_closest(query_id, target, peers, info);
1711 } else {
1712 step.last = true;
1713 self.bootstrap_status.on_finish();
1714 }
1715
1716 Some(Event::OutboundQueryProgressed {
1717 id: query_id,
1718 stats: query.stats,
1719 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1720 peer,
1721 num_remaining,
1722 })),
1723 step,
1724 })
1725 }
1726
1727 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1728 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1729 id: query_id,
1730 stats: query.stats,
1731 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1732 step: ProgressStep::first_and_last(),
1733 },
1734 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1735 id: query_id,
1736 stats: query.stats,
1737 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1738 step: ProgressStep::first_and_last(),
1739 },
1740 }),
1741
1742 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1743 step.last = true;
1744 Some(Event::OutboundQueryProgressed {
1745 id: query_id,
1746 stats: query.stats,
1747 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1748 key,
1749 peers: query.peers.into_peerinfos_iter().collect(),
1750 })),
1751 step,
1752 })
1753 }
1754
1755 QueryInfo::PutRecord {
1756 record,
1757 quorum,
1758 context,
1759 phase,
1760 } => {
1761 let err = Err(PutRecordError::Timeout {
1762 key: record.key,
1763 quorum,
1764 success: match phase {
1765 PutRecordPhase::GetClosestPeers => vec![],
1766 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1767 },
1768 });
1769 match context {
1770 PutRecordContext::Publish | PutRecordContext::Custom => {
1771 Some(Event::OutboundQueryProgressed {
1772 id: query_id,
1773 stats: query.stats,
1774 result: QueryResult::PutRecord(err),
1775 step: ProgressStep::first_and_last(),
1776 })
1777 }
1778 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1779 id: query_id,
1780 stats: query.stats,
1781 result: QueryResult::RepublishRecord(err),
1782 step: ProgressStep::first_and_last(),
1783 }),
1784 PutRecordContext::Replicate => match phase {
1785 PutRecordPhase::GetClosestPeers => {
1786 tracing::warn!(
1787 "Locating closest peers for replication failed: {:?}",
1788 err
1789 );
1790 None
1791 }
1792 PutRecordPhase::PutRecord { .. } => {
1793 tracing::debug!("Replicating record failed: {:?}", err);
1794 None
1795 }
1796 },
1797 }
1798 }
1799
1800 QueryInfo::GetRecord { key, mut step, .. } => {
1801 step.last = true;
1802
1803 Some(Event::OutboundQueryProgressed {
1804 id: query_id,
1805 stats: query.stats,
1806 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1807 step,
1808 })
1809 }
1810
1811 QueryInfo::GetProviders { key, mut step, .. } => {
1812 step.last = true;
1813
1814 Some(Event::OutboundQueryProgressed {
1815 id: query_id,
1816 stats: query.stats,
1817 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1818 key,
1819 closest_peers: query.peers.into_peerids_iter().collect(),
1820 })),
1821 step,
1822 })
1823 }
1824 }
1825 }
1826
1827 /// Processes a record received from a peer.
1828 fn record_received(
1829 &mut self,
1830 source: PeerId,
1831 connection: ConnectionId,
1832 request_id: RequestId,
1833 mut record: Record,
1834 ) {
1835 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1836 // If the (alleged) publisher is the local node, do nothing. The record of
1837 // the original publisher should never change as a result of replication
1838 // and the publisher is always assumed to have the "right" value.
1839 self.queued_events.push_back(ToSwarm::NotifyHandler {
1840 peer_id: source,
1841 handler: NotifyHandler::One(connection),
1842 event: HandlerIn::PutRecordRes {
1843 key: record.key,
1844 value: record.value,
1845 request_id,
1846 },
1847 });
1848 return;
1849 }
1850
1851 let now = Instant::now();
1852
1853 // Calculate the expiration exponentially inversely proportional to the
1854 // number of nodes between the local node and the closest node to the key
1855 // (beyond the replication factor). This ensures avoiding over-caching
1856 // outside of the k closest nodes to a key.
1857 let target = kbucket::Key::new(record.key.clone());
1858 let num_between = self.kbuckets.count_nodes_between(&target);
1859 let k = self.queries.config().replication_factor.get();
1860 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1861 let expiration = self
1862 .record_ttl
1863 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1864 // The smaller TTL prevails. Only if neither TTL is set is the record
1865 // stored "forever".
1866 record.expires = record.expires.or(expiration).min(expiration);
1867
1868 if let Some(job) = self.put_record_job.as_mut() {
1869 // Ignore the record in the next run of the replication
1870 // job, since we can assume the sender replicated the
1871 // record to the k closest peers. Effectively, only
1872 // one of the k closest peers performs a replication
1873 // in the configured interval, assuming a shared interval.
1874 job.skip(record.key.clone())
1875 }
1876
1877 // While records received from a publisher, as well as records that do
1878 // not exist locally should always (attempted to) be stored, there is a
1879 // choice here w.r.t. the handling of replicated records whose keys refer
1880 // to records that exist locally: The value and / or the publisher may
1881 // either be overridden or left unchanged. At the moment and in the
1882 // absence of a decisive argument for another option, both are always
1883 // overridden as it avoids having to load the existing record in the
1884 // first place.
1885
1886 if !record.is_expired(now) {
1887 // The record is cloned because of the weird libp2p protocol
1888 // requirement to send back the value in the response, although this
1889 // is a waste of resources.
1890 match self.record_filtering {
1891 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1892 Ok(()) => {
1893 tracing::debug!(
1894 record=?record.key,
1895 "Record stored: {} bytes",
1896 record.value.len()
1897 );
1898 self.queued_events.push_back(ToSwarm::GenerateEvent(
1899 Event::InboundRequest {
1900 request: InboundRequest::PutRecord {
1901 source,
1902 connection,
1903 record: None,
1904 },
1905 },
1906 ));
1907 }
1908 Err(e) => {
1909 tracing::info!("Record not stored: {:?}", e);
1910 self.queued_events.push_back(ToSwarm::NotifyHandler {
1911 peer_id: source,
1912 handler: NotifyHandler::One(connection),
1913 event: HandlerIn::Reset(request_id),
1914 });
1915
1916 return;
1917 }
1918 },
1919 StoreInserts::FilterBoth => {
1920 self.queued_events
1921 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1922 request: InboundRequest::PutRecord {
1923 source,
1924 connection,
1925 record: Some(record.clone()),
1926 },
1927 }));
1928 }
1929 }
1930 }
1931
1932 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1933 // case where the record is discarded due to being expired. Given that
1934 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1935 // request, the remote perceives the local node as one node among the k
1936 // closest nodes to the target. In addition returning
1937 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1938 // information to a possibly malicious remote node.
1939 self.queued_events.push_back(ToSwarm::NotifyHandler {
1940 peer_id: source,
1941 handler: NotifyHandler::One(connection),
1942 event: HandlerIn::PutRecordRes {
1943 key: record.key,
1944 value: record.value,
1945 request_id,
1946 },
1947 })
1948 }
1949
1950 /// Processes a provider record received from a peer.
1951 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1952 if &provider.node_id != self.kbuckets.local_key().preimage() {
1953 let record = ProviderRecord {
1954 key,
1955 provider: provider.node_id,
1956 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1957 addresses: provider.multiaddrs,
1958 };
1959 match self.record_filtering {
1960 StoreInserts::Unfiltered => {
1961 if let Err(e) = self.store.add_provider(record) {
1962 tracing::info!("Provider record not stored: {:?}", e);
1963 return;
1964 }
1965
1966 self.queued_events
1967 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1968 request: InboundRequest::AddProvider { record: None },
1969 }));
1970 }
1971 StoreInserts::FilterBoth => {
1972 self.queued_events
1973 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1974 request: InboundRequest::AddProvider {
1975 record: Some(record),
1976 },
1977 }));
1978 }
1979 }
1980 }
1981 }
1982
1983 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1984 let key = kbucket::Key::from(peer_id);
1985
1986 if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1987 // TODO: Ideally, the address should only be removed if the error can
1988 // be classified as "permanent" but since `err` is currently a borrowed
1989 // trait object without a `'static` bound, even downcasting for inspection
1990 // of the error is not possible (and also not truly desirable or ergonomic).
1991 // The error passed in should rather be a dedicated enum.
1992 if addrs.remove(address).is_ok() {
1993 tracing::debug!(
1994 peer=%peer_id,
1995 %address,
1996 "Address removed from peer due to error."
1997 );
1998 } else {
1999 // Despite apparently having no reachable address (any longer),
2000 // the peer is kept in the routing table with the last address to avoid
2001 // (temporary) loss of network connectivity to "flush" the routing
2002 // table. Once in, a peer is only removed from the routing table
2003 // if it is the least recently connected peer, currently disconnected
2004 // and is unreachable in the context of another peer pending insertion
2005 // into the same bucket. This is handled transparently by the
2006 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2007 // within `Behaviour::poll`.
2008 tracing::debug!(
2009 peer=%peer_id,
2010 %address,
2011 "Last remaining address of peer is unreachable."
2012 );
2013 }
2014 }
2015
2016 for query in self.queries.iter_mut() {
2017 if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2018 addrs.retain(|a| a != address);
2019 }
2020 }
2021 }
2022
2023 fn on_connection_established(
2024 &mut self,
2025 ConnectionEstablished {
2026 peer_id,
2027 failed_addresses,
2028 other_established,
2029 ..
2030 }: ConnectionEstablished,
2031 ) {
2032 for addr in failed_addresses {
2033 self.address_failed(peer_id, addr);
2034 }
2035
2036 // Peer's first connection.
2037 if other_established == 0 {
2038 self.connected_peers.insert(peer_id);
2039 }
2040 }
2041
2042 fn on_address_change(
2043 &mut self,
2044 AddressChange {
2045 peer_id: peer,
2046 old,
2047 new,
2048 ..
2049 }: AddressChange,
2050 ) {
2051 let (old, new) = (old.get_remote_address(), new.get_remote_address());
2052
2053 // Update routing table.
2054 if let Some(addrs) = self
2055 .kbuckets
2056 .entry(&kbucket::Key::from(peer))
2057 .as_mut()
2058 .and_then(|e| e.value())
2059 {
2060 if addrs.replace(old, new) {
2061 tracing::debug!(
2062 %peer,
2063 old_address=%old,
2064 new_address=%new,
2065 "Old address replaced with new address for peer."
2066 );
2067 } else {
2068 tracing::debug!(
2069 %peer,
2070 old_address=%old,
2071 new_address=%new,
2072 "Old address not replaced with new address for peer as old address wasn't present.",
2073 );
2074 }
2075 } else {
2076 tracing::debug!(
2077 %peer,
2078 old_address=%old,
2079 new_address=%new,
2080 "Old address not replaced with new address for peer as peer is not present in the \
2081 routing table."
2082 );
2083 }
2084
2085 // Update query address cache.
2086 //
2087 // Given two connected nodes: local node A and remote node B. Say node B
2088 // is not in node A's routing table. Additionally node B is part of the
2089 // `Query::addresses` list of an ongoing query on node A. Say Node
2090 // B triggers an address change and then disconnects. Later on the
2091 // earlier mentioned query on node A would like to connect to node B.
2092 // Without replacing the address in the `Query::addresses` set node
2093 // A would attempt to dial the old and not the new address.
2094 //
2095 // While upholding correctness, iterating through all discovered
2096 // addresses of a peer in all currently ongoing queries might have a
2097 // large performance impact. If so, the code below might be worth
2098 // revisiting.
2099 for query in self.queries.iter_mut() {
2100 if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2101 for addr in addrs.iter_mut() {
2102 if addr == old {
2103 *addr = new.clone();
2104 }
2105 }
2106 }
2107 }
2108 }
2109
2110 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2111 let Some(peer_id) = peer_id else { return };
2112
2113 match error {
2114 DialError::LocalPeerId { .. }
2115 | DialError::WrongPeerId { .. }
2116 | DialError::Aborted
2117 | DialError::Denied { .. }
2118 | DialError::Transport(_)
2119 | DialError::NoAddresses => {
2120 if let DialError::Transport(addresses) = error {
2121 for (addr, _) in addresses {
2122 self.address_failed(peer_id, addr)
2123 }
2124 }
2125
2126 for query in self.queries.iter_mut() {
2127 query.on_failure(&peer_id);
2128 }
2129 }
2130 DialError::DialPeerConditionFalse(
2131 dial_opts::PeerCondition::Disconnected
2132 | dial_opts::PeerCondition::NotDialing
2133 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2134 ) => {
2135 // We might (still) be connected, or about to be connected, thus do not report the
2136 // failure to the queries.
2137 }
2138 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2139 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2140 }
2141 }
2142 }
2143
2144 fn on_connection_closed(
2145 &mut self,
2146 ConnectionClosed {
2147 peer_id,
2148 remaining_established,
2149 connection_id,
2150 ..
2151 }: ConnectionClosed,
2152 ) {
2153 self.connections.remove(&connection_id);
2154
2155 if remaining_established == 0 {
2156 for query in self.queries.iter_mut() {
2157 query.on_failure(&peer_id);
2158 }
2159 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2160 self.connected_peers.remove(&peer_id);
2161 }
2162 }
2163
2164 /// Preloads a new [`Handler`] with requests that are waiting
2165 /// to be sent to the newly connected peer.
2166 fn preload_new_handler(
2167 &mut self,
2168 handler: &mut Handler,
2169 connection_id: ConnectionId,
2170 peer: PeerId,
2171 ) {
2172 self.connections.insert(connection_id, peer);
2173 // Queue events for sending pending RPCs to the connected peer.
2174 // There can be only one pending RPC for a particular peer and query per definition.
2175 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2176 q.pending_rpcs
2177 .iter()
2178 .position(|(p, _)| p == &peer)
2179 .map(|p| q.pending_rpcs.remove(p))
2180 }) {
2181 handler.on_behaviour_event(event)
2182 }
2183 }
2184}
2185
2186/// Exponentially decrease the given duration (base 2).
2187fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2188 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2189}
2190
2191impl<TStore> NetworkBehaviour for Behaviour<TStore>
2192where
2193 TStore: RecordStore + Send + 'static,
2194{
2195 type ConnectionHandler = Handler;
2196 type ToSwarm = Event;
2197
2198 fn handle_established_inbound_connection(
2199 &mut self,
2200 connection_id: ConnectionId,
2201 peer: PeerId,
2202 local_addr: &Multiaddr,
2203 remote_addr: &Multiaddr,
2204 ) -> Result<THandler<Self>, ConnectionDenied> {
2205 let connected_point = ConnectedPoint::Listener {
2206 local_addr: local_addr.clone(),
2207 send_back_addr: remote_addr.clone(),
2208 };
2209
2210 let mut handler = Handler::new(
2211 self.protocol_config.clone(),
2212 connected_point,
2213 peer,
2214 self.mode,
2215 );
2216 self.preload_new_handler(&mut handler, connection_id, peer);
2217
2218 Ok(handler)
2219 }
2220
2221 fn handle_established_outbound_connection(
2222 &mut self,
2223 connection_id: ConnectionId,
2224 peer: PeerId,
2225 addr: &Multiaddr,
2226 role_override: Endpoint,
2227 port_use: PortUse,
2228 ) -> Result<THandler<Self>, ConnectionDenied> {
2229 let connected_point = ConnectedPoint::Dialer {
2230 address: addr.clone(),
2231 role_override,
2232 port_use,
2233 };
2234
2235 let mut handler = Handler::new(
2236 self.protocol_config.clone(),
2237 connected_point,
2238 peer,
2239 self.mode,
2240 );
2241 self.preload_new_handler(&mut handler, connection_id, peer);
2242
2243 Ok(handler)
2244 }
2245
2246 fn handle_pending_outbound_connection(
2247 &mut self,
2248 _connection_id: ConnectionId,
2249 maybe_peer: Option<PeerId>,
2250 _addresses: &[Multiaddr],
2251 _effective_role: Endpoint,
2252 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2253 let Some(peer_id) = maybe_peer else {
2254 return Ok(vec![]);
2255 };
2256
2257 // We should order addresses from decreasing likelihood of connectivity, so start with
2258 // the addresses of that peer in the k-buckets.
2259 let key = kbucket::Key::from(peer_id);
2260 let mut peer_addrs =
2261 if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2262 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2263 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2264 addrs
2265 } else {
2266 Vec::new()
2267 };
2268
2269 // We add to that a temporary list of addresses from the ongoing queries.
2270 for query in self.queries.iter() {
2271 if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2272 peer_addrs.extend(addrs.iter().cloned())
2273 }
2274 }
2275
2276 Ok(peer_addrs)
2277 }
2278
2279 fn on_connection_handler_event(
2280 &mut self,
2281 source: PeerId,
2282 connection: ConnectionId,
2283 event: THandlerOutEvent<Self>,
2284 ) {
2285 match event {
2286 HandlerEvent::ProtocolConfirmed { endpoint } => {
2287 debug_assert!(self.connected_peers.contains(&source));
2288 // The remote's address can only be put into the routing table,
2289 // and thus shared with other nodes, if the local node is the dialer,
2290 // since the remote address on an inbound connection may be specific
2291 // to that connection (e.g. typically the TCP port numbers).
2292 let address = match endpoint {
2293 ConnectedPoint::Dialer { address, .. } => Some(address),
2294 ConnectedPoint::Listener { .. } => None,
2295 };
2296
2297 self.connection_updated(source, address, NodeStatus::Connected);
2298 }
2299
2300 HandlerEvent::ProtocolNotSupported { endpoint } => {
2301 let address = match endpoint {
2302 ConnectedPoint::Dialer { address, .. } => Some(address),
2303 ConnectedPoint::Listener { .. } => None,
2304 };
2305 self.connection_updated(source, address, NodeStatus::Disconnected);
2306 }
2307
2308 HandlerEvent::FindNodeReq { key, request_id } => {
2309 let closer_peers = self
2310 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2311 .collect::<Vec<_>>();
2312
2313 self.queued_events
2314 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2315 request: InboundRequest::FindNode {
2316 num_closer_peers: closer_peers.len(),
2317 },
2318 }));
2319
2320 self.queued_events.push_back(ToSwarm::NotifyHandler {
2321 peer_id: source,
2322 handler: NotifyHandler::One(connection),
2323 event: HandlerIn::FindNodeRes {
2324 closer_peers,
2325 request_id,
2326 },
2327 });
2328 }
2329
2330 HandlerEvent::FindNodeRes {
2331 closer_peers,
2332 query_id,
2333 } => {
2334 self.discovered(&query_id, &source, closer_peers.iter());
2335 }
2336
2337 HandlerEvent::GetProvidersReq { key, request_id } => {
2338 let provider_peers = self.provider_peers(&key, &source);
2339 let closer_peers = self
2340 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2341 .collect::<Vec<_>>();
2342
2343 self.queued_events
2344 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2345 request: InboundRequest::GetProvider {
2346 num_closer_peers: closer_peers.len(),
2347 num_provider_peers: provider_peers.len(),
2348 },
2349 }));
2350
2351 self.queued_events.push_back(ToSwarm::NotifyHandler {
2352 peer_id: source,
2353 handler: NotifyHandler::One(connection),
2354 event: HandlerIn::GetProvidersRes {
2355 closer_peers,
2356 provider_peers,
2357 request_id,
2358 },
2359 });
2360 }
2361
2362 HandlerEvent::GetProvidersRes {
2363 closer_peers,
2364 provider_peers,
2365 query_id,
2366 } => {
2367 let peers = closer_peers.iter().chain(provider_peers.iter());
2368 self.discovered(&query_id, &source, peers);
2369 if let Some(query) = self.queries.get_mut(&query_id) {
2370 let stats = query.stats().clone();
2371 if let QueryInfo::GetProviders {
2372 ref key,
2373 ref mut providers_found,
2374 ref mut step,
2375 ..
2376 } = query.info
2377 {
2378 *providers_found += provider_peers.len();
2379 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2380
2381 self.queued_events.push_back(ToSwarm::GenerateEvent(
2382 Event::OutboundQueryProgressed {
2383 id: query_id,
2384 result: QueryResult::GetProviders(Ok(
2385 GetProvidersOk::FoundProviders {
2386 key: key.clone(),
2387 providers,
2388 },
2389 )),
2390 step: step.clone(),
2391 stats,
2392 },
2393 ));
2394 *step = step.next();
2395 }
2396 }
2397 }
2398 HandlerEvent::QueryError { query_id, error } => {
2399 tracing::debug!(
2400 peer=%source,
2401 query=?query_id,
2402 "Request to peer in query failed with {:?}",
2403 error
2404 );
2405 // If the query to which the error relates is still active,
2406 // signal the failure w.r.t. `source`.
2407 if let Some(query) = self.queries.get_mut(&query_id) {
2408 query.on_failure(&source)
2409 }
2410 }
2411
2412 HandlerEvent::AddProvider { key, provider } => {
2413 // Only accept a provider record from a legitimate peer.
2414 if provider.node_id != source {
2415 return;
2416 }
2417
2418 self.provider_received(key, provider);
2419 }
2420
2421 HandlerEvent::GetRecord { key, request_id } => {
2422 // Lookup the record locally.
2423 let record = match self.store.get(&key) {
2424 Some(record) => {
2425 if record.is_expired(Instant::now()) {
2426 self.store.remove(&key);
2427 None
2428 } else {
2429 Some(record.into_owned())
2430 }
2431 }
2432 None => None,
2433 };
2434
2435 let closer_peers = self
2436 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2437 .collect::<Vec<_>>();
2438
2439 self.queued_events
2440 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2441 request: InboundRequest::GetRecord {
2442 num_closer_peers: closer_peers.len(),
2443 present_locally: record.is_some(),
2444 },
2445 }));
2446
2447 self.queued_events.push_back(ToSwarm::NotifyHandler {
2448 peer_id: source,
2449 handler: NotifyHandler::One(connection),
2450 event: HandlerIn::GetRecordRes {
2451 record,
2452 closer_peers,
2453 request_id,
2454 },
2455 });
2456 }
2457
2458 HandlerEvent::GetRecordRes {
2459 record,
2460 closer_peers,
2461 query_id,
2462 } => {
2463 if let Some(query) = self.queries.get_mut(&query_id) {
2464 let stats = query.stats().clone();
2465 if let QueryInfo::GetRecord {
2466 key,
2467 ref mut step,
2468 ref mut found_a_record,
2469 cache_candidates,
2470 } = &mut query.info
2471 {
2472 if let Some(record) = record {
2473 *found_a_record = true;
2474 let record = PeerRecord {
2475 peer: Some(source),
2476 record,
2477 };
2478
2479 self.queued_events.push_back(ToSwarm::GenerateEvent(
2480 Event::OutboundQueryProgressed {
2481 id: query_id,
2482 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2483 record,
2484 ))),
2485 step: step.clone(),
2486 stats,
2487 },
2488 ));
2489
2490 *step = step.next();
2491 } else {
2492 tracing::trace!(record=?key, %source, "Record not found at source");
2493 if let Caching::Enabled { max_peers } = self.caching {
2494 let source_key = kbucket::Key::from(source);
2495 let target_key = kbucket::Key::from(key.clone());
2496 let distance = source_key.distance(&target_key);
2497 cache_candidates.insert(distance, source);
2498 if cache_candidates.len() > max_peers as usize {
2499 cache_candidates.pop_last();
2500 }
2501 }
2502 }
2503 }
2504 }
2505
2506 self.discovered(&query_id, &source, closer_peers.iter());
2507 }
2508
2509 HandlerEvent::PutRecord { record, request_id } => {
2510 self.record_received(source, connection, request_id, record);
2511 }
2512
2513 HandlerEvent::PutRecordRes { query_id, .. } => {
2514 if let Some(query) = self.queries.get_mut(&query_id) {
2515 query.on_success(&source, vec![]);
2516 if let QueryInfo::PutRecord {
2517 phase: PutRecordPhase::PutRecord { success, .. },
2518 quorum,
2519 ..
2520 } = &mut query.info
2521 {
2522 success.push(source);
2523
2524 let quorum = quorum.get();
2525 if success.len() >= quorum {
2526 let peers = success.clone();
2527 let finished = query.try_finish(peers.iter());
2528 if !finished {
2529 tracing::debug!(
2530 peer=%source,
2531 query=?query_id,
2532 "PutRecord query reached quorum ({}/{}) with response \
2533 from peer but could not yet finish.",
2534 peers.len(),
2535 quorum,
2536 );
2537 }
2538 }
2539 }
2540 }
2541 }
2542 };
2543 }
2544
2545 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2546 fn poll(
2547 &mut self,
2548 cx: &mut Context<'_>,
2549 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2550 let now = Instant::now();
2551
2552 // Calculate the available capacity for queries triggered by background jobs.
2553 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2554
2555 // Run the periodic provider announcement job.
2556 if let Some(mut job) = self.add_provider_job.take() {
2557 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2558 for i in 0..num {
2559 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2560 self.start_add_provider(r.key, AddProviderContext::Republish)
2561 } else {
2562 jobs_query_capacity -= i;
2563 break;
2564 }
2565 }
2566 self.add_provider_job = Some(job);
2567 }
2568
2569 // Run the periodic record replication / publication job.
2570 if let Some(mut job) = self.put_record_job.take() {
2571 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2572 for _ in 0..num {
2573 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2574 let context =
2575 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2576 PutRecordContext::Republish
2577 } else {
2578 PutRecordContext::Replicate
2579 };
2580 self.start_put_record(r, Quorum::All, context)
2581 } else {
2582 break;
2583 }
2584 }
2585 self.put_record_job = Some(job);
2586 }
2587
2588 // Poll bootstrap periodically and automatically.
2589 if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2590 if let Err(e) = self.bootstrap() {
2591 tracing::warn!("Failed to trigger bootstrap: {e}");
2592 }
2593 }
2594
2595 loop {
2596 // Drain queued events first.
2597 if let Some(event) = self.queued_events.pop_front() {
2598 return Poll::Ready(event);
2599 }
2600
2601 // Drain applied pending entries from the routing table.
2602 if let Some(entry) = self.kbuckets.take_applied_pending() {
2603 let kbucket::Node { key, value } = entry.inserted;
2604 let peer_id = key.into_preimage();
2605 self.queued_events
2606 .push_back(ToSwarm::NewExternalAddrOfPeer {
2607 peer_id,
2608 address: value.first().clone(),
2609 });
2610 let event = Event::RoutingUpdated {
2611 bucket_range: self
2612 .kbuckets
2613 .bucket(&key)
2614 .map(|b| b.range())
2615 .expect("Self to never be applied from pending."),
2616 peer: peer_id,
2617 is_new_peer: true,
2618 addresses: value,
2619 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2620 };
2621 return Poll::Ready(ToSwarm::GenerateEvent(event));
2622 }
2623
2624 // Look for a finished query.
2625 loop {
2626 match self.queries.poll(now) {
2627 QueryPoolState::Finished(q) => {
2628 if let Some(event) = self.query_finished(q) {
2629 return Poll::Ready(ToSwarm::GenerateEvent(event));
2630 }
2631 }
2632 QueryPoolState::Timeout(q) => {
2633 if let Some(event) = self.query_timeout(q) {
2634 return Poll::Ready(ToSwarm::GenerateEvent(event));
2635 }
2636 }
2637 QueryPoolState::Waiting(Some((query, peer_id))) => {
2638 let event = query.info.to_request(query.id());
2639 // TODO: AddProvider requests yield no response, so the query completes
2640 // as soon as all requests have been sent. However, the handler should
2641 // better emit an event when the request has been sent (and report
2642 // an error if sending fails), instead of immediately reporting
2643 // "success" somewhat prematurely here.
2644 if let QueryInfo::AddProvider {
2645 phase: AddProviderPhase::AddProvider { .. },
2646 ..
2647 } = &query.info
2648 {
2649 query.on_success(&peer_id, vec![])
2650 }
2651
2652 if self.connected_peers.contains(&peer_id) {
2653 self.queued_events.push_back(ToSwarm::NotifyHandler {
2654 peer_id,
2655 event,
2656 handler: NotifyHandler::Any,
2657 });
2658 } else if &peer_id != self.kbuckets.local_key().preimage() {
2659 query.pending_rpcs.push((peer_id, event));
2660 self.queued_events.push_back(ToSwarm::Dial {
2661 opts: DialOpts::peer_id(peer_id).build(),
2662 });
2663 }
2664 }
2665 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2666 }
2667 }
2668
2669 // No immediate event was produced as a result of a finished query.
2670 // If no new events have been queued either, signal `NotReady` to
2671 // be polled again later.
2672 if self.queued_events.is_empty() {
2673 self.no_events_waker = Some(cx.waker().clone());
2674
2675 return Poll::Pending;
2676 }
2677 }
2678 }
2679
2680 fn on_swarm_event(&mut self, event: FromSwarm) {
2681 self.listen_addresses.on_swarm_event(&event);
2682 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2683
2684 if self.auto_mode && external_addresses_changed {
2685 self.determine_mode_from_external_addresses();
2686 }
2687
2688 match event {
2689 FromSwarm::ConnectionEstablished(connection_established) => {
2690 self.on_connection_established(connection_established)
2691 }
2692 FromSwarm::ConnectionClosed(connection_closed) => {
2693 self.on_connection_closed(connection_closed)
2694 }
2695 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2696 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2697 FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2698 // A new listen addr was just discovered and we have no connected peers,
2699 // it can mean that our network interfaces were not up but they are now
2700 // so it might be a good idea to trigger a bootstrap.
2701 self.bootstrap_status.trigger();
2702 }
2703 _ => {}
2704 }
2705 }
2706}
2707
2708/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2709#[derive(Debug, Clone, PartialEq, Eq)]
2710pub struct PeerInfo {
2711 pub peer_id: PeerId,
2712 pub addrs: Vec<Multiaddr>,
2713}
2714
2715/// A quorum w.r.t. the configured replication factor specifies the minimum
2716/// number of distinct nodes that must be successfully contacted in order
2717/// for a query to succeed.
2718#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2719pub enum Quorum {
2720 One,
2721 Majority,
2722 All,
2723 N(NonZeroUsize),
2724}
2725
2726impl Quorum {
2727 /// Evaluate the quorum w.r.t a given total (number of peers).
2728 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2729 match self {
2730 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2731 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2732 Quorum::All => total,
2733 Quorum::N(n) => NonZeroUsize::min(total, *n),
2734 }
2735 }
2736}
2737
2738/// A record either received by the given peer or retrieved from the local
2739/// record store.
2740#[derive(Debug, Clone, PartialEq, Eq)]
2741pub struct PeerRecord {
2742 /// The peer from whom the record was received. `None` if the record was
2743 /// retrieved from local storage.
2744 pub peer: Option<PeerId>,
2745 pub record: Record,
2746}
2747
2748//////////////////////////////////////////////////////////////////////////////
2749// Events
2750
2751/// The events produced by the `Kademlia` behaviour.
2752///
2753/// See [`NetworkBehaviour::poll`].
2754#[derive(Debug, Clone)]
2755#[allow(clippy::large_enum_variant)]
2756pub enum Event {
2757 /// An inbound request has been received and handled.
2758 // Note on the difference between 'request' and 'query': A request is a
2759 // single request-response style exchange with a single remote peer. A query
2760 // is made of multiple requests across multiple remote peers.
2761 InboundRequest { request: InboundRequest },
2762
2763 /// An outbound query has made progress.
2764 OutboundQueryProgressed {
2765 /// The ID of the query that finished.
2766 id: QueryId,
2767 /// The intermediate result of the query.
2768 result: QueryResult,
2769 /// Execution statistics from the query.
2770 stats: QueryStats,
2771 /// Indicates which event this is, if therer are multiple responses for a single query.
2772 step: ProgressStep,
2773 },
2774
2775 /// The routing table has been updated with a new peer and / or
2776 /// address, thereby possibly evicting another peer.
2777 RoutingUpdated {
2778 /// The ID of the peer that was added or updated.
2779 peer: PeerId,
2780 /// Whether this is a new peer and was thus just added to the routing
2781 /// table, or whether it is an existing peer who's addresses changed.
2782 is_new_peer: bool,
2783 /// The full list of known addresses of `peer`.
2784 addresses: Addresses,
2785 /// Returns the minimum inclusive and maximum inclusive distance for
2786 /// the bucket of the peer.
2787 bucket_range: (Distance, Distance),
2788 /// The ID of the peer that was evicted from the routing table to make
2789 /// room for the new peer, if any.
2790 old_peer: Option<PeerId>,
2791 },
2792
2793 /// A peer has connected for whom no listen address is known.
2794 ///
2795 /// If the peer is to be added to the routing table, a known
2796 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2797 UnroutablePeer { peer: PeerId },
2798
2799 /// A connection to a peer has been established for whom a listen address
2800 /// is known but the peer has not been added to the routing table either
2801 /// because [`BucketInserts::Manual`] is configured or because
2802 /// the corresponding bucket is full.
2803 ///
2804 /// If the peer is to be included in the routing table, it must
2805 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2806 /// removing another peer.
2807 ///
2808 /// See [`Behaviour::kbucket`] for insight into the contents of
2809 /// the k-bucket of `peer`.
2810 RoutablePeer { peer: PeerId, address: Multiaddr },
2811
2812 /// A connection to a peer has been established for whom a listen address
2813 /// is known but the peer is only pending insertion into the routing table
2814 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2815 /// may not make it into the routing table.
2816 ///
2817 /// If the peer is to be unconditionally included in the routing table,
2818 /// it should be explicitly added via [`Behaviour::add_address`] after
2819 /// removing another peer.
2820 ///
2821 /// See [`Behaviour::kbucket`] for insight into the contents of
2822 /// the k-bucket of `peer`.
2823 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2824
2825 /// This peer's mode has been updated automatically.
2826 ///
2827 /// This happens in response to an external
2828 /// address being added or removed.
2829 ModeChanged { new_mode: Mode },
2830}
2831
2832/// Information about progress events.
2833#[derive(Debug, Clone)]
2834pub struct ProgressStep {
2835 /// The index into the event
2836 pub count: NonZeroUsize,
2837 /// Is this the final event?
2838 pub last: bool,
2839}
2840
2841impl ProgressStep {
2842 fn first() -> Self {
2843 Self {
2844 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2845 last: false,
2846 }
2847 }
2848
2849 fn first_and_last() -> Self {
2850 let mut first = ProgressStep::first();
2851 first.last = true;
2852 first
2853 }
2854
2855 fn next(&self) -> Self {
2856 assert!(!self.last);
2857 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2858
2859 Self { count, last: false }
2860 }
2861}
2862
2863/// Information about a received and handled inbound request.
2864#[derive(Debug, Clone)]
2865pub enum InboundRequest {
2866 /// Request for the list of nodes whose IDs are the closest to `key`.
2867 FindNode { num_closer_peers: usize },
2868 /// Same as `FindNode`, but should also return the entries of the local
2869 /// providers list for this key.
2870 GetProvider {
2871 num_closer_peers: usize,
2872 num_provider_peers: usize,
2873 },
2874 /// A peer sent an add provider request.
2875 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2876 /// included.
2877 ///
2878 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2879 AddProvider { record: Option<ProviderRecord> },
2880 /// Request to retrieve a record.
2881 GetRecord {
2882 num_closer_peers: usize,
2883 present_locally: bool,
2884 },
2885 /// A peer sent a put record request.
2886 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2887 ///
2888 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2889 PutRecord {
2890 source: PeerId,
2891 connection: ConnectionId,
2892 record: Option<Record>,
2893 },
2894}
2895
2896/// The results of Kademlia queries.
2897#[derive(Debug, Clone)]
2898pub enum QueryResult {
2899 /// The result of [`Behaviour::bootstrap`].
2900 Bootstrap(BootstrapResult),
2901
2902 /// The result of [`Behaviour::get_closest_peers`].
2903 GetClosestPeers(GetClosestPeersResult),
2904
2905 /// The result of [`Behaviour::get_providers`].
2906 GetProviders(GetProvidersResult),
2907
2908 /// The result of [`Behaviour::start_providing`].
2909 StartProviding(AddProviderResult),
2910
2911 /// The result of a (automatic) republishing of a provider record.
2912 RepublishProvider(AddProviderResult),
2913
2914 /// The result of [`Behaviour::get_record`].
2915 GetRecord(GetRecordResult),
2916
2917 /// The result of [`Behaviour::put_record`].
2918 PutRecord(PutRecordResult),
2919
2920 /// The result of a (automatic) republishing of a (value-)record.
2921 RepublishRecord(PutRecordResult),
2922}
2923
2924/// The result of [`Behaviour::get_record`].
2925pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2926
2927/// The successful result of [`Behaviour::get_record`].
2928#[derive(Debug, Clone)]
2929#[allow(clippy::large_enum_variant)]
2930pub enum GetRecordOk {
2931 FoundRecord(PeerRecord),
2932 FinishedWithNoAdditionalRecord {
2933 /// If caching is enabled, these are the peers closest
2934 /// _to the record key_ (not the local node) that were queried but
2935 /// did not return the record, sorted by distance to the record key
2936 /// from closest to farthest. How many of these are tracked is configured
2937 /// by [`Config::set_caching`].
2938 ///
2939 /// Writing back the cache at these peers is a manual operation.
2940 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2941 /// after selecting one of the returned records.
2942 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2943 },
2944}
2945
2946/// The error result of [`Behaviour::get_record`].
2947#[derive(Debug, Clone, Error)]
2948pub enum GetRecordError {
2949 #[error("the record was not found")]
2950 NotFound {
2951 key: record::Key,
2952 closest_peers: Vec<PeerId>,
2953 },
2954 #[error("the quorum failed; needed {quorum} peers")]
2955 QuorumFailed {
2956 key: record::Key,
2957 records: Vec<PeerRecord>,
2958 quorum: NonZeroUsize,
2959 },
2960 #[error("the request timed out")]
2961 Timeout { key: record::Key },
2962}
2963
2964impl GetRecordError {
2965 /// Gets the key of the record for which the operation failed.
2966 pub fn key(&self) -> &record::Key {
2967 match self {
2968 GetRecordError::QuorumFailed { key, .. } => key,
2969 GetRecordError::Timeout { key, .. } => key,
2970 GetRecordError::NotFound { key, .. } => key,
2971 }
2972 }
2973
2974 /// Extracts the key of the record for which the operation failed,
2975 /// consuming the error.
2976 pub fn into_key(self) -> record::Key {
2977 match self {
2978 GetRecordError::QuorumFailed { key, .. } => key,
2979 GetRecordError::Timeout { key, .. } => key,
2980 GetRecordError::NotFound { key, .. } => key,
2981 }
2982 }
2983}
2984
2985/// The result of [`Behaviour::put_record`].
2986pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2987
2988/// The successful result of [`Behaviour::put_record`].
2989#[derive(Debug, Clone)]
2990pub struct PutRecordOk {
2991 pub key: record::Key,
2992}
2993
2994/// The error result of [`Behaviour::put_record`].
2995#[derive(Debug, Clone, Error)]
2996pub enum PutRecordError {
2997 #[error("the quorum failed; needed {quorum} peers")]
2998 QuorumFailed {
2999 key: record::Key,
3000 /// [`PeerId`]s of the peers the record was successfully stored on.
3001 success: Vec<PeerId>,
3002 quorum: NonZeroUsize,
3003 },
3004 #[error("the request timed out")]
3005 Timeout {
3006 key: record::Key,
3007 /// [`PeerId`]s of the peers the record was successfully stored on.
3008 success: Vec<PeerId>,
3009 quorum: NonZeroUsize,
3010 },
3011}
3012
3013impl PutRecordError {
3014 /// Gets the key of the record for which the operation failed.
3015 pub fn key(&self) -> &record::Key {
3016 match self {
3017 PutRecordError::QuorumFailed { key, .. } => key,
3018 PutRecordError::Timeout { key, .. } => key,
3019 }
3020 }
3021
3022 /// Extracts the key of the record for which the operation failed,
3023 /// consuming the error.
3024 pub fn into_key(self) -> record::Key {
3025 match self {
3026 PutRecordError::QuorumFailed { key, .. } => key,
3027 PutRecordError::Timeout { key, .. } => key,
3028 }
3029 }
3030}
3031
3032/// The result of [`Behaviour::bootstrap`].
3033pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3034
3035/// The successful result of [`Behaviour::bootstrap`].
3036#[derive(Debug, Clone)]
3037pub struct BootstrapOk {
3038 pub peer: PeerId,
3039 pub num_remaining: u32,
3040}
3041
3042/// The error result of [`Behaviour::bootstrap`].
3043#[derive(Debug, Clone, Error)]
3044pub enum BootstrapError {
3045 #[error("the request timed out")]
3046 Timeout {
3047 peer: PeerId,
3048 num_remaining: Option<u32>,
3049 },
3050}
3051
3052/// The result of [`Behaviour::get_closest_peers`].
3053pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3054
3055/// The successful result of [`Behaviour::get_closest_peers`].
3056#[derive(Debug, Clone)]
3057pub struct GetClosestPeersOk {
3058 pub key: Vec<u8>,
3059 pub peers: Vec<PeerInfo>,
3060}
3061
3062/// The error result of [`Behaviour::get_closest_peers`].
3063#[derive(Debug, Clone, Error)]
3064pub enum GetClosestPeersError {
3065 #[error("the request timed out")]
3066 Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3067}
3068
3069impl GetClosestPeersError {
3070 /// Gets the key for which the operation failed.
3071 pub fn key(&self) -> &Vec<u8> {
3072 match self {
3073 GetClosestPeersError::Timeout { key, .. } => key,
3074 }
3075 }
3076
3077 /// Extracts the key for which the operation failed,
3078 /// consuming the error.
3079 pub fn into_key(self) -> Vec<u8> {
3080 match self {
3081 GetClosestPeersError::Timeout { key, .. } => key,
3082 }
3083 }
3084}
3085
3086/// The result of [`Behaviour::get_providers`].
3087pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3088
3089/// The successful result of [`Behaviour::get_providers`].
3090#[derive(Debug, Clone)]
3091pub enum GetProvidersOk {
3092 FoundProviders {
3093 key: record::Key,
3094 /// The new set of providers discovered.
3095 providers: HashSet<PeerId>,
3096 },
3097 FinishedWithNoAdditionalRecord {
3098 closest_peers: Vec<PeerId>,
3099 },
3100}
3101
3102/// The error result of [`Behaviour::get_providers`].
3103#[derive(Debug, Clone, Error)]
3104pub enum GetProvidersError {
3105 #[error("the request timed out")]
3106 Timeout {
3107 key: record::Key,
3108 closest_peers: Vec<PeerId>,
3109 },
3110}
3111
3112impl GetProvidersError {
3113 /// Gets the key for which the operation failed.
3114 pub fn key(&self) -> &record::Key {
3115 match self {
3116 GetProvidersError::Timeout { key, .. } => key,
3117 }
3118 }
3119
3120 /// Extracts the key for which the operation failed,
3121 /// consuming the error.
3122 pub fn into_key(self) -> record::Key {
3123 match self {
3124 GetProvidersError::Timeout { key, .. } => key,
3125 }
3126 }
3127}
3128
3129/// The result of publishing a provider record.
3130pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3131
3132/// The successful result of publishing a provider record.
3133#[derive(Debug, Clone)]
3134pub struct AddProviderOk {
3135 pub key: record::Key,
3136}
3137
3138/// The possible errors when publishing a provider record.
3139#[derive(Debug, Clone, Error)]
3140pub enum AddProviderError {
3141 #[error("the request timed out")]
3142 Timeout { key: record::Key },
3143}
3144
3145impl AddProviderError {
3146 /// Gets the key for which the operation failed.
3147 pub fn key(&self) -> &record::Key {
3148 match self {
3149 AddProviderError::Timeout { key, .. } => key,
3150 }
3151 }
3152
3153 /// Extracts the key for which the operation failed,
3154 pub fn into_key(self) -> record::Key {
3155 match self {
3156 AddProviderError::Timeout { key, .. } => key,
3157 }
3158 }
3159}
3160
3161impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3162 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3163 KadPeer {
3164 node_id: e.node.key.into_preimage(),
3165 multiaddrs: e.node.value.into_vec(),
3166 connection_ty: match e.status {
3167 NodeStatus::Connected => ConnectionType::Connected,
3168 NodeStatus::Disconnected => ConnectionType::NotConnected,
3169 },
3170 }
3171 }
3172}
3173
3174/// The context of a [`QueryInfo::AddProvider`] query.
3175#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3176pub enum AddProviderContext {
3177 /// The context is a [`Behaviour::start_providing`] operation.
3178 Publish,
3179 /// The context is periodic republishing of provider announcements
3180 /// initiated earlier via [`Behaviour::start_providing`].
3181 Republish,
3182}
3183
3184/// The context of a [`QueryInfo::PutRecord`] query.
3185#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3186pub enum PutRecordContext {
3187 /// The context is a [`Behaviour::put_record`] operation.
3188 Publish,
3189 /// The context is periodic republishing of records stored
3190 /// earlier via [`Behaviour::put_record`].
3191 Republish,
3192 /// The context is periodic replication (i.e. without extending
3193 /// the record TTL) of stored records received earlier from another peer.
3194 Replicate,
3195 /// The context is a custom store operation targeting specific
3196 /// peers initiated by [`Behaviour::put_record_to`].
3197 Custom,
3198}
3199
3200/// Information about a running query.
3201#[derive(Debug, Clone)]
3202pub enum QueryInfo {
3203 /// A query initiated by [`Behaviour::bootstrap`].
3204 Bootstrap {
3205 /// The targeted peer ID.
3206 peer: PeerId,
3207 /// The remaining random peer IDs to query, one per
3208 /// bucket that still needs refreshing.
3209 ///
3210 /// This is `None` if the initial self-lookup has not
3211 /// yet completed and `Some` with an exhausted iterator
3212 /// if bootstrapping is complete.
3213 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3214 step: ProgressStep,
3215 },
3216
3217 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3218 GetClosestPeers {
3219 /// The key being queried (the preimage).
3220 key: Vec<u8>,
3221 /// Current index of events.
3222 step: ProgressStep,
3223 /// Specifies expected number of responding peers
3224 num_results: NonZeroUsize,
3225 },
3226
3227 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3228 GetProviders {
3229 /// The key for which to search for providers.
3230 key: record::Key,
3231 /// The number of providers found so far.
3232 providers_found: usize,
3233 /// Current index of events.
3234 step: ProgressStep,
3235 },
3236
3237 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3238 AddProvider {
3239 /// The record key.
3240 key: record::Key,
3241 /// The current phase of the query.
3242 phase: AddProviderPhase,
3243 /// The execution context of the query.
3244 context: AddProviderContext,
3245 },
3246
3247 /// A (repeated) query initiated by [`Behaviour::put_record`].
3248 PutRecord {
3249 record: Record,
3250 /// The expected quorum of responses w.r.t. the replication factor.
3251 quorum: NonZeroUsize,
3252 /// The current phase of the query.
3253 phase: PutRecordPhase,
3254 /// The execution context of the query.
3255 context: PutRecordContext,
3256 },
3257
3258 /// A (repeated) query initiated by [`Behaviour::get_record`].
3259 GetRecord {
3260 /// The key to look for.
3261 key: record::Key,
3262 /// Current index of events.
3263 step: ProgressStep,
3264 /// Did we find at least one record?
3265 found_a_record: bool,
3266 /// The peers closest to the `key` that were queried but did not return a record,
3267 /// i.e. the peers that are candidates for caching the record.
3268 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3269 },
3270}
3271
3272impl QueryInfo {
3273 /// Creates an event for a handler to issue an outgoing request in the
3274 /// context of a query.
3275 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3276 match &self {
3277 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3278 key: peer.to_bytes(),
3279 query_id,
3280 },
3281 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3282 key: key.clone(),
3283 query_id,
3284 },
3285 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3286 key: key.clone(),
3287 query_id,
3288 },
3289 QueryInfo::AddProvider { key, phase, .. } => match phase {
3290 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3291 key: key.to_vec(),
3292 query_id,
3293 },
3294 AddProviderPhase::AddProvider {
3295 provider_id,
3296 external_addresses,
3297 ..
3298 } => HandlerIn::AddProvider {
3299 key: key.clone(),
3300 provider: crate::protocol::KadPeer {
3301 node_id: *provider_id,
3302 multiaddrs: external_addresses.clone(),
3303 connection_ty: crate::protocol::ConnectionType::Connected,
3304 },
3305 query_id,
3306 },
3307 },
3308 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3309 key: key.clone(),
3310 query_id,
3311 },
3312 QueryInfo::PutRecord { record, phase, .. } => match phase {
3313 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3314 key: record.key.to_vec(),
3315 query_id,
3316 },
3317 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3318 record: record.clone(),
3319 query_id,
3320 },
3321 },
3322 }
3323 }
3324}
3325
3326/// The phases of a [`QueryInfo::AddProvider`] query.
3327#[derive(Debug, Clone)]
3328pub enum AddProviderPhase {
3329 /// The query is searching for the closest nodes to the record key.
3330 GetClosestPeers,
3331
3332 /// The query advertises the local node as a provider for the key to
3333 /// the closest nodes to the key.
3334 AddProvider {
3335 /// The local peer ID that is advertised as a provider.
3336 provider_id: PeerId,
3337 /// The external addresses of the provider being advertised.
3338 external_addresses: Vec<Multiaddr>,
3339 /// Query statistics from the finished `GetClosestPeers` phase.
3340 get_closest_peers_stats: QueryStats,
3341 },
3342}
3343
3344/// The phases of a [`QueryInfo::PutRecord`] query.
3345#[derive(Debug, Clone, PartialEq, Eq)]
3346pub enum PutRecordPhase {
3347 /// The query is searching for the closest nodes to the record key.
3348 GetClosestPeers,
3349
3350 /// The query is replicating the record to the closest nodes to the key.
3351 PutRecord {
3352 /// A list of peers the given record has been successfully replicated to.
3353 success: Vec<PeerId>,
3354 /// Query statistics from the finished `GetClosestPeers` phase.
3355 get_closest_peers_stats: QueryStats,
3356 },
3357}
3358
3359/// A mutable reference to a running query.
3360pub struct QueryMut<'a> {
3361 query: &'a mut Query,
3362}
3363
3364impl QueryMut<'_> {
3365 pub fn id(&self) -> QueryId {
3366 self.query.id()
3367 }
3368
3369 /// Gets information about the type and state of the query.
3370 pub fn info(&self) -> &QueryInfo {
3371 &self.query.info
3372 }
3373
3374 /// Gets execution statistics about the query.
3375 ///
3376 /// For a multi-phase query such as `put_record`, these are the
3377 /// statistics of the current phase.
3378 pub fn stats(&self) -> &QueryStats {
3379 self.query.stats()
3380 }
3381
3382 /// Finishes the query asap, without waiting for the
3383 /// regular termination conditions.
3384 pub fn finish(&mut self) {
3385 self.query.finish()
3386 }
3387}
3388
3389/// An immutable reference to a running query.
3390pub struct QueryRef<'a> {
3391 query: &'a Query,
3392}
3393
3394impl QueryRef<'_> {
3395 pub fn id(&self) -> QueryId {
3396 self.query.id()
3397 }
3398
3399 /// Gets information about the type and state of the query.
3400 pub fn info(&self) -> &QueryInfo {
3401 &self.query.info
3402 }
3403
3404 /// Gets execution statistics about the query.
3405 ///
3406 /// For a multi-phase query such as `put_record`, these are the
3407 /// statistics of the current phase.
3408 pub fn stats(&self) -> &QueryStats {
3409 self.query.stats()
3410 }
3411}
3412
3413/// An operation failed to due no known peers in the routing table.
3414#[derive(Debug, Clone)]
3415pub struct NoKnownPeers();
3416
3417impl fmt::Display for NoKnownPeers {
3418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3419 write!(f, "No known peers.")
3420 }
3421}
3422
3423impl std::error::Error for NoKnownPeers {}
3424
3425/// The possible outcomes of [`Behaviour::add_address`].
3426#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3427pub enum RoutingUpdate {
3428 /// The given peer and address has been added to the routing
3429 /// table.
3430 Success,
3431 /// The peer and address is pending insertion into
3432 /// the routing table, if a disconnected peer fails
3433 /// to respond. If the given peer and address ends up
3434 /// in the routing table, [`Event::RoutingUpdated`]
3435 /// is eventually emitted.
3436 Pending,
3437 /// The routing table update failed, either because the
3438 /// corresponding bucket for the peer is full and the
3439 /// pending slot(s) are occupied, or because the given
3440 /// peer ID is deemed invalid (e.g. refers to the local
3441 /// peer ID).
3442 Failed,
3443}
3444
3445#[derive(PartialEq, Copy, Clone, Debug)]
3446pub enum Mode {
3447 Client,
3448 Server,
3449}
3450
3451impl fmt::Display for Mode {
3452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3453 match self {
3454 Mode::Client => write!(f, "client"),
3455 Mode::Server => write!(f, "server"),
3456 }
3457 }
3458}
3459
3460fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3461where
3462 T: ToString,
3463{
3464 confirmed_external_addresses
3465 .iter()
3466 .map(|addr| addr.to_string())
3467 .collect::<Vec<_>>()
3468 .join(", ")
3469}