diff --git a/Cargo.toml b/Cargo.toml index 06349a1..ce9e1d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,12 +59,12 @@ indexeddb = ["dep:idb", "serde"] redb = ["dep:redb", "dep:cbor4ii", "serde"] sqlite = ["dep:sqlx", "dep:cbor4ii", "serde"] -webrtc =["dep:libp2p-webrtc", "dep:libp2p-webrtc-websys"] +webrtc = ["dep:libp2p-webrtc", "dep:libp2p-webrtc-websys"] websocket = ["libp2p/websocket", "rcgen", "dep:pem", "libp2p/websocket-websys"] webtransport = ["libp2p/webtransport-websys"] [workspace] -members = ["examples/browser-webrtc", "examples/custom-behaviour-and-context", "examples/custom-transport", "examples/distributed-key-value-store", "examples/file-sharing-request-response", "examples/file-sharing-stream", "examples/floodsub", "examples/gossipsub", "examples/ipfs-kad", "examples/peer-store", "examples/relay", "examples/rendezvous", "examples/stream", "examples/upnp"] +members = ["examples/autorelay", "examples/browser-webrtc", "examples/custom-behaviour-and-context", "examples/custom-transport", "examples/distributed-key-value-store", "examples/file-sharing-request-response", "examples/file-sharing-stream", "examples/floodsub", "examples/gossipsub", "examples/ipfs-kad", "examples/peer-store", "examples/relay", "examples/rendezvous", "examples/stream", "examples/upnp"] [workspace.dependencies] async-rt = "0.1.8" @@ -170,7 +170,7 @@ getrandom = { workspace = true, features = ["js"] } getrandom_03 = { workspace = true, features = ["wasm_js"] } idb = { workspace = true, optional = true } js-sys = { workspace = true, optional = true } -libp2p = { features = ["macros", "serde", "wasm-bindgen"], workspace = true } +libp2p = { features = ["macros", "serde"], workspace = true } libp2p-webrtc-websys = { workspace = true, optional = true } parking_lot = { workspace = true, optional = true } send_wrapper = { workspace = true, features = ["futures"] } @@ -180,4 +180,4 @@ tokio = { default-features = false, features = ["sync", "macros"], workspace = t url = { workspace = true, optional = true } wasm-bindgen.workspace = true wasm-bindgen-futures.workspace = true -web-sys = { workspace = true, optional = true } +web-sys = { workspace = true, optional = true } \ No newline at end of file diff --git a/examples/autorelay/Cargo.toml b/examples/autorelay/Cargo.toml new file mode 100644 index 0000000..387430e --- /dev/null +++ b/examples/autorelay/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "autorelay" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio.workspace = true +futures.workspace = true +connexa = { path = "../../", default-features = false, features = ["ed25519", "rsa", "tcp", "quic", "yamux", "noise", "relay", "ping", "identify", "testing", "kad", "dns"] } +clap = { version = "4.5.39", features = ["derive"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } diff --git a/examples/autorelay/src/main.rs b/examples/autorelay/src/main.rs new file mode 100644 index 0000000..770f138 --- /dev/null +++ b/examples/autorelay/src/main.rs @@ -0,0 +1,55 @@ +use connexa::prelude::{DefaultConnexaBuilder, Multiaddr, PeerId}; + +pub const BOOTSTRAP_NODES: &[(&str, &str)] = &[ + ( + "/ip4/104.131.131.82/tcp/4001", + "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", + ), + ( + "/dnsaddr/bootstrap.libp2p.io", + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", + ), + ( + "/dnsaddr/bootstrap.libp2p.io", + "QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", + ), + ( + "/dnsaddr/bootstrap.libp2p.io", + "QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", + ), + ( + "/dnsaddr/bootstrap.libp2p.io", + "QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", + ), +]; + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let connexa = DefaultConnexaBuilder::new_identity() + .enable_tcp() + .enable_quic() + .enable_dns() + .with_relay() + .with_autorelay() + .with_ping() + .with_identify() + .with_kademlia() + .build() + .await?; + + for (addr, peer_id) in BOOTSTRAP_NODES { + let peer_id: PeerId = peer_id.parse().expect("valid peer id"); + let addr: Multiaddr = addr.parse().expect("valid addr"); + connexa.dht().add_address(peer_id, addr).await?; + } + + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + let external_addrs = connexa.swarm().external_addresses().await?; + + for addr in external_addrs { + println!("- {}", addr); + } + + Ok(()) +} diff --git a/examples/upnp/src/main.rs b/examples/upnp/src/main.rs index d1f16ed..3f881a9 100644 --- a/examples/upnp/src/main.rs +++ b/examples/upnp/src/main.rs @@ -21,6 +21,7 @@ async fn main() -> std::io::Result<()> { UpnpEvent::NonRoutableGateway => println!("Gateway is not routable"), }, _ => {} + _ => {} }) .build() .await?; diff --git a/src/behaviour.rs b/src/behaviour.rs index 0e61aed..82000b1 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "relay")] +pub mod autorelay; pub mod dummy; pub mod peer_store; #[cfg(feature = "request-response")] @@ -56,6 +58,9 @@ where #[cfg(feature = "relay")] pub relay_client: Toggle, + #[cfg(feature = "relay")] + pub autorelay: Toggle, + #[cfg(not(target_arch = "wasm32"))] #[cfg(feature = "upnp")] pub upnp: Toggle, @@ -270,6 +275,15 @@ where } false => (None, None.into()), }; + #[cfg(feature = "relay")] + let autorelay = protocols + .autorelay + .then(|| { + let config_fn = config.autorelay_config; + let config = config_fn(autorelay::Config::default()); + autorelay::Behaviour::new_with_config(config) + }) + .into(); #[cfg(not(feature = "relay"))] let transport = None::<()>; @@ -361,6 +375,8 @@ where relay, #[cfg(feature = "relay")] relay_client, + #[cfg(feature = "relay")] + autorelay, #[cfg(feature = "stream")] stream, #[cfg(not(target_arch = "wasm32"))] diff --git a/src/behaviour/autorelay.rs b/src/behaviour/autorelay.rs new file mode 100644 index 0000000..87f42c8 --- /dev/null +++ b/src/behaviour/autorelay.rs @@ -0,0 +1,2117 @@ +// TODO: Replace with builtin autorelay behaviour from libp2p. See https://github.com/libp2p/rust-libp2p/pull/6156 +use crate::behaviour::autorelay::handler::Out; +use crate::multiaddr_ext::MultiaddrExt; +use crate::prelude::swarm::derive_prelude::{ListenerId, PortUse}; +use crate::prelude::swarm::{ + ExternalAddresses, ListenOpts, NewListenAddr, NotifyHandler, + derive_prelude::{ + AddressChange, ConnectionClosed, ConnectionDenied, ConnectionEstablished, ConnectionId, + DialFailure, ExpiredListenAddr, FromSwarm, ListenerClosed, ListenerError, Multiaddr, + NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + dial_opts::DialOpts, + dummy, +}; +use crate::prelude::transport::Endpoint; +use crate::prelude::{PeerId, Protocol}; +use either::Either; +use std::collections::BTreeMap; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + num::NonZeroU8, + task::{Context, Poll, Waker}, + time::Duration, +}; +use web_time::{Instant, SystemTime}; + +mod handler; + +#[derive(Debug)] +pub struct Behaviour { + config: Config, + status: Status, + auto_status_change: bool, + external_addresses: ExternalAddresses, + events: VecDeque::ToSwarm, THandlerInEvent>>, + + connections: HashMap<(PeerId, ConnectionId), Connection>, + + reservations: HashMap, + + external_reservations: HashMap, + + // placeholder for reservations. + // TODO: Removed once https://github.com/libp2p/rust-libp2p/pull/3222 is published or backported. + reservation_addrs: HashMap>, + + static_relays: HashMap>, + + static_dial_cooldowns: HashMap, + + failure_counts: HashMap, + + previous_relays: VecDeque<(PeerId, Multiaddr, SystemTime)>, + + relays_available: bool, + + waker: Option, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + config: Config::default(), + status: Status::Enable, + auto_status_change: true, + external_addresses: ExternalAddresses::default(), + events: VecDeque::new(), + connections: HashMap::new(), + reservations: HashMap::new(), + external_reservations: HashMap::new(), + reservation_addrs: HashMap::new(), + static_relays: HashMap::new(), + static_dial_cooldowns: HashMap::new(), + failure_counts: HashMap::new(), + previous_relays: VecDeque::new(), + relays_available: false, + waker: None, + } + } +} + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum Status { + #[default] + Enable, + Disable, +} + +#[derive(Debug)] +struct Connection { + address: Multiaddr, + relay_status: RelayStatus, +} + +impl Connection { + /// Mark relayed connection as not supported + pub(crate) fn disqualify_connection_if_relayed(&mut self) { + if self.address.is_relayed() { + self.relay_status = RelayStatus::NotSupported; + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RelayStatus { + Supported { status: ReservationStatus }, + NotSupported, + Pending, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ReservationStatus { + Idle, + Pending { id: ListenerId }, + Active { id: ListenerId }, + Blacklisted, +} + +#[derive(Debug)] +pub struct Config { + max_reservations: NonZeroU8, + failure_cooldown: Duration, + failure_cooldown_max: Duration, + max_previous_relays: usize, + static_relays: HashMap>, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_reservations: NonZeroU8::new(2).unwrap(), + failure_cooldown: Duration::from_secs(30), + failure_cooldown_max: Duration::from_secs(10 * 60), + max_previous_relays: 16, + static_relays: HashMap::new(), + } + } +} + +impl Config { + pub fn set_max_reservations(mut self, max_reservations: NonZeroU8) -> Self { + self.max_reservations = max_reservations; + self + } + + pub fn set_failure_cooldown(mut self, duration: Duration) -> Self { + self.failure_cooldown = duration; + self + } + + pub fn set_failure_cooldown_max(mut self, duration: Duration) -> Self { + self.failure_cooldown_max = duration; + self + } + + pub fn set_max_previous_relays(mut self, max: usize) -> Self { + self.max_previous_relays = max; + self + } + + pub fn add_static_relay(mut self, peer_id: PeerId, addresses: Vec) -> Self { + let entry = self.static_relays.entry(peer_id).or_default(); + for addr in addresses { + if !entry.contains(&addr) { + entry.push(addr); + } + } + self + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub enum Event { + /// The status of the local node has changed. + StatusChanged { status: Status }, + /// No connected peer supports the HOP protocol. + NoRelaysAvailable, + /// At least one connected peer supports the HOP protocol. + RelaysAvailable, +} + +impl Behaviour { + pub fn new_with_config(mut config: Config) -> Self { + let initial_static_relays = std::mem::take(&mut config.static_relays); + let mut behaviour = Self { + config, + ..Default::default() + }; + for (peer_id, addresses) in initial_static_relays { + for address in addresses { + behaviour.add_static_relay(peer_id, address); + } + } + behaviour + } + + /// Sets the autorelay status. + pub fn set_status(&mut self, status: Option) { + match status { + Some(status) => { + self.auto_status_change = false; + if self.status != status { + self.status = status; + self.events + .push_back(ToSwarm::GenerateEvent(Event::StatusChanged { status })); + if status == Status::Enable { + self.meet_reservation_target(); + } + } + } + None => { + self.auto_status_change = true; + self.determine_status_from_external_addresses(); + } + } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + /// Register a peer as a static relay. + /// + /// This will dial and establish a connection to the peer if it doesn't already have a direct + /// connection. + /// Note that peers that are through a relay cannot be used as a static peer + pub fn add_static_relay(&mut self, peer_id: PeerId, address: Multiaddr) -> bool { + if address.is_relayed() { + tracing::warn!(%peer_id, %address, "static relay address is relayed. ignoring."); + return false; + } + + let entry = self.static_relays.entry(peer_id).or_default(); + if entry.contains(&address) { + tracing::warn!(%peer_id, %address, "static relay address already exist"); + } else { + entry.push(address); + } + let combined = entry.clone(); + + if self.is_peer_idle(&peer_id) { + self.evict_for_static_peer(peer_id); + } + + if !self.queue_static_dial(peer_id, combined) { + self.meet_reservation_target(); + } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + + true + } + + /// Remove peer as a static relay. + /// This will not close any connections or terminate any existing reservation with the relay + pub fn remove_static_relay(&mut self, peer_id: &PeerId) -> bool { + self.static_dial_cooldowns.remove(peer_id); + self.static_relays.remove(peer_id).is_some() + } + + pub fn static_relays(&self) -> impl Iterator { + self.static_relays + .iter() + .map(|(peer, addrs)| (peer, addrs.as_slice())) + } + + pub fn previous_relays(&self) -> impl Iterator { + self.previous_relays + .iter() + .map(|(peer, addr, ts)| (peer, addr, ts)) + } + + fn static_dial_in_cooldown(&self, peer_id: &PeerId) -> bool { + self.static_dial_cooldowns + .get(peer_id) + .is_some_and(|deadline| *deadline > Instant::now()) + } + + fn queue_static_dial(&mut self, peer_id: PeerId, addresses: Vec) -> bool { + if addresses.is_empty() + || self.has_direct_connection(&peer_id) + || self.static_dial_in_cooldown(&peer_id) + { + return false; + } + let opts = DialOpts::peer_id(peer_id).addresses(addresses).build(); + self.events.push_back(ToSwarm::Dial { opts }); + true + } + + fn record_previous_relay(&mut self, peer_id: PeerId, address: Multiaddr) { + let max = self.config.max_previous_relays; + if max == 0 { + return; + } + self.previous_relays.retain(|(p, _, _)| *p != peer_id); + if self.previous_relays.len() >= max { + self.previous_relays.pop_front(); + } + self.previous_relays + .push_back((peer_id, address, SystemTime::now())); + } + + fn forget_previous_relay(&mut self, peer_id: &PeerId) { + self.previous_relays.retain(|(p, _, _)| p != peer_id); + } + + fn record_failure(&mut self, peer_id: PeerId) -> Duration { + let attempts = self.failure_counts.entry(peer_id).or_insert(0); + *attempts = attempts.saturating_add(1); + let exponent = attempts.saturating_sub(1).min(20); + let scale = 1u32 << exponent; + self.config + .failure_cooldown + .saturating_mul(scale) + .min(self.config.failure_cooldown_max) + } + + fn clear_failure(&mut self, peer_id: &PeerId) { + self.failure_counts.remove(peer_id); + } + + fn determine_status_from_external_addresses(&mut self) { + let has_public_addr = self + .external_addresses + .iter() + .any(|addr| !addr.is_relayed()); + + let new_status = match has_public_addr { + true => Status::Disable, + false => Status::Enable, + }; + if new_status != self.status { + self.status = new_status; + self.events + .push_back(ToSwarm::GenerateEvent(Event::StatusChanged { + status: new_status, + })); + match new_status { + Status::Enable => self.meet_reservation_target(), + Status::Disable => self.remove_all_reservations(), + } + } + } + + fn is_peer_idle(&self, peer_id: &PeerId) -> bool { + self.connections.iter().any(|((pid, _), info)| { + pid == peer_id + && info.relay_status + == RelayStatus::Supported { + status: ReservationStatus::Idle, + } + }) + } + + fn has_direct_connection(&self, peer_id: &PeerId) -> bool { + self.connections + .iter() + .any(|((pid, _), info)| pid == peer_id && !info.address.is_relayed()) + } + + fn evict_for_static_peer(&mut self, new_static: PeerId) { + let covered = self.covered_peers(); + if covered.contains(&new_static) { + return; + } + let max = self.config.max_reservations.get() as usize; + if covered.len() < max { + return; + } + + if let Some(listener_id) = self + .reservations + .iter() + .find(|(_, (peer_id, _))| !self.static_relays.contains_key(peer_id)) + .map(|(listener_id, _)| *listener_id) + { + self.events + .push_back(ToSwarm::RemoveListener { id: listener_id }); + } + } + + fn select_connection_for_reservation(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + let info = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("connection is present"); + + if info.relay_status + != (RelayStatus::Supported { + status: ReservationStatus::Idle, + }) + { + return; + } + + let addr_with_peer_id = match info.address.clone().with_p2p(peer_id) { + Ok(addr) => addr, + Err(addr) => { + tracing::warn!(%addr, "address unexpectedly contains a different peer id than the connection; marking relay connection ineligible"); + info.relay_status = RelayStatus::NotSupported; + return; + } + }; + + let opts = ListenOpts::new(addr_with_peer_id.with(Protocol::P2pCircuit)); + let id = opts.listener_id(); + + info.relay_status = RelayStatus::Supported { + status: ReservationStatus::Pending { id }, + }; + self.reservations.insert(id, (peer_id, connection_id)); + self.events.push_back(ToSwarm::ListenOn { opts }); + } + + /// Removes all existing reservations. + pub fn remove_all_reservations(&mut self) { + let relay_listeners = self + .reservations + .iter() + .map(|(id, (peer_id, conn_id))| (*id, *peer_id, *conn_id)) + .collect::>(); + + for (listener_id, peer_id, connection_id) in relay_listeners { + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + continue; + }; + + if !matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { id } | ReservationStatus::Pending { id } + } if id == listener_id + ) { + continue; + } + + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + + self.events + .push_back(ToSwarm::RemoveListener { id: listener_id }); + } + } + + fn disable_reservation(&mut self, id: ListenerId, failed: bool) { + self.expire_reservation_addrs(id); + + if self.external_reservations.remove(&id).is_some() { + self.meet_reservation_target(); + return; + } + + let Some((peer_id, connection_id)) = self.reservations.remove(&id) else { + return; + }; + + let Some(address) = self + .connections + .get(&(peer_id, connection_id)) + .filter(|info| { + matches!( + info.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + | ReservationStatus::Pending { .. } + } + ) + }) + .map(|info| info.address.clone()) + else { + self.meet_reservation_target(); + return; + }; + + let blacklist_duration = failed.then(|| self.record_failure(peer_id)); + + let connection = self + .connections + .get_mut(&(peer_id, connection_id)) + .expect("connection is tracked"); + match blacklist_duration { + Some(duration) => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Blacklisted, + }; + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: Either::Left(handler::In::Blacklist { duration }), + }); + } + None => { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + } + } + + self.record_previous_relay(peer_id, address); + self.meet_reservation_target(); + } + + fn reconcile_reservation_addrs(&mut self) { + let confirmed: HashSet = self + .external_addresses + .iter() + .filter(|addr| addr.is_relayed()) + .cloned() + .collect(); + + for addrs in self.reservation_addrs.values_mut() { + addrs.retain(|addr| confirmed.contains(addr)); + } + self.reservation_addrs.retain(|_, addrs| !addrs.is_empty()); + + for addr in confirmed { + let Some(relay_peer) = addr.relay_peer_id() else { + continue; + }; + + let listeners = self + .reservations + .iter() + .filter(|(_, (peer_id, _))| *peer_id == relay_peer) + .map(|(id, _)| *id) + .chain( + self.external_reservations + .iter() + .filter(|(_, peer_id)| **peer_id == relay_peer) + .map(|(id, _)| *id), + ) + .collect::>(); + + for id in listeners { + self.reservation_addrs + .entry(id) + .or_default() + .insert(addr.clone()); + } + } + } + + fn expire_reservation_addrs(&mut self, id: ListenerId) { + let Some(addrs) = self.reservation_addrs.remove(&id) else { + return; + }; + + for addr in addrs { + let still_backed = self + .reservation_addrs + .values() + .any(|other| other.contains(&addr)); + if !still_backed { + self.events.push_back(ToSwarm::ExternalAddrExpired(addr)); + } + } + } + + fn covered_peers(&self) -> HashSet { + self.reservations + .values() + .map(|(peer_id, _)| *peer_id) + .chain(self.external_reservations.values().copied()) + .collect() + } + + /// Meet the reservation target by selecting connections to establish a reservation. + fn meet_reservation_target(&mut self) { + if self.status == Status::Disable { + return; + } + + let max = self.config.max_reservations.get() as usize; + let covered = self.covered_peers(); + let budget = max.saturating_sub(covered.len()); + if budget == 0 { + return; + } + + let mut static_candidates = BTreeMap::new(); + let mut candidates: BTreeMap<_, ConnectionId> = BTreeMap::new(); + for ((peer_id, connection_id), info) in self.connections.iter() { + if covered.contains(peer_id) { + continue; + } + if info.relay_status + != (RelayStatus::Supported { + status: ReservationStatus::Idle, + }) + { + continue; + } + let bucket = if self.static_relays.contains_key(peer_id) { + &mut static_candidates + } else { + &mut candidates + }; + bucket + .entry(*peer_id) + .and_modify(|existing| *existing = (*existing).min(*connection_id)) + .or_insert(*connection_id); + } + + let selected_candidates: Vec<(PeerId, ConnectionId)> = static_candidates + .into_iter() + .chain(candidates) + .take(budget) + .collect(); + + for (peer_id, connection_id) in selected_candidates { + self.select_connection_for_reservation(peer_id, connection_id); + } + + debug_assert!(self.covered_peers().len() <= max); + } + + fn update_relay_availability(&mut self) { + let has_hop_peer = self + .connections + .values() + .any(|info| matches!(info.relay_status, RelayStatus::Supported { .. })); + + match (has_hop_peer, self.relays_available) { + (true, false) => { + self.relays_available = true; + self.events + .push_back(ToSwarm::GenerateEvent(Event::RelaysAvailable)); + } + (false, true) => { + self.relays_available = false; + self.events + .push_back(ToSwarm::GenerateEvent(Event::NoRelaysAvailable)); + } + _ => {} + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if local_addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { + if addr.is_relayed() { + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(handler::Handler::default())) + } + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + let change = self.external_addresses.on_swarm_event(&event); + + if self.auto_status_change && change { + self.determine_status_from_external_addresses(); + } + + if change { + self.reconcile_reservation_addrs(); + } + + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + endpoint, + connection_id, + .. + }) => { + let remote_addr = endpoint.get_remote_address().clone(); + + let mut connection = Connection { + address: remote_addr, + relay_status: RelayStatus::Pending, + }; + + connection.disqualify_connection_if_relayed(); + + self.connections + .insert((peer_id, connection_id), connection); + + if self.static_relays.contains_key(&peer_id) { + self.static_dial_cooldowns.remove(&peer_id); + } + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + let Some(connection) = self.connections.remove(&(peer_id, connection_id)) else { + return; + }; + + if !self.connections.keys().any(|(pid, _)| *pid == peer_id) { + self.clear_failure(&peer_id); + } + + let had_reservation = matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Active { .. } + | ReservationStatus::Pending { .. } + | ReservationStatus::Blacklisted + } + ); + + if let RelayStatus::Supported { + status: ReservationStatus::Active { id } | ReservationStatus::Pending { id }, + } = connection.relay_status + { + self.reservations.remove(&id); + self.meet_reservation_target(); + } + + if had_reservation { + self.record_previous_relay(peer_id, connection.address); + } + + if let Some(addresses) = self.static_relays.get(&peer_id).cloned() { + self.queue_static_dial(peer_id, addresses); + } + + self.update_relay_availability(); + } + FromSwarm::AddressChange(AddressChange { + peer_id, + connection_id, + old: _, + new, + }) => { + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + return; + }; + + let new_addr = new.get_remote_address(); + + connection.address = new_addr.clone(); + } + FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { + if !addr.is_relayed() { + return; + } + + if let Some((peer_id, connection_id)) = self.reservations.get(&listener_id).copied() + { + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) + else { + return; + }; + + if matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Pending { id } + } if id == listener_id + ) { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Active { id: listener_id }, + }; + self.forget_previous_relay(&peer_id); + self.clear_failure(&peer_id); + } + return; + } + + if let Some(relay_peer_id) = addr.relay_peer_id() { + self.external_reservations + .insert(listener_id, relay_peer_id); + self.reconcile_reservation_addrs(); + } + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, .. }) => { + self.disable_reservation(listener_id, false); + } + FromSwarm::ListenerError(ListenerError { listener_id, .. }) => { + self.disable_reservation(listener_id, true); + } + FromSwarm::ListenerClosed(ListenerClosed { + listener_id, + reason, + .. + }) => { + self.disable_reservation(listener_id, reason.is_err()); + } + FromSwarm::DialFailure(DialFailure { + peer_id: Some(peer_id), + error, + .. + }) if self.static_relays.contains_key(&peer_id) => { + tracing::warn!(%peer_id, %error, "dial to static relay failed"); + self.static_dial_cooldowns + .insert(peer_id, Instant::now() + self.config.failure_cooldown); + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let Either::Left(event) = event; + + let Some(connection) = self.connections.get_mut(&(peer_id, connection_id)) else { + return; + }; + + match event { + Out::Supported => { + if matches!( + connection.relay_status, + RelayStatus::Pending | RelayStatus::NotSupported + ) { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + if self.static_relays.contains_key(&peer_id) { + self.evict_for_static_peer(peer_id); + } + self.meet_reservation_target(); + self.update_relay_availability(); + } + } + Out::Unsupported => { + let drop_listener = match connection.relay_status { + RelayStatus::Supported { + status: ReservationStatus::Pending { id } | ReservationStatus::Active { id }, + } => Some(id), + _ => None, + }; + let lost_address = drop_listener.map(|_| connection.address.clone()); + connection.relay_status = RelayStatus::NotSupported; + if let Some(id) = drop_listener { + self.expire_reservation_addrs(id); + self.reservations.remove(&id); + self.events.push_back(ToSwarm::RemoveListener { id }); + self.meet_reservation_target(); + } + if let Some(address) = lost_address { + self.record_previous_relay(peer_id, address); + } + self.update_relay_availability(); + } + Out::BlacklistExpired => { + if matches!( + connection.relay_status, + RelayStatus::Supported { + status: ReservationStatus::Blacklisted + } + ) { + connection.relay_status = RelayStatus::Supported { + status: ReservationStatus::Idle, + }; + self.meet_reservation_target(); + } + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + self.waker = Some(cx.waker().clone()); + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use crate::behaviour::autorelay; + use futures::StreamExt; + use futures::{AsyncRead, AsyncWrite}; + use libp2p::core::muxing::StreamMuxerBox; + use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport}; + use libp2p::core::upgrade; + use libp2p::multiaddr::Protocol; + use libp2p::swarm::{Config, ConnectionId, NetworkBehaviour, SwarmEvent}; + use libp2p::{Multiaddr, PeerId, Swarm, Transport, identify, identity, noise, relay, yamux}; + use std::collections::{HashMap, HashSet}; + use std::num::NonZeroU8; + use std::time::Duration; + + #[tokio::test] + async fn autorelay_respects_max_reservations() { + init_tracing(); + + let (relay_a_peer_id, relay_a_addr) = spawn_relay(); + let (relay_b_peer_id, relay_b_addr) = spawn_relay(); + + let mut client = build_client( + autorelay::Config::default().set_max_reservations(NonZeroU8::new(1).unwrap()), + ); + client.dial(relay_a_addr).unwrap(); + client.dial(relay_b_addr).unwrap(); + + let first = wait_until_some(&mut client, Duration::from_secs(20), |event| { + if let SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal: false, + .. + }, + )) = event + { + Some(*relay_peer_id) + } else { + None + } + }) + .await; + assert!(first == relay_a_peer_id || first == relay_b_peer_id); + + let mut extra = 0usize; + let mut settle = futures_timer::Delay::new(Duration::from_secs(5)); + loop { + tokio::select! { + _ = &mut settle => break, + ev = client.select_next_some() => { + if matches!( + ev, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { renewal: false, .. } + )) + ) { + extra += 1; + } + } + } + } + + assert_eq!( + extra, 0, + "autorelay opened {extra} reservation(s) beyond max_reservations=1" + ); + } + + #[tokio::test] + async fn autorelay_with_two_reservations_among_five_relays() { + init_tracing(); + + let relay_addrs: Vec<(PeerId, Multiaddr)> = (0..5).map(|_| spawn_relay()).collect(); + let relay_peers: HashSet = relay_addrs.iter().map(|(p, _)| *p).collect(); + + let mut client = build_client( + autorelay::Config::default().set_max_reservations(NonZeroU8::new(2).unwrap()), + ); + for (_, addr) in &relay_addrs { + client.dial(addr.clone()).unwrap(); + } + + let mut direct_conns: HashMap = HashMap::new(); + let mut reservations: HashSet = HashSet::new(); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(30)); + loop { + tokio::select! { + _ = &mut sleep => panic!( + "timeout: got {} reservations, expected 2", + reservations.len() + ), + ev = client.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { + peer_id, connection_id, endpoint, .. + } if !endpoint.is_relayed() && relay_peers.contains(&peer_id) => { + direct_conns.insert(peer_id, connection_id); + } + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal: false, + .. + }, + )) => { + reservations.insert(relay_peer_id); + } + _ => {} + } + } + if reservations.len() == 2 { + break; + } + } + + let drop_peer = *reservations.iter().next().expect("two reservations held"); + let keep_peer = reservations + .iter() + .find(|p| **p != drop_peer) + .copied() + .expect("two reservations held"); + let drop_conn = *direct_conns + .get(&drop_peer) + .expect("direct connection observed"); + + assert!( + client.close_connection(drop_conn), + "should close the relay connection holding a reservation" + ); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(30)); + + loop { + tokio::select! { + _ = &mut sleep => panic!("timeout waiting for replacement reservation"), + ev = client.select_next_some() => { + if let SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal: false, + .. + }, + )) = ev + && relay_peer_id != keep_peer + && relay_peer_id != drop_peer + { + return; + } + } + } + } + } + + #[tokio::test] + async fn autorelay_drops_reservations_when_public_address_appears() { + init_tracing(); + + let (_, relay_a_addr) = spawn_relay(); + let (_, relay_b_addr) = spawn_relay(); + + let mut client = build_client( + autorelay::Config::default().set_max_reservations(NonZeroU8::new(2).unwrap()), + ); + client.dial(relay_a_addr).unwrap(); + client.dial(relay_b_addr).unwrap(); + + let mut confirmed: HashSet = HashSet::new(); + let mut sleep = futures_timer::Delay::new(Duration::from_secs(30)); + + loop { + tokio::select! { + _ = &mut sleep => panic!( + "timeout: got {} confirmed external addresses, expected 2", + confirmed.len() + ), + ev = client.select_next_some() => { + if let SwarmEvent::ExternalAddrConfirmed { address } = ev + && address.iter().any(|p| p == Protocol::P2pCircuit) + { + confirmed.insert(address); + } + } + } + if confirmed.len() == 2 { + break; + } + } + + let public_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + client.add_external_address(public_addr); + + let mut expired: HashSet = HashSet::new(); + let mut sleep = futures_timer::Delay::new(Duration::from_secs(15)); + + loop { + tokio::select! { + _ = &mut sleep => panic!( + "timeout: only {}/{} relayed addresses expired", + expired.len(), + confirmed.len() + ), + ev = client.select_next_some() => { + if let SwarmEvent::ExternalAddrExpired { address } = ev + && confirmed.contains(&address) + { + expired.insert(address); + } + } + } + if expired == confirmed { + break; + } + } + } + + #[tokio::test] + async fn autorelay_expires_circuit_address_once_on_connection_close() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_addr).unwrap(); + + let (conn, circuit) = wait_until_some(&mut client, Duration::from_secs(15), { + let mut direct: Option = None; + move |event| match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + } if *peer_id == relay_peer && !endpoint.is_relayed() => { + direct = Some(*connection_id); + None + } + SwarmEvent::ExternalAddrConfirmed { address } + if address.iter().any(|p| p == Protocol::P2pCircuit) => + { + direct.map(|c| (c, address.clone())) + } + _ => None, + } + }) + .await; + + assert!(client.close_connection(conn)); + + let mut expired = 0usize; + let mut settle = futures_timer::Delay::new(Duration::from_secs(5)); + loop { + tokio::select! { + _ = &mut settle => break, + ev = client.select_next_some() => { + if let SwarmEvent::ExternalAddrExpired { address } = &ev + && *address == circuit + { + expired += 1; + } + } + } + } + + assert_eq!( + expired, 1, + "expected exactly one expiry of the circuit address, got {expired}" + ); + } + + #[tokio::test] + async fn autorelay_expires_user_circuit_listen_on_removal() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Disable)); + + let circuit_listen = relay_addr + .with(Protocol::P2p(relay_peer)) + .with(Protocol::P2pCircuit); + let listener_id = client.listen_on(circuit_listen).unwrap(); + + let circuit = wait_until_some(&mut client, Duration::from_secs(15), |event| { + if let SwarmEvent::ExternalAddrConfirmed { address } = event + && address.iter().any(|p| p == Protocol::P2pCircuit) + { + Some(address.clone()) + } else { + None + } + }) + .await; + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::NewListenAddr { address, .. } if *address == circuit + ) + }) + .await; + + assert!(client.remove_listener(listener_id)); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::ExternalAddrExpired { address } if *address == circuit + ) + }) + .await; + } + + #[tokio::test] + async fn autorelay_blacklists_failing_relay_and_retries_after_cooldown() { + init_tracing(); + + let (_, relay_addr) = spawn_rejecting_relay(); + + let cooldown = Duration::from_secs(1); + let mut client = build_client( + autorelay::Config::default() + .set_max_reservations(NonZeroU8::new(1).unwrap()) + .set_failure_cooldown(cooldown), + ); + client.dial(relay_addr).unwrap(); + + let first_failure_at = + wait_for_listener_failure(&mut client, Duration::from_secs(10)).await; + + let early_retry = with_timeout( + wait_for_listener_failure(&mut client, cooldown * 5), + cooldown / 2, + ) + .await; + assert!( + early_retry.is_none(), + "autorelay retried during the cooldown window" + ); + + let second_failure_at = wait_for_listener_failure(&mut client, cooldown * 5).await; + let elapsed = second_failure_at.duration_since(first_failure_at); + assert!( + elapsed >= cooldown, + "retry should respect cooldown (elapsed {elapsed:?}, cooldown {cooldown:?})" + ); + } + + async fn wait_for_listener_failure( + client: &mut Swarm, + timeout: Duration, + ) -> std::time::Instant { + let mut sleep = futures_timer::Delay::new(timeout); + + loop { + tokio::select! { + _ = &mut sleep => panic!("timeout waiting for listener failure"), + ev = client.select_next_some() => { + if let SwarmEvent::ListenerClosed { reason: Err(_), .. } = ev { + return std::time::Instant::now(); + } + } + } + } + } + + #[tokio::test] + async fn autorelay_disabled_does_not_reserve() { + init_tracing(); + + let (_, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Disable)); + client.dial(relay_addr).unwrap(); + + let observed = with_timeout( + wait_until(&mut client, Duration::from_secs(5), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. } + )) + ) + }), + Duration::from_secs(3), + ) + .await; + + assert!( + observed.is_none(), + "autorelay opened a reservation while disabled" + ); + } + + #[tokio::test] + async fn autorelay_re_enable_triggers_reservation() { + init_tracing(); + + let (_, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Disable)); + client.dial(relay_addr).unwrap(); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(3)); + + loop { + tokio::select! { + _ = &mut sleep => break, + ev = client.select_next_some() => { + if matches!( + ev, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. } + )) + ) { + panic!("autorelay reserved while disabled"); + } + } + } + } + + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Enable)); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. } + )) + ) + }) + .await; + } + + #[tokio::test] + async fn autorelay_disable_preserves_active_reservation() { + init_tracing(); + + let (_, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_addr).unwrap(); + + wait_until(&mut client, Duration::from_secs(20), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. } + )) + ) + }) + .await; + + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Disable)); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(3)); + + loop { + tokio::select! { + _ = &mut sleep => break, + ev = client.select_next_some() => { + if let SwarmEvent::ListenerClosed { reason: Err(_), .. } = ev { + panic!("disabling autorelay dropped an active reservation"); + } + if let SwarmEvent::ExternalAddrExpired { .. } = ev { + panic!("disabling autorelay expired an external address"); + } + } + } + } + } + + #[tokio::test] + async fn autorelay_prefers_static_relay() { + init_tracing(); + + let (opportunistic_peer, opportunistic_addr) = spawn_relay(); + let (static_peer, static_addr) = spawn_relay(); + + let mut client = build_client( + autorelay::Config::default().set_max_reservations(NonZeroU8::new(1).unwrap()), + ); + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Disable)); + + client.dial(opportunistic_addr).unwrap(); + client + .behaviour_mut() + .autorelay + .add_static_relay(static_peer, static_addr); + + // Let both connections establish and identify exchanges complete. + let mut warmup = futures_timer::Delay::new(Duration::from_secs(3)); + loop { + tokio::select! { + _ = &mut warmup => break, + _ = client.select_next_some() => {} + } + } + + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Enable)); + + let accepted_peer = wait_until_some(&mut client, Duration::from_secs(15), |event| { + if let SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. }, + )) = event + { + Some(*relay_peer_id) + } else { + None + } + }) + .await; + + assert_eq!( + accepted_peer, static_peer, + "autorelay should pick the static relay over the opportunistic one" + ); + assert_ne!(accepted_peer, opportunistic_peer); + } + + #[tokio::test] + async fn remove_static_relay_preserves_active_reservation() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .add_static_relay(relay_peer, relay_addr); + + wait_until(&mut client, Duration::from_secs(15), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. } + )) + ) + }) + .await; + + assert!( + client + .behaviour_mut() + .autorelay + .remove_static_relay(&relay_peer) + ); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(3)); + + loop { + tokio::select! { + _ = &mut sleep => break, + ev = client.select_next_some() => { + if let SwarmEvent::ListenerClosed { reason: Err(_), .. } = ev { + panic!("removing static relay dropped an active reservation"); + } + if let SwarmEvent::ExternalAddrExpired { .. } = ev { + panic!("removing static relay expired an external address"); + } + } + } + } + } + + #[tokio::test] + async fn static_relay_redials_after_connection_drop() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .add_static_relay(relay_peer, relay_addr); + + let conn_id = + wait_for_reservation_with_conn(&mut client, relay_peer, Duration::from_secs(15)).await; + + assert!(client.close_connection(conn_id)); + + wait_until(&mut client, Duration::from_secs(20), { + let mut redialed = false; + let mut reserved_again = false; + move |event| { + match event { + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } if *peer_id == relay_peer && !endpoint.is_relayed() => { + redialed = true; + } + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. }, + )) if *relay_peer_id == relay_peer => { + reserved_again = true; + } + _ => {} + } + redialed && reserved_again + } + }) + .await; + } + + async fn wait_until_some( + client: &mut Swarm, + timeout: Duration, + mut extract: F, + ) -> T + where + F: FnMut(&SwarmEvent) -> Option, + { + let mut sleep = futures_timer::Delay::new(timeout); + + loop { + tokio::select! { + _ = &mut sleep => panic!("timeout waiting on predicate"), + ev = client.select_next_some() => { + if let Some(value) = extract(&ev) { + return value; + } + } + } + } + } + + #[tokio::test] + async fn autorelay_emits_relay_available_after_recovery() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_addr.clone()).unwrap(); + + let conn_id = + wait_for_reservation_with_conn(&mut client, relay_peer, Duration::from_secs(15)).await; + + assert!(client.close_connection(conn_id)); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::Autorelay(autorelay::Event::NoRelaysAvailable)) + ) + }) + .await; + + client.dial(relay_addr).unwrap(); + + wait_until(&mut client, Duration::from_secs(15), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::Autorelay(autorelay::Event::RelaysAvailable)) + ) + }) + .await; + } + + #[tokio::test] + async fn autorelay_no_relays_available_is_edge_triggered() { + init_tracing(); + + let (relay_a_peer, relay_a_addr) = spawn_relay(); + let (relay_b_peer, relay_b_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_a_addr).unwrap(); + client.dial(relay_b_addr).unwrap(); + + let mut conns: HashMap = HashMap::new(); + let mut reserved: HashSet = HashSet::new(); + let mut sleep = futures_timer::Delay::new(Duration::from_secs(20)); + + loop { + tokio::select! { + _ = &mut sleep => panic!("did not get both reservations in time"), + ev = client.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { + peer_id, connection_id, endpoint, .. + } if !endpoint.is_relayed() + && (peer_id == relay_a_peer || peer_id == relay_b_peer) => + { + conns.insert(peer_id, connection_id); + } + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } + )) if relay_peer_id == relay_a_peer || relay_peer_id == relay_b_peer => { + reserved.insert(relay_peer_id); + } + _ => {} + } + } + if reserved.len() == 2 { + break; + } + } + + let conn_a = *conns.get(&relay_a_peer).unwrap(); + let conn_b = *conns.get(&relay_b_peer).unwrap(); + + assert!(client.close_connection(conn_a)); + assert!(client.close_connection(conn_b)); + + let mut starved_count = 0usize; + let mut sleep = futures_timer::Delay::new(Duration::from_secs(5)); + + loop { + tokio::select! { + _ = &mut sleep => break, + ev = client.select_next_some() => { + if matches!( + ev, + SwarmEvent::Behaviour(ClientEvent::Autorelay( + autorelay::Event::NoRelaysAvailable + )) + ) { + starved_count += 1; + } + } + } + } + + assert_eq!( + starved_count, 1, + "NoRelaysAvailable should fire exactly once across multiple meet_reservation_target invocations" + ); + } + + #[tokio::test] + async fn autorelay_resumes_after_public_address_removed() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_addr).unwrap(); + + wait_for_reservation_from(&mut client, relay_peer, Duration::from_secs(15)).await; + + let public_addr = memory_addr(); + client.add_external_address(public_addr.clone()); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!(event, SwarmEvent::ExternalAddrExpired { .. }) + }) + .await; + + client.remove_external_address(&public_addr); + + wait_for_reservation_from(&mut client, relay_peer, Duration::from_secs(15)).await; + } + + #[tokio::test] + async fn autorelay_manual_enable_ignores_public_address() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client + .behaviour_mut() + .autorelay + .set_status(Some(autorelay::Status::Enable)); + client.dial(relay_addr).unwrap(); + + wait_for_reservation_from(&mut client, relay_peer, Duration::from_secs(15)).await; + + client.add_external_address(memory_addr()); + + let mut sleep = futures_timer::Delay::new(Duration::from_secs(3)); + + loop { + tokio::select! { + _ = &mut sleep => break, + ev = client.select_next_some() => { + if let SwarmEvent::ListenerClosed { reason: Err(_), .. } = ev { + panic!("manual-Enable autorelay dropped reservation after public addr appeared"); + } + if let SwarmEvent::ExternalAddrExpired { address } = &ev + && address.iter().any(|p| p == Protocol::P2pCircuit) + { + panic!("manual-Enable autorelay expired the relayed external address"); + } + if let SwarmEvent::Behaviour(ClientEvent::Autorelay( + autorelay::Event::StatusChanged { status: autorelay::Status::Disable }, + )) = ev + { + panic!("manual-Enable autorelay flipped to Disable on public addr"); + } + } + } + } + } + + #[tokio::test] + async fn autorelay_forgets_previous_relay_on_reacquire() { + init_tracing(); + + let (relay_peer, relay_addr) = spawn_relay(); + + let mut client = build_client(autorelay::Config::default()); + client.dial(relay_addr.clone()).unwrap(); + + let conn_id = + wait_for_reservation_with_conn(&mut client, relay_peer, Duration::from_secs(15)).await; + + assert!(client.close_connection(conn_id)); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::Autorelay(autorelay::Event::NoRelaysAvailable)) + ) + }) + .await; + + assert!( + client + .behaviour() + .autorelay + .previous_relays() + .any(|(p, _, _)| *p == relay_peer), + "expected {relay_peer} in previous_relays after loss" + ); + + client.dial(relay_addr).unwrap(); + + wait_until(&mut client, Duration::from_secs(15), |event| { + matches!( + event, + SwarmEvent::NewListenAddr { address, .. } if address.iter().any(|p| p == Protocol::P2pCircuit) + ) + }) + .await; + + let previous: Vec = client + .behaviour() + .autorelay + .previous_relays() + .map(|(p, _, _)| *p) + .collect(); + assert!( + !previous.contains(&relay_peer), + "expected {relay_peer} to be removed from previous_relays after re-acquire, got {previous:?}" + ); + } + + #[tokio::test] + async fn autorelay_previous_relays_is_bounded() { + init_tracing(); + + let peers_and_addrs: Vec<(PeerId, Multiaddr)> = (0..3).map(|_| spawn_relay()).collect(); + + let mut client = build_client( + autorelay::Config::default() + .set_max_reservations(NonZeroU8::new(1).unwrap()) + .set_max_previous_relays(2), + ); + + for (peer, addr) in &peers_and_addrs { + client.dial(addr.clone()).unwrap(); + + let conn_id = + wait_for_reservation_with_conn(&mut client, *peer, Duration::from_secs(15)).await; + + assert!(client.close_connection(conn_id)); + + wait_until(&mut client, Duration::from_secs(10), |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::Autorelay( + autorelay::Event::NoRelaysAvailable + )) + ) + }) + .await; + } + + let previous: Vec = client + .behaviour() + .autorelay + .previous_relays() + .map(|(p, _, _)| *p) + .collect(); + + assert_eq!( + previous.len(), + 2, + "expected previous_relays to be bounded to 2, got {previous:?}" + ); + assert!( + !previous.contains(&peers_and_addrs[0].0), + "oldest relay should have been evicted: {previous:?}" + ); + assert!(previous.contains(&peers_and_addrs[1].0)); + assert!(previous.contains(&peers_and_addrs[2].0)); + } + + #[tokio::test] + async fn autorelay_static_relay_dial_cooldown_after_failure() { + init_tracing(); + + let cooldown = Duration::from_secs(2); + let mut client = build_client(autorelay::Config::default().set_failure_cooldown(cooldown)); + + let unreachable_peer = PeerId::random(); + let unreachable_addr = memory_addr(); + + client + .behaviour_mut() + .autorelay + .add_static_relay(unreachable_peer, unreachable_addr.clone()); + + wait_until(&mut client, Duration::from_secs(5), |event| { + matches!( + event, + SwarmEvent::OutgoingConnectionError { peer_id: Some(p), .. } if *p == unreachable_peer + ) + }) + .await; + + let first_failure_at = std::time::Instant::now(); + + client + .behaviour_mut() + .autorelay + .add_static_relay(unreachable_peer, unreachable_addr.clone()); + + let mut redialed = false; + let mut watch = futures_timer::Delay::new(cooldown / 2); + loop { + tokio::select! { + _ = &mut watch => break, + ev = client.select_next_some() => { + if matches!( + ev, + SwarmEvent::OutgoingConnectionError { peer_id: Some(p), .. } if p == unreachable_peer + ) { + redialed = true; + break; + } + } + } + } + assert!(!redialed, "autorelay redialed within cooldown"); + + let remaining = cooldown + .checked_sub(first_failure_at.elapsed()) + .unwrap_or_default(); + if !remaining.is_zero() { + futures_timer::Delay::new(remaining + Duration::from_millis(200)).await; + } + + client + .behaviour_mut() + .autorelay + .add_static_relay(unreachable_peer, unreachable_addr); + + wait_until(&mut client, Duration::from_secs(5), |event| { + matches!( + event, + SwarmEvent::OutgoingConnectionError { peer_id: Some(p), .. } if *p == unreachable_peer + ) + }) + .await; + } + + #[tokio::test] + async fn autorelay_evicts_discovered_peers_for_static() { + init_tracing(); + + let (opp_a_peer, opp_a_addr) = spawn_relay(); + let (opp_b_peer, opp_b_addr) = spawn_relay(); + let (static_peer, static_addr) = spawn_relay(); + + let mut client = build_client( + autorelay::Config::default().set_max_reservations(NonZeroU8::new(1).unwrap()), + ); + + client.dial(opp_a_addr).unwrap(); + client.dial(opp_b_addr).unwrap(); + + wait_until_some(&mut client, Duration::from_secs(20), |event| { + if let SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. }, + )) = event + && (*relay_peer_id == opp_a_peer || *relay_peer_id == opp_b_peer) + { + Some(*relay_peer_id) + } else { + None + } + }) + .await; + + client + .behaviour_mut() + .autorelay + .add_static_relay(static_peer, static_addr); + + wait_for_reservation_from(&mut client, static_peer, Duration::from_secs(20)).await; + } + + async fn wait_until(client: &mut Swarm, timeout: Duration, mut predicate: F) + where + F: FnMut(&SwarmEvent) -> bool, + { + let mut sleep = futures_timer::Delay::new(timeout); + loop { + tokio::select! { + _ = &mut sleep => panic!("timeout waiting on predicate"), + ev = client.select_next_some() => { + if predicate(&ev) { + return; + } + } + } + } + } + + async fn with_timeout(future: F, timeout: Duration) -> Option { + use futures::future::Either; + let timer = futures_timer::Delay::new(timeout); + futures::pin_mut!(future); + match futures::future::select(future, timer).await { + Either::Left((output, _)) => Some(output), + Either::Right(_) => None, + } + } + + async fn wait_for_reservation_from( + client: &mut Swarm, + peer: PeerId, + timeout: Duration, + ) { + wait_until(client, timeout, |event| { + matches!( + event, + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } + )) if *relay_peer_id == peer + ) + }) + .await; + } + + async fn wait_for_reservation_with_conn( + client: &mut Swarm, + peer: PeerId, + timeout: Duration, + ) -> ConnectionId { + wait_until_some(client, timeout, { + let mut established: Option = None; + let mut reserved = false; + move |event| { + match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + } if *peer_id == peer && !endpoint.is_relayed() => { + established = Some(*connection_id); + } + SwarmEvent::Behaviour(ClientEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. }, + )) if *relay_peer_id == peer => { + reserved = true; + } + _ => {} + } + if reserved { established } else { None } + } + }) + .await + } + + fn init_tracing() { + // let _ = tracing_subscriber::fmt() + // .with_env_filter(EnvFilter::from_default_env()) + // .try_init(); + } + + fn memory_addr() -> Multiaddr { + Multiaddr::empty().with(Protocol::Memory(rand::random::())) + } + + fn spawn_relay() -> (PeerId, Multiaddr) { + spawn_relay_swarm(build_relay()) + } + + fn spawn_rejecting_relay() -> (PeerId, Multiaddr) { + spawn_relay_swarm(build_rejecting_relay()) + } + + fn spawn_relay_swarm(mut relay: Swarm) -> (PeerId, Multiaddr) { + let addr = memory_addr(); + let peer = *relay.local_peer_id(); + relay.listen_on(addr.clone()).unwrap(); + relay.add_external_address(addr.clone()); + tokio::spawn(relay.collect::>()); + (peer, addr) + } + + fn build_relay() -> Swarm { + build_relay_with_config(relay::Config { + reservation_duration: Duration::from_secs(60), + ..Default::default() + }) + } + + fn build_rejecting_relay() -> Swarm { + build_relay_with_config(relay::Config { + max_reservations: 0, + ..Default::default() + }) + } + + fn build_relay_with_config(config: relay::Config) -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = local_key.public().to_peer_id(); + let transport = upgrade_transport(MemoryTransport::default().boxed(), &local_key); + + Swarm::new( + transport, + Relay { + relay: relay::Behaviour::new(local_peer_id, config), + identify: identify::Behaviour::new(identify::Config::new( + "/autorelay-test/1.0.0".to_owned(), + local_key.public(), + )), + }, + local_peer_id, + Config::with_tokio_executor(), + ) + } + + fn build_client(autorelay_config: autorelay::Config) -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = local_key.public().to_peer_id(); + let (relay_transport, relay_client) = relay::client::new(local_peer_id); + + let transport = upgrade_transport( + OrTransport::new(relay_transport, MemoryTransport::default()).boxed(), + &local_key, + ); + + Swarm::new( + transport, + Client { + relay_client, + autorelay: autorelay::Behaviour::new_with_config(autorelay_config), + identify: identify::Behaviour::new(identify::Config::new( + "/autorelay-test/1.0.0".to_owned(), + local_key.public(), + )), + }, + local_peer_id, + Config::with_tokio_executor(), + ) + } + + fn upgrade_transport( + transport: Boxed, + identity: &identity::Keypair, + ) -> Boxed<(PeerId, StreamMuxerBox)> + where + StreamSink: AsyncRead + AsyncWrite + Send + Unpin + 'static, + { + transport + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(identity).unwrap()) + .multiplex(yamux::Config::default()) + .boxed() + } + + #[derive(NetworkBehaviour)] + struct Relay { + relay: relay::Behaviour, + identify: identify::Behaviour, + } + + #[derive(NetworkBehaviour)] + struct Client { + relay_client: relay::client::Behaviour, + autorelay: autorelay::Behaviour, + identify: identify::Behaviour, + } +} diff --git a/src/behaviour/autorelay/handler.rs b/src/behaviour/autorelay/handler.rs new file mode 100644 index 0000000..169e1ff --- /dev/null +++ b/src/behaviour/autorelay/handler.rs @@ -0,0 +1,129 @@ +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use crate::prelude::swarm::handler::ConnectionEvent; +use crate::prelude::swarm::{ + ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, SupportedProtocols, +}; +use crate::prelude::transport::upgrade::DeniedUpgrade; +use futures::FutureExt; +use futures_timer::Delay; +use libp2p::relay::HOP_PROTOCOL_NAME; + +#[derive(Default, Debug)] +pub struct Handler { + events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + >, + >, + + supported: bool, + + supported_protocol: SupportedProtocols, + + blacklist_timer: Option, +} + +#[derive(Debug, Copy, Clone)] +pub enum In { + Blacklist { duration: Duration }, +} + +#[derive(Debug, Copy, Clone)] +pub enum Out { + Supported, + Unsupported, + BlacklistExpired, +} + +#[allow(deprecated)] +impl ConnectionHandler for Handler { + type FromBehaviour = In; + type ToBehaviour = Out; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> bool { + false + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + In::Blacklist { duration } => { + self.blacklist_timer = Some(Delay::new(duration)); + } + } + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + if let ConnectionEvent::RemoteProtocolsChange(protocol) = event { + let change = self.supported_protocol.on_protocols_change(protocol); + if change { + let valid = self + .supported_protocol + .iter() + .any(|proto| HOP_PROTOCOL_NAME.eq(proto)); + + match (valid, self.supported) { + (true, false) => { + self.supported = true; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Out::Supported)); + } + (false, true) => { + self.supported = false; + self.blacklist_timer = None; + self.events + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Out::Unsupported)); + } + (true, true) => {} + _ => {} + } + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + if let Some(timer) = self.blacklist_timer.as_mut() + && timer.poll_unpin(cx).is_ready() + { + self.blacklist_timer = None; + if self.supported { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Out::BlacklistExpired, + )); + } + } + + Poll::Pending + } +} diff --git a/src/builder.rs b/src/builder.rs index b106711..19331e7 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -127,6 +127,9 @@ pub(crate) struct Config { pub autonat_v2_client_config: Box AutonatV2ClientConfig>, #[cfg(feature = "relay")] pub relay_server_config: Box RelayServerConfig>, + #[cfg(feature = "relay")] + pub autorelay_config: + Box behaviour::autorelay::Config>, #[cfg(feature = "identify")] pub identify_config: (String, Box IdentifyConfig>), #[cfg(feature = "request-response")] @@ -159,6 +162,8 @@ impl Default for Config { autonat_v2_client_config: Box::new(|config| config), #[cfg(feature = "relay")] relay_server_config: Box::new(|config| config), + #[cfg(feature = "relay")] + autorelay_config: Box::new(|config| config), #[cfg(feature = "identify")] identify_config: (String::from("/ipfs/id"), Box::new(|config| config)), #[cfg(feature = "request-response")] @@ -183,6 +188,8 @@ pub(crate) struct Protocols { pub(crate) relay_client: bool, #[cfg(feature = "relay")] pub(crate) relay_server: bool, + #[cfg(feature = "relay")] + pub(crate) autorelay: bool, #[cfg(feature = "dcutr")] #[cfg(not(target_arch = "wasm32"))] pub(crate) dcutr: bool, @@ -444,6 +451,23 @@ where self } + /// Enable autorelay + #[cfg(feature = "relay")] + pub fn with_autorelay(self) -> Self { + self.with_autorelay_with_config(|config| config) + } + + /// Enable autorelay + #[cfg(feature = "relay")] + pub fn with_autorelay_with_config(mut self, f: F) -> Self + where + F: FnOnce(behaviour::autorelay::Config) -> behaviour::autorelay::Config + 'static, + { + self.config.autorelay_config = Box::new(f); + self.protocols.autorelay = true; + self + } + /// Enables DCuTR #[cfg(all(feature = "relay", feature = "dcutr"))] #[cfg(not(target_arch = "wasm32"))] diff --git a/src/handle.rs b/src/handle.rs index 359fed0..9d68d3d 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -8,6 +8,8 @@ pub(crate) mod floodsub; #[cfg(feature = "gossipsub")] pub(crate) mod gossipsub; mod peer_store; +#[cfg(feature = "relay")] +mod relay; #[cfg(feature = "rendezvous")] pub(crate) mod rendezvous; #[cfg(feature = "request-response")] @@ -28,6 +30,8 @@ use crate::handle::floodsub::ConnexaFloodsub; #[cfg(feature = "gossipsub")] use crate::handle::gossipsub::ConnexaGossipsub; use crate::handle::peer_store::ConnexaPeerstore; +#[cfg(feature = "relay")] +use crate::handle::relay::ConnexaRelay; #[cfg(feature = "rendezvous")] use crate::handle::rendezvous::ConnexaRendezvous; #[cfg(feature = "request-response")] @@ -137,6 +141,12 @@ where ConnexaRendezvous::new(self) } + /// Returns a handle for relay functions + #[cfg(feature = "relay")] + pub fn relay(&self) -> ConnexaRelay<'_, T, K> { + ConnexaRelay::new(self) + } + /// Returns a handle to manage peer whitelist functionality pub fn whitelist(&self) -> ConnexaWhitelist<'_, T, K> { ConnexaWhitelist::new(self) diff --git a/src/handle/relay.rs b/src/handle/relay.rs new file mode 100644 index 0000000..fd20792 --- /dev/null +++ b/src/handle/relay.rs @@ -0,0 +1,95 @@ +use crate::handle::Connexa; +use crate::prelude::{Multiaddr, PeerId}; +use crate::types::AutoRelayCommand; +use futures::channel::oneshot; +use std::io; + +pub struct ConnexaRelay<'a, T, K> { + connexa: &'a Connexa, +} + +impl<'a, T, K> ConnexaRelay<'a, T, K> +where + T: Send + Sync + 'static, +{ + pub(crate) fn new(connexa: &'a Connexa) -> Self { + Self { connexa } + } + + pub async fn add_static_relay(&self, peer_id: PeerId, addr: Multiaddr) -> io::Result { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send( + AutoRelayCommand::AddStaticRelay { + peer_id, + relay_addr: addr, + resp: tx, + } + .into(), + ) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn remove_static_relay(&self, peer_id: PeerId) -> io::Result { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::RemoveStaticRelay { peer_id, resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn list_static_relays(&self) -> io::Result)>> { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::ListStaticRelays { resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn get_static_relay(&self, peer_id: PeerId) -> io::Result> { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::GetStaticRelay { peer_id, resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn enable_auto_relay(&self) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::EnableAutoRelay { resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn disable_auto_relay(&self) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::DisableAutoRelay { resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } + + pub async fn disable_relays(&self) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.connexa + .to_task + .clone() + .send(AutoRelayCommand::DisableRelays { resp: tx }.into()) + .await?; + rx.await.map_err(io::Error::other)? + } +} diff --git a/src/multiaddr_ext.rs b/src/multiaddr_ext.rs index b576f02..f371eb9 100644 --- a/src/multiaddr_ext.rs +++ b/src/multiaddr_ext.rs @@ -1,3 +1,4 @@ +use crate::prelude::PeerId; use libp2p::Multiaddr; use libp2p::multiaddr::Protocol; @@ -11,6 +12,7 @@ pub trait MultiaddrExt { fn is_private(&self) -> bool; fn is_unspecified(&self) -> bool; + fn relay_peer_id(&self) -> Option; } impl MultiaddrExt for Multiaddr { @@ -47,6 +49,18 @@ impl MultiaddrExt for Multiaddr { _ => false, }) } + + fn relay_peer_id(&self) -> Option { + let mut last_p2p = None; + for proto in self.iter() { + match proto { + Protocol::P2p(peer) => last_p2p = Some(peer), + Protocol::P2pCircuit => return last_p2p, + _ => {} + } + } + None + } } #[cfg(test)] diff --git a/src/task.rs b/src/task.rs index b285702..0e6e316 100644 --- a/src/task.rs +++ b/src/task.rs @@ -52,7 +52,7 @@ use futures::channel::{mpsc, oneshot}; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; use futures_timer::Delay; -use indexmap::IndexMap; +use indexmap::{IndexMap, IndexSet}; #[cfg(feature = "gossipsub")] use libp2p::gossipsub::{MessageAcceptance, MessageId}; #[cfg(feature = "kad")] @@ -573,6 +573,10 @@ where Command::Autonat(autonat_command) => self.process_autonat_v1_command(autonat_command), #[cfg(feature = "kad")] Command::Dht(dht_command) => self.process_kademlia_command(dht_command), + #[cfg(feature = "relay")] + Command::AutoRelay(autorelay_command) => { + self.process_autorelay_commands(autorelay_command) + } #[cfg(feature = "stream")] Command::Stream(stream_command) => self.process_stream_command(stream_command), #[cfg(feature = "request-response")] diff --git a/src/task/ping.rs b/src/task/ping.rs index 3222b40..e915f83 100644 --- a/src/task/ping.rs +++ b/src/task/ping.rs @@ -20,6 +20,18 @@ where match result { Ok(duration) => { tracing::info!("ping to {} at {} took {:?}", peer, connection, duration); + + // #[cfg(feature = "relay")] + // if let Some(autorelay) = self + // .swarm + // .as_mut() + // .expect("swarm valid") + // .behaviour_mut() + // .autorelay + // .as_mut() + // { + // autorelay.set_peer_ping(peer, connection, duration); + // } } Err(e) => { // TODO: Possibly disconnect peer since if there is an error? diff --git a/src/task/relay.rs b/src/task/relay.rs index 219b797..fea0ae2 100644 --- a/src/task/relay.rs +++ b/src/task/relay.rs @@ -1,9 +1,14 @@ +use crate::behaviour::autorelay; use crate::behaviour::peer_store::store::Store; use crate::task::ConnexaTask; +use crate::types::AutoRelayCommand; use libp2p::relay::{Event as RelayServerEvent, client::Event as RelayClientEvent}; use libp2p::swarm::NetworkBehaviour; use std::fmt::Debug; +#[allow(dead_code)] +pub const RELAY_NAMESPACE: &[u8] = b"/libp2p/relay"; + impl ConnexaTask where X: Default + Send + 'static, @@ -11,6 +16,89 @@ where C::ToSwarm: Debug, S: Store, { + pub fn process_autorelay_commands(&mut self, command: AutoRelayCommand) { + let swarm = self.swarm.as_mut().expect("swarm is still valid"); + match command { + AutoRelayCommand::AddStaticRelay { + peer_id, + relay_addr, + resp, + } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + let _ = resp.send(Ok(autorelay.add_static_relay(peer_id, relay_addr))); + } + AutoRelayCommand::RemoveStaticRelay { peer_id, resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + let _ = resp.send(Ok(autorelay.remove_static_relay(&peer_id))); + } + AutoRelayCommand::ListStaticRelays { resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + let list = autorelay + .static_relays() + .map(|(peer_id, addr)| (*peer_id, addr.to_vec())) + .collect::>(); + let _ = resp.send(Ok(list)); + } + AutoRelayCommand::GetStaticRelay { peer_id, resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + let addr = autorelay + .static_relays() + .find(|(p, _)| **p == peer_id) + .map(|(_, addr)| addr.to_vec()) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::NotFound, "static relay not found") + }); + let _ = resp.send(addr); + } + AutoRelayCommand::EnableAutoRelay { resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + autorelay.set_status(Some(autorelay::Status::Enable)); + + let _ = resp.send(Ok(())); + } + AutoRelayCommand::DisableAutoRelay { resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + autorelay.set_status(Some(autorelay::Status::Disable)); + + let _ = resp.send(Ok(())); + } + AutoRelayCommand::DisableRelays { resp } => { + let Some(autorelay) = swarm.behaviour_mut().autorelay.as_mut() else { + let _ = resp.send(Err(std::io::Error::other("autorelay is not enabled"))); + return; + }; + + autorelay.remove_all_reservations(); + + let _ = resp.send(Ok(())); + } + } + } + pub fn process_relay_client_event(&mut self, event: RelayClientEvent) { match event { RelayClientEvent::ReservationReqAccepted { diff --git a/src/types.rs b/src/types.rs index d9d868f..edfea3a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -46,6 +46,8 @@ pub enum Command { Rendezvous(RendezvousCommand), #[cfg(feature = "autonat")] Autonat(AutonatCommand), + #[cfg(feature = "relay")] + AutoRelay(AutoRelayCommand), Whitelist(WhitelistCommand), Blacklist(BlacklistCommand), ConnectionLimits(ConnectionLimitsCommand), @@ -108,6 +110,13 @@ impl From for Command { } } +#[cfg(feature = "relay")] +impl From for Command { + fn from(cmd: AutoRelayCommand) -> Self { + Command::AutoRelay(cmd) + } +} + impl From for Command { fn from(cmd: WhitelistCommand) -> Self { Command::Whitelist(cmd) @@ -432,6 +441,36 @@ pub enum DHTCommand { }, } +#[cfg(feature = "relay")] +#[derive(Debug)] +pub enum AutoRelayCommand { + AddStaticRelay { + peer_id: PeerId, + relay_addr: Multiaddr, + resp: oneshot::Sender>, + }, + RemoveStaticRelay { + peer_id: PeerId, + resp: oneshot::Sender>, + }, + DisableRelays { + resp: oneshot::Sender>, + }, + ListStaticRelays { + resp: oneshot::Sender)>>>, + }, + GetStaticRelay { + peer_id: PeerId, + resp: oneshot::Sender>>, + }, + EnableAutoRelay { + resp: oneshot::Sender>, + }, + DisableAutoRelay { + resp: oneshot::Sender>, + }, +} + #[cfg(feature = "request-response")] type ResponseStream = BoxStream<'static, (PeerId, ConnexaResult)>; #[cfg(feature = "request-response")]