libp2p_stream/
behaviour.rs

1use core::fmt;
2use std::{
3    sync::{Arc, Mutex},
4    task::{Context, Poll},
5};
6
7use futures::{channel::mpsc, StreamExt};
8use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
9use libp2p_identity::PeerId;
10use libp2p_swarm::{
11    self as swarm, dial_opts::DialOpts, ConnectionDenied, ConnectionId, FromSwarm,
12    NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
13};
14use swarm::{
15    behaviour::ConnectionEstablished, dial_opts::PeerCondition, ConnectionClosed, DialError,
16    DialFailure,
17};
18
19use crate::{handler::Handler, shared::Shared, Control};
20
21/// A generic behaviour for stream-oriented protocols.
22pub struct Behaviour {
23    shared: Arc<Mutex<Shared>>,
24    dial_receiver: mpsc::Receiver<PeerId>,
25}
26
27impl Default for Behaviour {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl Behaviour {
34    pub fn new() -> Self {
35        let (dial_sender, dial_receiver) = mpsc::channel(0);
36
37        Self {
38            shared: Arc::new(Mutex::new(Shared::new(dial_sender))),
39            dial_receiver,
40        }
41    }
42
43    /// Obtain a new [`Control`].
44    pub fn new_control(&self) -> Control {
45        Control::new(self.shared.clone())
46    }
47}
48
49/// The protocol is already registered.
50#[derive(Debug)]
51pub struct AlreadyRegistered;
52
53impl fmt::Display for AlreadyRegistered {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        write!(f, "The protocol is already registered")
56    }
57}
58
59impl std::error::Error for AlreadyRegistered {}
60
61impl NetworkBehaviour for Behaviour {
62    type ConnectionHandler = Handler;
63    type ToSwarm = ();
64
65    fn handle_established_inbound_connection(
66        &mut self,
67        connection_id: ConnectionId,
68        peer: PeerId,
69        _: &Multiaddr,
70        _: &Multiaddr,
71    ) -> Result<THandler<Self>, ConnectionDenied> {
72        Ok(Handler::new(
73            peer,
74            self.shared.clone(),
75            Shared::lock(&self.shared).receiver(peer, connection_id),
76        ))
77    }
78
79    fn handle_established_outbound_connection(
80        &mut self,
81        connection_id: ConnectionId,
82        peer: PeerId,
83        _: &Multiaddr,
84        _: Endpoint,
85        _: PortUse,
86    ) -> Result<THandler<Self>, ConnectionDenied> {
87        Ok(Handler::new(
88            peer,
89            self.shared.clone(),
90            Shared::lock(&self.shared).receiver(peer, connection_id),
91        ))
92    }
93
94    fn on_swarm_event(&mut self, event: FromSwarm) {
95        match event {
96            FromSwarm::ConnectionEstablished(ConnectionEstablished {
97                peer_id,
98                connection_id,
99                ..
100            }) => Shared::lock(&self.shared).on_connection_established(connection_id, peer_id),
101            FromSwarm::ConnectionClosed(ConnectionClosed { connection_id, .. }) => {
102                Shared::lock(&self.shared).on_connection_closed(connection_id)
103            }
104            FromSwarm::DialFailure(DialFailure {
105                peer_id: Some(peer_id),
106                error:
107                    error @ (DialError::Transport(_)
108                    | DialError::Denied { .. }
109                    | DialError::NoAddresses
110                    | DialError::WrongPeerId { .. }),
111                ..
112            }) => {
113                let reason = error.to_string(); // We can only forward the string repr but it is better than nothing.
114
115                Shared::lock(&self.shared).on_dial_failure(peer_id, reason)
116            }
117            _ => {}
118        }
119    }
120
121    fn on_connection_handler_event(
122        &mut self,
123        _peer_id: PeerId,
124        _connection_id: ConnectionId,
125        event: THandlerOutEvent<Self>,
126    ) {
127        libp2p_core::util::unreachable(event);
128    }
129
130    fn poll(
131        &mut self,
132        cx: &mut Context<'_>,
133    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
134        if let Poll::Ready(Some(peer)) = self.dial_receiver.poll_next_unpin(cx) {
135            return Poll::Ready(ToSwarm::Dial {
136                opts: DialOpts::peer_id(peer)
137                    .condition(PeerCondition::DisconnectedAndNotDialing)
138                    .build(),
139            });
140        }
141
142        Poll::Pending
143    }
144}