Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions packages/rs-dash-async/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,21 @@ pub struct ShutdownReport<K: RegistryKey> {
pub per_worker: BTreeMap<K, WorkerStatus>,
/// Number of parked orphans still alive at the reap deadline.
pub detached: usize,
/// Terminal status of the orphan reap pass: `Ok` if every parked
/// orphan joined cleanly, `Detached` if any survived past the
/// grace, `Panicked` / `Stopped` / `Error` if an orphan finished
/// non-cleanly. Preserved so an orphan that ran to completion but
/// e.g. panicked still fails [`all_clean`](Self::all_clean), even
/// when `detached == 0`.
pub orphan_status: WorkerStatus,
}

impl<K: RegistryKey> ShutdownReport<K> {
/// `true` only when every per-worker status is clean and no orphan
/// survived the reap.
/// `true` only when every per-worker status is clean and the orphan
/// reap is clean (no survivors past the grace and no non-clean
/// orphan classification).
pub fn all_clean(&self) -> bool {
self.detached == 0 && self.per_worker.values().all(WorkerStatus::is_clean)
self.orphan_status.is_clean() && self.per_worker.values().all(WorkerStatus::is_clean)
}
}

Expand Down Expand Up @@ -705,11 +713,14 @@ impl<K: RegistryKey> ThreadRegistry<K> {
}
}

// Account for parked orphans last.
let (_status, detached) = self.reap_orphans_impl(self.reap_backstop).await;
// Account for parked orphans last. Preserve the terminal status
// alongside the survivor count so an orphan that finished but
// classified non-clean (panic, abort) still fails `all_clean()`.
let (orphan_status, detached) = self.reap_orphans_impl(self.reap_backstop).await;
ShutdownReport {
per_worker,
detached,
orphan_status,
}
}

Expand Down Expand Up @@ -1092,6 +1103,42 @@ mod tests {
assert!(!reg.any_alive());
}

/// Orphan-status preservation: a parked orphan that finishes but
/// classifies non-clean (panic) must surface its status on the
/// `ShutdownReport`, and `all_clean()` must report `false` even though
/// no orphan survived the reap (`detached == 0`). Guards against the
/// pre-fix bug where `shutdown()` discarded the orphan reap status.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn shutdown_preserves_non_clean_orphan_status_when_no_survivors() {
let reg = ThreadRegistry::<&str>::new();
// Park an orphan thread that finishes immediately by panicking.
let panicker = std::thread::spawn(|| panic!("deliberate orphan panic"));
// Wait for the panicker to actually finish, so the reap sees it
// already-finished and classifies it without needing the grace.
while !panicker.is_finished() {
std::thread::sleep(Duration::from_millis(5));
}
reg.park_orphan_for_test("orphan_panic", panicker);

let report = tokio::time::timeout(Duration::from_secs(2), reg.shutdown())
.await
.expect("shutdown must complete within bound");
assert_eq!(
report.detached, 0,
"panicked orphan finished, so no survivor"
);
match &report.orphan_status {
WorkerStatus::Panicked(msg) => {
assert!(msg.contains("deliberate orphan panic"), "got {msg}");
}
other => panic!("expected Panicked orphan_status, got {other:?}"),
}
assert!(
!report.all_clean(),
"all_clean() must fail on a non-clean orphan status even at detached==0"
);
}

// ----- Group 3: registry unit suite -------------------------------

