libp2p_stream/
behaviour.rs1use core::fmt;
2use std::{
3 sync::{Arc, Mutex},
4 task::{Context, Poll},
5};
6
7use futures::{channel::mpsc, StreamExt};
8use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
9use libp2p_identity::PeerId;
10use libp2p_swarm::{
11 self as swarm, dial_opts::DialOpts, ConnectionDenied, ConnectionId, FromSwarm,
12 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
13};
14use swarm::{
15 behaviour::ConnectionEstablished, dial_opts::PeerCondition, ConnectionClosed, DialError,
16 DialFailure,
17};
18
19use crate::{handler::Handler, shared::Shared, Control};
20
21pub struct Behaviour {
23 shared: Arc<Mutex<Shared>>,
24 dial_receiver: mpsc::Receiver<PeerId>,
25}
26
27impl Default for Behaviour {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl Behaviour {
34 pub fn new() -> Self {
35 let (dial_sender, dial_receiver) = mpsc::channel(0);
36
37 Self {
38 shared: Arc::new(Mutex::new(Shared::new(dial_sender))),
39 dial_receiver,
40 }
41 }
42
43 pub fn new_control(&self) -> Control {
45 Control::new(self.shared.clone())
46 }
47}
48
49#[derive(Debug)]
51pub struct AlreadyRegistered;
52
53impl fmt::Display for AlreadyRegistered {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 write!(f, "The protocol is already registered")
56 }
57}
58
59impl std::error::Error for AlreadyRegistered {}
60
61impl NetworkBehaviour for Behaviour {
62 type ConnectionHandler = Handler;
63 type ToSwarm = ();
64
65 fn handle_established_inbound_connection(
66 &mut self,
67 connection_id: ConnectionId,
68 peer: PeerId,
69 _: &Multiaddr,
70 _: &Multiaddr,
71 ) -> Result<THandler<Self>, ConnectionDenied> {
72 Ok(Handler::new(
73 peer,
74 self.shared.clone(),
75 Shared::lock(&self.shared).receiver(peer, connection_id),
76 ))
77 }
78
79 fn handle_established_outbound_connection(
80 &mut self,
81 connection_id: ConnectionId,
82 peer: PeerId,
83 _: &Multiaddr,
84 _: Endpoint,
85 _: PortUse,
86 ) -> Result<THandler<Self>, ConnectionDenied> {
87 Ok(Handler::new(
88 peer,
89 self.shared.clone(),
90 Shared::lock(&self.shared).receiver(peer, connection_id),
91 ))
92 }
93
94 fn on_swarm_event(&mut self, event: FromSwarm) {
95 match event {
96 FromSwarm::ConnectionEstablished(ConnectionEstablished {
97 peer_id,
98 connection_id,
99 ..
100 }) => Shared::lock(&self.shared).on_connection_established(connection_id, peer_id),
101 FromSwarm::ConnectionClosed(ConnectionClosed { connection_id, .. }) => {
102 Shared::lock(&self.shared).on_connection_closed(connection_id)
103 }
104 FromSwarm::DialFailure(DialFailure {
105 peer_id: Some(peer_id),
106 error:
107 error @ (DialError::Transport(_)
108 | DialError::Denied { .. }
109 | DialError::NoAddresses
110 | DialError::WrongPeerId { .. }),
111 ..
112 }) => {
113 let reason = error.to_string(); Shared::lock(&self.shared).on_dial_failure(peer_id, reason)
116 }
117 _ => {}
118 }
119 }
120
121 fn on_connection_handler_event(
122 &mut self,
123 _peer_id: PeerId,
124 _connection_id: ConnectionId,
125 event: THandlerOutEvent<Self>,
126 ) {
127 libp2p_core::util::unreachable(event);
128 }
129
130 fn poll(
131 &mut self,
132 cx: &mut Context<'_>,
133 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
134 if let Poll::Ready(Some(peer)) = self.dial_receiver.poll_next_unpin(cx) {
135 return Poll::Ready(ToSwarm::Dial {
136 opts: DialOpts::peer_id(peer)
137 .condition(PeerCondition::DisconnectedAndNotDialing)
138 .build(),
139 });
140 }
141
142 Poll::Pending
143 }
144}