libp2p_request_response/
json.rs

1// Copyright 2023 Protocol Labs
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21/// A request-response behaviour using [`serde_json`] for serializing and deserializing the
22/// messages.
23///
24/// # Default Size Limits
25///
26/// The codec uses the following default size limits:
27/// - Maximum request size: 1,048,576 bytes (1 MiB)
28/// - Maximum response size: 10,485,760 bytes (10 MiB)
29///
30/// These limits can be customized with [`codec::Codec::set_request_size_maximum`]
31/// and [`codec::Codec::set_response_size_maximum`].
32///
33/// # Example
34///
35/// ```
36/// # use libp2p_request_response::{json, ProtocolSupport, self as request_response};
37/// # use libp2p_swarm::{StreamProtocol};
38/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
39/// struct GreetRequest {
40///     name: String,
41/// }
42///
43/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
44/// struct GreetResponse {
45///     message: String,
46/// }
47///
48/// let behaviour = json::Behaviour::<GreetRequest, GreetResponse>::new(
49///     [(
50///         StreamProtocol::new("/my-json-protocol"),
51///         ProtocolSupport::Full,
52///     )],
53///     request_response::Config::default(),
54/// );
55/// ```
56pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;
57
58pub mod codec {
59    use std::{io, marker::PhantomData};
60
61    use async_trait::async_trait;
62    use futures::prelude::*;
63    use libp2p_swarm::StreamProtocol;
64    use serde::{de::DeserializeOwned, Serialize};
65
66    pub struct Codec<Req, Resp> {
67        /// Max request size in bytes
68        request_size_maximum: u64,
69        /// Max response size in bytes
70        response_size_maximum: u64,
71        phantom: PhantomData<(Req, Resp)>,
72    }
73
74    impl<Req, Resp> Default for Codec<Req, Resp> {
75        fn default() -> Self {
76            Codec {
77                request_size_maximum: 1024 * 1024,
78                response_size_maximum: 10 * 1024 * 1024,
79                phantom: PhantomData,
80            }
81        }
82    }
83
84    impl<Req, Resp> Clone for Codec<Req, Resp> {
85        fn clone(&self) -> Self {
86            Self {
87                request_size_maximum: self.request_size_maximum,
88                response_size_maximum: self.response_size_maximum,
89                phantom: self.phantom,
90            }
91        }
92    }
93
94    impl<Req, Resp> Codec<Req, Resp> {
95        /// Sets the limit for request size in bytes.
96        pub fn set_request_size_maximum(mut self, request_size_maximum: u64) -> Self {
97            self.request_size_maximum = request_size_maximum;
98            self
99        }
100
101        /// Sets the limit for response size in bytes.
102        pub fn set_response_size_maximum(mut self, response_size_maximum: u64) -> Self {
103            self.response_size_maximum = response_size_maximum;
104            self
105        }
106    }
107
108    #[async_trait]
109    impl<Req, Resp> crate::Codec for Codec<Req, Resp>
110    where
111        Req: Send + Serialize + DeserializeOwned,
112        Resp: Send + Serialize + DeserializeOwned,
113    {
114        type Protocol = StreamProtocol;
115        type Request = Req;
116        type Response = Resp;
117
118        async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
119        where
120            T: AsyncRead + Unpin + Send,
121        {
122            let mut vec = Vec::new();
123
124            io.take(self.request_size_maximum)
125                .read_to_end(&mut vec)
126                .await?;
127
128            Ok(serde_json::from_slice(vec.as_slice())?)
129        }
130
131        async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
132        where
133            T: AsyncRead + Unpin + Send,
134        {
135            let mut vec = Vec::new();
136
137            io.take(self.response_size_maximum)
138                .read_to_end(&mut vec)
139                .await?;
140
141            Ok(serde_json::from_slice(vec.as_slice())?)
142        }
143
144        async fn write_request<T>(
145            &mut self,
146            _: &Self::Protocol,
147            io: &mut T,
148            req: Self::Request,
149        ) -> io::Result<()>
150        where
151            T: AsyncWrite + Unpin + Send,
152        {
153            let data = serde_json::to_vec(&req)?;
154
155            io.write_all(data.as_ref()).await?;
156
157            Ok(())
158        }
159
160        async fn write_response<T>(
161            &mut self,
162            _: &Self::Protocol,
163            io: &mut T,
164            resp: Self::Response,
165        ) -> io::Result<()>
166        where
167            T: AsyncWrite + Unpin + Send,
168        {
169            let data = serde_json::to_vec(&resp)?;
170
171            io.write_all(data.as_ref()).await?;
172
173            Ok(())
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use futures::AsyncWriteExt;
181    use futures_ringbuf::Endpoint;
182    use libp2p_swarm::StreamProtocol;
183    use serde::{Deserialize, Serialize};
184
185    use crate::Codec;
186
187    #[async_std::test]
188    async fn test_codec() {
189        let expected_request = TestRequest {
190            payload: "test_payload".to_string(),
191        };
192        let expected_response = TestResponse {
193            payload: "test_payload".to_string(),
194        };
195        let protocol = StreamProtocol::new("/test_json/1");
196        let mut codec: super::codec::Codec<TestRequest, TestResponse> =
197            super::codec::Codec::default();
198
199        let (mut a, mut b) = Endpoint::pair(124, 124);
200        codec
201            .write_request(&protocol, &mut a, expected_request.clone())
202            .await
203            .expect("Should write request");
204        a.close().await.unwrap();
205
206        let actual_request = codec
207            .read_request(&protocol, &mut b)
208            .await
209            .expect("Should read request");
210        b.close().await.unwrap();
211
212        assert_eq!(actual_request, expected_request);
213
214        let (mut a, mut b) = Endpoint::pair(124, 124);
215        codec
216            .write_response(&protocol, &mut a, expected_response.clone())
217            .await
218            .expect("Should write response");
219        a.close().await.unwrap();
220
221        let actual_response = codec
222            .read_response(&protocol, &mut b)
223            .await
224            .expect("Should read response");
225        b.close().await.unwrap();
226
227        assert_eq!(actual_response, expected_response);
228    }
229
230    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
231    struct TestRequest {
232        payload: String,
233    }
234
235    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236    struct TestResponse {
237        payload: String,
238    }
239}