1#![doc = include_str!("../README.md")]
22
23use std::{
24 collections::hash_map::DefaultHasher,
25 error::Error,
26 hash::{Hash, Hasher},
27 time::Duration,
28};
29
30use futures::stream::StreamExt;
31use libp2p::{
32 gossipsub, mdns, noise,
33 swarm::{NetworkBehaviour, SwarmEvent},
34 tcp, yamux,
35};
36use tokio::{io, io::AsyncBufReadExt, select};
37use tracing_subscriber::EnvFilter;
38
39#[derive(NetworkBehaviour)]
41struct MyBehaviour {
42 gossipsub: gossipsub::Behaviour,
43 mdns: mdns::tokio::Behaviour,
44}
45
46#[tokio::main]
47async fn main() -> Result<(), Box<dyn Error>> {
48 let _ = tracing_subscriber::fmt()
49 .with_env_filter(EnvFilter::from_default_env())
50 .try_init();
51
52 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
53 .with_tokio()
54 .with_tcp(
55 tcp::Config::default(),
56 noise::Config::new,
57 yamux::Config::default,
58 )?
59 .with_quic()
60 .with_behaviour(|key| {
61 let message_id_fn = |message: &gossipsub::Message| {
63 let mut s = DefaultHasher::new();
64 message.data.hash(&mut s);
65 gossipsub::MessageId::from(s.finish().to_string())
66 };
67
68 let gossipsub_config = gossipsub::ConfigBuilder::default()
70 .heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(message_id_fn) .build()
75 .map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; let gossipsub = gossipsub::Behaviour::new(
79 gossipsub::MessageAuthenticity::Signed(key.clone()),
80 gossipsub_config,
81 )?;
82
83 let mdns =
84 mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
85 Ok(MyBehaviour { gossipsub, mdns })
86 })?
87 .build();
88
89 let topic = gossipsub::IdentTopic::new("test-net");
91 swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
93
94 let mut stdin = io::BufReader::new(io::stdin()).lines();
96
97 swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
99 swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
100
101 println!("Enter messages via STDIN and they will be sent to connected peers using Gossipsub");
102
103 loop {
105 select! {
106 Ok(Some(line)) = stdin.next_line() => {
107 if let Err(e) = swarm
108 .behaviour_mut().gossipsub
109 .publish(topic.clone(), line.as_bytes()) {
110 println!("Publish error: {e:?}");
111 }
112 }
113 event = swarm.select_next_some() => match event {
114 SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
115 for (peer_id, _multiaddr) in list {
116 println!("mDNS discovered a new peer: {peer_id}");
117 swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
118 }
119 },
120 SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
121 for (peer_id, _multiaddr) in list {
122 println!("mDNS discover peer has expired: {peer_id}");
123 swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
124 }
125 },
126 SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
127 propagation_source: peer_id,
128 message_id: id,
129 message,
130 })) => println!(
131 "Got message: '{}' with id: {id} from peer: {peer_id}",
132 String::from_utf8_lossy(&message.data),
133 ),
134 SwarmEvent::NewListenAddr { address, .. } => {
135 println!("Local node is listening on {address}");
136 }
137 _ => {}
138 }
139 }
140 }
141}