libp2p_relay/protocol/
outbound_stop.rs1use std::{io, time::Duration};
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use libp2p_identity::PeerId;
27use libp2p_swarm::Stream;
28use thiserror::Error;
29
30use crate::{proto, protocol::MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME};
31
32#[derive(Debug, Error)]
33pub enum Error {
34 #[error("Remote reported resource limit exceeded.")]
35 ResourceLimitExceeded,
36 #[error("Remote reported permission denied.")]
37 PermissionDenied,
38 #[error("Remote does not support the `{STOP_PROTOCOL_NAME}` protocol")]
39 Unsupported,
40 #[error("IO error")]
41 Io(#[source] io::Error),
42 #[error("Protocol error")]
43 Protocol(#[from] ProtocolViolation),
44}
45
46impl Error {
47 pub(crate) fn to_status(&self) -> proto::Status {
48 match self {
49 Error::ResourceLimitExceeded => proto::Status::RESOURCE_LIMIT_EXCEEDED,
50 Error::PermissionDenied => proto::Status::PERMISSION_DENIED,
51 Error::Unsupported => proto::Status::CONNECTION_FAILED,
52 Error::Io(_) => proto::Status::CONNECTION_FAILED,
53 Error::Protocol(
54 ProtocolViolation::UnexpectedStatus(_) | ProtocolViolation::UnexpectedTypeConnect,
55 ) => proto::Status::UNEXPECTED_MESSAGE,
56 Error::Protocol(_) => proto::Status::MALFORMED_MESSAGE,
57 }
58 }
59}
60
61#[derive(Debug, Error)]
63pub enum ProtocolViolation {
64 #[error(transparent)]
65 Codec(#[from] quick_protobuf_codec::Error),
66 #[error("Expected 'status' field to be set.")]
67 MissingStatusField,
68 #[error("Unexpected message type 'connect'")]
69 UnexpectedTypeConnect,
70 #[error("Unexpected message status '{0:?}'")]
71 UnexpectedStatus(proto::Status),
72}
73
74impl From<quick_protobuf_codec::Error> for Error {
75 fn from(e: quick_protobuf_codec::Error) -> Self {
76 Error::Protocol(ProtocolViolation::Codec(e))
77 }
78}
79
80pub(crate) async fn connect(
82 io: Stream,
83 src_peer_id: PeerId,
84 max_duration: Duration,
85 max_bytes: u64,
86) -> Result<Circuit, Error> {
87 let msg = proto::StopMessage {
88 type_pb: proto::StopMessageType::CONNECT,
89 peer: Some(proto::Peer {
90 id: src_peer_id.to_bytes(),
91 addrs: vec![],
92 }),
93 limit: Some(proto::Limit {
94 duration: Some(
95 max_duration
96 .as_secs()
97 .try_into()
98 .expect("`max_circuit_duration` not to exceed `u32::MAX`."),
99 ),
100 data: Some(max_bytes),
101 }),
102 status: None,
103 };
104
105 let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
106
107 substream.send(msg).await?;
108
109 let proto::StopMessage {
110 type_pb,
111 peer: _,
112 limit: _,
113 status,
114 } = substream
115 .next()
116 .await
117 .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))??;
118
119 match type_pb {
120 proto::StopMessageType::CONNECT => {
121 return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect))
122 }
123 proto::StopMessageType::STATUS => {}
124 }
125
126 match status {
127 Some(proto::Status::OK) => {}
128 Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => return Err(Error::ResourceLimitExceeded),
129 Some(proto::Status::PERMISSION_DENIED) => return Err(Error::PermissionDenied),
130 Some(s) => return Err(Error::Protocol(ProtocolViolation::UnexpectedStatus(s))),
131 None => return Err(Error::Protocol(ProtocolViolation::MissingStatusField)),
132 }
133
134 let FramedParts {
135 io,
136 read_buffer,
137 write_buffer,
138 ..
139 } = substream.into_parts();
140 assert!(
141 write_buffer.is_empty(),
142 "Expect a flushed Framed to have an empty write buffer."
143 );
144
145 Ok(Circuit {
146 dst_stream: io,
147 dst_pending_data: read_buffer.freeze(),
148 })
149}
150
151pub(crate) struct Circuit {
152 pub(crate) dst_stream: Stream,
153 pub(crate) dst_pending_data: Bytes,
154}