libp2p_relay/protocol/
inbound_stop.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::io;
22
23use asynchronous_codec::{Framed, FramedParts};
24use bytes::Bytes;
25use futures::prelude::*;
26use libp2p_identity::PeerId;
27use libp2p_swarm::Stream;
28use thiserror::Error;
29
30use crate::{
31    proto,
32    protocol::{self, MAX_MESSAGE_SIZE},
33};
34
35pub(crate) async fn handle_open_circuit(io: Stream) -> Result<Circuit, Error> {
36    let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
37
38    let proto::StopMessage {
39        type_pb,
40        peer,
41        limit,
42        status: _,
43    } = substream
44        .next()
45        .await
46        .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))??;
47
48    match type_pb {
49        proto::StopMessageType::CONNECT => {
50            let src_peer_id = PeerId::from_bytes(&peer.ok_or(ProtocolViolation::MissingPeer)?.id)
51                .map_err(|_| ProtocolViolation::ParsePeerId)?;
52            Ok(Circuit {
53                substream,
54                src_peer_id,
55                limit: limit.map(Into::into),
56            })
57        }
58        proto::StopMessageType::STATUS => {
59            Err(Error::Protocol(ProtocolViolation::UnexpectedTypeStatus))
60        }
61    }
62}
63
64#[derive(Debug, Error)]
65pub(crate) enum Error {
66    #[error("Protocol error")]
67    Protocol(#[from] ProtocolViolation),
68    #[error("IO error")]
69    Io(#[from] io::Error),
70}
71
72impl From<quick_protobuf_codec::Error> for Error {
73    fn from(error: quick_protobuf_codec::Error) -> Self {
74        Self::Protocol(ProtocolViolation::Codec(error))
75    }
76}
77
78#[derive(Debug, Error)]
79pub(crate) enum ProtocolViolation {
80    #[error(transparent)]
81    Codec(#[from] quick_protobuf_codec::Error),
82    #[error("Failed to parse peer id.")]
83    ParsePeerId,
84    #[error("Expected 'peer' field to be set.")]
85    MissingPeer,
86    #[error("Unexpected message type 'status'")]
87    UnexpectedTypeStatus,
88}
89
90pub(crate) struct Circuit {
91    substream: Framed<Stream, quick_protobuf_codec::Codec<proto::StopMessage>>,
92    src_peer_id: PeerId,
93    limit: Option<protocol::Limit>,
94}
95
96impl Circuit {
97    pub(crate) fn src_peer_id(&self) -> PeerId {
98        self.src_peer_id
99    }
100
101    pub(crate) fn limit(&self) -> Option<protocol::Limit> {
102        self.limit
103    }
104
105    pub(crate) async fn accept(mut self) -> Result<(Stream, Bytes), Error> {
106        let msg = proto::StopMessage {
107            type_pb: proto::StopMessageType::STATUS,
108            peer: None,
109            limit: None,
110            status: Some(proto::Status::OK),
111        };
112
113        self.send(msg).await?;
114
115        let FramedParts {
116            io,
117            read_buffer,
118            write_buffer,
119            ..
120        } = self.substream.into_parts();
121        assert!(
122            write_buffer.is_empty(),
123            "Expect a flushed Framed to have an empty write buffer."
124        );
125
126        Ok((io, read_buffer.freeze()))
127    }
128
129    pub(crate) async fn deny(mut self, status: proto::Status) -> Result<(), Error> {
130        let msg = proto::StopMessage {
131            type_pb: proto::StopMessageType::STATUS,
132            peer: None,
133            limit: None,
134            status: Some(status),
135        };
136
137        self.send(msg).await?;
138
139        Ok(())
140    }
141
142    async fn send(&mut self, msg: proto::StopMessage) -> Result<(), Error> {
143        self.substream.send(msg).await?;
144        self.substream.flush().await?;
145
146        Ok(())
147    }
148}