1pub(crate) mod handler;
24pub(crate) mod transport;
25
26use std::{
27 collections::{hash_map, HashMap, VecDeque},
28 convert::Infallible,
29 io::{Error, ErrorKind, IoSlice},
30 pin::Pin,
31 task::{Context, Poll},
32};
33
34use bytes::Bytes;
35use either::Either;
36use futures::{
37 channel::mpsc::Receiver,
38 future::{BoxFuture, FutureExt},
39 io::{AsyncRead, AsyncWrite},
40 ready,
41 stream::StreamExt,
42};
43use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
44use libp2p_identity::PeerId;
45use libp2p_swarm::{
46 behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
47 dial_opts::DialOpts,
48 dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
49 NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
50};
51use transport::Transport;
52
53use crate::{
54 multiaddr_ext::MultiaddrExt,
55 priv_client::handler::Handler,
56 protocol::{self, inbound_stop},
57};
58
59#[derive(Debug)]
61pub enum Event {
62 ReservationReqAccepted {
64 relay_peer_id: PeerId,
65 renewal: bool,
67 limit: Option<protocol::Limit>,
68 },
69 OutboundCircuitEstablished {
70 relay_peer_id: PeerId,
71 limit: Option<protocol::Limit>,
72 },
73 InboundCircuitEstablished {
75 src_peer_id: PeerId,
76 limit: Option<protocol::Limit>,
77 },
78}
79
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81enum ReservationStatus {
82 Pending,
83 Confirmed,
84}
85
86pub struct Behaviour {
89 local_peer_id: PeerId,
90
91 from_transport: Receiver<transport::TransportToBehaviourMsg>,
92 directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
95
96 reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,
101
102 queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,
104
105 pending_handler_commands: HashMap<ConnectionId, handler::In>,
106}
107
108pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
110 let (transport, from_transport) = Transport::new();
111 let behaviour = Behaviour {
112 local_peer_id,
113 from_transport,
114 directly_connected_peers: Default::default(),
115 reservation_addresses: Default::default(),
116 queued_actions: Default::default(),
117 pending_handler_commands: Default::default(),
118 };
119 (transport, behaviour)
120}
121
122impl Behaviour {
123 fn on_connection_closed(
124 &mut self,
125 ConnectionClosed {
126 peer_id,
127 connection_id,
128 endpoint,
129 ..
130 }: ConnectionClosed,
131 ) {
132 if !endpoint.is_relayed() {
133 match self.directly_connected_peers.entry(peer_id) {
134 hash_map::Entry::Occupied(mut connections) => {
135 let position = connections
136 .get()
137 .iter()
138 .position(|c| c == &connection_id)
139 .expect("Connection to be known.");
140 connections.get_mut().remove(position);
141
142 if connections.get().is_empty() {
143 connections.remove();
144 }
145 }
146 hash_map::Entry::Vacant(_) => {
147 unreachable!("`on_connection_closed` for unconnected peer.")
148 }
149 };
150 if let Some((addr, ReservationStatus::Confirmed)) =
151 self.reservation_addresses.remove(&connection_id)
152 {
153 self.queued_actions
154 .push_back(ToSwarm::ExternalAddrExpired(addr));
155 }
156 }
157 }
158}
159
160impl NetworkBehaviour for Behaviour {
161 type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
162 type ToSwarm = Event;
163
164 fn handle_established_inbound_connection(
165 &mut self,
166 connection_id: ConnectionId,
167 peer: PeerId,
168 local_addr: &Multiaddr,
169 remote_addr: &Multiaddr,
170 ) -> Result<THandler<Self>, ConnectionDenied> {
171 let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
172
173 if local_addr.is_relayed() {
174 return Ok(Either::Right(dummy::ConnectionHandler));
175 }
176 let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone());
177
178 if let Some(event) = pending_handler_command {
179 handler.on_behaviour_event(event)
180 }
181
182 Ok(Either::Left(handler))
183 }
184
185 fn handle_established_outbound_connection(
186 &mut self,
187 connection_id: ConnectionId,
188 peer: PeerId,
189 addr: &Multiaddr,
190 _: Endpoint,
191 _: PortUse,
192 ) -> Result<THandler<Self>, ConnectionDenied> {
193 let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
194
195 if addr.is_relayed() {
196 return Ok(Either::Right(dummy::ConnectionHandler));
197 }
198
199 let mut handler = Handler::new(self.local_peer_id, peer, addr.clone());
200
201 if let Some(event) = pending_handler_command {
202 handler.on_behaviour_event(event)
203 }
204
205 Ok(Either::Left(handler))
206 }
207
208 fn on_swarm_event(&mut self, event: FromSwarm) {
209 match event {
210 FromSwarm::ConnectionEstablished(ConnectionEstablished {
211 peer_id,
212 connection_id,
213 endpoint,
214 ..
215 }) if !endpoint.is_relayed() => {
216 self.directly_connected_peers
217 .entry(peer_id)
218 .or_default()
219 .push(connection_id);
220 }
221 FromSwarm::ConnectionClosed(connection_closed) => {
222 self.on_connection_closed(connection_closed)
223 }
224 FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
225 self.reservation_addresses.remove(&connection_id);
226 self.pending_handler_commands.remove(&connection_id);
227 }
228 _ => {}
229 }
230 }
231
232 fn on_connection_handler_event(
233 &mut self,
234 event_source: PeerId,
235 connection: ConnectionId,
236 handler_event: THandlerOutEvent<Self>,
237 ) {
238 let handler_event = match handler_event {
239 Either::Left(e) => e,
240 Either::Right(v) => libp2p_core::util::unreachable(v),
241 };
242
243 let event = match handler_event {
244 handler::Event::ReservationReqAccepted { renewal, limit } => {
245 let (addr, status) = self
246 .reservation_addresses
247 .get_mut(&connection)
248 .expect("Relay connection exist");
249
250 if !renewal && *status == ReservationStatus::Pending {
251 *status = ReservationStatus::Confirmed;
252 self.queued_actions
253 .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
254 }
255
256 Event::ReservationReqAccepted {
257 relay_peer_id: event_source,
258 renewal,
259 limit,
260 }
261 }
262 handler::Event::OutboundCircuitEstablished { limit } => {
263 Event::OutboundCircuitEstablished {
264 relay_peer_id: event_source,
265 limit,
266 }
267 }
268 handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
269 Event::InboundCircuitEstablished { src_peer_id, limit }
270 }
271 };
272
273 self.queued_actions.push_back(ToSwarm::GenerateEvent(event));
274 }
275
276 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
277 fn poll(
278 &mut self,
279 cx: &mut Context<'_>,
280 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
281 if let Some(action) = self.queued_actions.pop_front() {
282 return Poll::Ready(action);
283 }
284
285 let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
286 Some(transport::TransportToBehaviourMsg::ListenReq {
287 relay_peer_id,
288 relay_addr,
289 to_listener,
290 }) => {
291 match self
292 .directly_connected_peers
293 .get(&relay_peer_id)
294 .and_then(|cs| cs.first())
295 {
296 Some(connection_id) => {
297 self.reservation_addresses.insert(
298 *connection_id,
299 (
300 relay_addr
301 .with(Protocol::P2p(relay_peer_id))
302 .with(Protocol::P2pCircuit)
303 .with(Protocol::P2p(self.local_peer_id)),
304 ReservationStatus::Pending,
305 ),
306 );
307
308 ToSwarm::NotifyHandler {
309 peer_id: relay_peer_id,
310 handler: NotifyHandler::One(*connection_id),
311 event: Either::Left(handler::In::Reserve { to_listener }),
312 }
313 }
314 None => {
315 let opts = DialOpts::peer_id(relay_peer_id)
316 .addresses(vec![relay_addr.clone()])
317 .extend_addresses_through_behaviour()
318 .build();
319 let relayed_connection_id = opts.connection_id();
320
321 self.reservation_addresses.insert(
322 relayed_connection_id,
323 (
324 relay_addr
325 .with(Protocol::P2p(relay_peer_id))
326 .with(Protocol::P2pCircuit)
327 .with(Protocol::P2p(self.local_peer_id)),
328 ReservationStatus::Pending,
329 ),
330 );
331
332 self.pending_handler_commands
333 .insert(relayed_connection_id, handler::In::Reserve { to_listener });
334 ToSwarm::Dial { opts }
335 }
336 }
337 }
338 Some(transport::TransportToBehaviourMsg::DialReq {
339 relay_addr,
340 relay_peer_id,
341 dst_peer_id,
342 send_back,
343 ..
344 }) => {
345 match self
346 .directly_connected_peers
347 .get(&relay_peer_id)
348 .and_then(|cs| cs.first())
349 {
350 Some(connection_id) => ToSwarm::NotifyHandler {
351 peer_id: relay_peer_id,
352 handler: NotifyHandler::One(*connection_id),
353 event: Either::Left(handler::In::EstablishCircuit {
354 to_dial: send_back,
355 dst_peer_id,
356 }),
357 },
358 None => {
359 let opts = DialOpts::peer_id(relay_peer_id)
360 .addresses(vec![relay_addr])
361 .extend_addresses_through_behaviour()
362 .build();
363 let connection_id = opts.connection_id();
364
365 self.pending_handler_commands.insert(
366 connection_id,
367 handler::In::EstablishCircuit {
368 to_dial: send_back,
369 dst_peer_id,
370 },
371 );
372
373 ToSwarm::Dial { opts }
374 }
375 }
376 }
377 None => unreachable!(
378 "`relay::Behaviour` polled after channel from \
379 `Transport` has been closed. Unreachable under \
380 the assumption that the `client::Behaviour` is never polled after \
381 `client::Transport` is dropped.",
382 ),
383 };
384
385 Poll::Ready(action)
386 }
387}
388
389pub struct Connection {
393 pub(crate) state: ConnectionState,
394}
395
396pub(crate) enum ConnectionState {
397 InboundAccepting {
398 accept: BoxFuture<'static, Result<ConnectionState, Error>>,
399 },
400 Operational {
401 read_buffer: Bytes,
402 substream: Stream,
403 },
404}
405
406impl Unpin for ConnectionState {}
407
408impl ConnectionState {
409 pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
410 ConnectionState::InboundAccepting {
411 accept: async {
412 let (substream, read_buffer) = circuit
413 .accept()
414 .await
415 .map_err(|e| Error::new(ErrorKind::Other, e))?;
416 Ok(ConnectionState::Operational {
417 read_buffer,
418 substream,
419 })
420 }
421 .boxed(),
422 }
423 }
424
425 pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
426 ConnectionState::Operational {
427 substream,
428 read_buffer,
429 }
430 }
431}
432
433impl AsyncWrite for Connection {
434 fn poll_write(
435 mut self: Pin<&mut Self>,
436 cx: &mut Context,
437 buf: &[u8],
438 ) -> Poll<Result<usize, Error>> {
439 loop {
440 match &mut self.state {
441 ConnectionState::InboundAccepting { accept } => {
442 *self = Connection {
443 state: ready!(accept.poll_unpin(cx))?,
444 };
445 }
446 ConnectionState::Operational { substream, .. } => {
447 return Pin::new(substream).poll_write(cx, buf);
448 }
449 }
450 }
451 }
452 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
453 loop {
454 match &mut self.state {
455 ConnectionState::InboundAccepting { accept } => {
456 *self = Connection {
457 state: ready!(accept.poll_unpin(cx))?,
458 };
459 }
460 ConnectionState::Operational { substream, .. } => {
461 return Pin::new(substream).poll_flush(cx);
462 }
463 }
464 }
465 }
466 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
467 loop {
468 match &mut self.state {
469 ConnectionState::InboundAccepting { accept } => {
470 *self = Connection {
471 state: ready!(accept.poll_unpin(cx))?,
472 };
473 }
474 ConnectionState::Operational { substream, .. } => {
475 return Pin::new(substream).poll_close(cx);
476 }
477 }
478 }
479 }
480
481 fn poll_write_vectored(
482 mut self: Pin<&mut Self>,
483 cx: &mut Context,
484 bufs: &[IoSlice],
485 ) -> Poll<Result<usize, Error>> {
486 loop {
487 match &mut self.state {
488 ConnectionState::InboundAccepting { accept } => {
489 *self = Connection {
490 state: ready!(accept.poll_unpin(cx))?,
491 };
492 }
493 ConnectionState::Operational { substream, .. } => {
494 return Pin::new(substream).poll_write_vectored(cx, bufs);
495 }
496 }
497 }
498 }
499}
500
501impl AsyncRead for Connection {
502 fn poll_read(
503 mut self: Pin<&mut Self>,
504 cx: &mut Context<'_>,
505 buf: &mut [u8],
506 ) -> Poll<Result<usize, Error>> {
507 loop {
508 match &mut self.state {
509 ConnectionState::InboundAccepting { accept } => {
510 *self = Connection {
511 state: ready!(accept.poll_unpin(cx))?,
512 };
513 }
514 ConnectionState::Operational {
515 read_buffer,
516 substream,
517 ..
518 } => {
519 if !read_buffer.is_empty() {
520 let n = std::cmp::min(read_buffer.len(), buf.len());
521 let data = read_buffer.split_to(n);
522 buf[0..n].copy_from_slice(&data[..]);
523 return Poll::Ready(Ok(n));
524 }
525
526 return Pin::new(substream).poll_read(cx, buf);
527 }
528 }
529 }
530 }
531}