Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f3354f6
feat(platform-wallet)!: shutdown() joins coordinator threads and retu…
lklimek Jun 22, 2026
261178e
fix(platform-wallet): RAII-guard is_syncing so a coordinator panic ca…
lklimek Jun 22, 2026
42d734d
refactor(rs-dash-async): add AtomicFlagGuard RAII helper
lklimek Jun 23, 2026
6e78b77
fix(platform-wallet): refine CoordinatorThreadStatus variants + tight…
lklimek Jun 23, 2026
5f80450
test(rs-dash-async): assert AtomicFlagGuard contract + add #[must_use]
lklimek Jun 23, 2026
6b2cd39
fix(platform-wallet): make coordinator passes cancellable + converge …
lklimek Jun 23, 2026
13a22dd
fix(platform-wallet): bound clear_shielded + tidy shutdown docs/logging
lklimek Jun 23, 2026
93b8954
fix(platform-wallet-ffi): timeout-bound the shielded sync stop bridge
lklimek Jun 23, 2026
747f5f0
Merge branch 'v3.1-dev' into feat/platform-wallet-shutdown-join
lklimek Jun 23, 2026
2bd9501
fix(platform-wallet)!: close residual coordinator-thread UAF on shutdown
lklimek Jun 23, 2026
7c975ed
fix(platform-wallet)!: surface non-clean shielded drain on clear/stop
lklimek Jun 23, 2026
5f63c95
fix(platform-wallet): reap prior coordinator thread outside backgroun…
lklimek Jun 23, 2026
2b068ba
fix(platform-wallet): close shielded epilogue TOCTOU + pin restart reap
lklimek Jun 24, 2026
5017ba1
fix(swift-sdk): retain wallet callback context on incomplete shutdown
lklimek Jun 24, 2026
b491773
test(platform-wallet): bound cleanup quiesce in restart-reap regressi…
lklimek Jun 24, 2026
76c8bee
fix(platform-wallet): track detached coordinator threads so shutdown(…
lklimek Jun 24, 2026
3cca1cf
perf(platform-wallet): drain coordinators concurrently in shutdown() …
lklimek Jun 24, 2026
8c52811
feat(dash-async): add shared ThreadRegistry worker-lifecycle engine
lklimek Jun 24, 2026
ac9a51a
feat(dash-async): key-scope parked orphans for any_alive_for()
lklimek Jun 24, 2026
d20aed0
refactor(platform-wallet): migrate sync coordinators onto shared Thre…
lklimek Jun 24, 2026
d190f29
test(dash-async): anchor DrainHook compile_fail doctest to E0277 + no…
lklimek Jun 24, 2026
3e81fc1
fix(dash-async,platform-wallet): harden ThreadRegistry lifecycle + do…
lklimek Jun 24, 2026
911f99f
refactor(platform-wallet): extract CoordinatorLifecycle to dedup the …
lklimek Jun 24, 2026
22647a7
fix(platform-wallet): raise quiescing gate in CoordinatorLifecycle::q…
lklimek Jun 25, 2026
7f3aeb5
fix(dash-async): park a restarted worker's prior under the slot lock …
lklimek Jun 25, 2026
41791c0
fix(platform-wallet-ffi): gate shielded_sync_stop success on orphan l…
lklimek Jun 25, 2026
4b099a9
fix(platform-wallet): bound clear_shielded's drain and hold its quies…
lklimek Jun 25, 2026
7be68c5
refactor(dash-async): full spawn-failure rollback + drop stale doc hi…
lklimek Jun 25, 2026
3821389
docs(swift-sdk): broaden deinit comment for shielded_sync_stop's orph…
lklimek Jun 25, 2026
748c4f8
fix(platform-wallet): make the quiescing<->is_syncing handshake self-…
lklimek Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions packages/rs-dash-async/src/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::sync::atomic::{AtomicBool, Ordering};

/// RAII guard that clears an [`AtomicBool`] flag to `false` on drop.
///
/// Callers set the flag to `true` before constructing the guard (typically
/// via a `compare_exchange`); the guard resets it on every exit path,
/// including panics, so a panicked holder can never leave the flag wedged.
pub struct AtomicFlagGuard<'a>(&'a AtomicBool);
Comment thread
Claudius-Maginificent marked this conversation as resolved.

impl<'a> AtomicFlagGuard<'a> {
/// Wrap `flag`. Does **not** set it to `true` — the caller is
/// responsible for doing that before constructing the guard.
pub fn new(flag: &'a AtomicBool) -> Self {
Comment thread
Claudius-Maginificent marked this conversation as resolved.
Self(flag)
}
}

impl Drop for AtomicFlagGuard<'_> {
fn drop(&mut self) {
self.0.store(false, Ordering::Release);
Comment thread
Claudius-Maginificent marked this conversation as resolved.
}
}
4 changes: 4 additions & 0 deletions packages/rs-dash-async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
//!
//! Provides [`block_on`] -- a function that bridges async futures into sync code,
//! handling multiple tokio runtime flavors (no runtime, current-thread, multi-thread).
//!
//! Also provides [`AtomicFlagGuard`] — a RAII guard for panic-safe `AtomicBool` flag resets.

mod atomic;
mod block_on;

