libp2p_swarm/connection/pool/
task.rs1use std::{convert::Infallible, pin::Pin};
25
26use futures::{
27 channel::{mpsc, oneshot},
28 future::{poll_fn, Either, Future},
29 SinkExt, StreamExt,
30};
31use libp2p_core::muxing::StreamMuxerBox;
32
33use super::concurrent_dial::ConcurrentDial;
34use crate::{
35 connection::{
36 self, ConnectionError, ConnectionId, PendingInboundConnectionError,
37 PendingOutboundConnectionError,
38 },
39 transport::TransportError,
40 ConnectionHandler, Multiaddr, PeerId,
41};
42
43#[derive(Debug)]
45pub(crate) enum Command<T> {
46 NotifyHandler(T),
48 Close,
51}
52
53pub(crate) enum PendingConnectionEvent {
54 ConnectionEstablished {
55 id: ConnectionId,
56 output: (PeerId, StreamMuxerBox),
57 outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
61 },
62 PendingFailed {
64 id: ConnectionId,
65 error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
66 },
67}
68
69#[derive(Debug)]
70pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
71 AddressChange {
73 id: ConnectionId,
74 peer_id: PeerId,
75 new_address: Multiaddr,
76 },
77 Notify {
79 id: ConnectionId,
80 peer_id: PeerId,
81 event: ToBehaviour,
82 },
83 Closed {
88 id: ConnectionId,
89 peer_id: PeerId,
90 error: Option<ConnectionError>,
91 },
92}
93
94pub(crate) async fn new_for_pending_outgoing_connection(
95 connection_id: ConnectionId,
96 dial: ConcurrentDial,
97 abort_receiver: oneshot::Receiver<Infallible>,
98 mut events: mpsc::Sender<PendingConnectionEvent>,
99) {
100 match futures::future::select(abort_receiver, Box::pin(dial)).await {
101 Either::Left((Err(oneshot::Canceled), _)) => {
102 let _ = events
103 .send(PendingConnectionEvent::PendingFailed {
104 id: connection_id,
105 error: Either::Left(PendingOutboundConnectionError::Aborted),
106 })
107 .await;
108 }
109 Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
110 Either::Right((Ok((address, output, errors)), _)) => {
111 let _ = events
112 .send(PendingConnectionEvent::ConnectionEstablished {
113 id: connection_id,
114 output,
115 outgoing: Some((address, errors)),
116 })
117 .await;
118 }
119 Either::Right((Err(e), _)) => {
120 let _ = events
121 .send(PendingConnectionEvent::PendingFailed {
122 id: connection_id,
123 error: Either::Left(PendingOutboundConnectionError::Transport(e)),
124 })
125 .await;
126 }
127 }
128}
129
130pub(crate) async fn new_for_pending_incoming_connection<TFut>(
131 connection_id: ConnectionId,
132 future: TFut,
133 abort_receiver: oneshot::Receiver<Infallible>,
134 mut events: mpsc::Sender<PendingConnectionEvent>,
135) where
136 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
137{
138 match futures::future::select(abort_receiver, Box::pin(future)).await {
139 Either::Left((Err(oneshot::Canceled), _)) => {
140 let _ = events
141 .send(PendingConnectionEvent::PendingFailed {
142 id: connection_id,
143 error: Either::Right(PendingInboundConnectionError::Aborted),
144 })
145 .await;
146 }
147 Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
148 Either::Right((Ok(output), _)) => {
149 let _ = events
150 .send(PendingConnectionEvent::ConnectionEstablished {
151 id: connection_id,
152 output,
153 outgoing: None,
154 })
155 .await;
156 }
157 Either::Right((Err(e), _)) => {
158 let _ = events
159 .send(PendingConnectionEvent::PendingFailed {
160 id: connection_id,
161 error: Either::Right(PendingInboundConnectionError::Transport(
162 TransportError::Other(e),
163 )),
164 })
165 .await;
166 }
167 }
168}
169
170pub(crate) async fn new_for_established_connection<THandler>(
171 connection_id: ConnectionId,
172 peer_id: PeerId,
173 mut connection: crate::connection::Connection<THandler>,
174 mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
175 mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
176) where
177 THandler: ConnectionHandler,
178{
179 loop {
180 match futures::future::select(
181 command_receiver.next(),
182 poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
183 )
184 .await
185 {
186 Either::Left((Some(command), _)) => match command {
187 Command::NotifyHandler(event) => connection.on_behaviour_event(event),
188 Command::Close => {
189 command_receiver.close();
190 let (remaining_events, closing_muxer) = connection.close();
191
192 let _ = events
193 .send_all(&mut remaining_events.map(|event| {
194 Ok(EstablishedConnectionEvent::Notify {
195 id: connection_id,
196 event,
197 peer_id,
198 })
199 }))
200 .await;
201
202 let error = closing_muxer.await.err().map(ConnectionError::IO);
203
204 let _ = events
205 .send(EstablishedConnectionEvent::Closed {
206 id: connection_id,
207 peer_id,
208 error,
209 })
210 .await;
211 return;
212 }
213 },
214
215 Either::Left((None, _)) => return,
217
218 Either::Right((event, _)) => {
219 match event {
220 Ok(connection::Event::Handler(event)) => {
221 let _ = events
222 .send(EstablishedConnectionEvent::Notify {
223 id: connection_id,
224 peer_id,
225 event,
226 })
227 .await;
228 }
229 Ok(connection::Event::AddressChange(new_address)) => {
230 let _ = events
231 .send(EstablishedConnectionEvent::AddressChange {
232 id: connection_id,
233 peer_id,
234 new_address,
235 })
236 .await;
237 }
238 Err(error) => {
239 command_receiver.close();
240 let (remaining_events, _closing_muxer) = connection.close();
241
242 let _ = events
243 .send_all(&mut remaining_events.map(|event| {
244 Ok(EstablishedConnectionEvent::Notify {
245 id: connection_id,
246 event,
247 peer_id,
248 })
249 }))
250 .await;
251
252 let _ = events
254 .send(EstablishedConnectionEvent::Closed {
255 id: connection_id,
256 peer_id,
257 error: Some(error),
258 })
259 .await;
260 return;
261 }
262 }
263 }
264 }
265 }
266}