1pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26 collections::{hash_map, HashMap, HashSet, VecDeque},
27 num::NonZeroU32,
28 ops::Add,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37 behaviour::{ConnectionClosed, FromSwarm},
38 dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44 behaviour::handler::Handler,
45 multiaddr_ext::MultiaddrExt,
46 proto,
47 protocol::{inbound_hop, outbound_stop},
48};
49
50pub struct Config {
56 pub max_reservations: usize,
57 pub max_reservations_per_peer: usize,
58 pub reservation_duration: Duration,
59 pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61 pub max_circuits: usize,
62 pub max_circuits_per_peer: usize,
63 pub max_circuit_duration: Duration,
64 pub max_circuit_bytes: u64,
65 pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69 pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70 self.reservation_rate_limiters
71 .push(rate_limiter::new_per_peer(
72 rate_limiter::GenericRateLimiterConfig { limit, interval },
73 ));
74 self
75 }
76
77 pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78 self.circuit_src_rate_limiters
79 .push(rate_limiter::new_per_peer(
80 rate_limiter::GenericRateLimiterConfig { limit, interval },
81 ));
82 self
83 }
84
85 pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86 self.reservation_rate_limiters
87 .push(rate_limiter::new_per_ip(
88 rate_limiter::GenericRateLimiterConfig { limit, interval },
89 ));
90 self
91 }
92
93 pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94 self.circuit_src_rate_limiters
95 .push(rate_limiter::new_per_ip(
96 rate_limiter::GenericRateLimiterConfig { limit, interval },
97 ));
98 self
99 }
100}
101
102impl std::fmt::Debug for Config {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("Config")
105 .field("max_reservations", &self.max_reservations)
106 .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107 .field("reservation_duration", &self.reservation_duration)
108 .field(
109 "reservation_rate_limiters",
110 &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111 )
112 .field("max_circuits", &self.max_circuits)
113 .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114 .field("max_circuit_duration", &self.max_circuit_duration)
115 .field("max_circuit_bytes", &self.max_circuit_bytes)
116 .field(
117 "circuit_src_rate_limiters",
118 &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119 )
120 .finish()
121 }
122}
123
124impl Default for Config {
125 fn default() -> Self {
126 let reservation_rate_limiters = vec![
127 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130 limit: NonZeroU32::new(30).expect("30 > 0"),
131 interval: Duration::from_secs(60 * 2),
132 }),
133 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136 limit: NonZeroU32::new(60).expect("60 > 0"),
137 interval: Duration::from_secs(60),
138 }),
139 ];
140
141 let circuit_src_rate_limiters = vec![
142 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144 limit: NonZeroU32::new(30).expect("30 > 0"),
145 interval: Duration::from_secs(60 * 2),
146 }),
147 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149 limit: NonZeroU32::new(60).expect("60 > 0"),
150 interval: Duration::from_secs(60),
151 }),
152 ];
153
154 Config {
155 max_reservations: 128,
156 max_reservations_per_peer: 4,
157 reservation_duration: Duration::from_secs(60 * 60),
158 reservation_rate_limiters,
159
160 max_circuits: 16,
161 max_circuits_per_peer: 4,
162 max_circuit_duration: Duration::from_secs(2 * 60),
163 max_circuit_bytes: 1 << 17, circuit_src_rate_limiters,
165 }
166 }
167}
168
169#[derive(Debug)]
171pub enum Event {
172 ReservationReqAccepted {
174 src_peer_id: PeerId,
175 renewed: bool,
177 },
178 #[deprecated(
180 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181 )]
182 ReservationReqAcceptFailed {
183 src_peer_id: PeerId,
184 error: inbound_hop::Error,
185 },
186 ReservationReqDenied {
188 src_peer_id: PeerId,
189 status: StatusCode,
190 },
191 #[deprecated(
193 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
194 )]
195 ReservationReqDenyFailed {
196 src_peer_id: PeerId,
197 error: inbound_hop::Error,
198 },
199 ReservationClosed { src_peer_id: PeerId },
201 ReservationTimedOut { src_peer_id: PeerId },
203 CircuitReqDenied {
205 src_peer_id: PeerId,
206 dst_peer_id: PeerId,
207 status: StatusCode,
208 },
209 #[deprecated(
211 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
212 )]
213 CircuitReqDenyFailed {
214 src_peer_id: PeerId,
215 dst_peer_id: PeerId,
216 error: inbound_hop::Error,
217 },
218 CircuitReqAccepted {
220 src_peer_id: PeerId,
221 dst_peer_id: PeerId,
222 },
223 #[deprecated(
225 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
226 )]
227 CircuitReqOutboundConnectFailed {
228 src_peer_id: PeerId,
229 dst_peer_id: PeerId,
230 error: outbound_stop::Error,
231 },
232 #[deprecated(
234 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
235 )]
236 CircuitReqAcceptFailed {
237 src_peer_id: PeerId,
238 dst_peer_id: PeerId,
239 error: inbound_hop::Error,
240 },
241 CircuitClosed {
243 src_peer_id: PeerId,
244 dst_peer_id: PeerId,
245 error: Option<std::io::Error>,
246 },
247}
248
249pub struct Behaviour {
252 config: Config,
253
254 local_peer_id: PeerId,
255
256 reservations: HashMap<PeerId, HashSet<ConnectionId>>,
257 circuits: CircuitsTracker,
258
259 queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
261
262 external_addresses: ExternalAddresses,
263}
264
265impl Behaviour {
266 pub fn new(local_peer_id: PeerId, config: Config) -> Self {
267 Self {
268 config,
269 local_peer_id,
270 reservations: Default::default(),
271 circuits: Default::default(),
272 queued_actions: Default::default(),
273 external_addresses: Default::default(),
274 }
275 }
276
277 fn on_connection_closed(
278 &mut self,
279 ConnectionClosed {
280 peer_id,
281 connection_id,
282 ..
283 }: ConnectionClosed,
284 ) {
285 if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
286 if peer.get_mut().remove(&connection_id) {
287 self.queued_actions
288 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
289 src_peer_id: peer_id,
290 }));
291 }
292 if peer.get().is_empty() {
293 peer.remove();
294 }
295 }
296
297 for circuit in self
298 .circuits
299 .remove_by_connection(peer_id, connection_id)
300 .iter()
301 .filter(|c| matches!(c.status, CircuitStatus::Accepted))
303 {
304 self.queued_actions
305 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
306 src_peer_id: circuit.src_peer_id,
307 dst_peer_id: circuit.dst_peer_id,
308 error: Some(std::io::ErrorKind::ConnectionAborted.into()),
309 }));
310 }
311 }
312}
313
314impl NetworkBehaviour for Behaviour {
315 type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
316 type ToSwarm = Event;
317
318 fn handle_established_inbound_connection(
319 &mut self,
320 _: ConnectionId,
321 _: PeerId,
322 local_addr: &Multiaddr,
323 remote_addr: &Multiaddr,
324 ) -> Result<THandler<Self>, ConnectionDenied> {
325 if local_addr.is_relayed() {
326 return Ok(Either::Right(dummy::ConnectionHandler));
328 }
329
330 Ok(Either::Left(Handler::new(
331 handler::Config {
332 reservation_duration: self.config.reservation_duration,
333 max_circuit_duration: self.config.max_circuit_duration,
334 max_circuit_bytes: self.config.max_circuit_bytes,
335 },
336 ConnectedPoint::Listener {
337 local_addr: local_addr.clone(),
338 send_back_addr: remote_addr.clone(),
339 },
340 )))
341 }
342
343 fn handle_established_outbound_connection(
344 &mut self,
345 _: ConnectionId,
346 _: PeerId,
347 addr: &Multiaddr,
348 role_override: Endpoint,
349 port_use: PortUse,
350 ) -> Result<THandler<Self>, ConnectionDenied> {
351 if addr.is_relayed() {
352 return Ok(Either::Right(dummy::ConnectionHandler));
354 }
355
356 Ok(Either::Left(Handler::new(
357 handler::Config {
358 reservation_duration: self.config.reservation_duration,
359 max_circuit_duration: self.config.max_circuit_duration,
360 max_circuit_bytes: self.config.max_circuit_bytes,
361 },
362 ConnectedPoint::Dialer {
363 address: addr.clone(),
364 role_override,
365 port_use,
366 },
367 )))
368 }
369
370 fn on_swarm_event(&mut self, event: FromSwarm) {
371 self.external_addresses.on_swarm_event(&event);
372
373 if let FromSwarm::ConnectionClosed(connection_closed) = event {
374 self.on_connection_closed(connection_closed)
375 }
376 }
377
378 fn on_connection_handler_event(
379 &mut self,
380 event_source: PeerId,
381 connection: ConnectionId,
382 event: THandlerOutEvent<Self>,
383 ) {
384 let event = match event {
385 Either::Left(e) => e,
386 Either::Right(v) => libp2p_core::util::unreachable(v),
387 };
388
389 match event {
390 handler::Event::ReservationReqReceived {
391 inbound_reservation_req,
392 endpoint,
393 renewed,
394 } => {
395 let now = Instant::now();
396
397 assert!(
398 !endpoint.is_relayed(),
399 "`dummy::ConnectionHandler` handles relayed connections. It \
400 denies all inbound substreams."
401 );
402
403 let action = if
404 (!renewed
407 && self
408 .reservations
409 .get(&event_source)
410 .map(|cs| cs.len())
411 .unwrap_or(0)
412 > self.config.max_reservations_per_peer)
413 || self
415 .reservations
416 .values()
417 .map(|cs| cs.len())
418 .sum::<usize>()
419 >= self.config.max_reservations
420 || !self
422 .config
423 .reservation_rate_limiters
424 .iter_mut()
425 .all(|limiter| {
426 limiter.try_next(event_source, endpoint.get_remote_address(), now)
427 }) {
428 ToSwarm::NotifyHandler {
429 handler: NotifyHandler::One(connection),
430 peer_id: event_source,
431 event: Either::Left(handler::In::DenyReservationReq {
432 inbound_reservation_req,
433 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
434 }),
435 }
436 } else {
437 self.reservations
439 .entry(event_source)
440 .or_default()
441 .insert(connection);
442
443 ToSwarm::NotifyHandler {
444 handler: NotifyHandler::One(connection),
445 peer_id: event_source,
446 event: Either::Left(handler::In::AcceptReservationReq {
447 inbound_reservation_req,
448 addrs: self
449 .external_addresses
450 .iter()
451 .cloned()
452 .filter_map(|a| match a.iter().last()? {
454 Protocol::P2p(_) => Some(a),
455 _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
456 })
457 .collect(),
458 }),
459 }
460 };
461
462 self.queued_actions.push_back(action);
463 }
464 handler::Event::ReservationReqAccepted { renewed } => {
465 self.reservations
468 .entry(event_source)
469 .or_default()
470 .insert(connection);
471
472 self.queued_actions.push_back(ToSwarm::GenerateEvent(
473 Event::ReservationReqAccepted {
474 src_peer_id: event_source,
475 renewed,
476 },
477 ));
478 }
479 handler::Event::ReservationReqAcceptFailed { error } => {
480 #[allow(deprecated)]
481 self.queued_actions.push_back(ToSwarm::GenerateEvent(
482 Event::ReservationReqAcceptFailed {
483 src_peer_id: event_source,
484 error,
485 },
486 ));
487 }
488 handler::Event::ReservationReqDenied { status } => {
489 self.queued_actions.push_back(ToSwarm::GenerateEvent(
490 Event::ReservationReqDenied {
491 src_peer_id: event_source,
492 status: status.into(),
493 },
494 ));
495 }
496 handler::Event::ReservationReqDenyFailed { error } => {
497 #[allow(deprecated)]
498 self.queued_actions.push_back(ToSwarm::GenerateEvent(
499 Event::ReservationReqDenyFailed {
500 src_peer_id: event_source,
501 error,
502 },
503 ));
504 }
505 handler::Event::ReservationTimedOut {} => {
506 match self.reservations.entry(event_source) {
507 hash_map::Entry::Occupied(mut peer) => {
508 peer.get_mut().remove(&connection);
509 if peer.get().is_empty() {
510 peer.remove();
511 }
512 }
513 hash_map::Entry::Vacant(_) => {
514 unreachable!(
515 "Expect to track timed out reservation with peer {:?} on connection {:?}",
516 event_source,
517 connection,
518 );
519 }
520 }
521
522 self.queued_actions
523 .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
524 src_peer_id: event_source,
525 }));
526 }
527 handler::Event::CircuitReqReceived {
528 inbound_circuit_req,
529 endpoint,
530 } => {
531 let now = Instant::now();
532
533 assert!(
534 !endpoint.is_relayed(),
535 "`dummy::ConnectionHandler` handles relayed connections. It \
536 denies all inbound substreams."
537 );
538
539 let action = if self.circuits.num_circuits_of_peer(event_source)
540 > self.config.max_circuits_per_peer
541 || self.circuits.len() >= self.config.max_circuits
542 || !self
543 .config
544 .circuit_src_rate_limiters
545 .iter_mut()
546 .all(|limiter| {
547 limiter.try_next(event_source, endpoint.get_remote_address(), now)
548 }) {
549 ToSwarm::NotifyHandler {
551 handler: NotifyHandler::One(connection),
552 peer_id: event_source,
553 event: Either::Left(handler::In::DenyCircuitReq {
554 circuit_id: None,
555 inbound_circuit_req,
556 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
557 }),
558 }
559 } else if let Some(dst_conn) = self
560 .reservations
561 .get(&inbound_circuit_req.dst())
562 .and_then(|cs| cs.iter().next())
563 {
564 let circuit_id = self.circuits.insert(Circuit {
566 status: CircuitStatus::Accepting,
567 src_peer_id: event_source,
568 src_connection_id: connection,
569 dst_peer_id: inbound_circuit_req.dst(),
570 dst_connection_id: *dst_conn,
571 });
572
573 ToSwarm::NotifyHandler {
574 handler: NotifyHandler::One(*dst_conn),
575 peer_id: event_source,
576 event: Either::Left(handler::In::NegotiateOutboundConnect {
577 circuit_id,
578 inbound_circuit_req,
579 src_peer_id: event_source,
580 src_connection_id: connection,
581 }),
582 }
583 } else {
584 ToSwarm::NotifyHandler {
586 handler: NotifyHandler::One(connection),
587 peer_id: event_source,
588 event: Either::Left(handler::In::DenyCircuitReq {
589 circuit_id: None,
590 inbound_circuit_req,
591 status: proto::Status::NO_RESERVATION,
592 }),
593 }
594 };
595 self.queued_actions.push_back(action);
596 }
597 handler::Event::CircuitReqDenied {
598 circuit_id,
599 dst_peer_id,
600 status,
601 } => {
602 if let Some(circuit_id) = circuit_id {
603 self.circuits.remove(circuit_id);
604 }
605
606 self.queued_actions
607 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
608 src_peer_id: event_source,
609 dst_peer_id,
610 status: status.into(),
611 }));
612 }
613 handler::Event::CircuitReqDenyFailed {
614 circuit_id,
615 dst_peer_id,
616 error,
617 } => {
618 if let Some(circuit_id) = circuit_id {
619 self.circuits.remove(circuit_id);
620 }
621
622 #[allow(deprecated)]
623 self.queued_actions.push_back(ToSwarm::GenerateEvent(
624 Event::CircuitReqDenyFailed {
625 src_peer_id: event_source,
626 dst_peer_id,
627 error,
628 },
629 ));
630 }
631 handler::Event::OutboundConnectNegotiated {
632 circuit_id,
633 src_peer_id,
634 src_connection_id,
635 inbound_circuit_req,
636 dst_stream,
637 dst_pending_data,
638 } => {
639 self.queued_actions.push_back(ToSwarm::NotifyHandler {
640 handler: NotifyHandler::One(src_connection_id),
641 peer_id: src_peer_id,
642 event: Either::Left(handler::In::AcceptAndDriveCircuit {
643 circuit_id,
644 dst_peer_id: event_source,
645 inbound_circuit_req,
646 dst_stream,
647 dst_pending_data,
648 }),
649 });
650 }
651 handler::Event::OutboundConnectNegotiationFailed {
652 circuit_id,
653 src_peer_id,
654 src_connection_id,
655 inbound_circuit_req,
656 status,
657 error,
658 } => {
659 self.queued_actions.push_back(ToSwarm::NotifyHandler {
660 handler: NotifyHandler::One(src_connection_id),
661 peer_id: src_peer_id,
662 event: Either::Left(handler::In::DenyCircuitReq {
663 circuit_id: Some(circuit_id),
664 inbound_circuit_req,
665 status,
666 }),
667 });
668 #[allow(deprecated)]
669 self.queued_actions.push_back(ToSwarm::GenerateEvent(
670 Event::CircuitReqOutboundConnectFailed {
671 src_peer_id,
672 dst_peer_id: event_source,
673 error,
674 },
675 ));
676 }
677 handler::Event::CircuitReqAccepted {
678 dst_peer_id,
679 circuit_id,
680 } => {
681 self.circuits.accepted(circuit_id);
682 self.queued_actions
683 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
684 src_peer_id: event_source,
685 dst_peer_id,
686 }));
687 }
688 handler::Event::CircuitReqAcceptFailed {
689 dst_peer_id,
690 circuit_id,
691 error,
692 } => {
693 self.circuits.remove(circuit_id);
694 #[allow(deprecated)]
695 self.queued_actions.push_back(ToSwarm::GenerateEvent(
696 Event::CircuitReqAcceptFailed {
697 src_peer_id: event_source,
698 dst_peer_id,
699 error,
700 },
701 ));
702 }
703 handler::Event::CircuitClosed {
704 dst_peer_id,
705 circuit_id,
706 error,
707 } => {
708 self.circuits.remove(circuit_id);
709
710 self.queued_actions
711 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
712 src_peer_id: event_source,
713 dst_peer_id,
714 error,
715 }));
716 }
717 }
718 }
719
720 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
721 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
722 if let Some(to_swarm) = self.queued_actions.pop_front() {
723 return Poll::Ready(to_swarm);
724 }
725
726 Poll::Pending
727 }
728}
729
730#[derive(Default)]
731struct CircuitsTracker {
732 next_id: CircuitId,
733 circuits: HashMap<CircuitId, Circuit>,
734}
735
736impl CircuitsTracker {
737 fn len(&self) -> usize {
738 self.circuits.len()
739 }
740
741 fn insert(&mut self, circuit: Circuit) -> CircuitId {
742 let id = self.next_id;
743 self.next_id = self.next_id + 1;
744
745 self.circuits.insert(id, circuit);
746
747 id
748 }
749
750 fn accepted(&mut self, circuit_id: CircuitId) {
751 if let Some(c) = self.circuits.get_mut(&circuit_id) {
752 c.status = CircuitStatus::Accepted;
753 };
754 }
755
756 fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
757 self.circuits.remove(&circuit_id)
758 }
759
760 fn remove_by_connection(
761 &mut self,
762 peer_id: PeerId,
763 connection_id: ConnectionId,
764 ) -> Vec<Circuit> {
765 let mut removed = vec![];
766
767 self.circuits.retain(|_circuit_id, circuit| {
768 let is_src =
769 circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
770 let is_dst =
771 circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
772
773 if is_src || is_dst {
774 removed.push(circuit.clone());
775 false
777 } else {
778 true
780 }
781 });
782
783 removed
784 }
785
786 fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
787 self.circuits
788 .iter()
789 .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
790 .count()
791 }
792}
793
794#[derive(Clone)]
795struct Circuit {
796 src_peer_id: PeerId,
797 src_connection_id: ConnectionId,
798 dst_peer_id: PeerId,
799 dst_connection_id: ConnectionId,
800 status: CircuitStatus,
801}
802
803#[derive(Clone)]
804enum CircuitStatus {
805 Accepting,
806 Accepted,
807}
808
809#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
810pub struct CircuitId(u64);
811
812impl Add<u64> for CircuitId {
813 type Output = CircuitId;
814
815 fn add(self, rhs: u64) -> Self {
816 CircuitId(self.0 + rhs)
817 }
818}
819
820#[derive(Debug)]
822pub enum StatusCode {
823 OK,
824 ReservationRefused,
825 ResourceLimitExceeded,
826 PermissionDenied,
827 ConnectionFailed,
828 NoReservation,
829 MalformedMessage,
830 UnexpectedMessage,
831}
832
833impl From<proto::Status> for StatusCode {
834 fn from(other: proto::Status) -> Self {
835 match other {
836 proto::Status::OK => Self::OK,
837 proto::Status::RESERVATION_REFUSED => Self::ReservationRefused,
838 proto::Status::RESOURCE_LIMIT_EXCEEDED => Self::ResourceLimitExceeded,
839 proto::Status::PERMISSION_DENIED => Self::PermissionDenied,
840 proto::Status::CONNECTION_FAILED => Self::ConnectionFailed,
841 proto::Status::NO_RESERVATION => Self::NoReservation,
842 proto::Status::MALFORMED_MESSAGE => Self::MalformedMessage,
843 proto::Status::UNEXPECTED_MESSAGE => Self::UnexpectedMessage,
844 }
845 }
846}