1use std::{
22 collections::{
23 hash_map::{DefaultHasher, HashMap},
24 VecDeque,
25 },
26 iter,
27 task::{Context, Poll},
28};
29
30use bytes::Bytes;
31use cuckoofilter::{CuckooError, CuckooFilter};
32use fnv::FnvHashSet;
33use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use libp2p_swarm::{
36 behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
37 dial_opts::DialOpts,
38 CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
39 OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use smallvec::SmallVec;
42
43use crate::{
44 protocol::{
45 FloodsubMessage, FloodsubProtocol, FloodsubRpc, FloodsubSubscription,
46 FloodsubSubscriptionAction,
47 },
48 topic::Topic,
49 Config,
50};
51
52#[deprecated = "Use `Behaviour` instead."]
53pub type Floodsub = Behaviour;
54
55pub struct Behaviour {
57 events: VecDeque<ToSwarm<Event, FloodsubRpc>>,
59
60 config: Config,
61
62 target_peers: FnvHashSet<PeerId>,
64
65 connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
69
70 subscribed_topics: SmallVec<[Topic; 16]>,
73
74 received: CuckooFilter<DefaultHasher>,
77}
78
79impl Behaviour {
80 pub fn new(local_peer_id: PeerId) -> Self {
82 Self::from_config(Config::new(local_peer_id))
83 }
84
85 pub fn from_config(config: Config) -> Self {
87 Behaviour {
88 events: VecDeque::new(),
89 config,
90 target_peers: FnvHashSet::default(),
91 connected_peers: HashMap::new(),
92 subscribed_topics: SmallVec::new(),
93 received: CuckooFilter::new(),
94 }
95 }
96
97 #[inline]
99 pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
100 if self.connected_peers.contains_key(&peer_id) {
102 for topic in self.subscribed_topics.iter().cloned() {
103 self.events.push_back(ToSwarm::NotifyHandler {
104 peer_id,
105 handler: NotifyHandler::Any,
106 event: FloodsubRpc {
107 messages: Vec::new(),
108 subscriptions: vec![FloodsubSubscription {
109 topic,
110 action: FloodsubSubscriptionAction::Subscribe,
111 }],
112 },
113 });
114 }
115 }
116
117 if self.target_peers.insert(peer_id) {
118 self.events.push_back(ToSwarm::Dial {
119 opts: DialOpts::peer_id(peer_id).build(),
120 });
121 }
122 }
123
124 #[inline]
126 pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
127 self.target_peers.remove(peer_id);
128 }
129
130 pub fn subscribe(&mut self, topic: Topic) -> bool {
134 if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
135 return false;
136 }
137
138 for peer in self.connected_peers.keys() {
139 self.events.push_back(ToSwarm::NotifyHandler {
140 peer_id: *peer,
141 handler: NotifyHandler::Any,
142 event: FloodsubRpc {
143 messages: Vec::new(),
144 subscriptions: vec![FloodsubSubscription {
145 topic: topic.clone(),
146 action: FloodsubSubscriptionAction::Subscribe,
147 }],
148 },
149 });
150 }
151
152 self.subscribed_topics.push(topic);
153 true
154 }
155
156 pub fn unsubscribe(&mut self, topic: Topic) -> bool {
162 let Some(pos) = self.subscribed_topics.iter().position(|t| *t == topic) else {
163 return false;
164 };
165
166 self.subscribed_topics.remove(pos);
167
168 for peer in self.connected_peers.keys() {
169 self.events.push_back(ToSwarm::NotifyHandler {
170 peer_id: *peer,
171 handler: NotifyHandler::Any,
172 event: FloodsubRpc {
173 messages: Vec::new(),
174 subscriptions: vec![FloodsubSubscription {
175 topic: topic.clone(),
176 action: FloodsubSubscriptionAction::Unsubscribe,
177 }],
178 },
179 });
180 }
181
182 true
183 }
184
185 pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
187 self.publish_many(iter::once(topic), data)
188 }
189
190 pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
192 self.publish_many_any(iter::once(topic), data)
193 }
194
195 pub fn publish_many(
200 &mut self,
201 topic: impl IntoIterator<Item = impl Into<Topic>>,
202 data: impl Into<Bytes>,
203 ) {
204 self.publish_many_inner(topic, data, true)
205 }
206
207 pub fn publish_many_any(
210 &mut self,
211 topic: impl IntoIterator<Item = impl Into<Topic>>,
212 data: impl Into<Bytes>,
213 ) {
214 self.publish_many_inner(topic, data, false)
215 }
216
217 fn publish_many_inner(
218 &mut self,
219 topic: impl IntoIterator<Item = impl Into<Topic>>,
220 data: impl Into<Bytes>,
221 check_self_subscriptions: bool,
222 ) {
223 let message = FloodsubMessage {
224 source: self.config.local_peer_id,
225 data: data.into(),
226 sequence_number: rand::random::<[u8; 20]>().to_vec(),
230 topics: topic.into_iter().map(Into::into).collect(),
231 };
232
233 let self_subscribed = self
234 .subscribed_topics
235 .iter()
236 .any(|t| message.topics.iter().any(|u| t == u));
237 if self_subscribed {
238 if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
239 tracing::warn!(
240 "Message was added to 'received' Cuckoofilter but some \
241 other message was removed as a consequence: {}",
242 e,
243 );
244 }
245 if self.config.subscribe_local_messages {
246 self.events
247 .push_back(ToSwarm::GenerateEvent(Event::Message(message.clone())));
248 }
249 }
250 if check_self_subscriptions && !self_subscribed {
253 return;
254 }
255
256 for (peer_id, sub_topic) in self.connected_peers.iter() {
258 if !self.target_peers.contains(peer_id) {
260 continue;
261 }
262
263 if !sub_topic
265 .iter()
266 .any(|t| message.topics.iter().any(|u| t == u))
267 {
268 continue;
269 }
270
271 self.events.push_back(ToSwarm::NotifyHandler {
272 peer_id: *peer_id,
273 handler: NotifyHandler::Any,
274 event: FloodsubRpc {
275 subscriptions: Vec::new(),
276 messages: vec![message.clone()],
277 },
278 });
279 }
280 }
281
282 fn on_connection_established(
283 &mut self,
284 ConnectionEstablished {
285 peer_id,
286 other_established,
287 ..
288 }: ConnectionEstablished,
289 ) {
290 if other_established > 0 {
291 return;
293 }
294
295 if self.target_peers.contains(&peer_id) {
297 for topic in self.subscribed_topics.iter().cloned() {
298 self.events.push_back(ToSwarm::NotifyHandler {
299 peer_id,
300 handler: NotifyHandler::Any,
301 event: FloodsubRpc {
302 messages: Vec::new(),
303 subscriptions: vec![FloodsubSubscription {
304 topic,
305 action: FloodsubSubscriptionAction::Subscribe,
306 }],
307 },
308 });
309 }
310 }
311
312 self.connected_peers.insert(peer_id, SmallVec::new());
313 }
314
315 fn on_connection_closed(
316 &mut self,
317 ConnectionClosed {
318 peer_id,
319 remaining_established,
320 ..
321 }: ConnectionClosed,
322 ) {
323 if remaining_established > 0 {
324 return;
326 }
327
328 let was_in = self.connected_peers.remove(&peer_id);
329 debug_assert!(was_in.is_some());
330
331 if self.target_peers.contains(&peer_id) {
334 self.events.push_back(ToSwarm::Dial {
335 opts: DialOpts::peer_id(peer_id).build(),
336 });
337 }
338 }
339}
340
341impl NetworkBehaviour for Behaviour {
342 type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
343 type ToSwarm = Event;
344
345 fn handle_established_inbound_connection(
346 &mut self,
347 _: ConnectionId,
348 _: PeerId,
349 _: &Multiaddr,
350 _: &Multiaddr,
351 ) -> Result<THandler<Self>, ConnectionDenied> {
352 Ok(Default::default())
353 }
354
355 fn handle_established_outbound_connection(
356 &mut self,
357 _: ConnectionId,
358 _: PeerId,
359 _: &Multiaddr,
360 _: Endpoint,
361 _: PortUse,
362 ) -> Result<THandler<Self>, ConnectionDenied> {
363 Ok(Default::default())
364 }
365
366 fn on_connection_handler_event(
367 &mut self,
368 propagation_source: PeerId,
369 connection_id: ConnectionId,
370 event: THandlerOutEvent<Self>,
371 ) {
372 let event = match event {
374 Ok(InnerMessage::Rx(event)) => event,
375 Ok(InnerMessage::Sent) => return,
376 Err(e) => {
377 tracing::debug!("Failed to send floodsub message: {e}");
378 self.events.push_back(ToSwarm::CloseConnection {
379 peer_id: propagation_source,
380 connection: CloseConnection::One(connection_id),
381 });
382 return;
383 }
384 };
385
386 for subscription in event.subscriptions {
388 let remote_peer_topics = self.connected_peers
389 .get_mut(&propagation_source)
390 .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
391 match subscription.action {
392 FloodsubSubscriptionAction::Subscribe => {
393 if !remote_peer_topics.contains(&subscription.topic) {
394 remote_peer_topics.push(subscription.topic.clone());
395 }
396 self.events
397 .push_back(ToSwarm::GenerateEvent(Event::Subscribed {
398 peer_id: propagation_source,
399 topic: subscription.topic,
400 }));
401 }
402 FloodsubSubscriptionAction::Unsubscribe => {
403 if let Some(pos) = remote_peer_topics
404 .iter()
405 .position(|t| t == &subscription.topic)
406 {
407 remote_peer_topics.remove(pos);
408 }
409 self.events
410 .push_back(ToSwarm::GenerateEvent(Event::Unsubscribed {
411 peer_id: propagation_source,
412 topic: subscription.topic,
413 }));
414 }
415 }
416 }
417
418 let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
420
421 for message in event.messages {
422 match self.received.test_and_add(&message) {
425 Ok(true) => {} Ok(false) => continue, Err(e @ CuckooError::NotEnoughSpace) => {
428 tracing::warn!(
430 "Message was added to 'received' Cuckoofilter but some \
431 other message was removed as a consequence: {}",
432 e,
433 );
434 }
435 }
436
437 if self
439 .subscribed_topics
440 .iter()
441 .any(|t| message.topics.iter().any(|u| t == u))
442 {
443 let event = Event::Message(message.clone());
444 self.events.push_back(ToSwarm::GenerateEvent(event));
445 }
446
447 for (peer_id, subscr_topics) in self.connected_peers.iter() {
449 if peer_id == &propagation_source {
450 continue;
451 }
452
453 if !self.target_peers.contains(peer_id) {
455 continue;
456 }
457
458 if !subscr_topics
460 .iter()
461 .any(|t| message.topics.iter().any(|u| t == u))
462 {
463 continue;
464 }
465
466 if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
467 rpcs_to_dispatch[pos].1.messages.push(message.clone());
468 } else {
469 rpcs_to_dispatch.push((
470 *peer_id,
471 FloodsubRpc {
472 subscriptions: Vec::new(),
473 messages: vec![message.clone()],
474 },
475 ));
476 }
477 }
478 }
479
480 for (peer_id, rpc) in rpcs_to_dispatch {
481 self.events.push_back(ToSwarm::NotifyHandler {
482 peer_id,
483 handler: NotifyHandler::Any,
484 event: rpc,
485 });
486 }
487 }
488
489 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
490 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
491 if let Some(event) = self.events.pop_front() {
492 return Poll::Ready(event);
493 }
494
495 Poll::Pending
496 }
497
498 fn on_swarm_event(&mut self, event: FromSwarm) {
499 match event {
500 FromSwarm::ConnectionEstablished(connection_established) => {
501 self.on_connection_established(connection_established)
502 }
503 FromSwarm::ConnectionClosed(connection_closed) => {
504 self.on_connection_closed(connection_closed)
505 }
506 _ => {}
507 }
508 }
509}
510
511#[derive(Debug)]
513pub enum InnerMessage {
514 Rx(FloodsubRpc),
516 Sent,
518}
519
520impl From<FloodsubRpc> for InnerMessage {
521 #[inline]
522 fn from(rpc: FloodsubRpc) -> InnerMessage {
523 InnerMessage::Rx(rpc)
524 }
525}
526
527impl From<()> for InnerMessage {
528 #[inline]
529 fn from(_: ()) -> InnerMessage {
530 InnerMessage::Sent
531 }
532}
533
534#[deprecated = "Use `Event` instead."]
535pub type FloodsubEvent = Event;
536
537#[derive(Debug)]
539pub enum Event {
540 Message(FloodsubMessage),
542
543 Subscribed {
545 peer_id: PeerId,
547 topic: Topic,
549 },
550
551 Unsubscribed {
553 peer_id: PeerId,
555 topic: Topic,
557 },
558}