/// TC-003 β€” a slow prior-generation thread's epilogue must NOT clear a
Expand Down
9 changes: 6 additions & 3 deletions packages/rs-platform-wallet-ffi/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,12 @@ pub unsafe extern "C" fn platform_wallet_manager_destroy(
// left alive to fire a callback against freed memory.
// `shutdown()` is idempotent, so this is safe even if the host
// already stopped some sync managers before calling destroy.
// It now joins the coordinator OS threads and returns their
// per-thread exit status; the C ABI exposes none of that, so we
// just log it (a panicked loop is worth surfacing) and drop it.
// It joins the coordinator OS threads and returns their per-
// thread exit status; a clean shutdown is logged and we return
// ok(), but a non-clean status (a coordinator that did not
// exit cleanly β€” panicked, timed out, or left a still-alive
// detached thread) is surfaced as `ErrorShutdownIncomplete` so
// the host knows NOT to free its callback context yet.
let status = runtime().block_on(manager.shutdown());
if !status.all_clean() {
tracing::warn!(
Expand Down
227 changes: 202 additions & 25 deletions packages/rs-platform-wallet/src/changeset/core_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use key_wallet::managed_account::transaction_record::{OutputRole, TransactionRec
use key_wallet::transaction_checking::TransactionContext;
use key_wallet::Utxo;
use key_wallet_manager::{WalletEvent, WalletId, WalletManager};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -70,29 +71,7 @@ pub async fn wallet_event_adapter_loop<P>(
recv = receiver.recv() => {
match recv {
Ok(event) => {
let wallet_id = event.wallet_id();
// For events that need to consult per-wallet
// state (today only `TransactionInstantLocked`,
// which checks finality before recording the IS
// lock), grab a brief read lock on the manager.
let core = build_core_changeset(&wallet_manager, &event).await;
if core.is_empty_no_records() {
// SyncHeightAdvanced for an unknown wallet,
// empty BlockProcessed, etc. β€” nothing to
// persist. Skip the round-trip.
continue;
}
let cs = PlatformWalletChangeSet {
core: Some(core),
..PlatformWalletChangeSet::default()
};
if let Err(e) = persister.store(wallet_id, cs) {
tracing::warn!(
wallet_id = %hex::encode(wallet_id),
error = %e,
"Persister rejected core changeset; state will be re-emitted on next sync round"
);
}
dispatch_event(&wallet_manager, persister.as_ref(), event).await;
}
Err(RecvError::Closed) if cancel.is_cancelled() => break,
Err(RecvError::Closed) => {
Expand All @@ -107,12 +86,82 @@ pub async fn wallet_event_adapter_loop<P>(
}
}
}
_ = cancel.cancelled() => break,
_ = cancel.cancelled() => {
// Drain anything already queued in the receiver before
// exit. Without this, events that the broadcast had
// already delivered (but the select hadn't yet polled)
// are dropped on cancellation β€” losing persistence work
// the upstream already committed to. Same dispatch /
// error handling as the live arm.
drain_pending_events(&mut receiver, &wallet_manager, persister.as_ref()).await;
break;
}
}
}
tracing::debug!("wallet-event adapter task exiting");
}

/// Drain every event already buffered in `receiver` synchronously,
/// dispatching each via [`dispatch_event`]. Used by the cancellation
/// arm of the adapter loop so events the broadcast delivered before
/// teardown are not dropped on exit. Lagged batches are logged and
/// skipped (matching the live-loop policy); a closed channel ends
/// the drain.
async fn drain_pending_events<P>(
receiver: &mut Receiver<WalletEvent>,
wallet_manager: &Arc<RwLock<WalletManager<PlatformWalletInfo>>>,
persister: &P,
) where
P: PlatformWalletPersistence + 'static,
{
loop {
match receiver.try_recv() {
Ok(event) => dispatch_event(wallet_manager, persister, event).await,
Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
Err(TryRecvError::Lagged(n)) => {
tracing::warn!(
missed = n,
"wallet-event adapter lagged on broadcast channel during cancellation drain; some events were dropped"
);
}
}
}
}

/// Project a single [`WalletEvent`] into its [`CoreChangeSet`] and
/// forward to the persister. Extracted so the live-recv path and the
/// cancellation-drain path apply identical handling.
async fn dispatch_event<P>(
wallet_manager: &Arc<RwLock<WalletManager<PlatformWalletInfo>>>,
persister: &P,
event: WalletEvent,
) where
P: PlatformWalletPersistence + 'static,
{
let wallet_id = event.wallet_id();
// For events that need to consult per-wallet state (today only
// `TransactionInstantLocked`, which checks finality before recording
// the IS lock), grab a brief read lock on the manager.
let core = build_core_changeset(wallet_manager, &event).await;
if core.is_empty_no_records() {
// SyncHeightAdvanced for an unknown wallet, empty
// BlockProcessed, etc. β€” nothing to persist. Skip the
// round-trip.
return;
}
let cs = PlatformWalletChangeSet {
core: Some(core),
..PlatformWalletChangeSet::default()
};
if let Err(e) = persister.store(wallet_id, cs) {
tracing::warn!(
wallet_id = %hex::encode(wallet_id),
error = %e,
"Persister rejected core changeset; state will be re-emitted on next sync round"
);
}
}

/// Project an upstream [`WalletEvent`] into a [`CoreChangeSet`] suitable
/// for atomic persistence.
async fn build_core_changeset(
Expand Down Expand Up @@ -353,3 +402,131 @@ impl CoreChangeSet {
&& self.addresses_derived.is_empty()
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::atomic::{AtomicUsize, Ordering as AO};
use tokio::sync::broadcast;

use crate::changeset::{ClientStartState, PersistenceError, PlatformWalletChangeSet};

struct CountingPersister(AtomicUsize);

impl PlatformWalletPersistence for CountingPersister {
fn store(
&self,
_wallet_id: WalletId,
_changeset: PlatformWalletChangeSet,
) -> Result<(), PersistenceError> {
self.0.fetch_add(1, AO::SeqCst);
Ok(())
}
fn flush(&self, _wallet_id: WalletId) -> Result<(), PersistenceError> {
Ok(())
}
fn load(&self) -> Result<ClientStartState, PersistenceError> {
Ok(ClientStartState::default())
}
}

/// `drain_pending_events` must dispatch every event already queued
/// in the receiver before returning. Guards against the
/// cancellation-arm bug where events the broadcast had delivered but
/// the `select!` hadn't yet polled were silently dropped at exit.
#[tokio::test]
async fn drain_pending_events_persists_queued_events() {
let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet)));
let persister = CountingPersister(AtomicUsize::new(0));

let (tx, mut rx) = broadcast::channel::<WalletEvent>(16);
let wallet_id: WalletId = [1u8; 32];
// Queue three SyncHeightAdvanced events without polling rx;
// each maps to a non-empty CoreChangeSet (synced_height = Some).
for h in 100..103 {
tx.send(WalletEvent::SyncHeightAdvanced {
wallet_id,
height: h,
})
.unwrap();
}

drain_pending_events(&mut rx, &wallet_manager, &persister).await;
assert_eq!(
persister.0.load(AO::SeqCst),
3,
"every queued event must reach the persister before the drain returns"
);
}

/// Sanity check: an empty receiver returns immediately, no stores.
#[tokio::test]
async fn drain_pending_events_is_noop_on_empty_receiver() {
let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet)));
let persister = CountingPersister(AtomicUsize::new(0));
let (_tx, mut rx) = broadcast::channel::<WalletEvent>(4);
drain_pending_events(&mut rx, &wallet_manager, &persister).await;
assert_eq!(persister.0.load(AO::SeqCst), 0);
}

