libp2p_request_response/
json.rs1pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;
57
58pub mod codec {
59 use std::{io, marker::PhantomData};
60
61 use async_trait::async_trait;
62 use futures::prelude::*;
63 use libp2p_swarm::StreamProtocol;
64 use serde::{de::DeserializeOwned, Serialize};
65
66 pub struct Codec<Req, Resp> {
67 request_size_maximum: u64,
69 response_size_maximum: u64,
71 phantom: PhantomData<(Req, Resp)>,
72 }
73
74 impl<Req, Resp> Default for Codec<Req, Resp> {
75 fn default() -> Self {
76 Codec {
77 request_size_maximum: 1024 * 1024,
78 response_size_maximum: 10 * 1024 * 1024,
79 phantom: PhantomData,
80 }
81 }
82 }
83
84 impl<Req, Resp> Clone for Codec<Req, Resp> {
85 fn clone(&self) -> Self {
86 Self {
87 request_size_maximum: self.request_size_maximum,
88 response_size_maximum: self.response_size_maximum,
89 phantom: self.phantom,
90 }
91 }
92 }
93
94 impl<Req, Resp> Codec<Req, Resp> {
95 pub fn set_request_size_maximum(mut self, request_size_maximum: u64) -> Self {
97 self.request_size_maximum = request_size_maximum;
98 self
99 }
100
101 pub fn set_response_size_maximum(mut self, response_size_maximum: u64) -> Self {
103 self.response_size_maximum = response_size_maximum;
104 self
105 }
106 }
107
108 #[async_trait]
109 impl<Req, Resp> crate::Codec for Codec<Req, Resp>
110 where
111 Req: Send + Serialize + DeserializeOwned,
112 Resp: Send + Serialize + DeserializeOwned,
113 {
114 type Protocol = StreamProtocol;
115 type Request = Req;
116 type Response = Resp;
117
118 async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
119 where
120 T: AsyncRead + Unpin + Send,
121 {
122 let mut vec = Vec::new();
123
124 io.take(self.request_size_maximum)
125 .read_to_end(&mut vec)
126 .await?;
127
128 Ok(serde_json::from_slice(vec.as_slice())?)
129 }
130
131 async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
132 where
133 T: AsyncRead + Unpin + Send,
134 {
135 let mut vec = Vec::new();
136
137 io.take(self.response_size_maximum)
138 .read_to_end(&mut vec)
139 .await?;
140
141 Ok(serde_json::from_slice(vec.as_slice())?)
142 }
143
144 async fn write_request<T>(
145 &mut self,
146 _: &Self::Protocol,
147 io: &mut T,
148 req: Self::Request,
149 ) -> io::Result<()>
150 where
151 T: AsyncWrite + Unpin + Send,
152 {
153 let data = serde_json::to_vec(&req)?;
154
155 io.write_all(data.as_ref()).await?;
156
157 Ok(())
158 }
159
160 async fn write_response<T>(
161 &mut self,
162 _: &Self::Protocol,
163 io: &mut T,
164 resp: Self::Response,
165 ) -> io::Result<()>
166 where
167 T: AsyncWrite + Unpin + Send,
168 {
169 let data = serde_json::to_vec(&resp)?;
170
171 io.write_all(data.as_ref()).await?;
172
173 Ok(())
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use futures::AsyncWriteExt;
181 use futures_ringbuf::Endpoint;
182 use libp2p_swarm::StreamProtocol;
183 use serde::{Deserialize, Serialize};
184
185 use crate::Codec;
186
187 #[async_std::test]
188 async fn test_codec() {
189 let expected_request = TestRequest {
190 payload: "test_payload".to_string(),
191 };
192 let expected_response = TestResponse {
193 payload: "test_payload".to_string(),
194 };
195 let protocol = StreamProtocol::new("/test_json/1");
196 let mut codec: super::codec::Codec<TestRequest, TestResponse> =
197 super::codec::Codec::default();
198
199 let (mut a, mut b) = Endpoint::pair(124, 124);
200 codec
201 .write_request(&protocol, &mut a, expected_request.clone())
202 .await
203 .expect("Should write request");
204 a.close().await.unwrap();
205
206 let actual_request = codec
207 .read_request(&protocol, &mut b)
208 .await
209 .expect("Should read request");
210 b.close().await.unwrap();
211
212 assert_eq!(actual_request, expected_request);
213
214 let (mut a, mut b) = Endpoint::pair(124, 124);
215 codec
216 .write_response(&protocol, &mut a, expected_response.clone())
217 .await
218 .expect("Should write response");
219 a.close().await.unwrap();
220
221 let actual_response = codec
222 .read_response(&protocol, &mut b)
223 .await
224 .expect("Should read response");
225 b.close().await.unwrap();
226
227 assert_eq!(actual_response, expected_response);
228 }
229
230 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
231 struct TestRequest {
232 payload: String,
233 }
234
235 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236 struct TestResponse {
237 payload: String,
238 }
239}