rzv_discover/
rzv-discover.rs

1// Copyright 2021 COMIT Network.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{error::Error, time::Duration};
22
23use futures::StreamExt;
24use libp2p::{
25    multiaddr::Protocol,
26    noise, ping, rendezvous,
27    swarm::{NetworkBehaviour, SwarmEvent},
28    tcp, yamux, Multiaddr,
29};
30use tracing_subscriber::EnvFilter;
31
32const NAMESPACE: &str = "rendezvous";
33
34#[tokio::main]
35async fn main() -> Result<(), Box<dyn Error>> {
36    let _ = tracing_subscriber::fmt()
37        .with_env_filter(EnvFilter::from_default_env())
38        .try_init();
39
40    let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap();
41    let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN"
42        .parse()
43        .unwrap();
44
45    let mut swarm = libp2p::SwarmBuilder::with_new_identity()
46        .with_tokio()
47        .with_tcp(
48            tcp::Config::default(),
49            noise::Config::new,
50            yamux::Config::default,
51        )?
52        .with_behaviour(|key| MyBehaviour {
53            rendezvous: rendezvous::client::Behaviour::new(key.clone()),
54            ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
55        })?
56        .build();
57
58    swarm.dial(rendezvous_point_address.clone()).unwrap();
59
60    let mut discover_tick = tokio::time::interval(Duration::from_secs(30));
61    let mut cookie = None;
62
63    loop {
64        tokio::select! {
65                event = swarm.select_next_some() => match event {
66                    SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => {
67                        tracing::info!(
68                            "Connected to rendezvous point, discovering nodes in '{}' namespace ...",
69                            NAMESPACE
70                        );
71
72                        swarm.behaviour_mut().rendezvous.discover(
73                            Some(rendezvous::Namespace::new(NAMESPACE.to_string()).unwrap()),
74                            None,
75                            None,
76                            rendezvous_point,
77                        );
78                    }
79                    SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(rendezvous::client::Event::Discovered {
80                        registrations,
81                        cookie: new_cookie,
82                        ..
83                    })) => {
84                        cookie.replace(new_cookie);
85
86                        for registration in registrations {
87                            for address in registration.record.addresses() {
88                                let peer = registration.record.peer_id();
89                                tracing::info!(%peer, %address, "Discovered peer");
90
91                                let p2p_suffix = Protocol::P2p(peer);
92                                let address_with_p2p =
93                                    if !address.ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) {
94                                        address.clone().with(p2p_suffix)
95                                    } else {
96                                        address.clone()
97                                    };
98
99                                swarm.dial(address_with_p2p).unwrap();
100                            }
101                        }
102                    }
103                    SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
104                        peer,
105                        result: Ok(rtt),
106                        ..
107                    })) if peer != rendezvous_point => {
108                        tracing::info!(%peer, "Ping is {}ms", rtt.as_millis())
109                    }
110                    other => {
111                        tracing::debug!("Unhandled {:?}", other);
112                    }
113            },
114            _ = discover_tick.tick(), if cookie.is_some() =>
115                swarm.behaviour_mut().rendezvous.discover(
116                    Some(rendezvous::Namespace::new(NAMESPACE.to_string()).unwrap()),
117                    cookie.clone(),
118                    None,
119                    rendezvous_point
120                    )
121        }
122    }
123}
124
125#[derive(NetworkBehaviour)]
126struct MyBehaviour {
127    rendezvous: rendezvous::client::Behaviour,
128    ping: ping::Behaviour,
129}