libp2p_rendezvous/
client.rs

1// Copyright 2021 COMIT Network.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, VecDeque},
23    iter,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use futures::{
29    future::{BoxFuture, FutureExt},
30    stream::{FuturesUnordered, StreamExt},
31};
32use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
33use libp2p_identity::{Keypair, PeerId, SigningError};
34use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
35use libp2p_swarm::{
36    ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
37    THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40use crate::codec::{
41    Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
42};
43
44pub struct Behaviour {
45    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
46
47    inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
48
49    keypair: Keypair,
50
51    waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
52    waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
53
54    /// Hold addresses of all peers that we have discovered so far.
55    ///
56    /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by
57    /// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`].
58    discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
59
60    registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
61
62    /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers`
63    /// otherwise we have a memory leak.
64    expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
65
66    external_addresses: ExternalAddresses,
67}
68
69impl Behaviour {
70    /// Create a new instance of the rendezvous [`NetworkBehaviour`].
71    pub fn new(keypair: Keypair) -> Self {
72        Self {
73            events: Default::default(),
74            inner: libp2p_request_response::Behaviour::with_codec(
75                crate::codec::Codec::default(),
76                iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
77                libp2p_request_response::Config::default(),
78            ),
79            keypair,
80            waiting_for_register: Default::default(),
81            waiting_for_discovery: Default::default(),
82            discovered_peers: Default::default(),
83            registered_namespaces: Default::default(),
84            expiring_registrations: FuturesUnordered::from_iter(vec![
85                futures::future::pending().boxed()
86            ]),
87            external_addresses: Default::default(),
88        }
89    }
90
91    /// Register our external addresses in the given namespace with the given rendezvous peer.
92    ///
93    /// External addresses are either manually added via
94    /// [`libp2p_swarm::Swarm::add_external_address`] or reported by other [`NetworkBehaviour`]s
95    /// via [`ToSwarm::ExternalAddrConfirmed`].
96    pub fn register(
97        &mut self,
98        namespace: Namespace,
99        rendezvous_node: PeerId,
100        ttl: Option<Ttl>,
101    ) -> Result<(), RegisterError> {
102        let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
103        if external_addresses.is_empty() {
104            return Err(RegisterError::NoExternalAddresses);
105        }
106
107        let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
108        let req_id = self.inner.send_request(
109            &rendezvous_node,
110            Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
111        );
112        self.waiting_for_register
113            .insert(req_id, (rendezvous_node, namespace));
114
115        Ok(())
116    }
117
118    /// Unregister ourselves from the given namespace with the given rendezvous peer.
119    pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
120        self.registered_namespaces
121            .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
122
123        self.inner
124            .send_request(&rendezvous_node, Unregister(namespace));
125    }
126
127    /// Discover other peers at a given rendezvous peer.
128    ///
129    /// If desired, the registrations can be filtered by a namespace.
130    /// If no namespace is given, peers from all namespaces will be returned.
131    /// A successfully discovery returns a cookie within [`Event::Discovered`].
132    /// Such a cookie can be used to only fetch the _delta_ of registrations since
133    /// the cookie was acquired.
134    pub fn discover(
135        &mut self,
136        namespace: Option<Namespace>,
137        cookie: Option<Cookie>,
138        limit: Option<u64>,
139        rendezvous_node: PeerId,
140    ) {
141        let req_id = self.inner.send_request(
142            &rendezvous_node,
143            Discover {
144                namespace: namespace.clone(),
145                cookie,
146                limit,
147            },
148        );
149
150        self.waiting_for_discovery
151            .insert(req_id, (rendezvous_node, namespace));
152    }
153}
154
155#[derive(Debug, thiserror::Error)]
156pub enum RegisterError {
157    #[error("We don't know about any externally reachable addresses of ours")]
158    NoExternalAddresses,
159    #[error("Failed to make a new PeerRecord")]
160    FailedToMakeRecord(#[from] SigningError),
161}
162
163#[derive(Debug)]
164#[allow(clippy::large_enum_variant)]
165pub enum Event {
166    /// We successfully discovered other nodes with using the contained rendezvous node.
167    Discovered {
168        rendezvous_node: PeerId,
169        registrations: Vec<Registration>,
170        cookie: Cookie,
171    },
172    /// We failed to discover other nodes on the contained rendezvous node.
173    DiscoverFailed {
174        rendezvous_node: PeerId,
175        namespace: Option<Namespace>,
176        error: ErrorCode,
177    },
178    /// We successfully registered with the contained rendezvous node.
179    Registered {
180        rendezvous_node: PeerId,
181        ttl: Ttl,
182        namespace: Namespace,
183    },
184    /// We failed to register with the contained rendezvous node.
185    RegisterFailed {
186        rendezvous_node: PeerId,
187        namespace: Namespace,
188        error: ErrorCode,
189    },
190    /// The connection details we learned from this node expired.
191    Expired { peer: PeerId },
192}
193
194impl NetworkBehaviour for Behaviour {
195    type ConnectionHandler = <libp2p_request_response::Behaviour<
196        crate::codec::Codec,
197    > as NetworkBehaviour>::ConnectionHandler;
198
199    type ToSwarm = Event;
200
201    fn handle_established_inbound_connection(
202        &mut self,
203        connection_id: ConnectionId,
204        peer: PeerId,
205        local_addr: &Multiaddr,
206        remote_addr: &Multiaddr,
207    ) -> Result<THandler<Self>, ConnectionDenied> {
208        self.inner.handle_established_inbound_connection(
209            connection_id,
210            peer,
211            local_addr,
212            remote_addr,
213        )
214    }
215
216    fn handle_established_outbound_connection(
217        &mut self,
218        connection_id: ConnectionId,
219        peer: PeerId,
220        addr: &Multiaddr,
221        role_override: Endpoint,
222        port_use: PortUse,
223    ) -> Result<THandler<Self>, ConnectionDenied> {
224        self.inner.handle_established_outbound_connection(
225            connection_id,
226            peer,
227            addr,
228            role_override,
229            port_use,
230        )
231    }
232
233    fn on_connection_handler_event(
234        &mut self,
235        peer_id: PeerId,
236        connection_id: ConnectionId,
237        event: THandlerOutEvent<Self>,
238    ) {
239        self.inner
240            .on_connection_handler_event(peer_id, connection_id, event);
241    }
242
243    fn on_swarm_event(&mut self, event: FromSwarm) {
244        let changed = self.external_addresses.on_swarm_event(&event);
245
246        self.inner.on_swarm_event(event);
247
248        if changed && self.external_addresses.iter().count() > 0 {
249            let registered = self.registered_namespaces.clone();
250            for ((rz_node, ns), ttl) in registered {
251                if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
252                    tracing::warn!("refreshing registration failed: {e}")
253                }
254            }
255        }
256    }
257
258    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
259    fn poll(
260        &mut self,
261        cx: &mut Context<'_>,
262    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
263        use libp2p_request_response as req_res;
264        loop {
265            if let Some(event) = self.events.pop_front() {
266                return Poll::Ready(event);
267            }
268
269            match self.inner.poll(cx) {
270                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
271                    message:
272                        req_res::Message::Response {
273                            request_id,
274                            response,
275                        },
276                    ..
277                })) => {
278                    if let Some(event) = self.handle_response(&request_id, response) {
279                        return Poll::Ready(ToSwarm::GenerateEvent(event));
280                    }
281
282                    continue; // not a request we care about
283                }
284                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
285                    request_id,
286                    ..
287                })) => {
288                    if let Some(event) = self.event_for_outbound_failure(&request_id) {
289                        return Poll::Ready(ToSwarm::GenerateEvent(event));
290                    }
291
292                    continue; // not a request we care about
293                }
294                Poll::Ready(ToSwarm::GenerateEvent(
295                    req_res::Event::InboundFailure { .. }
296                    | req_res::Event::ResponseSent { .. }
297                    | req_res::Event::Message {
298                        message: req_res::Message::Request { .. },
299                        ..
300                    },
301                )) => {
302                    unreachable!("rendezvous clients never receive requests")
303                }
304                Poll::Ready(other) => {
305                    let new_to_swarm =
306                        other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
307
308                    return Poll::Ready(new_to_swarm);
309                }
310                Poll::Pending => {}
311            }
312
313            if let Poll::Ready(Some(expired_registration)) =
314                self.expiring_registrations.poll_next_unpin(cx)
315            {
316                self.discovered_peers.remove(&expired_registration);
317                return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
318                    peer: expired_registration.0,
319                }));
320            }
321
322            return Poll::Pending;
323        }
324    }
325
326    fn handle_pending_outbound_connection(
327        &mut self,
328        _connection_id: ConnectionId,
329        maybe_peer: Option<PeerId>,
330        _addresses: &[Multiaddr],
331        _effective_role: Endpoint,
332    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
333        let peer = match maybe_peer {
334            None => return Ok(vec![]),
335            Some(peer) => peer,
336        };
337
338        let addresses = self
339            .discovered_peers
340            .iter()
341            .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
342            .flatten()
343            .cloned()
344            .collect();
345
346        Ok(addresses)
347    }
348}
349
350impl Behaviour {
351    fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
352        if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
353            return Some(Event::RegisterFailed {
354                rendezvous_node,
355                namespace,
356                error: ErrorCode::Unavailable,
357            });
358        };
359
360        if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
361            return Some(Event::DiscoverFailed {
362                rendezvous_node,
363                namespace,
364                error: ErrorCode::Unavailable,
365            });
366        };
367
368        None
369    }
370
371    fn handle_response(
372        &mut self,
373        request_id: &OutboundRequestId,
374        response: Message,
375    ) -> Option<Event> {
376        match response {
377            RegisterResponse(Ok(ttl)) => {
378                if let Some((rendezvous_node, namespace)) =
379                    self.waiting_for_register.remove(request_id)
380                {
381                    self.registered_namespaces
382                        .insert((rendezvous_node, namespace.clone()), ttl);
383
384                    return Some(Event::Registered {
385                        rendezvous_node,
386                        ttl,
387                        namespace,
388                    });
389                }
390
391                None
392            }
393            RegisterResponse(Err(error_code)) => {
394                if let Some((rendezvous_node, namespace)) =
395                    self.waiting_for_register.remove(request_id)
396                {
397                    return Some(Event::RegisterFailed {
398                        rendezvous_node,
399                        namespace,
400                        error: error_code,
401                    });
402                }
403
404                None
405            }
406            DiscoverResponse(Ok((registrations, cookie))) => {
407                if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
408                {
409                    self.events
410                        .extend(registrations.iter().flat_map(|registration| {
411                            let peer_id = registration.record.peer_id();
412                            registration
413                                .record
414                                .addresses()
415                                .iter()
416                                .filter(|addr| {
417                                    !self.discovered_peers.iter().any(
418                                        |((discovered_peer_id, _), addrs)| {
419                                            *discovered_peer_id == peer_id && addrs.contains(addr)
420                                        },
421                                    )
422                                })
423                                .map(|address| ToSwarm::NewExternalAddrOfPeer {
424                                    peer_id,
425                                    address: address.clone(),
426                                })
427                                .collect::<Vec<_>>()
428                        }));
429
430                    self.discovered_peers
431                        .extend(registrations.iter().map(|registration| {
432                            let peer_id = registration.record.peer_id();
433                            let namespace = registration.namespace.clone();
434
435                            let addresses = registration.record.addresses().to_vec();
436
437                            ((peer_id, namespace), addresses)
438                        }));
439
440                    self.expiring_registrations
441                        .extend(registrations.iter().cloned().map(|registration| {
442                            async move {
443                                // if the timer errors we consider it expired
444                                futures_timer::Delay::new(Duration::from_secs(registration.ttl))
445                                    .await;
446
447                                (registration.record.peer_id(), registration.namespace)
448                            }
449                            .boxed()
450                        }));
451
452                    return Some(Event::Discovered {
453                        rendezvous_node,
454                        registrations,
455                        cookie,
456                    });
457                }
458
459                None
460            }
461            DiscoverResponse(Err(error_code)) => {
462                if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
463                    return Some(Event::DiscoverFailed {
464                        rendezvous_node,
465                        namespace: ns,
466                        error: error_code,
467                    });
468                }
469
470                None
471            }
472            _ => unreachable!("rendezvous clients never receive requests"),
473        }
474    }
475}