libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::io;
22
23use asynchronous_codec::{FramedRead, FramedWrite};
24use futures::prelude::*;
25use libp2p_core::{multiaddr, Multiaddr, PeerRecord, SignedEnvelope};
26use libp2p_identity as identity;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::StreamProtocol;
29use thiserror::Error;
30
31use crate::proto;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39/// Identify information of a peer sent in protocol messages.
40#[derive(Debug, Clone)]
41pub struct Info {
42    /// The public key of the peer.
43    pub public_key: PublicKey,
44    /// Application-specific version of the protocol family used by the peer,
45    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
46    pub protocol_version: String,
47    /// Name and version of the peer, similar to the `User-Agent` header in
48    /// the HTTP protocol.
49    pub agent_version: String,
50    /// The addresses that the peer is listening on.
51    pub listen_addrs: Vec<Multiaddr>,
52    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
53    pub protocols: Vec<StreamProtocol>,
54    /// Address observed by or for the remote.
55    pub observed_addr: Multiaddr,
56    /// Verifiable addresses of the peer.
57    pub signed_peer_record: Option<SignedEnvelope>,
58}
59
60impl Info {
61    pub fn merge(&mut self, info: PushInfo) {
62        if let Some(public_key) = info.public_key {
63            self.public_key = public_key;
64        }
65        if let Some(protocol_version) = info.protocol_version {
66            self.protocol_version = protocol_version;
67        }
68        if let Some(agent_version) = info.agent_version {
69            self.agent_version = agent_version;
70        }
71        if !info.listen_addrs.is_empty() {
72            self.listen_addrs = info.listen_addrs;
73        }
74        if !info.protocols.is_empty() {
75            self.protocols = info.protocols;
76        }
77        if let Some(observed_addr) = info.observed_addr {
78            self.observed_addr = observed_addr;
79        }
80    }
81}
82
83/// Identify push information of a peer sent in protocol messages.
84/// Note that missing fields should be ignored, as peers may choose to send partial updates
85/// containing only the fields whose values have changed.
86#[derive(Debug, Clone)]
87pub struct PushInfo {
88    pub public_key: Option<PublicKey>,
89    pub protocol_version: Option<String>,
90    pub agent_version: Option<String>,
91    pub listen_addrs: Vec<Multiaddr>,
92    pub protocols: Vec<StreamProtocol>,
93    pub observed_addr: Option<Multiaddr>,
94}
95
96pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
97where
98    T: AsyncWrite + Unpin,
99{
100    tracing::trace!("Sending: {:?}", info);
101
102    let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
103
104    let pubkey_bytes = info.public_key.encode_protobuf();
105
106    let message = proto::Identify {
107        agentVersion: Some(info.agent_version.clone()),
108        protocolVersion: Some(info.protocol_version.clone()),
109        publicKey: Some(pubkey_bytes),
110        listenAddrs: listen_addrs,
111        observedAddr: Some(info.observed_addr.to_vec()),
112        protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
113        signedPeerRecord: info
114            .signed_peer_record
115            .clone()
116            .map(|r| r.into_protobuf_encoding()),
117    };
118
119    let mut framed_io = FramedWrite::new(
120        io,
121        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
122    );
123
124    framed_io.send(message).await?;
125    framed_io.close().await?;
126
127    Ok(info)
128}
129
130pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
131where
132    T: AsyncRead + AsyncWrite + Unpin,
133{
134    let info = recv(socket).await?.try_into()?;
135
136    tracing::trace!(?info, "Received");
137
138    Ok(info)
139}
140
141pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
142where
143    T: AsyncRead + AsyncWrite + Unpin,
144{
145    let info = recv(socket).await?.try_into()?;
146
147    tracing::trace!(?info, "Received");
148
149    Ok(info)
150}
151
152async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
153where
154    T: AsyncRead + AsyncWrite + Unpin,
155{
156    // Even though we won't write to the stream anymore we don't close it here.
157    // The reason for this is that the `close` call on some transport's require the
158    // remote's ACK, but it could be that the remote already dropped the stream
159    // after finishing their write.
160
161    let info = FramedRead::new(
162        socket,
163        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
164    )
165    .next()
166    .await
167    .ok_or(UpgradeError::StreamClosed)??;
168
169    Ok(info)
170}
171
172fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
173    listen_addrs
174        .into_iter()
175        .filter_map(|bytes| match Multiaddr::try_from(bytes) {
176            Ok(a) => Some(a),
177            Err(e) => {
178                tracing::debug!("Unable to parse multiaddr: {e:?}");
179                None
180            }
181        })
182        .collect()
183}
184
185fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
186    protocols
187        .into_iter()
188        .filter_map(|p| match StreamProtocol::try_from_owned(p) {
189            Ok(p) => Some(p),
190            Err(e) => {
191                tracing::debug!("Received invalid protocol from peer: {e}");
192                None
193            }
194        })
195        .collect()
196}
197
198fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
199    public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
200        Ok(k) => Some(k),
201        Err(e) => {
202            tracing::debug!("Unable to decode public key: {e:?}");
203            None
204        }
205    })
206}
207
208fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
209    observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
210        Ok(a) => Some(a),
211        Err(e) => {
212            tracing::debug!("Unable to parse observed multiaddr: {e:?}");
213            None
214        }
215    })
216}
217
218impl TryFrom<proto::Identify> for Info {
219    type Error = UpgradeError;
220
221    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
222        let identify_public_key = {
223            match parse_public_key(msg.publicKey) {
224                Some(key) => key,
225                // This will always produce a DecodingError if the public key is missing.
226                None => PublicKey::try_decode_protobuf(Default::default())?,
227            }
228        };
229
230        // When signedPeerRecord contains valid addresses, ignore addresses in listenAddrs.
231        // When signedPeerRecord is invalid or signed by others, ignore the signedPeerRecord(set to
232        // `None`).
233        let (listen_addrs, signed_envelope) = msg
234            .signedPeerRecord
235            .and_then(|b| {
236                let envelope = SignedEnvelope::from_protobuf_encoding(b.as_ref()).ok()?;
237                let peer_record = PeerRecord::from_signed_envelope(envelope).ok()?;
238                (peer_record.peer_id() == identify_public_key.to_peer_id()).then_some((
239                    peer_record.addresses().to_vec(),
240                    Some(peer_record.into_signed_envelope()),
241                ))
242            })
243            .unwrap_or_else(|| (parse_listen_addrs(msg.listenAddrs), None));
244
245        let info = Info {
246            public_key: identify_public_key,
247            protocol_version: msg.protocolVersion.unwrap_or_default(),
248            agent_version: msg.agentVersion.unwrap_or_default(),
249            listen_addrs,
250            protocols: parse_protocols(msg.protocols),
251            observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
252            signed_peer_record: signed_envelope,
253        };
254
255        Ok(info)
256    }
257}
258
259impl TryFrom<proto::Identify> for PushInfo {
260    type Error = UpgradeError;
261
262    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
263        let info = PushInfo {
264            public_key: parse_public_key(msg.publicKey),
265            protocol_version: msg.protocolVersion,
266            agent_version: msg.agentVersion,
267            listen_addrs: parse_listen_addrs(msg.listenAddrs),
268            protocols: parse_protocols(msg.protocols),
269            observed_addr: parse_observed_addr(msg.observedAddr),
270        };
271
272        Ok(info)
273    }
274}
275
276#[derive(Debug, Error)]
277pub enum UpgradeError {
278    #[error(transparent)]
279    Codec(#[from] quick_protobuf_codec::Error),
280    #[error("I/O interaction failed")]
281    Io(#[from] io::Error),
282    #[error("Stream closed")]
283    StreamClosed,
284    #[error("Failed decoding multiaddr")]
285    Multiaddr(#[from] multiaddr::Error),
286    #[error("Failed decoding public key")]
287    PublicKey(#[from] identity::DecodingError),
288}
289
290#[cfg(test)]
291mod tests {
292    use std::str::FromStr;
293
294    use libp2p_core::PeerRecord;
295    use libp2p_identity as identity;
296    use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
297
298    use super::*;
299
300    #[test]
301    fn skip_invalid_multiaddr() {
302        let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
303        let valid_multiaddr_bytes = valid_multiaddr.to_vec();
304
305        let invalid_multiaddr = {
306            let a = vec![255; 8];
307            assert!(Multiaddr::try_from(a.clone()).is_err());
308            a
309        };
310
311        let payload = proto::Identify {
312            agentVersion: None,
313            listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
314            observedAddr: None,
315            protocolVersion: None,
316            protocols: vec![],
317            publicKey: Some(
318                identity::Keypair::generate_ed25519()
319                    .public()
320                    .encode_protobuf(),
321            ),
322            signedPeerRecord: None,
323        };
324
325        let info = PushInfo::try_from(payload).expect("not to fail");
326
327        assert_eq!(info.listen_addrs, vec![valid_multiaddr])
328    }
329
330    #[test]
331    fn protobuf_roundtrip() {
332        // from go implementation of identify,
333        // see https://github.com/libp2p/go-libp2p/blob/2209ae05976df6a1cc2631c961f57549d109008c/p2p/protocol/identify/pb/identify.pb.go#L133
334        // signedPeerRecord field is a dummy one that can't be properly parsed into SignedEnvelope,
335        // but the wire format doesn't care.
336        let go_protobuf: [u8; 375] = [
337            0x0a, 0x27, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
338            0x2f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62, 0x2f, 0x69,
339            0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
340            0x0b, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x62, 0x22, 0x86,
341            0x02, 0x0a, 0x08, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x12, 0x28, 0x0a,
342            0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69,
343            0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74,
344            0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a,
345            0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18,
346            0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65,
347            0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69,
348            0x63, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x75,
349            0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x69, 0x73,
350            0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c,
351            0x52, 0x0b, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x12,
352            0x22, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x41, 0x64, 0x64,
353            0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72,
354            0x76, 0x65, 0x64, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x6f,
355            0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09,
356            0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x73,
357            0x69, 0x67, 0x6e, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72,
358            0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x69, 0x67, 0x6e, 0x65,
359            0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x42, 0x36, 0x5a,
360            0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69,
361            0x62, 0x70, 0x32, 0x70, 0x2f, 0x67, 0x6f, 0x2d, 0x6c, 0x69, 0x62, 0x70, 0x32, 0x70,
362            0x2f, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f,
363            0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62,
364        ];
365        let mut buf = [0u8; 375];
366        let mut message =
367            proto::Identify::from_reader(&mut BytesReader::from_bytes(&go_protobuf), &go_protobuf)
368                .expect("read to succeed");
369
370        // The actual bytes they put in is "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb".
371        // Starting with Z4 means it is zig-zag-encoded 4-byte varint of string, appended by
372        // protobuf.
373        assert_eq!(
374            String::from_utf8(
375                message
376                    .signedPeerRecord
377                    .clone()
378                    .expect("field to be present")
379            )
380            .expect("parse to succeed"),
381            "Z4github.com/libp2p/go-libp2p/p2p/protocol/identify/pb".to_string()
382        );
383        message
384            .write_message(&mut Writer::new(&mut buf[..]))
385            .expect("same length after roundtrip");
386        assert_eq!(go_protobuf, buf);
387
388        let identity = identity::Keypair::generate_ed25519();
389        let record = PeerRecord::new(
390            &identity,
391            vec![Multiaddr::from_str("/ip4/0.0.0.0").expect("parse to succeed")],
392        )
393        .expect("infallible siging using ed25519");
394        message
395            .signedPeerRecord
396            .replace(record.into_signed_envelope().into_protobuf_encoding());
397        let mut buf = Vec::new();
398        message
399            .write_message(&mut Writer::new(&mut buf))
400            .expect("write to succeed");
401        let parsed_message = proto::Identify::from_reader(&mut BytesReader::from_bytes(&buf), &buf)
402            .expect("read to succeed");
403        assert_eq!(message, parsed_message)
404    }
405}