1use std::{
22 collections::{HashMap, VecDeque},
23 iter,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use futures::{
29 future::{BoxFuture, FutureExt},
30 stream::{FuturesUnordered, StreamExt},
31};
32use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
33use libp2p_identity::{Keypair, PeerId, SigningError};
34use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
35use libp2p_swarm::{
36 ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
37 THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40use crate::codec::{
41 Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
42};
43
44pub struct Behaviour {
45 events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
46
47 inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
48
49 keypair: Keypair,
50
51 waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
52 waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
53
54 discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
59
60 registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
61
62 expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
65
66 external_addresses: ExternalAddresses,
67}
68
69impl Behaviour {
70 pub fn new(keypair: Keypair) -> Self {
72 Self {
73 events: Default::default(),
74 inner: libp2p_request_response::Behaviour::with_codec(
75 crate::codec::Codec::default(),
76 iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
77 libp2p_request_response::Config::default(),
78 ),
79 keypair,
80 waiting_for_register: Default::default(),
81 waiting_for_discovery: Default::default(),
82 discovered_peers: Default::default(),
83 registered_namespaces: Default::default(),
84 expiring_registrations: FuturesUnordered::from_iter(vec![
85 futures::future::pending().boxed()
86 ]),
87 external_addresses: Default::default(),
88 }
89 }
90
91 pub fn register(
97 &mut self,
98 namespace: Namespace,
99 rendezvous_node: PeerId,
100 ttl: Option<Ttl>,
101 ) -> Result<(), RegisterError> {
102 let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
103 if external_addresses.is_empty() {
104 return Err(RegisterError::NoExternalAddresses);
105 }
106
107 let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
108 let req_id = self.inner.send_request(
109 &rendezvous_node,
110 Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
111 );
112 self.waiting_for_register
113 .insert(req_id, (rendezvous_node, namespace));
114
115 Ok(())
116 }
117
118 pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
120 self.registered_namespaces
121 .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
122
123 self.inner
124 .send_request(&rendezvous_node, Unregister(namespace));
125 }
126
127 pub fn discover(
135 &mut self,
136 namespace: Option<Namespace>,
137 cookie: Option<Cookie>,
138 limit: Option<u64>,
139 rendezvous_node: PeerId,
140 ) {
141 let req_id = self.inner.send_request(
142 &rendezvous_node,
143 Discover {
144 namespace: namespace.clone(),
145 cookie,
146 limit,
147 },
148 );
149
150 self.waiting_for_discovery
151 .insert(req_id, (rendezvous_node, namespace));
152 }
153}
154
155#[derive(Debug, thiserror::Error)]
156pub enum RegisterError {
157 #[error("We don't know about any externally reachable addresses of ours")]
158 NoExternalAddresses,
159 #[error("Failed to make a new PeerRecord")]
160 FailedToMakeRecord(#[from] SigningError),
161}
162
163#[derive(Debug)]
164#[allow(clippy::large_enum_variant)]
165pub enum Event {
166 Discovered {
168 rendezvous_node: PeerId,
169 registrations: Vec<Registration>,
170 cookie: Cookie,
171 },
172 DiscoverFailed {
174 rendezvous_node: PeerId,
175 namespace: Option<Namespace>,
176 error: ErrorCode,
177 },
178 Registered {
180 rendezvous_node: PeerId,
181 ttl: Ttl,
182 namespace: Namespace,
183 },
184 RegisterFailed {
186 rendezvous_node: PeerId,
187 namespace: Namespace,
188 error: ErrorCode,
189 },
190 Expired { peer: PeerId },
192}
193
194impl NetworkBehaviour for Behaviour {
195 type ConnectionHandler = <libp2p_request_response::Behaviour<
196 crate::codec::Codec,
197 > as NetworkBehaviour>::ConnectionHandler;
198
199 type ToSwarm = Event;
200
201 fn handle_established_inbound_connection(
202 &mut self,
203 connection_id: ConnectionId,
204 peer: PeerId,
205 local_addr: &Multiaddr,
206 remote_addr: &Multiaddr,
207 ) -> Result<THandler<Self>, ConnectionDenied> {
208 self.inner.handle_established_inbound_connection(
209 connection_id,
210 peer,
211 local_addr,
212 remote_addr,
213 )
214 }
215
216 fn handle_established_outbound_connection(
217 &mut self,
218 connection_id: ConnectionId,
219 peer: PeerId,
220 addr: &Multiaddr,
221 role_override: Endpoint,
222 port_use: PortUse,
223 ) -> Result<THandler<Self>, ConnectionDenied> {
224 self.inner.handle_established_outbound_connection(
225 connection_id,
226 peer,
227 addr,
228 role_override,
229 port_use,
230 )
231 }
232
233 fn on_connection_handler_event(
234 &mut self,
235 peer_id: PeerId,
236 connection_id: ConnectionId,
237 event: THandlerOutEvent<Self>,
238 ) {
239 self.inner
240 .on_connection_handler_event(peer_id, connection_id, event);
241 }
242
243 fn on_swarm_event(&mut self, event: FromSwarm) {
244 let changed = self.external_addresses.on_swarm_event(&event);
245
246 self.inner.on_swarm_event(event);
247
248 if changed && self.external_addresses.iter().count() > 0 {
249 let registered = self.registered_namespaces.clone();
250 for ((rz_node, ns), ttl) in registered {
251 if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
252 tracing::warn!("refreshing registration failed: {e}")
253 }
254 }
255 }
256 }
257
258 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
259 fn poll(
260 &mut self,
261 cx: &mut Context<'_>,
262 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
263 use libp2p_request_response as req_res;
264 loop {
265 if let Some(event) = self.events.pop_front() {
266 return Poll::Ready(event);
267 }
268
269 match self.inner.poll(cx) {
270 Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
271 message:
272 req_res::Message::Response {
273 request_id,
274 response,
275 },
276 ..
277 })) => {
278 if let Some(event) = self.handle_response(&request_id, response) {
279 return Poll::Ready(ToSwarm::GenerateEvent(event));
280 }
281
282 continue; }
284 Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
285 request_id,
286 ..
287 })) => {
288 if let Some(event) = self.event_for_outbound_failure(&request_id) {
289 return Poll::Ready(ToSwarm::GenerateEvent(event));
290 }
291
292 continue; }
294 Poll::Ready(ToSwarm::GenerateEvent(
295 req_res::Event::InboundFailure { .. }
296 | req_res::Event::ResponseSent { .. }
297 | req_res::Event::Message {
298 message: req_res::Message::Request { .. },
299 ..
300 },
301 )) => {
302 unreachable!("rendezvous clients never receive requests")
303 }
304 Poll::Ready(other) => {
305 let new_to_swarm =
306 other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
307
308 return Poll::Ready(new_to_swarm);
309 }
310 Poll::Pending => {}
311 }
312
313 if let Poll::Ready(Some(expired_registration)) =
314 self.expiring_registrations.poll_next_unpin(cx)
315 {
316 self.discovered_peers.remove(&expired_registration);
317 return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
318 peer: expired_registration.0,
319 }));
320 }
321
322 return Poll::Pending;
323 }
324 }
325
326 fn handle_pending_outbound_connection(
327 &mut self,
328 _connection_id: ConnectionId,
329 maybe_peer: Option<PeerId>,
330 _addresses: &[Multiaddr],
331 _effective_role: Endpoint,
332 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
333 let peer = match maybe_peer {
334 None => return Ok(vec![]),
335 Some(peer) => peer,
336 };
337
338 let addresses = self
339 .discovered_peers
340 .iter()
341 .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
342 .flatten()
343 .cloned()
344 .collect();
345
346 Ok(addresses)
347 }
348}
349
350impl Behaviour {
351 fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
352 if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
353 return Some(Event::RegisterFailed {
354 rendezvous_node,
355 namespace,
356 error: ErrorCode::Unavailable,
357 });
358 };
359
360 if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
361 return Some(Event::DiscoverFailed {
362 rendezvous_node,
363 namespace,
364 error: ErrorCode::Unavailable,
365 });
366 };
367
368 None
369 }
370
371 fn handle_response(
372 &mut self,
373 request_id: &OutboundRequestId,
374 response: Message,
375 ) -> Option<Event> {
376 match response {
377 RegisterResponse(Ok(ttl)) => {
378 if let Some((rendezvous_node, namespace)) =
379 self.waiting_for_register.remove(request_id)
380 {
381 self.registered_namespaces
382 .insert((rendezvous_node, namespace.clone()), ttl);
383
384 return Some(Event::Registered {
385 rendezvous_node,
386 ttl,
387 namespace,
388 });
389 }
390
391 None
392 }
393 RegisterResponse(Err(error_code)) => {
394 if let Some((rendezvous_node, namespace)) =
395 self.waiting_for_register.remove(request_id)
396 {
397 return Some(Event::RegisterFailed {
398 rendezvous_node,
399 namespace,
400 error: error_code,
401 });
402 }
403
404 None
405 }
406 DiscoverResponse(Ok((registrations, cookie))) => {
407 if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
408 {
409 self.events
410 .extend(registrations.iter().flat_map(|registration| {
411 let peer_id = registration.record.peer_id();
412 registration
413 .record
414 .addresses()
415 .iter()
416 .filter(|addr| {
417 !self.discovered_peers.iter().any(
418 |((discovered_peer_id, _), addrs)| {
419 *discovered_peer_id == peer_id && addrs.contains(addr)
420 },
421 )
422 })
423 .map(|address| ToSwarm::NewExternalAddrOfPeer {
424 peer_id,
425 address: address.clone(),
426 })
427 .collect::<Vec<_>>()
428 }));
429
430 self.discovered_peers
431 .extend(registrations.iter().map(|registration| {
432 let peer_id = registration.record.peer_id();
433 let namespace = registration.namespace.clone();
434
435 let addresses = registration.record.addresses().to_vec();
436
437 ((peer_id, namespace), addresses)
438 }));
439
440 self.expiring_registrations
441 .extend(registrations.iter().cloned().map(|registration| {
442 async move {
443 futures_timer::Delay::new(Duration::from_secs(registration.ttl))
445 .await;
446
447 (registration.record.peer_id(), registration.namespace)
448 }
449 .boxed()
450 }));
451
452 return Some(Event::Discovered {
453 rendezvous_node,
454 registrations,
455 cookie,
456 });
457 }
458
459 None
460 }
461 DiscoverResponse(Err(error_code)) => {
462 if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
463 return Some(Event::DiscoverFailed {
464 rendezvous_node,
465 namespace: ns,
466 error: error_code,
467 });
468 }
469
470 None
471 }
472 _ => unreachable!("rendezvous clients never receive requests"),
473 }
474 }
475}