diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 8538c4b4300..083ab5b448b 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -6,6 +6,9 @@ - Fix panic in swarm metrics when `ConnectionClosed` events are received for connections that were established before metrics collection started. See [PR 6158](https://github.com/libp2p/rust-libp2p/pull/6158). +- Add `StatusChanged` as a relay metric. + See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154). + ## 0.17.0 - Update `prometheus-client` to `0.23.0`. diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index c6b3827743c..c5ee6ec1a65 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -62,6 +62,7 @@ enum EventType { CircuitReqAccepted, CircuitReqAcceptFailed, CircuitClosed, + StatusChanged, } impl From<&libp2p_relay::Event> for EventType { @@ -90,6 +91,7 @@ impl From<&libp2p_relay::Event> for EventType { #[allow(deprecated)] libp2p_relay::Event::CircuitReqAcceptFailed { .. } => EventType::CircuitReqAcceptFailed, libp2p_relay::Event::CircuitClosed { .. } => EventType::CircuitClosed, + libp2p_relay::Event::StatusChanged { .. } => EventType::StatusChanged, } } } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 10cbf45c7db..2b792309416 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,9 +2,12 @@ - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). +- Automatically configure HOP protocol advertisement based on external addresses, with the ability to override this + functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable + protocol advertisement. + See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154). ## 0.21.1 - - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136) diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 2e395c3dac6..e4b101f1179 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -23,10 +23,10 @@ pub(crate) mod handler; pub(crate) mod rate_limiter; use std::{ - collections::{HashMap, HashSet, VecDeque, hash_map}, + collections::{HashMap, VecDeque, hash_map}, num::NonZeroU32, ops::Add, - task::{Context, Poll}, + task::{Context, Poll, Waker}, time::Duration, }; @@ -34,10 +34,9 @@ use either::Either; use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr, multiaddr::Protocol, transport::PortUse}; use libp2p_identity::PeerId; use libp2p_swarm::{ - ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, - behaviour::{ConnectionClosed, FromSwarm}, - dummy, + ConnectionClosed, ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, + NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + derive_prelude::ConnectionEstablished, dummy, }; use web_time::Instant; @@ -245,6 +244,11 @@ pub enum Event { dst_peer_id: PeerId, error: Option, }, + /// Own [`Status`] changed. + /// + /// This is triggered based on if the external address + /// has been added or removed. + StatusChanged { status: Status }, } /// [`NetworkBehaviour`] implementation of the relay server @@ -254,27 +258,166 @@ pub struct Behaviour { local_peer_id: PeerId, - reservations: HashMap>, + connections: HashMap>, circuits: CircuitsTracker, /// Queue of actions to return when polled. queued_actions: VecDeque>>, external_addresses: ExternalAddresses, + + /// Advertisement status. + status: Status, + + /// Indication of whether the status should automatically change based on the external + /// addresses. + /// + /// By default is true unless [`Status`] is explicitly set in [`Behaviour::set_status`]. + auto_status_change: bool, + + waker: Option, +} +/// Own relay server advertisement status. +#[derive(PartialEq, Copy, Clone, Debug)] +pub enum Status { + /// Enables advertisement of the HOP protocol. + /// This allows other nodes to use us a relay server. + Enable, + + /// Disables advertisement of the HOP protocol. + /// New nodes won't be able to use us a relay server anymore. + /// Existing circuits are kept alive until they time out. + Disable, +} + +#[derive(Debug, PartialEq, Eq)] +enum Reservation { + Active, + None, +} + +impl Reservation { + pub(crate) fn is_active(&self) -> bool { + *self == Reservation::Active + } } impl Behaviour { + /// Create a new relay server behavior. + /// + /// By default, the node with determine its advertisement [`Status`] automatically + /// based on info about its own external addresses. + /// Alternatively, the advertisement status can be fixed with [`Behaviour::set_status`]. pub fn new(local_peer_id: PeerId, config: Config) -> Self { Self { config, local_peer_id, - reservations: Default::default(), + connections: Default::default(), circuits: Default::default(), queued_actions: Default::default(), external_addresses: Default::default(), + status: Status::Disable, + auto_status_change: true, + waker: None, + } + } + + /// Sets the advertisement status of the local node as relay server. + /// + /// If the `status` is set to `None`, the node will configure its status automatically + /// depending on whether it has any external addresses or not. + pub fn set_status(&mut self, status: Option) { + match status { + Some(status) => { + self.auto_status_change = false; + if self.status != status { + self.status = status; + self.reconfigure_relay_status(); + } + } + None => { + self.auto_status_change = true; + self.determine_relay_status_from_external_address(); + } + } + } + + fn reconfigure_relay_status(&mut self) { + if self.connections.is_empty() { + return; + } + + for (peer_id, connections) in self.connections.iter() { + self.queued_actions + .extend(connections.keys().map(|id| ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*id), + event: Either::Left(handler::In::SetStatus { + status: self.status, + }), + })); + } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + fn determine_relay_status_from_external_address(&mut self) { + let old = self.status; + + self.status = match (self.external_addresses.as_slice(), self.status) { + ([], Status::Enable) => { + tracing::debug!( + "disabling protocol advertisement because we no longer have any confirmed external addresses" + ); + Status::Disable + } + ([], Status::Disable) => { + // Previously disabled because of no external addresses. + Status::Disable + } + (confirmed_external_addresses, Status::Disable) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + tracing::debug!("advertising protocol because we are now externally reachable"); + Status::Enable + } + (confirmed_external_addresses, Status::Enable) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + + Status::Enable + } + }; + + if self.status != old { + self.reconfigure_relay_status(); + self.queued_actions + .push_back(ToSwarm::GenerateEvent(Event::StatusChanged { + status: self.status, + })); } } + fn on_connection_established( + &mut self, + ConnectionEstablished { + peer_id, + connection_id, + .. + }: ConnectionEstablished, + ) { + self.connections + .entry(peer_id) + .or_default() + .insert(connection_id, Reservation::None); + } + fn on_connection_closed( &mut self, ConnectionClosed { @@ -283,8 +426,8 @@ impl Behaviour { .. }: ConnectionClosed, ) { - if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) { - if peer.get_mut().remove(&connection_id) { + if let hash_map::Entry::Occupied(mut peer) = self.connections.entry(peer_id) { + if peer.get_mut().remove(&connection_id).is_some() { self.queued_actions .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed { src_peer_id: peer_id, @@ -338,6 +481,7 @@ impl NetworkBehaviour for Behaviour { local_addr: local_addr.clone(), send_back_addr: remote_addr.clone(), }, + self.status, ))) } @@ -365,14 +509,25 @@ impl NetworkBehaviour for Behaviour { role_override, port_use, }, + self.status, ))) } fn on_swarm_event(&mut self, event: FromSwarm) { - self.external_addresses.on_swarm_event(&event); + let changed = self.external_addresses.on_swarm_event(&event); + + if self.auto_status_change && changed { + self.determine_relay_status_from_external_address(); + } - if let FromSwarm::ConnectionClosed(connection_closed) = event { - self.on_connection_closed(connection_closed) + match event { + FromSwarm::ConnectionEstablished(connection_established) => { + self.on_connection_established(connection_established) + } + FromSwarm::ConnectionClosed(connection_closed) => { + self.on_connection_closed(connection_closed) + } + _ => {} } } @@ -406,16 +561,16 @@ impl NetworkBehaviour for Behaviour { // `max_reservations_per_peer`. (!renewed && self - .reservations + .connections .get(&event_source) - .map(|cs| cs.len()) + .map(|cs| cs.values().filter(|status| status.is_active()).count()) .unwrap_or(0) > self.config.max_reservations_per_peer) // Deny if it exceeds `max_reservations`. || self - .reservations + .connections .values() - .map(|cs| cs.len()) + .map(|cs| cs.values().filter(|status| status.is_active()).count()) .sum::() >= self.config.max_reservations // Deny if it exceeds the allowed rate of reservations. @@ -436,10 +591,10 @@ impl NetworkBehaviour for Behaviour { } } else { // Accept reservation. - self.reservations + self.connections .entry(event_source) .or_default() - .insert(connection); + .insert(connection, Reservation::Active); ToSwarm::NotifyHandler { handler: NotifyHandler::One(connection), @@ -465,10 +620,10 @@ impl NetworkBehaviour for Behaviour { handler::Event::ReservationReqAccepted { renewed } => { // Ensure local eventual consistent reservation state matches handler (source of // truth). - self.reservations - .entry(event_source) - .or_default() - .insert(connection); + self.connections + .get_mut(&event_source) + .expect("valid connection") + .insert(connection, Reservation::Active); self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::ReservationReqAccepted { @@ -504,7 +659,7 @@ impl NetworkBehaviour for Behaviour { )); } handler::Event::ReservationTimedOut {} => { - match self.reservations.entry(event_source) { + match self.connections.entry(event_source) { hash_map::Entry::Occupied(mut peer) => { peer.get_mut().remove(&connection); if peer.get().is_empty() { @@ -556,11 +711,13 @@ impl NetworkBehaviour for Behaviour { status: proto::Status::RESOURCE_LIMIT_EXCEEDED, }), } - } else if let Some(dst_conn) = self - .reservations + } else if let Some((dst_conn, status)) = self + .connections .get(&inbound_circuit_req.dst()) .and_then(|cs| cs.iter().next()) { + assert_eq!(*status, Reservation::Active); + // Accept circuit request if reservation present. let circuit_id = self.circuits.insert(Circuit { status: CircuitStatus::Accepting, @@ -718,11 +875,16 @@ impl NetworkBehaviour for Behaviour { } #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))] - fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { if let Some(to_swarm) = self.queued_actions.pop_front() { return Poll::Ready(to_swarm); } + self.waker = Some(cx.waker().clone()); + Poll::Pending } } diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 7d99af3d52c..8b509fbb7f1 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -33,7 +33,10 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use futures_timer::Delay; -use libp2p_core::{ConnectedPoint, Multiaddr, upgrade::ReadyUpgrade}; +use libp2p_core::{ + ConnectedPoint, Multiaddr, + upgrade::{DeniedUpgrade, ReadyUpgrade}, +}; use libp2p_identity::PeerId; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol, @@ -44,7 +47,7 @@ use web_time::Instant; use crate::{ HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME, - behaviour::CircuitId, + behaviour::{self, CircuitId}, copy_future::CopyFuture, proto, protocol::{inbound_hop, outbound_stop}, @@ -87,6 +90,9 @@ pub enum In { dst_stream: Stream, dst_pending_data: Bytes, }, + SetStatus { + status: behaviour::Status, + }, } impl fmt::Debug for In { @@ -137,6 +143,10 @@ impl fmt::Debug for In { .field("circuit_id", circuit_id) .field("dst_peer_id", dst_peer_id) .finish(), + In::SetStatus { status } => f + .debug_struct("In::SetStatus") + .field("status", status) + .finish(), } } } @@ -385,10 +395,12 @@ pub struct Handler { CircuitId, Result, >, + + status: behaviour::Status, } impl Handler { - pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { + pub fn new(config: Config, endpoint: ConnectedPoint, status: behaviour::Status) -> Handler { Handler { inbound_workers: futures_bounded::FuturesSet::new( move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), @@ -409,6 +421,7 @@ impl Handler { active_reservation: Default::default(), pending_connect_requests: Default::default(), active_connect_requests: Default::default(), + status, } } @@ -496,13 +509,18 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type InboundProtocol = ReadyUpgrade; + type InboundProtocol = Either, DeniedUpgrade>; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()) + match self.status { + behaviour::Status::Enable => { + SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(HOP_PROTOCOL_NAME)), ()) + } + behaviour::Status::Disable => SubstreamProtocol::new(Either::Right(DeniedUpgrade), ()), + } } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { @@ -594,6 +612,7 @@ impl ConnectionHandler for Handler { .boxed(), ); } + In::SetStatus { status } => self.status = status, } } @@ -890,7 +909,7 @@ impl ConnectionHandler for Handler { ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: stream, + protocol: futures::future::Either::Left(stream), .. }) => { self.on_fully_negotiated_inbound(stream); diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index b3c07e1390e..f53d80f4848 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -39,7 +39,9 @@ mod proto { }; } -pub use behaviour::{Behaviour, CircuitId, Config, Event, StatusCode, rate_limiter::RateLimiter}; +pub use behaviour::{ + Behaviour, CircuitId, Config, Event, Status, StatusCode, rate_limiter::RateLimiter, +}; pub use protocol::{HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; /// Types related to the relay protocol inbound. diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index de3087b2903..94994203964 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -442,6 +442,147 @@ async fn reuse_connection() { .await; } +#[tokio::test] +async fn relay_auto_enables_on_external_address() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + + relay.listen_on(relay_addr.clone()).unwrap(); + + relay.add_external_address(relay_addr.clone()); + + let status = loop { + match relay.select_next_some().await { + SwarmEvent::Behaviour(RelayEvent::Relay(relay::Event::StatusChanged { status })) => { + break status; + } + SwarmEvent::NewListenAddr { .. } => {} + e => panic!("{e:?}"), + } + }; + + assert_eq!(status, relay::Status::Enable); +} + +#[tokio::test] +async fn relay_auto_disables_on_external_address_removal() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone()); + + // Wait for auto-enable. + loop { + match relay.select_next_some().await { + SwarmEvent::Behaviour(RelayEvent::Relay(relay::Event::StatusChanged { + status: relay::Status::Enable, + })) => break, + SwarmEvent::NewListenAddr { .. } => {} + e => panic!("{e:?}"), + } + } + + relay.remove_external_address(&relay_addr); + + let status = loop { + match relay.select_next_some().await { + SwarmEvent::Behaviour(RelayEvent::Relay(relay::Event::StatusChanged { status })) => { + break status; + } + SwarmEvent::Behaviour(RelayEvent::Ping(_)) => {} + e => panic!("{e:?}"), + } + }; + + assert_eq!(status, relay::Status::Disable); +} + +#[tokio::test] +async fn relay_manual_status_overrides_auto() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + + relay.listen_on(relay_addr.clone()).unwrap(); + + relay + .behaviour_mut() + .relay + .set_status(Some(relay::Status::Disable)); + + match relay.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => {} + e => panic!("{e:?}"), + } + + relay.add_external_address(relay_addr.clone()); + + relay.behaviour_mut().relay.set_status(None); + + let status = match relay.select_next_some().await { + SwarmEvent::Behaviour(RelayEvent::Relay(relay::Event::StatusChanged { status })) => status, + e => panic!("{e:?}"), + }; + + assert_eq!(status, relay::Status::Enable); +} + +#[tokio::test] +async fn disabled_relay_rejects_reservation() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + + relay + .behaviour_mut() + .relay + .set_status(Some(relay::Status::Disable)); + + tokio::spawn(async move { + relay.collect::>().await; + }); + + let mut client = build_client(); + let client_addr = relay_addr + .with(Protocol::P2p(relay_peer_id)) + .with(Protocol::P2pCircuit); + + let listener = client.listen_on(client_addr).unwrap(); + + assert!(wait_for_dial(&mut client, relay_peer_id).await); + + let error = client + .wait(|e| match e { + SwarmEvent::ListenerClosed { + listener_id, + reason: Err(e), + .. + } if listener_id == listener => Some(e), + _ => None, + }) + .await; + + assert!(error.source().is_some()); +} + fn build_relay() -> Swarm { build_relay_with_config(relay::Config { reservation_duration: Duration::from_secs(2),