stream_example/
main.rs

1use std::{io, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
5use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
6use libp2p_stream as stream;
7use rand::RngCore;
8use tracing::level_filters::LevelFilter;
9use tracing_subscriber::EnvFilter;
10
11const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");
12
13#[tokio::main]
14async fn main() -> Result<()> {
15    tracing_subscriber::fmt()
16        .with_env_filter(
17            EnvFilter::builder()
18                .with_default_directive(LevelFilter::INFO.into())
19                .from_env()?,
20        )
21        .init();
22
23    let maybe_address = std::env::args()
24        .nth(1)
25        .map(|arg| arg.parse::<Multiaddr>())
26        .transpose()
27        .context("Failed to parse argument as `Multiaddr`")?;
28
29    let mut swarm = libp2p::SwarmBuilder::with_new_identity()
30        .with_tokio()
31        .with_quic()
32        .with_behaviour(|_| stream::Behaviour::new())?
33        .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
34        .build();
35
36    swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;
37
38    let mut incoming_streams = swarm
39        .behaviour()
40        .new_control()
41        .accept(ECHO_PROTOCOL)
42        .unwrap();
43
44    // Deal with incoming streams.
45    // Spawning a dedicated task is just one way of doing this.
46    // libp2p doesn't care how you handle incoming streams but you _must_ handle them somehow.
47    // To mitigate DoS attacks, libp2p will internally drop incoming streams if your application
48    // cannot keep up processing them.
49    tokio::spawn(async move {
50        // This loop handles incoming streams _sequentially_ but that doesn't have to be the case.
51        // You can also spawn a dedicated task per stream if you want to.
52        // Be aware that this breaks backpressure though as spawning new tasks is equivalent to an
53        // unbounded buffer. Each task needs memory meaning an aggressive remote peer may
54        // force you OOM this way.
55
56        while let Some((peer, stream)) = incoming_streams.next().await {
57            match echo(stream).await {
58                Ok(n) => {
59                    tracing::info!(%peer, "Echoed {n} bytes!");
60                }
61                Err(e) => {
62                    tracing::warn!(%peer, "Echo failed: {e}");
63                    continue;
64                }
65            };
66        }
67    });
68
69    // In this demo application, the dialing peer initiates the protocol.
70    if let Some(address) = maybe_address {
71        let Some(Protocol::P2p(peer_id)) = address.iter().last() else {
72            anyhow::bail!("Provided address does not end in `/p2p`");
73        };
74
75        swarm.dial(address)?;
76
77        tokio::spawn(connection_handler(peer_id, swarm.behaviour().new_control()));
78    }
79
80    // Poll the swarm to make progress.
81    loop {
82        let event = swarm.next().await.expect("never terminates");
83
84        match event {
85            libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
86                let listen_address = address.with_p2p(*swarm.local_peer_id()).unwrap();
87                tracing::info!(%listen_address);
88            }
89            event => tracing::trace!(?event),
90        }
91    }
92}
93
94/// A very simple, `async fn`-based connection handler for our custom echo protocol.
95async fn connection_handler(peer: PeerId, mut control: stream::Control) {
96    loop {
97        tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.
98
99        let stream = match control.open_stream(peer, ECHO_PROTOCOL).await {
100            Ok(stream) => stream,
101            Err(error @ stream::OpenStreamError::UnsupportedProtocol(_)) => {
102                tracing::info!(%peer, %error);
103                return;
104            }
105            Err(error) => {
106                // Other errors may be temporary.
107                // In production, something like an exponential backoff / circuit-breaker may be
108                // more appropriate.
109                tracing::debug!(%peer, %error);
110                continue;
111            }
112        };
113
114        if let Err(e) = send(stream).await {
115            tracing::warn!(%peer, "Echo protocol failed: {e}");
116            continue;
117        }
118
119        tracing::info!(%peer, "Echo complete!")
120    }
121}
122
123async fn echo(mut stream: Stream) -> io::Result<usize> {
124    let mut total = 0;
125
126    let mut buf = [0u8; 100];
127
128    loop {
129        let read = stream.read(&mut buf).await?;
130        if read == 0 {
131            return Ok(total);
132        }
133
134        total += read;
135        stream.write_all(&buf[..read]).await?;
136    }
137}
138
139async fn send(mut stream: Stream) -> io::Result<()> {
140    let num_bytes = rand::random::<usize>() % 1000;
141
142    let mut bytes = vec![0; num_bytes];
143    rand::thread_rng().fill_bytes(&mut bytes);
144
145    stream.write_all(&bytes).await?;
146
147    let mut buf = vec![0; num_bytes];
148    stream.read_exact(&mut buf).await?;
149
150    if bytes != buf {
151        return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
152    }
153
154    stream.close().await?;
155
156    Ok(())
157}