/// End-to-end: events queued in the broadcast receiver at the moment
/// `cancel` fires must be dispatched before the adapter loop exits.
/// Cancels first, then pushes events through the WalletManager's
/// broadcast sender β€” the loop's `select!` is already biased toward
/// the cancel arm by then, so without the drain path every event
/// here would be silently dropped on exit.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn adapter_loop_drains_queued_events_on_cancel() {
let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet)));
let persister = Arc::new(CountingPersister(AtomicUsize::new(0)));
let cancel = CancellationToken::new();

let wm_for_task = Arc::clone(&wallet_manager);
let persister_for_task = Arc::clone(&persister);
let cancel_for_task = cancel.clone();
let handle = tokio::spawn(async move {
wallet_event_adapter_loop(wm_for_task, persister_for_task, cancel_for_task).await;
});

// Wait for the loop to subscribe (it does so before the first
// recv()). A short poll is enough β€” the subscribe is sync inside
// the task.
for _ in 0..50 {
if wallet_manager.read().await.event_sender().receiver_count() > 0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
assert!(
wallet_manager.read().await.event_sender().receiver_count() > 0,
"adapter loop should have subscribed by now"
);

// Cancel BEFORE sending. The next time the adapter polls, the
// cancel arm wins (the events end up sitting in the broadcast
// queue), so the drain path is what carries them through. The
// sends happen synchronously into the broadcast buffer.
cancel.cancel();
let sender = wallet_manager.read().await.event_sender().clone();
let wallet_id: WalletId = [7u8; 32];
for h in 200..205 {
sender
.send(WalletEvent::SyncHeightAdvanced {
wallet_id,
height: h,
})
.unwrap();
}

let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
.await
.expect("adapter must exit promptly on cancel");

assert_eq!(
persister.0.load(AO::SeqCst),
5,
"drain path must dispatch every queued event before loop exit"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ impl CoordinatorLifecycle {
self.quiescing.store(false, Ordering::Release);
}

/// Cancel-only stop: signal the loop and return immediately.
/// Cancel-only stop: signal the loop and return immediately. Does
/// NOT wait for an in-flight pass to settle; the loop's outer
/// `select!` is `biased` toward the cancel arm, so an in-flight pass
/// may be dropped at its next `.await` (and any post-await side
/// effects in that pass do not run). Use [`quiesce`](Self::quiesce)
/// for a real "fully drained" barrier.
pub(crate) fn stop(&self) {
self.registry.cancel(self.worker);
}
Expand Down
20 changes: 14 additions & 6 deletions packages/rs-platform-wallet/src/manager/identity_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,20 @@ where

/// Stop the background sync loop. No-op if not running.
///
/// **Cancel-only**: requests cancellation and returns immediately. A
/// pass already inside `sync_now` keeps running to completion,
/// including its `persister.store(...)` fan-out. For a real "nothing
/// is running and nothing more will be persisted" barrier β€” required
/// by manager shutdown so the host can free the persister context β€”
/// use [`quiesce`](Self::quiesce).
/// **Cancel-only**: requests cancellation and returns immediately.
/// The signal is delivered, but `stop()` does not wait for any
/// in-flight pass to settle. Because [`start`](Self::start)'s outer
/// `select!` polls the cancel arm `biased` first, an in-flight
/// `sync_now()` future may be dropped at its next `.await` (the SDK
/// fetch or persister round-trip), in which case its persister fan-out
/// stops at the await point reached. It may equally have already
/// completed before the signal arrived. Either way, there is no
/// post-condition that all persister stores have finished, and a
/// fresh pass started after this returns is still possible β€” for a
/// real "nothing is running and nothing more will be persisted"
/// barrier β€” required by manager shutdown so the host can free the
/// persister context β€” use [`quiesce`](Self::quiesce) (or the
/// manager-wide [`shutdown`](super::PlatformWalletManager::shutdown)).
pub fn stop(&self) {
self.lifecycle.stop();
}
Expand Down
Loading
Loading