libp2p_swarm/connection/pool/
task.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22//! Async functions driving pending and established connections in the form of a task.
23
24use std::{convert::Infallible, pin::Pin};
25
26use futures::{
27    channel::{mpsc, oneshot},
28    future::{poll_fn, Either, Future},
29    SinkExt, StreamExt,
30};
31use libp2p_core::muxing::StreamMuxerBox;
32
33use super::concurrent_dial::ConcurrentDial;
34use crate::{
35    connection::{
36        self, ConnectionError, ConnectionId, PendingInboundConnectionError,
37        PendingOutboundConnectionError,
38    },
39    transport::TransportError,
40    ConnectionHandler, Multiaddr, PeerId,
41};
42
43/// Commands that can be sent to a task driving an established connection.
44#[derive(Debug)]
45pub(crate) enum Command<T> {
46    /// Notify the connection handler of an event.
47    NotifyHandler(T),
48    /// Gracefully close the connection (active close) before
49    /// terminating the task.
50    Close,
51}
52
53pub(crate) enum PendingConnectionEvent {
54    ConnectionEstablished {
55        id: ConnectionId,
56        output: (PeerId, StreamMuxerBox),
57        /// [`Some`] when the new connection is an outgoing connection.
58        /// Addresses are dialed in parallel. Contains the addresses and errors
59        /// of dial attempts that failed before the one successful dial.
60        outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
61    },
62    /// A pending connection failed.
63    PendingFailed {
64        id: ConnectionId,
65        error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
66    },
67}
68
69#[derive(Debug)]
70pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
71    /// A node we are connected to has changed its address.
72    AddressChange {
73        id: ConnectionId,
74        peer_id: PeerId,
75        new_address: Multiaddr,
76    },
77    /// Notify the manager of an event from the connection.
78    Notify {
79        id: ConnectionId,
80        peer_id: PeerId,
81        event: ToBehaviour,
82    },
83    /// A connection closed, possibly due to an error.
84    ///
85    /// If `error` is `None`, the connection has completed
86    /// an active orderly close.
87    Closed {
88        id: ConnectionId,
89        peer_id: PeerId,
90        error: Option<ConnectionError>,
91    },
92}
93
94pub(crate) async fn new_for_pending_outgoing_connection(
95    connection_id: ConnectionId,
96    dial: ConcurrentDial,
97    abort_receiver: oneshot::Receiver<Infallible>,
98    mut events: mpsc::Sender<PendingConnectionEvent>,
99) {
100    match futures::future::select(abort_receiver, Box::pin(dial)).await {
101        Either::Left((Err(oneshot::Canceled), _)) => {
102            let _ = events
103                .send(PendingConnectionEvent::PendingFailed {
104                    id: connection_id,
105                    error: Either::Left(PendingOutboundConnectionError::Aborted),
106                })
107                .await;
108        }
109        Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
110        Either::Right((Ok((address, output, errors)), _)) => {
111            let _ = events
112                .send(PendingConnectionEvent::ConnectionEstablished {
113                    id: connection_id,
114                    output,
115                    outgoing: Some((address, errors)),
116                })
117                .await;
118        }
119        Either::Right((Err(e), _)) => {
120            let _ = events
121                .send(PendingConnectionEvent::PendingFailed {
122                    id: connection_id,
123                    error: Either::Left(PendingOutboundConnectionError::Transport(e)),
124                })
125                .await;
126        }
127    }
128}
129
130pub(crate) async fn new_for_pending_incoming_connection<TFut>(
131    connection_id: ConnectionId,
132    future: TFut,
133    abort_receiver: oneshot::Receiver<Infallible>,
134    mut events: mpsc::Sender<PendingConnectionEvent>,
135) where
136    TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
137{
138    match futures::future::select(abort_receiver, Box::pin(future)).await {
139        Either::Left((Err(oneshot::Canceled), _)) => {
140            let _ = events
141                .send(PendingConnectionEvent::PendingFailed {
142                    id: connection_id,
143                    error: Either::Right(PendingInboundConnectionError::Aborted),
144                })
145                .await;
146        }
147        Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
148        Either::Right((Ok(output), _)) => {
149            let _ = events
150                .send(PendingConnectionEvent::ConnectionEstablished {
151                    id: connection_id,
152                    output,
153                    outgoing: None,
154                })
155                .await;
156        }
157        Either::Right((Err(e), _)) => {
158            let _ = events
159                .send(PendingConnectionEvent::PendingFailed {
160                    id: connection_id,
161                    error: Either::Right(PendingInboundConnectionError::Transport(
162                        TransportError::Other(e),
163                    )),
164                })
165                .await;
166        }
167    }
168}
169
170pub(crate) async fn new_for_established_connection<THandler>(
171    connection_id: ConnectionId,
172    peer_id: PeerId,
173    mut connection: crate::connection::Connection<THandler>,
174    mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
175    mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
176) where
177    THandler: ConnectionHandler,
178{
179    loop {
180        match futures::future::select(
181            command_receiver.next(),
182            poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
183        )
184        .await
185        {
186            Either::Left((Some(command), _)) => match command {
187                Command::NotifyHandler(event) => connection.on_behaviour_event(event),
188                Command::Close => {
189                    command_receiver.close();
190                    let (remaining_events, closing_muxer) = connection.close();
191
192                    let _ = events
193                        .send_all(&mut remaining_events.map(|event| {
194                            Ok(EstablishedConnectionEvent::Notify {
195                                id: connection_id,
196                                event,
197                                peer_id,
198                            })
199                        }))
200                        .await;
201
202                    let error = closing_muxer.await.err().map(ConnectionError::IO);
203
204                    let _ = events
205                        .send(EstablishedConnectionEvent::Closed {
206                            id: connection_id,
207                            peer_id,
208                            error,
209                        })
210                        .await;
211                    return;
212                }
213            },
214
215            // The manager has disappeared; abort.
216            Either::Left((None, _)) => return,
217
218            Either::Right((event, _)) => {
219                match event {
220                    Ok(connection::Event::Handler(event)) => {
221                        let _ = events
222                            .send(EstablishedConnectionEvent::Notify {
223                                id: connection_id,
224                                peer_id,
225                                event,
226                            })
227                            .await;
228                    }
229                    Ok(connection::Event::AddressChange(new_address)) => {
230                        let _ = events
231                            .send(EstablishedConnectionEvent::AddressChange {
232                                id: connection_id,
233                                peer_id,
234                                new_address,
235                            })
236                            .await;
237                    }
238                    Err(error) => {
239                        command_receiver.close();
240                        let (remaining_events, _closing_muxer) = connection.close();
241
242                        let _ = events
243                            .send_all(&mut remaining_events.map(|event| {
244                                Ok(EstablishedConnectionEvent::Notify {
245                                    id: connection_id,
246                                    event,
247                                    peer_id,
248                                })
249                            }))
250                            .await;
251
252                        // Terminate the task with the error, dropping the connection.
253                        let _ = events
254                            .send(EstablishedConnectionEvent::Closed {
255                                id: connection_id,
256                                peer_id,
257                                error: Some(error),
258                            })
259                            .await;
260                        return;
261                    }
262                }
263            }
264        }
265    }
266}