libp2p_rendezvous/
client.rs

1// Copyright 2021 COMIT Network.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, VecDeque},
23    iter,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use futures::{
29    future::{BoxFuture, FutureExt},
30    stream::{FuturesUnordered, StreamExt},
31};
32use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
33use libp2p_identity::{Keypair, PeerId, SigningError};
34use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
35use libp2p_swarm::{
36    ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
37    THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40use crate::codec::{
41    Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
42};
43
44pub struct Behaviour {
45    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
46
47    inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
48
49    keypair: Keypair,
50
51    waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
52    waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
53
54    /// Hold addresses of all peers that we have discovered so far.
55    ///
56    /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by
57    /// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`].
58    discovered_peers: HashMap<PeerId, HashMap<Namespace, Vec<Multiaddr>>>,
59
60    registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
61
62    /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers`
63    /// otherwise we have a memory leak.
64    expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
65
66    external_addresses: ExternalAddresses,
67}
68
69impl Behaviour {
70    /// Create a new instance of the rendezvous [`NetworkBehaviour`].
71    pub fn new(keypair: Keypair) -> Self {
72        Self {
73            events: Default::default(),
74            inner: libp2p_request_response::Behaviour::with_codec(
75                crate::codec::Codec::default(),
76                iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
77                libp2p_request_response::Config::default(),
78            ),
79            keypair,
80            waiting_for_register: Default::default(),
81            waiting_for_discovery: Default::default(),
82            discovered_peers: Default::default(),
83            registered_namespaces: Default::default(),
84            expiring_registrations: FuturesUnordered::from_iter(vec![
85                futures::future::pending().boxed()
86            ]),
87            external_addresses: Default::default(),
88        }
89    }
90
91    /// Register our external addresses in the given namespace with the given rendezvous peer.
92    ///
93    /// External addresses are either manually added via
94    /// [`libp2p_swarm::Swarm::add_external_address`] or reported by other [`NetworkBehaviour`]s
95    /// via [`ToSwarm::ExternalAddrConfirmed`].
96    pub fn register(
97        &mut self,
98        namespace: Namespace,
99        rendezvous_node: PeerId,
100        ttl: Option<Ttl>,
101    ) -> Result<(), RegisterError> {
102        let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
103        if external_addresses.is_empty() {
104            return Err(RegisterError::NoExternalAddresses);
105        }
106
107        let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
108        let req_id = self.inner.send_request(
109            &rendezvous_node,
110            Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
111        );
112        self.waiting_for_register
113            .insert(req_id, (rendezvous_node, namespace));
114
115        Ok(())
116    }
117
118    /// Unregister ourselves from the given namespace with the given rendezvous peer.
119    pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
120        self.registered_namespaces
121            .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
122
123        self.inner
124            .send_request(&rendezvous_node, Unregister(namespace));
125    }
126
127    /// Discover other peers at a given rendezvous peer.
128    ///
129    /// If desired, the registrations can be filtered by a namespace.
130    /// If no namespace is given, peers from all namespaces will be returned.
131    /// A successfully discovery returns a cookie within [`Event::Discovered`].
132    /// Such a cookie can be used to only fetch the _delta_ of registrations since
133    /// the cookie was acquired.
134    pub fn discover(
135        &mut self,
136        namespace: Option<Namespace>,
137        cookie: Option<Cookie>,
138        limit: Option<u64>,
139        rendezvous_node: PeerId,
140    ) {
141        let req_id = self.inner.send_request(
142            &rendezvous_node,
143            Discover {
144                namespace: namespace.clone(),
145                cookie,
146                limit,
147            },
148        );
149
150        self.waiting_for_discovery
151            .insert(req_id, (rendezvous_node, namespace));
152    }
153}
154
155#[derive(Debug, thiserror::Error)]
156pub enum RegisterError {
157    #[error("We don't know about any externally reachable addresses of ours")]
158    NoExternalAddresses,
159    #[error("Failed to make a new PeerRecord")]
160    FailedToMakeRecord(#[from] SigningError),
161}
162
163#[derive(Debug)]
164#[allow(clippy::large_enum_variant)]
165pub enum Event {
166    /// We successfully discovered other nodes with using the contained rendezvous node.
167    Discovered {
168        rendezvous_node: PeerId,
169        registrations: Vec<Registration>,
170        cookie: Cookie,
171    },
172    /// We failed to discover other nodes on the contained rendezvous node.
173    DiscoverFailed {
174        rendezvous_node: PeerId,
175        namespace: Option<Namespace>,
176        error: ErrorCode,
177    },
178    /// We successfully registered with the contained rendezvous node.
179    Registered {
180        rendezvous_node: PeerId,
181        ttl: Ttl,
182        namespace: Namespace,
183    },
184    /// We failed to register with the contained rendezvous node.
185    RegisterFailed {
186        rendezvous_node: PeerId,
187        namespace: Namespace,
188        error: ErrorCode,
189    },
190    /// The connection details we learned from this node expired.
191    Expired { peer: PeerId },
192}
193
194impl NetworkBehaviour for Behaviour {
195    type ConnectionHandler = <libp2p_request_response::Behaviour<
196        crate::codec::Codec,
197    > as NetworkBehaviour>::ConnectionHandler;
198
199    type ToSwarm = Event;
200
201    fn handle_established_inbound_connection(
202        &mut self,
203        connection_id: ConnectionId,
204        peer: PeerId,
205        local_addr: &Multiaddr,
206        remote_addr: &Multiaddr,
207    ) -> Result<THandler<Self>, ConnectionDenied> {
208        self.inner.handle_established_inbound_connection(
209            connection_id,
210            peer,
211            local_addr,
212            remote_addr,
213        )
214    }
215
216    fn handle_established_outbound_connection(
217        &mut self,
218        connection_id: ConnectionId,
219        peer: PeerId,
220        addr: &Multiaddr,
221        role_override: Endpoint,
222        port_use: PortUse,
223    ) -> Result<THandler<Self>, ConnectionDenied> {
224        self.inner.handle_established_outbound_connection(
225            connection_id,
226            peer,
227            addr,
228            role_override,
229            port_use,
230        )
231    }
232
233    fn on_connection_handler_event(
234        &mut self,
235        peer_id: PeerId,
236        connection_id: ConnectionId,
237        event: THandlerOutEvent<Self>,
238    ) {
239        self.inner
240            .on_connection_handler_event(peer_id, connection_id, event);
241    }
242
243    fn on_swarm_event(&mut self, event: FromSwarm) {
244        let changed = self.external_addresses.on_swarm_event(&event);
245
246        self.inner.on_swarm_event(event);
247
248        if changed && self.external_addresses.iter().count() > 0 {
249            let registered = self.registered_namespaces.clone();
250            for ((rz_node, ns), ttl) in registered {
251                if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
252                    tracing::warn!("refreshing registration failed: {e}")
253                }
254            }
255        }
256    }
257
258    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
259    fn poll(
260        &mut self,
261        cx: &mut Context<'_>,
262    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
263        use libp2p_request_response as req_res;
264        loop {
265            if let Some(event) = self.events.pop_front() {
266                return Poll::Ready(event);
267            }
268
269            match self.inner.poll(cx) {
270                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
271                    message:
272                        req_res::Message::Response {
273                            request_id,
274                            response,
275                        },
276                    ..
277                })) => {
278                    if let Some(event) = self.handle_response(&request_id, response) {
279                        return Poll::Ready(ToSwarm::GenerateEvent(event));
280                    }
281
282                    continue; // not a request we care about
283                }
284                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
285                    request_id,
286                    ..
287                })) => {
288                    if let Some(event) = self.event_for_outbound_failure(&request_id) {
289                        return Poll::Ready(ToSwarm::GenerateEvent(event));
290                    }
291
292                    continue; // not a request we care about
293                }
294                Poll::Ready(ToSwarm::GenerateEvent(
295                    req_res::Event::InboundFailure { .. }
296                    | req_res::Event::ResponseSent { .. }
297                    | req_res::Event::Message {
298                        message: req_res::Message::Request { .. },
299                        ..
300                    },
301                )) => {
302                    unreachable!("rendezvous clients never receive requests")
303                }
304                Poll::Ready(other) => {
305                    let new_to_swarm =
306                        other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
307
308                    return Poll::Ready(new_to_swarm);
309                }
310                Poll::Pending => {}
311            }
312
313            if let Poll::Ready(Some((peer, expired_registration))) =
314                self.expiring_registrations.poll_next_unpin(cx)
315            {
316                let Some(registrations) = self.discovered_peers.get_mut(&peer) else {
317                    continue;
318                };
319                registrations.remove(&expired_registration);
320                if registrations.is_empty() {
321                    self.discovered_peers.remove(&peer);
322                }
323                return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { peer }));
324            }
325
326            return Poll::Pending;
327        }
328    }
329
330    fn handle_pending_outbound_connection(
331        &mut self,
332        _connection_id: ConnectionId,
333        maybe_peer: Option<PeerId>,
334        _addresses: &[Multiaddr],
335        _effective_role: Endpoint,
336    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
337        let addrs = maybe_peer
338            .map(|peer| self.discovered_peer_addrs(&peer).cloned().collect())
339            .unwrap_or_default();
340        Ok(addrs)
341    }
342}
343
344impl Behaviour {
345    fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
346        if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
347            return Some(Event::RegisterFailed {
348                rendezvous_node,
349                namespace,
350                error: ErrorCode::Unavailable,
351            });
352        };
353
354        if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
355            return Some(Event::DiscoverFailed {
356                rendezvous_node,
357                namespace,
358                error: ErrorCode::Unavailable,
359            });
360        };
361
362        None
363    }
364
365    fn handle_response(
366        &mut self,
367        request_id: &OutboundRequestId,
368        response: Message,
369    ) -> Option<Event> {
370        match response {
371            RegisterResponse(Ok(ttl)) => {
372                let (rendezvous_node, namespace) = self.waiting_for_register.remove(request_id)?;
373                self.registered_namespaces
374                    .insert((rendezvous_node, namespace.clone()), ttl);
375
376                Some(Event::Registered {
377                    rendezvous_node,
378                    ttl,
379                    namespace,
380                })
381            }
382            RegisterResponse(Err(error_code)) => {
383                let (rendezvous_node, namespace) = self.waiting_for_register.remove(request_id)?;
384                Some(Event::RegisterFailed {
385                    rendezvous_node,
386                    namespace,
387                    error: error_code,
388                })
389            }
390            DiscoverResponse(Ok((registrations, cookie))) => {
391                let (rendezvous_node, _ns) = self.waiting_for_discovery.remove(request_id)?;
392                registrations.iter().for_each(|registration| {
393                    let peer_id = registration.record.peer_id();
394                    let addresses = registration.record.addresses();
395                    let namespace = registration.namespace.clone();
396                    let ttl = registration.ttl;
397
398                    // Emit events for all newly discovered addresses.
399                    let new_addr_events = addresses
400                        .iter()
401                        .filter_map(|address| {
402                            if self.discovered_peer_addrs(&peer_id).any(|a| a == address) {
403                                return None;
404                            }
405                            Some(ToSwarm::NewExternalAddrOfPeer {
406                                peer_id,
407                                address: address.clone(),
408                            })
409                        })
410                        .collect::<Vec<_>>();
411                    self.events.extend(new_addr_events);
412
413                    // Update list of discovered peers.
414                    self.discovered_peers
415                        .entry(peer_id)
416                        .or_default()
417                        .insert(namespace.clone(), addresses.to_owned());
418
419                    // Push registration expiry future.
420                    self.expiring_registrations.push(
421                        async move {
422                            // if the timer errors we consider it expired
423                            futures_timer::Delay::new(Duration::from_secs(ttl)).await;
424                            (peer_id, namespace)
425                        }
426                        .boxed(),
427                    );
428                });
429
430                Some(Event::Discovered {
431                    rendezvous_node,
432                    registrations,
433                    cookie,
434                })
435            }
436            DiscoverResponse(Err(error_code)) => {
437                let (rendezvous_node, ns) = self.waiting_for_discovery.remove(request_id)?;
438                Some(Event::DiscoverFailed {
439                    rendezvous_node,
440                    namespace: ns,
441                    error: error_code,
442                })
443            }
444            _ => unreachable!("rendezvous clients never receive requests"),
445        }
446    }
447
448    fn discovered_peer_addrs(&self, peer: &PeerId) -> impl Iterator<Item = &Multiaddr> {
449        self.discovered_peers
450            .get(peer)
451            .map(|addrs| addrs.values().flatten())
452            .unwrap_or_default()
453    }
454}