libp2p_relay/protocol/
inbound_stop.rs1use std::io;
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use libp2p_identity::PeerId;
27use libp2p_swarm::Stream;
28use thiserror::Error;
29
30use crate::{
31 proto,
32 protocol::{self, MAX_MESSAGE_SIZE},
33};
34
35pub(crate) async fn handle_open_circuit(io: Stream) -> Result<Circuit, Error> {
36 let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
37
38 let proto::StopMessage {
39 type_pb,
40 peer,
41 limit,
42 status: _,
43 } = substream
44 .next()
45 .await
46 .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))??;
47
48 match type_pb {
49 proto::StopMessageType::CONNECT => {
50 let src_peer_id = PeerId::from_bytes(&peer.ok_or(ProtocolViolation::MissingPeer)?.id)
51 .map_err(|_| ProtocolViolation::ParsePeerId)?;
52 Ok(Circuit {
53 substream,
54 src_peer_id,
55 limit: limit.map(Into::into),
56 })
57 }
58 proto::StopMessageType::STATUS => {
59 Err(Error::Protocol(ProtocolViolation::UnexpectedTypeStatus))
60 }
61 }
62}
63
64#[derive(Debug, Error)]
65pub(crate) enum Error {
66 #[error("Protocol error")]
67 Protocol(#[from] ProtocolViolation),
68 #[error("IO error")]
69 Io(#[from] io::Error),
70}
71
72impl From<quick_protobuf_codec::Error> for Error {
73 fn from(error: quick_protobuf_codec::Error) -> Self {
74 Self::Protocol(ProtocolViolation::Codec(error))
75 }
76}
77
78#[derive(Debug, Error)]
79pub(crate) enum ProtocolViolation {
80 #[error(transparent)]
81 Codec(#[from] quick_protobuf_codec::Error),
82 #[error("Failed to parse peer id.")]
83 ParsePeerId,
84 #[error("Expected 'peer' field to be set.")]
85 MissingPeer,
86 #[error("Unexpected message type 'status'")]
87 UnexpectedTypeStatus,
88}
89
90pub(crate) struct Circuit {
91 substream: Framed<Stream, quick_protobuf_codec::Codec<proto::StopMessage>>,
92 src_peer_id: PeerId,
93 limit: Option<protocol::Limit>,
94}
95
96impl Circuit {
97 pub(crate) fn src_peer_id(&self) -> PeerId {
98 self.src_peer_id
99 }
100
101 pub(crate) fn limit(&self) -> Option<protocol::Limit> {
102 self.limit
103 }
104
105 pub(crate) async fn accept(mut self) -> Result<(Stream, Bytes), Error> {
106 let msg = proto::StopMessage {
107 type_pb: proto::StopMessageType::STATUS,
108 peer: None,
109 limit: None,
110 status: Some(proto::Status::OK),
111 };
112
113 self.send(msg).await?;
114
115 let FramedParts {
116 io,
117 read_buffer,
118 write_buffer,
119 ..
120 } = self.substream.into_parts();
121 assert!(
122 write_buffer.is_empty(),
123 "Expect a flushed Framed to have an empty write buffer."
124 );
125
126 Ok((io, read_buffer.freeze()))
127 }
128
129 pub(crate) async fn deny(mut self, status: proto::Status) -> Result<(), Error> {
130 let msg = proto::StopMessage {
131 type_pb: proto::StopMessageType::STATUS,
132 peer: None,
133 limit: None,
134 status: Some(status),
135 };
136
137 self.send(msg).await?;
138
139 Ok(())
140 }
141
142 async fn send(&mut self, msg: proto::StopMessage) -> Result<(), Error> {
143 self.substream.send(msg).await?;
144 self.substream.flush().await?;
145
146 Ok(())
147 }
148}