libp2p_relay/protocol/
outbound_stop.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{io, time::Duration};
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use libp2p_identity::PeerId;
27use libp2p_swarm::Stream;
28use thiserror::Error;
29
30use crate::{proto, protocol::MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME};
31
32#[derive(Debug, Error)]
33pub enum Error {
34    #[error("Remote reported resource limit exceeded.")]
35    ResourceLimitExceeded,
36    #[error("Remote reported permission denied.")]
37    PermissionDenied,
38    #[error("Remote does not support the `{STOP_PROTOCOL_NAME}` protocol")]
39    Unsupported,
40    #[error("IO error")]
41    Io(#[source] io::Error),
42    #[error("Protocol error")]
43    Protocol(#[from] ProtocolViolation),
44}
45
46impl Error {
47    pub(crate) fn to_status(&self) -> proto::Status {
48        match self {
49            Error::ResourceLimitExceeded => proto::Status::RESOURCE_LIMIT_EXCEEDED,
50            Error::PermissionDenied => proto::Status::PERMISSION_DENIED,
51            Error::Unsupported => proto::Status::CONNECTION_FAILED,
52            Error::Io(_) => proto::Status::CONNECTION_FAILED,
53            Error::Protocol(
54                ProtocolViolation::UnexpectedStatus(_) | ProtocolViolation::UnexpectedTypeConnect,
55            ) => proto::Status::UNEXPECTED_MESSAGE,
56            Error::Protocol(_) => proto::Status::MALFORMED_MESSAGE,
57        }
58    }
59}
60
61/// Depicts all forms of protocol violations.
62#[derive(Debug, Error)]
63pub enum ProtocolViolation {
64    #[error(transparent)]
65    Codec(#[from] quick_protobuf_codec::Error),
66    #[error("Expected 'status' field to be set.")]
67    MissingStatusField,
68    #[error("Unexpected message type 'connect'")]
69    UnexpectedTypeConnect,
70    #[error("Unexpected message status '{0:?}'")]
71    UnexpectedStatus(proto::Status),
72}
73
74impl From<quick_protobuf_codec::Error> for Error {
75    fn from(e: quick_protobuf_codec::Error) -> Self {
76        Error::Protocol(ProtocolViolation::Codec(e))
77    }
78}
79
80/// Attempts to _connect_ to a peer via the given stream.
81pub(crate) async fn connect(
82    io: Stream,
83    src_peer_id: PeerId,
84    max_duration: Duration,
85    max_bytes: u64,
86) -> Result<Circuit, Error> {
87    let msg = proto::StopMessage {
88        type_pb: proto::StopMessageType::CONNECT,
89        peer: Some(proto::Peer {
90            id: src_peer_id.to_bytes(),
91            addrs: vec![],
92        }),
93        limit: Some(proto::Limit {
94            duration: Some(
95                max_duration
96                    .as_secs()
97                    .try_into()
98                    .expect("`max_circuit_duration` not to exceed `u32::MAX`."),
99            ),
100            data: Some(max_bytes),
101        }),
102        status: None,
103    };
104
105    let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
106
107    substream.send(msg).await?;
108
109    let proto::StopMessage {
110        type_pb,
111        peer: _,
112        limit: _,
113        status,
114    } = substream
115        .next()
116        .await
117        .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))??;
118
119    match type_pb {
120        proto::StopMessageType::CONNECT => {
121            return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect))
122        }
123        proto::StopMessageType::STATUS => {}
124    }
125
126    match status {
127        Some(proto::Status::OK) => {}
128        Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => return Err(Error::ResourceLimitExceeded),
129        Some(proto::Status::PERMISSION_DENIED) => return Err(Error::PermissionDenied),
130        Some(s) => return Err(Error::Protocol(ProtocolViolation::UnexpectedStatus(s))),
131        None => return Err(Error::Protocol(ProtocolViolation::MissingStatusField)),
132    }
133
134    let FramedParts {
135        io,
136        read_buffer,
137        write_buffer,
138        ..
139    } = substream.into_parts();
140    assert!(
141        write_buffer.is_empty(),
142        "Expect a flushed Framed to have an empty write buffer."
143    );
144
145    Ok(Circuit {
146        dst_stream: io,
147        dst_pending_data: read_buffer.freeze(),
148    })
149}
150
151pub(crate) struct Circuit {
152    pub(crate) dst_stream: Stream,
153    pub(crate) dst_pending_data: Bytes,
154}