rzv_discover/
rzv-discover.rs1use std::{error::Error, time::Duration};
22
23use futures::StreamExt;
24use libp2p::{
25 multiaddr::Protocol,
26 noise, ping, rendezvous,
27 swarm::{NetworkBehaviour, SwarmEvent},
28 tcp, yamux, Multiaddr,
29};
30use tracing_subscriber::EnvFilter;
31
32const NAMESPACE: &str = "rendezvous";
33
34#[tokio::main]
35async fn main() -> Result<(), Box<dyn Error>> {
36 let _ = tracing_subscriber::fmt()
37 .with_env_filter(EnvFilter::from_default_env())
38 .try_init();
39
40 let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap();
41 let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN"
42 .parse()
43 .unwrap();
44
45 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
46 .with_tokio()
47 .with_tcp(
48 tcp::Config::default(),
49 noise::Config::new,
50 yamux::Config::default,
51 )?
52 .with_behaviour(|key| MyBehaviour {
53 rendezvous: rendezvous::client::Behaviour::new(key.clone()),
54 ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
55 })?
56 .build();
57
58 swarm.dial(rendezvous_point_address.clone()).unwrap();
59
60 let mut discover_tick = tokio::time::interval(Duration::from_secs(30));
61 let mut cookie = None;
62
63 loop {
64 tokio::select! {
65 event = swarm.select_next_some() => match event {
66 SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => {
67 tracing::info!(
68 "Connected to rendezvous point, discovering nodes in '{}' namespace ...",
69 NAMESPACE
70 );
71
72 swarm.behaviour_mut().rendezvous.discover(
73 Some(rendezvous::Namespace::new(NAMESPACE.to_string()).unwrap()),
74 None,
75 None,
76 rendezvous_point,
77 );
78 }
79 SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(rendezvous::client::Event::Discovered {
80 registrations,
81 cookie: new_cookie,
82 ..
83 })) => {
84 cookie.replace(new_cookie);
85
86 for registration in registrations {
87 for address in registration.record.addresses() {
88 let peer = registration.record.peer_id();
89 tracing::info!(%peer, %address, "Discovered peer");
90
91 let p2p_suffix = Protocol::P2p(peer);
92 let address_with_p2p =
93 if !address.ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) {
94 address.clone().with(p2p_suffix)
95 } else {
96 address.clone()
97 };
98
99 swarm.dial(address_with_p2p).unwrap();
100 }
101 }
102 }
103 SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
104 peer,
105 result: Ok(rtt),
106 ..
107 })) if peer != rendezvous_point => {
108 tracing::info!(%peer, "Ping is {}ms", rtt.as_millis())
109 }
110 other => {
111 tracing::debug!("Unhandled {:?}", other);
112 }
113 },
114 _ = discover_tick.tick(), if cookie.is_some() =>
115 swarm.behaviour_mut().rendezvous.discover(
116 Some(rendezvous::Namespace::new(NAMESPACE.to_string()).unwrap()),
117 cookie.clone(),
118 None,
119 rendezvous_point
120 )
121 }
122 }
123}
124
125#[derive(NetworkBehaviour)]
126struct MyBehaviour {
127 rendezvous: rendezvous::client::Behaviour,
128 ping: ping::Behaviour,
129}