use std::{
collections::HashMap,
iter,
task::{Context, Poll},
time::Duration,
};
use futures::{
future::{BoxFuture, FutureExt},
stream::{FuturesUnordered, StreamExt},
};
use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
use libp2p_identity::{Keypair, PeerId, SigningError};
use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
use libp2p_swarm::{
ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use crate::codec::{
Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
};
pub struct Behaviour {
inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
keypair: Keypair,
waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
external_addresses: ExternalAddresses,
}
impl Behaviour {
pub fn new(keypair: Keypair) -> Self {
Self {
inner: libp2p_request_response::Behaviour::with_codec(
crate::codec::Codec::default(),
iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
libp2p_request_response::Config::default(),
),
keypair,
waiting_for_register: Default::default(),
waiting_for_discovery: Default::default(),
discovered_peers: Default::default(),
registered_namespaces: Default::default(),
expiring_registrations: FuturesUnordered::from_iter(vec![
futures::future::pending().boxed()
]),
external_addresses: Default::default(),
}
}
pub fn register(
&mut self,
namespace: Namespace,
rendezvous_node: PeerId,
ttl: Option<Ttl>,
) -> Result<(), RegisterError> {
let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
if external_addresses.is_empty() {
return Err(RegisterError::NoExternalAddresses);
}
let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
let req_id = self.inner.send_request(
&rendezvous_node,
Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
);
self.waiting_for_register
.insert(req_id, (rendezvous_node, namespace));
Ok(())
}
pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
self.registered_namespaces
.retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
self.inner
.send_request(&rendezvous_node, Unregister(namespace));
}
pub fn discover(
&mut self,
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
rendezvous_node: PeerId,
) {
let req_id = self.inner.send_request(
&rendezvous_node,
Discover {
namespace: namespace.clone(),
cookie,
limit,
},
);
self.waiting_for_discovery
.insert(req_id, (rendezvous_node, namespace));
}
}
#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
#[error("We don't know about any externally reachable addresses of ours")]
NoExternalAddresses,
#[error("Failed to make a new PeerRecord")]
FailedToMakeRecord(#[from] SigningError),
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
Discovered {
rendezvous_node: PeerId,
registrations: Vec<Registration>,
cookie: Cookie,
},
DiscoverFailed {
rendezvous_node: PeerId,
namespace: Option<Namespace>,
error: ErrorCode,
},
Registered {
rendezvous_node: PeerId,
ttl: Ttl,
namespace: Namespace,
},
RegisterFailed {
rendezvous_node: PeerId,
namespace: Namespace,
error: ErrorCode,
},
Expired { peer: PeerId },
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <libp2p_request_response::Behaviour<
crate::codec::Codec,
> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.inner
.on_connection_handler_event(peer_id, connection_id, event);
}
fn on_swarm_event(&mut self, event: FromSwarm) {
let changed = self.external_addresses.on_swarm_event(&event);
self.inner.on_swarm_event(event);
if changed && self.external_addresses.iter().count() > 0 {
let registered = self.registered_namespaces.clone();
for ((rz_node, ns), ttl) in registered {
if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
tracing::warn!("refreshing registration failed: {e}")
}
}
}
}
#[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
use libp2p_request_response as req_res;
loop {
match self.inner.poll(cx) {
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
message:
req_res::Message::Response {
request_id,
response,
},
..
})) => {
if let Some(event) = self.handle_response(&request_id, response) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
continue; }
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
request_id,
..
})) => {
if let Some(event) = self.event_for_outbound_failure(&request_id) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
continue; }
Poll::Ready(ToSwarm::GenerateEvent(
req_res::Event::InboundFailure { .. }
| req_res::Event::ResponseSent { .. }
| req_res::Event::Message {
message: req_res::Message::Request { .. },
..
},
)) => {
unreachable!("rendezvous clients never receive requests")
}
Poll::Ready(other) => {
let new_to_swarm =
other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
return Poll::Ready(new_to_swarm);
}
Poll::Pending => {}
}
if let Poll::Ready(Some(expired_registration)) =
self.expiring_registrations.poll_next_unpin(cx)
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
return Poll::Pending;
}
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
let peer = match maybe_peer {
None => return Ok(vec![]),
Some(peer) => peer,
};
let addresses = self
.discovered_peers
.iter()
.filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
.flatten()
.cloned()
.collect();
Ok(addresses)
}
}
impl Behaviour {
fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
});
};
if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
return Some(Event::DiscoverFailed {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
});
};
None
}
fn handle_response(
&mut self,
request_id: &OutboundRequestId,
response: Message,
) -> Option<Event> {
match response {
RegisterResponse(Ok(ttl)) => {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
self.registered_namespaces
.insert((rendezvous_node, namespace.clone()), ttl);
return Some(Event::Registered {
rendezvous_node,
ttl,
namespace,
});
}
None
}
RegisterResponse(Err(error_code)) => {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
return Some(Event::RegisterFailed {
rendezvous_node,
namespace,
error: error_code,
});
}
None
}
DiscoverResponse(Ok((registrations, cookie))) => {
if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
{
self.discovered_peers
.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();
let addresses = registration.record.addresses().to_vec();
((peer_id, namespace), addresses)
}));
self.expiring_registrations
.extend(registrations.iter().cloned().map(|registration| {
async move {
futures_timer::Delay::new(Duration::from_secs(registration.ttl))
.await;
(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));
return Some(Event::Discovered {
rendezvous_node,
registrations,
cookie,
});
}
None
}
DiscoverResponse(Err(error_code)) => {
if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
return Some(Event::DiscoverFailed {
rendezvous_node,
namespace: ns,
error: error_code,
});
}
None
}
_ => unreachable!("rendezvous clients never receive requests"),
}
}
}