Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154).
- Expire external address when a relay listener is closed without a replacement reservation.
See [PR 6285](https://github.com/libp2p/rust-libp2p/pull/6285).
- Reset reservation state on listener close.
See [PR 6461](https://github.com/libp2p/rust-libp2p/pull/6461).

## 0.21.1
- reduce allocations by replacing `get_or_insert` with `get_or_insert_with`
Expand Down
14 changes: 14 additions & 0 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ impl Behaviour {
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr));
}

if let Some(peer_id) = self.peer_for_connection(connection_id) {
self.queued_actions.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::In::ResetReservation),
});
}
}

fn peer_for_connection(&self, connection_id: ConnectionId) -> Option<PeerId> {
self.directly_connected_peers
.iter()
.find_map(|(peer, connections)| connections.contains(&connection_id).then_some(*peer))
}
}

Expand Down
5 changes: 5 additions & 0 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum In {
Reserve {
to_listener: mpsc::Sender<transport::ToListenerMsg>,
},
ResetReservation,
EstablishCircuit {
dst_peer_id: PeerId,
to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
Expand All @@ -71,6 +72,7 @@ impl fmt::Debug for In {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
In::Reserve { to_listener: _ } => f.debug_struct("In::Reserve").finish(),
In::ResetReservation => f.debug_struct("In::ResetReservation").finish(),
In::EstablishCircuit {
dst_peer_id,
to_dial: _,
Expand Down Expand Up @@ -250,6 +252,9 @@ impl ConnectionHandler for Handler {
In::Reserve { to_listener } => {
self.make_new_reservation(to_listener);
}
In::ResetReservation => {
self.reservation = Reservation::None;
}
In::EstablishCircuit {
to_dial,
dst_peer_id,
Expand Down
64 changes: 64 additions & 0 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,70 @@ async fn disabled_relay_rejects_reservation() {
assert!(error.source().is_some());
}

#[tokio::test]
async fn reservation_after_listener_close_emits_fresh_acceptance() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
let mut relay = build_relay();
let relay_peer_id = *relay.local_peer_id();

relay.listen_on(relay_addr.clone()).unwrap();
relay.add_external_address(relay_addr.clone());
tokio::spawn(async move {
relay.collect::<Vec<_>>().await;
});

let mut client = build_client();
let client_peer_id = *client.local_peer_id();
let client_addr = relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit);
let client_addr_with_peer_id = client_addr.clone().with(Protocol::P2p(client_peer_id));

let first_listener = client.listen_on(client_addr.clone()).unwrap();
assert!(wait_for_dial(&mut client, relay_peer_id).await);
wait_for_reservation(
&mut client,
client_addr_with_peer_id.clone(),
relay_peer_id,
false,
)
.await;

assert!(client.remove_listener(first_listener));

let mut first_listener_closed = false;
let mut first_addr_expired = false;
loop {
match client.select_next_some().await {
SwarmEvent::ListenerClosed { listener_id, .. } if listener_id == first_listener => {
first_listener_closed = true;
}
SwarmEvent::ExternalAddrExpired { address } if address == client_addr_with_peer_id => {
first_addr_expired = true;
}
SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {}
_ => {}
}
if first_listener_closed && first_addr_expired {
break;
}
}

let _second_listener = client.listen_on(client_addr.clone()).unwrap();

wait_for_reservation(
&mut client,
client_addr_with_peer_id.clone(),
relay_peer_id,
false,
)
.await;
}

fn build_relay() -> Swarm<Relay> {
build_relay_with_config(relay::Config {
reservation_duration: Duration::from_secs(2),
Expand Down
Loading