1use std::task::{Context, Poll};
22
23use either::Either;
24use futures::future;
25use libp2p_core::{transport::PortUse, upgrade::DeniedUpgrade, Endpoint, Multiaddr};
26use libp2p_identity::PeerId;
27
28use crate::{
29 behaviour::FromSwarm,
30 connection::ConnectionId,
31 handler::{
32 AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
33 DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError,
34 SubstreamProtocol,
35 },
36 upgrade::SendWrapper,
37 ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40pub struct Toggle<TBehaviour> {
44 inner: Option<TBehaviour>,
45}
46
47impl<TBehaviour> Toggle<TBehaviour> {
48 pub fn is_enabled(&self) -> bool {
50 self.inner.is_some()
51 }
52
53 pub fn as_ref(&self) -> Option<&TBehaviour> {
55 self.inner.as_ref()
56 }
57
58 pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
60 self.inner.as_mut()
61 }
62}
63
64impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
65 fn from(inner: Option<TBehaviour>) -> Self {
66 Toggle { inner }
67 }
68}
69
70impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
71where
72 TBehaviour: NetworkBehaviour,
73{
74 type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
75 type ToSwarm = TBehaviour::ToSwarm;
76
77 fn handle_pending_inbound_connection(
78 &mut self,
79 connection_id: ConnectionId,
80 local_addr: &Multiaddr,
81 remote_addr: &Multiaddr,
82 ) -> Result<(), ConnectionDenied> {
83 let Some(inner) = self.inner.as_mut() else {
84 return Ok(());
85 };
86
87 inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
88
89 Ok(())
90 }
91
92 fn handle_established_inbound_connection(
93 &mut self,
94 connection_id: ConnectionId,
95 peer: PeerId,
96 local_addr: &Multiaddr,
97 remote_addr: &Multiaddr,
98 ) -> Result<THandler<Self>, ConnectionDenied> {
99 let Some(inner) = self.inner.as_mut() else {
100 return Ok(ToggleConnectionHandler { inner: None });
101 };
102
103 let handler = inner.handle_established_inbound_connection(
104 connection_id,
105 peer,
106 local_addr,
107 remote_addr,
108 )?;
109
110 Ok(ToggleConnectionHandler {
111 inner: Some(handler),
112 })
113 }
114
115 fn handle_pending_outbound_connection(
116 &mut self,
117 connection_id: ConnectionId,
118 maybe_peer: Option<PeerId>,
119 addresses: &[Multiaddr],
120 effective_role: Endpoint,
121 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
122 let Some(inner) = self.inner.as_mut() else {
123 return Ok(vec![]);
124 };
125
126 let addresses = inner.handle_pending_outbound_connection(
127 connection_id,
128 maybe_peer,
129 addresses,
130 effective_role,
131 )?;
132
133 Ok(addresses)
134 }
135
136 fn handle_established_outbound_connection(
137 &mut self,
138 connection_id: ConnectionId,
139 peer: PeerId,
140 addr: &Multiaddr,
141 role_override: Endpoint,
142 port_use: PortUse,
143 ) -> Result<THandler<Self>, ConnectionDenied> {
144 let Some(inner) = self.inner.as_mut() else {
145 return Ok(ToggleConnectionHandler { inner: None });
146 };
147
148 let handler = inner.handle_established_outbound_connection(
149 connection_id,
150 peer,
151 addr,
152 role_override,
153 port_use,
154 )?;
155
156 Ok(ToggleConnectionHandler {
157 inner: Some(handler),
158 })
159 }
160
161 fn on_swarm_event(&mut self, event: FromSwarm) {
162 if let Some(behaviour) = &mut self.inner {
163 behaviour.on_swarm_event(event);
164 }
165 }
166
167 fn on_connection_handler_event(
168 &mut self,
169 peer_id: PeerId,
170 connection_id: ConnectionId,
171 event: THandlerOutEvent<Self>,
172 ) {
173 if let Some(behaviour) = &mut self.inner {
174 behaviour.on_connection_handler_event(peer_id, connection_id, event)
175 }
176 }
177
178 fn poll(
179 &mut self,
180 cx: &mut Context<'_>,
181 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
182 if let Some(inner) = self.inner.as_mut() {
183 inner.poll(cx)
184 } else {
185 Poll::Pending
186 }
187 }
188}
189
190pub struct ToggleConnectionHandler<TInner> {
192 inner: Option<TInner>,
193}
194
195impl<TInner> ToggleConnectionHandler<TInner>
196where
197 TInner: ConnectionHandler,
198{
199 fn on_fully_negotiated_inbound(
200 &mut self,
201 FullyNegotiatedInbound {
202 protocol: out,
203 info,
204 }: FullyNegotiatedInbound<
205 <Self as ConnectionHandler>::InboundProtocol,
206 <Self as ConnectionHandler>::InboundOpenInfo,
207 >,
208 ) {
209 let out = match out {
210 future::Either::Left(out) => out,
211 future::Either::Right(v) => libp2p_core::util::unreachable(v),
212 };
213
214 if let Either::Left(info) = info {
215 self.inner
216 .as_mut()
217 .expect("Can't receive an inbound substream if disabled; QED")
218 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
219 FullyNegotiatedInbound {
220 protocol: out,
221 info,
222 },
223 ));
224 } else {
225 panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
226 }
227 }
228 fn on_listen_upgrade_error(
229 &mut self,
230 ListenUpgradeError { info, error: err }: ListenUpgradeError<
231 <Self as ConnectionHandler>::InboundOpenInfo,
232 <Self as ConnectionHandler>::InboundProtocol,
233 >,
234 ) {
235 let (inner, info) = match (self.inner.as_mut(), info) {
236 (Some(inner), Either::Left(info)) => (inner, info),
237 (None, Either::Right(())) => return,
239 (Some(_), Either::Right(())) => panic!(
240 "Unexpected `Either::Right` inbound info through \
241 `on_listen_upgrade_error` in enabled state.",
242 ),
243 (None, Either::Left(_)) => panic!(
244 "Unexpected `Either::Left` inbound info through \
245 `on_listen_upgrade_error` in disabled state.",
246 ),
247 };
248
249 let err = match err {
250 Either::Left(e) => e,
251 Either::Right(v) => libp2p_core::util::unreachable(v),
252 };
253
254 inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
255 info,
256 error: err,
257 }));
258 }
259}
260
261impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
262where
263 TInner: ConnectionHandler,
264{
265 type FromBehaviour = TInner::FromBehaviour;
266 type ToBehaviour = TInner::ToBehaviour;
267 type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
268 type OutboundProtocol = TInner::OutboundProtocol;
269 type OutboundOpenInfo = TInner::OutboundOpenInfo;
270 type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
271
272 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
273 if let Some(inner) = self.inner.as_ref() {
274 inner
275 .listen_protocol()
276 .map_upgrade(|u| Either::Left(SendWrapper(u)))
277 .map_info(Either::Left)
278 } else {
279 SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
280 }
281 }
282
283 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
284 self.inner
285 .as_mut()
286 .expect("Can't receive events if disabled; QED")
287 .on_behaviour_event(event)
288 }
289
290 fn connection_keep_alive(&self) -> bool {
291 self.inner
292 .as_ref()
293 .map(|h| h.connection_keep_alive())
294 .unwrap_or(false)
295 }
296
297 fn poll(
298 &mut self,
299 cx: &mut Context<'_>,
300 ) -> Poll<
301 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
302 > {
303 if let Some(inner) = self.inner.as_mut() {
304 inner.poll(cx)
305 } else {
306 Poll::Pending
307 }
308 }
309
310 fn on_connection_event(
311 &mut self,
312 event: ConnectionEvent<
313 Self::InboundProtocol,
314 Self::OutboundProtocol,
315 Self::InboundOpenInfo,
316 Self::OutboundOpenInfo,
317 >,
318 ) {
319 match event {
320 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
321 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
322 }
323 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
324 protocol: out,
325 info,
326 }) => self
327 .inner
328 .as_mut()
329 .expect("Can't receive an outbound substream if disabled; QED")
330 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
331 FullyNegotiatedOutbound {
332 protocol: out,
333 info,
334 },
335 )),
336 ConnectionEvent::AddressChange(address_change) => {
337 if let Some(inner) = self.inner.as_mut() {
338 inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
339 new_address: address_change.new_address,
340 }));
341 }
342 }
343 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
344 .inner
345 .as_mut()
346 .expect("Can't receive an outbound substream if disabled; QED")
347 .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
348 info,
349 error: err,
350 })),
351 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
352 self.on_listen_upgrade_error(listen_upgrade_error)
353 }
354 ConnectionEvent::LocalProtocolsChange(change) => {
355 if let Some(inner) = self.inner.as_mut() {
356 inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
357 }
358 }
359 ConnectionEvent::RemoteProtocolsChange(change) => {
360 if let Some(inner) = self.inner.as_mut() {
361 inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
362 }
363 }
364 }
365 }
366
367 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
368 let Some(inner) = self.inner.as_mut() else {
369 return Poll::Ready(None);
370 };
371
372 inner.poll_close(cx)
373 }
374}