diff --git a/packages/rs-dash-async/src/registry.rs b/packages/rs-dash-async/src/registry.rs index 01795a5e73..cb562736ea 100644 --- a/packages/rs-dash-async/src/registry.rs +++ b/packages/rs-dash-async/src/registry.rs @@ -108,13 +108,21 @@ pub struct ShutdownReport { pub per_worker: BTreeMap, /// Number of parked orphans still alive at the reap deadline. pub detached: usize, + /// Terminal status of the orphan reap pass: `Ok` if every parked + /// orphan joined cleanly, `Detached` if any survived past the + /// grace, `Panicked` / `Stopped` / `Error` if an orphan finished + /// non-cleanly. Preserved so an orphan that ran to completion but + /// e.g. panicked still fails [`all_clean`](Self::all_clean), even + /// when `detached == 0`. + pub orphan_status: WorkerStatus, } impl ShutdownReport { - /// `true` only when every per-worker status is clean and no orphan - /// survived the reap. + /// `true` only when every per-worker status is clean and the orphan + /// reap is clean (no survivors past the grace and no non-clean + /// orphan classification). pub fn all_clean(&self) -> bool { - self.detached == 0 && self.per_worker.values().all(WorkerStatus::is_clean) + self.orphan_status.is_clean() && self.per_worker.values().all(WorkerStatus::is_clean) } } @@ -705,11 +713,14 @@ impl ThreadRegistry { } } - // Account for parked orphans last. - let (_status, detached) = self.reap_orphans_impl(self.reap_backstop).await; + // Account for parked orphans last. Preserve the terminal status + // alongside the survivor count so an orphan that finished but + // classified non-clean (panic, abort) still fails `all_clean()`. + let (orphan_status, detached) = self.reap_orphans_impl(self.reap_backstop).await; ShutdownReport { per_worker, detached, + orphan_status, } } @@ -1092,6 +1103,42 @@ mod tests { assert!(!reg.any_alive()); } + /// Orphan-status preservation: a parked orphan that finishes but + /// classifies non-clean (panic) must surface its status on the + /// `ShutdownReport`, and `all_clean()` must report `false` even though + /// no orphan survived the reap (`detached == 0`). Guards against the + /// pre-fix bug where `shutdown()` discarded the orphan reap status. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn shutdown_preserves_non_clean_orphan_status_when_no_survivors() { + let reg = ThreadRegistry::<&str>::new(); + // Park an orphan thread that finishes immediately by panicking. + let panicker = std::thread::spawn(|| panic!("deliberate orphan panic")); + // Wait for the panicker to actually finish, so the reap sees it + // already-finished and classifies it without needing the grace. + while !panicker.is_finished() { + std::thread::sleep(Duration::from_millis(5)); + } + reg.park_orphan_for_test("orphan_panic", panicker); + + let report = tokio::time::timeout(Duration::from_secs(2), reg.shutdown()) + .await + .expect("shutdown must complete within bound"); + assert_eq!( + report.detached, 0, + "panicked orphan finished, so no survivor" + ); + match &report.orphan_status { + WorkerStatus::Panicked(msg) => { + assert!(msg.contains("deliberate orphan panic"), "got {msg}"); + } + other => panic!("expected Panicked orphan_status, got {other:?}"), + } + assert!( + !report.all_clean(), + "all_clean() must fail on a non-clean orphan status even at detached==0" + ); + } + // ----- Group 3: registry unit suite ------------------------------- /// TC-003 — a slow prior-generation thread's epilogue must NOT clear a diff --git a/packages/rs-platform-wallet-ffi/src/manager.rs b/packages/rs-platform-wallet-ffi/src/manager.rs index 986103ab47..9272d36df0 100644 --- a/packages/rs-platform-wallet-ffi/src/manager.rs +++ b/packages/rs-platform-wallet-ffi/src/manager.rs @@ -360,9 +360,12 @@ pub unsafe extern "C" fn platform_wallet_manager_destroy( // left alive to fire a callback against freed memory. // `shutdown()` is idempotent, so this is safe even if the host // already stopped some sync managers before calling destroy. - // It now joins the coordinator OS threads and returns their - // per-thread exit status; the C ABI exposes none of that, so we - // just log it (a panicked loop is worth surfacing) and drop it. + // It joins the coordinator OS threads and returns their per- + // thread exit status; a clean shutdown is logged and we return + // ok(), but a non-clean status (a coordinator that did not + // exit cleanly — panicked, timed out, or left a still-alive + // detached thread) is surfaced as `ErrorShutdownIncomplete` so + // the host knows NOT to free its callback context yet. let status = runtime().block_on(manager.shutdown()); if !status.all_clean() { tracing::warn!( diff --git a/packages/rs-platform-wallet/src/changeset/core_bridge.rs b/packages/rs-platform-wallet/src/changeset/core_bridge.rs index 927cf8d000..840f373dc4 100644 --- a/packages/rs-platform-wallet/src/changeset/core_bridge.rs +++ b/packages/rs-platform-wallet/src/changeset/core_bridge.rs @@ -33,7 +33,8 @@ use key_wallet::managed_account::transaction_record::{OutputRole, TransactionRec use key_wallet::transaction_checking::TransactionContext; use key_wallet::Utxo; use key_wallet_manager::{WalletEvent, WalletId, WalletManager}; -use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::error::{RecvError, TryRecvError}; +use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; @@ -70,29 +71,7 @@ pub async fn wallet_event_adapter_loop

( recv = receiver.recv() => { match recv { Ok(event) => { - let wallet_id = event.wallet_id(); - // For events that need to consult per-wallet - // state (today only `TransactionInstantLocked`, - // which checks finality before recording the IS - // lock), grab a brief read lock on the manager. - let core = build_core_changeset(&wallet_manager, &event).await; - if core.is_empty_no_records() { - // SyncHeightAdvanced for an unknown wallet, - // empty BlockProcessed, etc. — nothing to - // persist. Skip the round-trip. - continue; - } - let cs = PlatformWalletChangeSet { - core: Some(core), - ..PlatformWalletChangeSet::default() - }; - if let Err(e) = persister.store(wallet_id, cs) { - tracing::warn!( - wallet_id = %hex::encode(wallet_id), - error = %e, - "Persister rejected core changeset; state will be re-emitted on next sync round" - ); - } + dispatch_event(&wallet_manager, persister.as_ref(), event).await; } Err(RecvError::Closed) if cancel.is_cancelled() => break, Err(RecvError::Closed) => { @@ -107,12 +86,82 @@ pub async fn wallet_event_adapter_loop

( } } } - _ = cancel.cancelled() => break, + _ = cancel.cancelled() => { + // Drain anything already queued in the receiver before + // exit. Without this, events that the broadcast had + // already delivered (but the select hadn't yet polled) + // are dropped on cancellation — losing persistence work + // the upstream already committed to. Same dispatch / + // error handling as the live arm. + drain_pending_events(&mut receiver, &wallet_manager, persister.as_ref()).await; + break; + } } } tracing::debug!("wallet-event adapter task exiting"); } +/// Drain every event already buffered in `receiver` synchronously, +/// dispatching each via [`dispatch_event`]. Used by the cancellation +/// arm of the adapter loop so events the broadcast delivered before +/// teardown are not dropped on exit. Lagged batches are logged and +/// skipped (matching the live-loop policy); a closed channel ends +/// the drain. +async fn drain_pending_events

( + receiver: &mut Receiver, + wallet_manager: &Arc>>, + persister: &P, +) where + P: PlatformWalletPersistence + 'static, +{ + loop { + match receiver.try_recv() { + Ok(event) => dispatch_event(wallet_manager, persister, event).await, + Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, + Err(TryRecvError::Lagged(n)) => { + tracing::warn!( + missed = n, + "wallet-event adapter lagged on broadcast channel during cancellation drain; some events were dropped" + ); + } + } + } +} + +/// Project a single [`WalletEvent`] into its [`CoreChangeSet`] and +/// forward to the persister. Extracted so the live-recv path and the +/// cancellation-drain path apply identical handling. +async fn dispatch_event

( + wallet_manager: &Arc>>, + persister: &P, + event: WalletEvent, +) where + P: PlatformWalletPersistence + 'static, +{ + let wallet_id = event.wallet_id(); + // For events that need to consult per-wallet state (today only + // `TransactionInstantLocked`, which checks finality before recording + // the IS lock), grab a brief read lock on the manager. + let core = build_core_changeset(wallet_manager, &event).await; + if core.is_empty_no_records() { + // SyncHeightAdvanced for an unknown wallet, empty + // BlockProcessed, etc. — nothing to persist. Skip the + // round-trip. + return; + } + let cs = PlatformWalletChangeSet { + core: Some(core), + ..PlatformWalletChangeSet::default() + }; + if let Err(e) = persister.store(wallet_id, cs) { + tracing::warn!( + wallet_id = %hex::encode(wallet_id), + error = %e, + "Persister rejected core changeset; state will be re-emitted on next sync round" + ); + } +} + /// Project an upstream [`WalletEvent`] into a [`CoreChangeSet`] suitable /// for atomic persistence. async fn build_core_changeset( @@ -353,3 +402,131 @@ impl CoreChangeSet { && self.addresses_derived.is_empty() } } + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::atomic::{AtomicUsize, Ordering as AO}; + use tokio::sync::broadcast; + + use crate::changeset::{ClientStartState, PersistenceError, PlatformWalletChangeSet}; + + struct CountingPersister(AtomicUsize); + + impl PlatformWalletPersistence for CountingPersister { + fn store( + &self, + _wallet_id: WalletId, + _changeset: PlatformWalletChangeSet, + ) -> Result<(), PersistenceError> { + self.0.fetch_add(1, AO::SeqCst); + Ok(()) + } + fn flush(&self, _wallet_id: WalletId) -> Result<(), PersistenceError> { + Ok(()) + } + fn load(&self) -> Result { + Ok(ClientStartState::default()) + } + } + + /// `drain_pending_events` must dispatch every event already queued + /// in the receiver before returning. Guards against the + /// cancellation-arm bug where events the broadcast had delivered but + /// the `select!` hadn't yet polled were silently dropped at exit. + #[tokio::test] + async fn drain_pending_events_persists_queued_events() { + let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet))); + let persister = CountingPersister(AtomicUsize::new(0)); + + let (tx, mut rx) = broadcast::channel::(16); + let wallet_id: WalletId = [1u8; 32]; + // Queue three SyncHeightAdvanced events without polling rx; + // each maps to a non-empty CoreChangeSet (synced_height = Some). + for h in 100..103 { + tx.send(WalletEvent::SyncHeightAdvanced { + wallet_id, + height: h, + }) + .unwrap(); + } + + drain_pending_events(&mut rx, &wallet_manager, &persister).await; + assert_eq!( + persister.0.load(AO::SeqCst), + 3, + "every queued event must reach the persister before the drain returns" + ); + } + + /// Sanity check: an empty receiver returns immediately, no stores. + #[tokio::test] + async fn drain_pending_events_is_noop_on_empty_receiver() { + let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet))); + let persister = CountingPersister(AtomicUsize::new(0)); + let (_tx, mut rx) = broadcast::channel::(4); + drain_pending_events(&mut rx, &wallet_manager, &persister).await; + assert_eq!(persister.0.load(AO::SeqCst), 0); + } + + /// End-to-end: events queued in the broadcast receiver at the moment + /// `cancel` fires must be dispatched before the adapter loop exits. + /// Cancels first, then pushes events through the WalletManager's + /// broadcast sender — the loop's `select!` is already biased toward + /// the cancel arm by then, so without the drain path every event + /// here would be silently dropped on exit. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn adapter_loop_drains_queued_events_on_cancel() { + let wallet_manager = Arc::new(RwLock::new(WalletManager::new(dashcore::Network::Testnet))); + let persister = Arc::new(CountingPersister(AtomicUsize::new(0))); + let cancel = CancellationToken::new(); + + let wm_for_task = Arc::clone(&wallet_manager); + let persister_for_task = Arc::clone(&persister); + let cancel_for_task = cancel.clone(); + let handle = tokio::spawn(async move { + wallet_event_adapter_loop(wm_for_task, persister_for_task, cancel_for_task).await; + }); + + // Wait for the loop to subscribe (it does so before the first + // recv()). A short poll is enough — the subscribe is sync inside + // the task. + for _ in 0..50 { + if wallet_manager.read().await.event_sender().receiver_count() > 0 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + assert!( + wallet_manager.read().await.event_sender().receiver_count() > 0, + "adapter loop should have subscribed by now" + ); + + // Cancel BEFORE sending. The next time the adapter polls, the + // cancel arm wins (the events end up sitting in the broadcast + // queue), so the drain path is what carries them through. The + // sends happen synchronously into the broadcast buffer. + cancel.cancel(); + let sender = wallet_manager.read().await.event_sender().clone(); + let wallet_id: WalletId = [7u8; 32]; + for h in 200..205 { + sender + .send(WalletEvent::SyncHeightAdvanced { + wallet_id, + height: h, + }) + .unwrap(); + } + + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle) + .await + .expect("adapter must exit promptly on cancel"); + + assert_eq!( + persister.0.load(AO::SeqCst), + 5, + "drain path must dispatch every queued event before loop exit" + ); + } +} diff --git a/packages/rs-platform-wallet/src/manager/coordinator_lifecycle.rs b/packages/rs-platform-wallet/src/manager/coordinator_lifecycle.rs index 87e20fa6e5..42596ae20f 100644 --- a/packages/rs-platform-wallet/src/manager/coordinator_lifecycle.rs +++ b/packages/rs-platform-wallet/src/manager/coordinator_lifecycle.rs @@ -136,7 +136,12 @@ impl CoordinatorLifecycle { self.quiescing.store(false, Ordering::Release); } - /// Cancel-only stop: signal the loop and return immediately. + /// Cancel-only stop: signal the loop and return immediately. Does + /// NOT wait for an in-flight pass to settle; the loop's outer + /// `select!` is `biased` toward the cancel arm, so an in-flight pass + /// may be dropped at its next `.await` (and any post-await side + /// effects in that pass do not run). Use [`quiesce`](Self::quiesce) + /// for a real "fully drained" barrier. pub(crate) fn stop(&self) { self.registry.cancel(self.worker); } diff --git a/packages/rs-platform-wallet/src/manager/identity_sync.rs b/packages/rs-platform-wallet/src/manager/identity_sync.rs index 165e4f4530..0ad3df7aca 100644 --- a/packages/rs-platform-wallet/src/manager/identity_sync.rs +++ b/packages/rs-platform-wallet/src/manager/identity_sync.rs @@ -422,12 +422,20 @@ where /// Stop the background sync loop. No-op if not running. /// - /// **Cancel-only**: requests cancellation and returns immediately. A - /// pass already inside `sync_now` keeps running to completion, - /// including its `persister.store(...)` fan-out. For a real "nothing - /// is running and nothing more will be persisted" barrier — required - /// by manager shutdown so the host can free the persister context — - /// use [`quiesce`](Self::quiesce). + /// **Cancel-only**: requests cancellation and returns immediately. + /// The signal is delivered, but `stop()` does not wait for any + /// in-flight pass to settle. Because [`start`](Self::start)'s outer + /// `select!` polls the cancel arm `biased` first, an in-flight + /// `sync_now()` future may be dropped at its next `.await` (the SDK + /// fetch or persister round-trip), in which case its persister fan-out + /// stops at the await point reached. It may equally have already + /// completed before the signal arrived. Either way, there is no + /// post-condition that all persister stores have finished, and a + /// fresh pass started after this returns is still possible — for a + /// real "nothing is running and nothing more will be persisted" + /// barrier — required by manager shutdown so the host can free the + /// persister context — use [`quiesce`](Self::quiesce) (or the + /// manager-wide [`shutdown`](super::PlatformWalletManager::shutdown)). pub fn stop(&self) { self.lifecycle.stop(); } diff --git a/packages/rs-platform-wallet/src/manager/mod.rs b/packages/rs-platform-wallet/src/manager/mod.rs index fd22ec6d17..00b89675a1 100644 --- a/packages/rs-platform-wallet/src/manager/mod.rs +++ b/packages/rs-platform-wallet/src/manager/mod.rs @@ -207,19 +207,22 @@ pub struct CoordinatorExitStatus { pub shielded_sync: Option, /// Wallet-event adapter (a `tokio` task, not an OS thread). pub event_adapter: CoordinatorThreadStatus, - /// Aggregate status of any coordinator OS threads that an earlier + /// Terminal status of any coordinator OS threads that an earlier /// tight `stop()`→`start()` reap had to detach past its 1 s /// wedge-backstop and park in the shared [`ThreadRegistry`]'s orphan /// list. /// - /// [`Ok`](CoordinatorThreadStatus::Ok) when none were detached (or - /// every detached thread has since joined cleanly); + /// [`Ok`](CoordinatorThreadStatus::Ok) when none were parked (or + /// every parked thread joined cleanly); /// [`Detached`](CoordinatorThreadStatus::Detached) when at least one - /// is still alive at the shutdown deadline. This is what keeps - /// [`all_clean`](Self::all_clean) honest for the wedge case the rest - /// of the teardown can't see — without it a detached-but-still-live - /// thread would let the host free a callback context the thread may - /// still touch (a residual use-after-free). + /// is still alive at the shutdown deadline; any other non-clean + /// variant (e.g. [`Panicked`](CoordinatorThreadStatus::Panicked)) + /// when a parked thread ran to completion but classified non-clean. + /// This is what keeps [`all_clean`](Self::all_clean) honest for the + /// wedge case the rest of the teardown can't see — without it a + /// detached-but-still-live or panicked parked thread would let the + /// host free a callback context the thread may still touch (a + /// residual use-after-free). pub detached_threads: CoordinatorThreadStatus, } @@ -240,10 +243,12 @@ impl CoordinatorExitStatus { /// Build the FFI-stable exit status from the registry's weight-ordered /// [`ShutdownReport`]. A worker absent from the report never ran, so it - /// maps to [`NotRunning`](CoordinatorThreadStatus::NotRunning); a - /// non-zero orphan-survivor count surfaces as - /// [`Detached`](CoordinatorThreadStatus::Detached), keeping - /// [`all_clean`](Self::all_clean) honest for a still-live wedged thread. + /// maps to [`NotRunning`](CoordinatorThreadStatus::NotRunning); the + /// orphan reap's terminal status surfaces verbatim as + /// [`detached_threads`](Self::detached_threads), keeping + /// [`all_clean`](Self::all_clean) honest for both a still-live wedged + /// thread (`Detached`) and an orphan that ran to completion but + /// classified non-clean (`Panicked` / `Stopped`). pub(crate) fn from_report(report: ShutdownReport) -> Self { let worker = |key: WalletWorker| -> CoordinatorThreadStatus { report @@ -261,11 +266,7 @@ impl CoordinatorExitStatus { #[cfg(not(feature = "shielded"))] shielded_sync: None, event_adapter: worker(WalletWorker::EventAdapter), - detached_threads: if report.detached > 0 { - CoordinatorThreadStatus::Detached - } else { - CoordinatorThreadStatus::Ok - }, + detached_threads: report.orphan_status.into(), } } } @@ -512,16 +513,17 @@ impl PlatformWalletManager

{ /// [`crate::error::PlatformWalletError::ShieldedShutdownIncomplete`]; or /// - the coordinator's store reset itself fails. /// - /// **Host-serialization precondition**: the caller must not invoke - /// `shielded_sync_start` for this manager concurrently with `clear`. A - /// concurrent direct `sync_now`/`sync_wallet` is held off — the quiescing - /// gate is raised *continuously* for the whole clear (from before the - /// drain, across the liveness check, through the wipe), so such a pass - /// observes the gate and bails with no lapse. The one remaining residual - /// is a full `shielded_sync_start` racing `clear`: a restart spawns a - /// fresh loop and reopens the gate, so it could re-persist into the wiped - /// store. The wallet UI drives these from one place; that ordering is the - /// host's contract until the registry grows a per-key clearing latch. + /// Internally exclusive with [`ShieldedSyncManager::start`]: a + /// clear/start latch is raised for the whole quiesce → liveness-check + /// → `coord.clear()` span. A `start()` whose latch load lands AFTER + /// the latch was raised becomes a no-op + warn log (the common race + /// the fix targets); a `start()` whose load beat the raise spawns + /// its loop, but the subsequent `quiesce()` then cancels and joins + /// it, and the `status.is_clean()` gate refuses the wipe if that + /// drain was non-clean. Either way the store is not written behind + /// our back. Direct `sync_now` / `sync_wallet` is gated by the same + /// registry-level `quiescing` flag, also held across the wipe. The + /// host should restart shielded sync after Clear returns. #[cfg(feature = "shielded")] pub async fn clear_shielded(&self) -> Result<(), crate::error::PlatformWalletError> { self.clear_shielded_inner(std::time::Duration::from_secs(SHUTDOWN_JOIN_TIMEOUT_SECS)) @@ -536,6 +538,13 @@ impl PlatformWalletManager

{ &self, drain_timeout: std::time::Duration, ) -> Result<(), crate::error::PlatformWalletError> { + // Raise the clear/start latch FIRST, before quiesce, so a `start()` + // racing this call is refused — without this, a freshly-spawned loop + // would reopen the `quiescing` gate we are about to hold and slip its + // first pass into the wiped store. The guard lowers the latch on + // return (every exit path). + let _start_latch = self.shielded_sync_manager.hold_clearing_latch(); + // Raise and HOLD the shielded quiescing gate for the WHOLE clear, // BEFORE quiescing — so the "no new pass" barrier never lapses // between the drain, the liveness check, and the store wipe: a direct @@ -1059,6 +1068,15 @@ mod tests { "expected ShieldedShutdownIncomplete, got {err:?}" ); + // The orphan-error early return path must still have released the + // clearing latch — otherwise a follow-up start() (after the host + // recovers from the error) would be silently refused forever. + assert!( + !manager.shielded_sync_manager.is_clearing(), + "clear_shielded must drop the clearing latch on every exit path, \ + including the orphan-alive early return" + ); + // Release + reap the orphan; the shielded-scoped gate now clears and // a retry succeeds (no shielded store configured → clear is a no-op). release_tx.send(()).unwrap(); @@ -1068,6 +1086,10 @@ mod tests { .clear_shielded() .await .expect("clear_shielded must succeed once the orphan is reaped"); + assert!( + !manager.shielded_sync_manager.is_clearing(), + "clear_shielded must drop the clearing latch after a successful clear too" + ); } /// SEC-001: `clear_shielded` must BOUND its in-flight-pass drain so a @@ -1123,6 +1145,52 @@ mod tests { pass_task.await.expect("pass task joined"); } + /// `clear_shielded` and `ShieldedSyncManager::start` must be + /// mutually exclusive inside the wallet — relying only on the host + /// to serialize them lets a `start` racing `clear` reopen the + /// registry's `quiescing` gate and slip its first pass into the + /// store we're about to wipe. + /// + /// Direct surface check: while the clearing latch is held, `start()` + /// is refused (no-op + warn log), and once the latch drops a fresh + /// `start()` succeeds. End-to-end the latch is held across the + /// whole quiesce → liveness → clear span by `clear_shielded`, so a + /// concurrent `start()` is similarly refused throughout. + #[cfg(feature = "shielded")] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn shielded_start_is_refused_while_clearing_latch_held() { + let manager = make_manager(); + let shielded = Arc::clone(&manager.shielded_sync_manager); + + // No loop, no latch → start spawns the loop normally. + Arc::clone(&shielded).start(); + assert!( + shielded.is_running(), + "baseline: start() without the latch must succeed" + ); + // Tear down so the next start has a clean slot. + assert!(shielded.quiesce().await.is_clean()); + + // Raise the latch as `clear_shielded` does; `start()` is refused. + let latch = shielded.hold_clearing_latch(); + assert!(shielded.is_clearing()); + Arc::clone(&shielded).start(); + assert!( + !shielded.is_running(), + "start() must be a no-op while the clear/start latch is held" + ); + + // Drop the latch; a fresh start succeeds. + drop(latch); + assert!(!shielded.is_clearing()); + Arc::clone(&shielded).start(); + assert!( + shielded.is_running(), + "start() must succeed once the clear/start latch drops" + ); + assert!(shielded.quiesce().await.is_clean()); + } + /// TC-015 (R5): `from_report` maps the registry's [`ShutdownReport`] /// onto the FFI-stable `CoordinatorExitStatus` with identical field / /// variant shape and `all_clean()` semantics. The full `WorkerStatus` @@ -1142,6 +1210,7 @@ mod tests { let status = CoordinatorExitStatus::from_report(ShutdownReport { per_worker: per, detached: 0, + orphan_status: WorkerStatus::Ok, }); assert_eq!(status.platform_address_sync, CoordinatorThreadStatus::Ok); assert_eq!(status.identity_sync, CoordinatorThreadStatus::Ok); @@ -1158,6 +1227,7 @@ mod tests { let status = CoordinatorExitStatus::from_report(ShutdownReport { per_worker: BTreeMap::new(), detached: 1, + orphan_status: WorkerStatus::Detached, }); assert_eq!(status.detached_threads, CoordinatorThreadStatus::Detached); assert_eq!( @@ -1171,10 +1241,25 @@ mod tests { let status = CoordinatorExitStatus::from_report(ShutdownReport { per_worker: per, detached: 0, + orphan_status: WorkerStatus::Ok, }); assert_eq!(status.identity_sync, CoordinatorThreadStatus::Timeout); assert!(!status.all_clean()); + // A finished-but-panicked orphan (detached==0, orphan_status non-clean) + // surfaces verbatim and fails all_clean — guards against the + // pre-fix bug where the orphan's status was discarded entirely. + let status = CoordinatorExitStatus::from_report(ShutdownReport { + per_worker: BTreeMap::new(), + detached: 0, + orphan_status: WorkerStatus::Panicked("orphan boom".into()), + }); + assert_eq!( + status.detached_threads, + CoordinatorThreadStatus::Panicked("orphan boom".into()) + ); + assert!(!status.all_clean()); + // Full variant mapping table. assert_eq!( CoordinatorThreadStatus::from(WorkerStatus::Stopped(Some("x".into()))), diff --git a/packages/rs-platform-wallet/src/manager/platform_address_sync.rs b/packages/rs-platform-wallet/src/manager/platform_address_sync.rs index 5cb15b048e..16cfb2c3a4 100644 --- a/packages/rs-platform-wallet/src/manager/platform_address_sync.rs +++ b/packages/rs-platform-wallet/src/manager/platform_address_sync.rs @@ -221,13 +221,21 @@ impl PlatformAddressSyncManager { /// Stop the background sync loop. No-op if not running. /// - /// **Cancel-only**: requests cancellation and returns immediately. A - /// pass already inside `sync_now` keeps running to completion, - /// including its `on_platform_address_sync_completed` host-callback - /// dispatch. For a real "nothing is running and nothing more will - /// fire a host callback" barrier — required by manager shutdown so - /// the host can free the event-handler context — use - /// [`quiesce`](Self::quiesce). + /// **Cancel-only**: requests cancellation and returns immediately. + /// The signal is delivered, but `stop()` does not wait for any + /// in-flight pass to settle. Because [`start`](Self::start)'s outer + /// `select!` polls the cancel arm `biased` first, an in-flight + /// `sync_now()` future may be dropped at its next `.await` (the SDK + /// fetch or the `on_platform_address_sync_completed` dispatch), in + /// which case the host callback for that pass may or may not have + /// fired. It may equally have already completed before the signal + /// arrived. Either way, there is no post-condition that no further + /// host callback fires, and a fresh pass started after this returns + /// is still possible — for a real "nothing is running and nothing + /// more will fire a host callback" barrier — required by manager + /// shutdown so the host can free the event-handler context — use + /// [`quiesce`](Self::quiesce) (or the manager-wide + /// [`shutdown`](super::PlatformWalletManager::shutdown)). pub fn stop(&self) { self.lifecycle.stop(); } diff --git a/packages/rs-platform-wallet/src/manager/shielded_sync.rs b/packages/rs-platform-wallet/src/manager/shielded_sync.rs index f949b48dd1..1b7a88eaca 100644 --- a/packages/rs-platform-wallet/src/manager/shielded_sync.rs +++ b/packages/rs-platform-wallet/src/manager/shielded_sync.rs @@ -26,6 +26,7 @@ //! [`configure_shielded`]: crate::manager::PlatformWalletManager::configure_shielded use std::collections::BTreeMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use dash_async::{AtomicFlagGuard, ThreadRegistry}; @@ -147,6 +148,18 @@ pub struct ShieldedSyncManager { /// half gives Clear / stop a real "no more host-visible mutations" /// barrier that cancel-only [`stop`](Self::stop) does not provide. lifecycle: CoordinatorLifecycle, + /// Internal clear/start exclusion latch. [`clear_shielded`] raises + /// it for the whole quiesce → liveness-check → `coord.clear()` span; + /// [`start`](Self::start) checks it before spawning the loop and + /// bails as a no-op when raised. Without this, the host's + /// serialization is the only guard, but the registry-level + /// `quiescing` gate that Clear holds is reopened by any concurrent + /// `start()` (which calls `lifecycle.reopen_quiescing_gate()`), + /// letting the freshly-spawned loop's first pass land in the store + /// Clear is about to wipe. + /// + /// [`clear_shielded`]: super::PlatformWalletManager::clear_shielded + clearing: Arc, } impl ShieldedSyncManager { @@ -163,9 +176,28 @@ impl ShieldedSyncManager { WalletWorker::ShieldedSync, DEFAULT_SYNC_INTERVAL_SECS, ), + clearing: Arc::new(AtomicBool::new(false)), } } + /// Raise the clear/start exclusion latch and hold it raised until the + /// returned guard drops, so [`start`](Self::start) called concurrently + /// is refused (no-op + warn log). Called by + /// [`clear_shielded`](super::PlatformWalletManager::clear_shielded) + /// across the whole quiesce → liveness-check → `coord.clear()` span. + pub(super) fn hold_clearing_latch(&self) -> AtomicFlagGuard<'_> { + self.clearing.store(true, Ordering::Release); + AtomicFlagGuard::new(&self.clearing) + } + + /// Whether [`clear_shielded`](super::PlatformWalletManager::clear_shielded) + /// is currently in its critical section. Visible to tests that want to + /// assert `start()` was refused. + #[cfg(test)] + pub(super) fn is_clearing(&self) -> bool { + self.clearing.load(Ordering::Acquire) + } + /// Set the polling interval. Clamped to a minimum of 1s. /// /// The running loop picks this up on its next sleep. @@ -201,7 +233,21 @@ impl ShieldedSyncManager { /// the underlying `dash-sdk` shielded-sync future is `!Send` (the /// GRPC client state isn't `Send + Sync`). Same trade-off as /// [`PlatformAddressSyncManager::start`](super::platform_address_sync::PlatformAddressSyncManager::start). + /// + /// Refused (no-op + warn log) while + /// [`clear_shielded`](super::PlatformWalletManager::clear_shielded) + /// holds the clear/start latch; otherwise reopening the registry- + /// level `quiescing` gate here would let this (re)start's first pass + /// write into the store Clear is about to wipe. The host should + /// restart shielded sync after Clear returns. pub fn start(self: Arc) { + if self.clearing.load(Ordering::Acquire) { + tracing::warn!( + "shielded sync start() refused — clear_shielded is in progress; \ + the host should restart shielded sync once clear returns" + ); + return; + } // Reopen the quiescing gate so this (re)start's passes can run. self.lifecycle.reopen_quiescing_gate(); @@ -242,11 +288,20 @@ impl ShieldedSyncManager { /// Stop the background sync loop. No-op if not running. /// /// **Cancel-only**: this requests cancellation and returns - /// immediately. A pass already inside `sync_now` / - /// `coordinator.sync()` keeps running to completion (including its - /// persister-callback fan-out). For a real "nothing is running and - /// nothing more will be persisted" barrier — required by Clear, - /// unregister, and rebind — use [`quiesce`](Self::quiesce). + /// immediately. The signal is delivered, but `stop()` does not wait + /// for any in-flight pass to settle. Because [`start`](Self::start)'s + /// outer `select!` polls the cancel arm `biased` first, an in-flight + /// `sync_now()` / `coordinator.sync()` future may be dropped at its + /// next `.await` (the SDK fetch, the per-note trial-decryption, the + /// tree commit, the persister fan-out), in which case any side + /// effects past the dropped await do not run. It may equally have + /// already completed before the signal arrived. Either way, there is + /// no post-condition that all persistence has finished, and a fresh + /// pass started after this returns is still possible — for a real + /// "nothing is running and nothing more will be persisted" barrier — + /// required by Clear, unregister, and rebind — use + /// [`quiesce`](Self::quiesce) (or the manager-wide + /// [`shutdown`](super::PlatformWalletManager::shutdown)). pub fn stop(&self) { self.lifecycle.stop(); }