pub use atomic::AtomicFlagGuard;
pub use block_on::{block_on, AsyncError};
13 changes: 12 additions & 1 deletion packages/rs-platform-wallet-ffi/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,18 @@ pub unsafe extern "C" fn platform_wallet_manager_destroy(
// left alive to fire a callback against freed memory.
// `shutdown()` is idempotent, so this is safe even if the host
// already stopped some sync managers before calling destroy.
runtime().block_on(manager.shutdown());
// It now joins the coordinator OS threads and returns their
// per-thread exit status; the C ABI exposes none of that, so we
// just log it (a panicked loop is worth surfacing) and drop it.
let status = runtime().block_on(manager.shutdown());
if !status.all_clean() {
tracing::warn!(
?status,
"platform wallet coordinator(s) did not exit cleanly"
);
} else {
tracing::debug!(?status, "platform wallet coordinators joined cleanly");
}
}
PlatformWalletFFIResult::ok()
Comment thread
Claudius-Maginificent marked this conversation as resolved.
}
Expand Down
1 change: 1 addition & 0 deletions packages/rs-platform-wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ bimap = "0.6"
# Async runtime
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
tokio-util = { version = "0.7.12" }
dash-async = { path = "../rs-dash-async" }

# Logging
tracing = "0.1"
Expand Down
53 changes: 41 additions & 12 deletions packages/rs-platform-wallet/src/manager/identity_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex as StdMutex,
};

use dash_async::AtomicFlagGuard;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use dpp::balances::credits::TokenAmount;
Expand Down Expand Up @@ -160,6 +162,11 @@ where
persister: Arc<P>,
/// Cancel token for the background loop, if running.
background_cancel: StdMutex<Option<CancellationToken>>,
/// Join handle for the background loop's OS thread, if running.
/// Taken and joined by [`quiesce`](Self::quiesce) so shutdown can
/// confirm the `!Send` loop fully exited before the host drops the
/// runtime.
background_join: StdMutex<Option<std::thread::JoinHandle<()>>>,
/// Monotonically increasing generation counter. Incremented each
/// time `start()` installs a new cancel token so the exiting
/// thread can tell whether its token is still current.
Expand Down Expand Up @@ -204,6 +211,7 @@ where
sdk,
persister,
background_cancel: StdMutex::new(None),
background_join: StdMutex::new(None),
background_generation: AtomicU64::new(0),
interval_secs: AtomicU64::new(DEFAULT_SYNC_INTERVAL_SECS),
is_syncing: AtomicBool::new(false),
Expand Down Expand Up @@ -395,18 +403,17 @@ where
/// The first pass runs immediately; subsequent passes fire every
/// [`interval`](Self::interval).
pub fn start(self: Arc<Self>) {
let mut guard = self.background_cancel.lock().expect("bg_cancel poisoned");
if guard.is_some() {
let mut cancel_guard = self.background_cancel.lock().expect("bg_cancel poisoned");
if cancel_guard.is_some() {
return;
}
let cancel = CancellationToken::new();
*guard = Some(cancel.clone());
*cancel_guard = Some(cancel.clone());
let my_gen = self.background_generation.fetch_add(1, Ordering::AcqRel) + 1;
drop(guard);

let handle = tokio::runtime::Handle::current();
let this = self;
std::thread::Builder::new()
let this = Arc::clone(&self);
let join = std::thread::Builder::new()
.name("identity-sync".into())
.spawn(move || {
handle.block_on(async move {
Expand Down Expand Up @@ -434,6 +441,11 @@ where
});
})
.expect("failed to spawn identity-sync thread");
// Store the join handle while still holding cancel_guard — a
// concurrent quiesce() must wait for this lock before calling
// stop(), so the handle is always stored before it can be taken.
*self.background_join.lock().expect("bg_join poisoned") = Some(join);
// cancel_guard drops here, releasing background_cancel.
}
Comment thread
Claudius-Maginificent marked this conversation as resolved.

/// Stop the background sync loop. No-op if not running.
Expand Down Expand Up @@ -473,13 +485,25 @@ where
/// so its falling edge (with the gate up) is a sound "fully drained"
/// signal. The gate is reopened before returning so a later
/// start/sync works normally.
pub async fn quiesce(&self) {
///
/// Finally **joins** the loop's OS thread (after the drain, so the
/// thread is on its way out) and returns its terminal status. Joining
/// while the runtime is still alive is what lets the manager promise
/// the `!Send` loop has stopped touching `tokio::time` before a
/// one-shot host drops the runtime.
pub async fn quiesce(&self) -> super::CoordinatorThreadStatus {
self.quiescing.store(true, Ordering::Release);
self.stop();
while self.is_syncing.load(Ordering::Acquire) {
tokio::time::sleep(Duration::from_millis(20)).await;
}
self.quiescing.store(false, Ordering::Release);
let handle = self
.background_join
.lock()
.expect("bg_join poisoned")
.take();
super::join_coordinator_thread(handle).await
}

/// Run one sync pass across every registered identity.
Expand All @@ -501,12 +525,17 @@ where
return;
}

// RAII guard: clears `is_syncing` on every exit path, including
// panics. Without this a panic inside the pass would leave
// `is_syncing=true` forever and wedge `quiesce()`'s drain loop.
let _is_syncing_guard = AtomicFlagGuard::new(&self.is_syncing);

// A `quiesce()` may have raised the gate between our CAS and
// here; if so, release the slot and bail without running a pass
// so the drain can complete and shutdown gets a true barrier
// (no further `persister.store(...)` after quiesce returns).
// here; if so, bail without running a pass so the drain can
// complete and shutdown gets a true barrier (no further
// `persister.store(...)` after quiesce returns).
// Guard clears `is_syncing` on return.
if self.quiescing.load(Ordering::Acquire) {
self.is_syncing.store(false, Ordering::Release);
return;
}

Expand All @@ -532,7 +561,7 @@ where
.map(|d| d.as_secs())
.unwrap_or(0);
self.last_sync_unix.store(now, Ordering::Release);
self.is_syncing.store(false, Ordering::Release);
// `_is_syncing_guard` drops here → `is_syncing = false`
}

/// Sync a single identity's watched tokens against Platform.
Expand Down
Loading
Loading