1use std::io;
22
23use asynchronous_codec::{FramedRead, FramedWrite};
24use futures::prelude::*;
25use libp2p_core::{multiaddr, Multiaddr, PeerRecord, SignedEnvelope};
26use libp2p_identity as identity;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::StreamProtocol;
29use thiserror::Error;
30
31use crate::proto;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39#[derive(Debug, Clone)]
41pub struct Info {
42 pub public_key: PublicKey,
44 pub protocol_version: String,
47 pub agent_version: String,
50 pub listen_addrs: Vec<Multiaddr>,
52 pub protocols: Vec<StreamProtocol>,
54 pub observed_addr: Multiaddr,
56 pub signed_peer_record: Option<SignedEnvelope>,
58}
59
60impl Info {
61 pub fn merge(&mut self, info: PushInfo) {
62 if let Some(public_key) = info.public_key {
63 self.public_key = public_key;
64 }
65 if let Some(protocol_version) = info.protocol_version {
66 self.protocol_version = protocol_version;
67 }
68 if let Some(agent_version) = info.agent_version {
69 self.agent_version = agent_version;
70 }
71 if !info.listen_addrs.is_empty() {
72 self.listen_addrs = info.listen_addrs;
73 }
74 if !info.protocols.is_empty() {
75 self.protocols = info.protocols;
76 }
77 if let Some(observed_addr) = info.observed_addr {
78 self.observed_addr = observed_addr;
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
87pub struct PushInfo {
88 pub public_key: Option<PublicKey>,
89 pub protocol_version: Option<String>,
90 pub agent_version: Option<String>,
91 pub listen_addrs: Vec<Multiaddr>,
92 pub protocols: Vec<StreamProtocol>,
93 pub observed_addr: Option<Multiaddr>,
94}
95
96pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
97where
98 T: AsyncWrite + Unpin,
99{
100 tracing::trace!("Sending: {:?}", info);
101
102 let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
103
104 let pubkey_bytes = info.public_key.encode_protobuf();
105
106 let message = proto::Identify {
107 agentVersion: Some(info.agent_version.clone()),
108 protocolVersion: Some(info.protocol_version.clone()),
109 publicKey: Some(pubkey_bytes),
110 listenAddrs: listen_addrs,
111 observedAddr: Some(info.observed_addr.to_vec()),
112 protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
113 signedPeerRecord: info
114 .signed_peer_record
115 .clone()
116 .map(|r| r.into_protobuf_encoding()),
117 };
118
119 let mut framed_io = FramedWrite::new(
120 io,
121 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
122 );
123
124 framed_io.send(message).await?;
125 framed_io.close().await?;
126
127 Ok(info)
128}
129
130pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
131where
132 T: AsyncRead + AsyncWrite + Unpin,
133{
134 let info = recv(socket).await?.try_into()?;
135
136 tracing::trace!(?info, "Received");
137
138 Ok(info)
139}
140
141pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
142where
143 T: AsyncRead + AsyncWrite + Unpin,
144{
145 let info = recv(socket).await?.try_into()?;
146
147 tracing::trace!(?info, "Received");
148
149 Ok(info)
150}
151
152async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
153where
154 T: AsyncRead + AsyncWrite + Unpin,
155{
156 let info = FramedRead::new(
162 socket,
163 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
164 )
165 .next()
166 .await
167 .ok_or(UpgradeError::StreamClosed)??;
168
169 Ok(info)
170}
171
172fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
173 listen_addrs
174 .into_iter()
175 .filter_map(|bytes| match Multiaddr::try_from(bytes) {
176 Ok(a) => Some(a),
177 Err(e) => {
178 tracing::debug!("Unable to parse multiaddr: {e:?}");
179 None
180 }
181 })
182 .collect()
183}
184
185fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
186 protocols
187 .into_iter()
188 .filter_map(|p| match StreamProtocol::try_from_owned(p) {
189 Ok(p) => Some(p),
190 Err(e) => {
191 tracing::debug!("Received invalid protocol from peer: {e}");
192 None
193 }
194 })
195 .collect()
196}
197
198fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
199 public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
200 Ok(k) => Some(k),
201 Err(e) => {
202 tracing::debug!("Unable to decode public key: {e:?}");
203 None
204 }
205 })
206}
207
208fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
209 observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
210 Ok(a) => Some(a),
211 Err(e) => {
212 tracing::debug!("Unable to parse observed multiaddr: {e:?}");
213 None
214 }
215 })
216}
217
218impl TryFrom<proto::Identify> for Info {
219 type Error = UpgradeError;
220
221 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
222 let identify_public_key = {
223 match parse_public_key(msg.publicKey) {
224 Some(key) => key,
225 None => PublicKey::try_decode_protobuf(Default::default())?,
227 }
228 };
229
230 let (listen_addrs, signed_envelope) = msg
234 .signedPeerRecord
235 .and_then(|b| {
236 let envelope = SignedEnvelope::from_protobuf_encoding(b.as_ref()).ok()?;
237 let peer_record = PeerRecord::from_signed_envelope(envelope).ok()?;
238 (peer_record.peer_id() == identify_public_key.to_peer_id()).then_some((
239 peer_record.addresses().to_vec(),
240 Some(peer_record.into_signed_envelope()),
241 ))
242 })
243 .unwrap_or_else(|| (parse_listen_addrs(msg.listenAddrs), None));
244
245 let info = Info {
246 public_key: identify_public_key,
247 protocol_version: msg.protocolVersion.unwrap_or_default(),
248 agent_version: msg.agentVersion.unwrap_or_default(),
249 listen_addrs,
250 protocols: parse_protocols(msg.protocols),
251 observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
252 signed_peer_record: signed_envelope,
253 };
254
255 Ok(info)
256 }
257}
258
259impl TryFrom<proto::Identify> for PushInfo {
260 type Error = UpgradeError;
261
262 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
263 let info = PushInfo {
264 public_key: parse_public_key(msg.publicKey),
265 protocol_version: msg.protocolVersion,
266 agent_version: msg.agentVersion,
267 listen_addrs: parse_listen_addrs(msg.listenAddrs),
268 protocols: parse_protocols(msg.protocols),
269 observed_addr: parse_observed_addr(msg.observedAddr),
270 };
271
272 Ok(info)
273 }
274}
275
276#[derive(Debug, Error)]
277pub enum UpgradeError {
278 #[error(transparent)]
279 Codec(#[from] quick_protobuf_codec::Error),
280 #[error("I/O interaction failed")]
281 Io(#[from] io::Error),
282 #[error("Stream closed")]
283 StreamClosed,
284 #[error("Failed decoding multiaddr")]
285 Multiaddr(#[from] multiaddr::Error),
286 #[error("Failed decoding public key")]
287 PublicKey(#[from] identity::DecodingError),
288}
289
290#[cfg(test)]
291mod tests {
292 use std::str::FromStr;
293
294 use libp2p_core::PeerRecord;
295 use libp2p_identity as identity;
296 use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
297
298 use super::*;
299
300 #[test]
301 fn skip_invalid_multiaddr() {
302 let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
303 let valid_multiaddr_bytes = valid_multiaddr.to_vec();
304
305 let invalid_multiaddr = {
306 let a = vec![255; 8];
307 assert!(Multiaddr::try_from(a.clone()).is_err());
308 a
309 };
310
311 let payload = proto::Identify {
312 agentVersion: None,
313 listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
314 observedAddr: None,
315 protocolVersion: None,
316 protocols: vec![],
317 publicKey: Some(
318 identity::Keypair::generate_ed25519()
319 .public()
320 .encode_protobuf(),
321 ),
322 signedPeerRecord: None,
323 };
324
325 let info = PushInfo::try_from(payload).expect("not to fail");
326
327 assert_eq!(info.listen_addrs, vec![valid_multiaddr])
328 }
329
330 #[test]
331 fn protobuf_roundtrip() {
332 let go_protobuf: [u8; 375] = [
337 0x0a, 0x27, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
338 0x2f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62, 0x2f, 0x69,
339 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
340 0x0b, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x62, 0x22, 0x86,
341 0x02, 0x0a, 0x08, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x12, 0x28, 0x0a,
342 0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69,
343 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74,
344 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a,
345 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18,
346 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65,
347 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69,
348 0x63, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x75,
349 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x69, 0x73,
350 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c,
351 0x52, 0x0b, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x12,
352 0x22, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x41, 0x64, 0x64,
353 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72,
354 0x76, 0x65, 0x64, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x6f,
355 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09,
356 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x73,
357 0x69, 0x67, 0x6e, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72,
358 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x69, 0x67, 0x6e, 0x65,
359 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x42, 0x36, 0x5a,
360 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69,
361 0x62, 0x70, 0x32, 0x70, 0x2f, 0x67, 0x6f, 0x2d, 0x6c, 0x69, 0x62, 0x70, 0x32, 0x70,
362 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f,
363 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62,
364 ];
365 let mut buf = [0u8; 375];
366 let mut message =
367 proto::Identify::from_reader(&mut BytesReader::from_bytes(&go_protobuf), &go_protobuf)
368 .expect("read to succeed");
369
370 assert_eq!(
374 String::from_utf8(
375 message
376 .signedPeerRecord
377 .clone()
378 .expect("field to be present")
379 )
380 .expect("parse to succeed"),
381 "Z4github.com/libp2p/go-libp2p/p2p/protocol/identify/pb".to_string()
382 );
383 message
384 .write_message(&mut Writer::new(&mut buf[..]))
385 .expect("same length after roundtrip");
386 assert_eq!(go_protobuf, buf);
387
388 let identity = identity::Keypair::generate_ed25519();
389 let record = PeerRecord::new(
390 &identity,
391 vec![Multiaddr::from_str("/ip4/0.0.0.0").expect("parse to succeed")],
392 )
393 .expect("infallible siging using ed25519");
394 message
395 .signedPeerRecord
396 .replace(record.into_signed_envelope().into_protobuf_encoding());
397 let mut buf = Vec::new();
398 message
399 .write_message(&mut Writer::new(&mut buf))
400 .expect("write to succeed");
401 let parsed_message = proto::Identify::from_reader(&mut BytesReader::from_bytes(&buf), &buf)
402 .expect("read to succeed");
403 assert_eq!(message, parsed_message)
404 }
405}