Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions skip_test_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ dylink_tests/deterministic/double_fork_dlopen.c
dylink_tests/deterministic/fork_dlopen.c
dylink_tests/deterministic/rdynamic_main.c
dylink_tests/deterministic/rdynamic_lib.c
dylink_tests/deterministic/thread_dlopen_concurrent.c
process_tests/deterministic/fork_max_cages.c
ci/deterministic/ci_intentional_failure_tmp.c
93 changes: 93 additions & 0 deletions src/cage/src/signal/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use sysdefs::logging::lind_debug_panic;
const EPOCH_NORMAL: u64 = 0;
const EPOCH_SIGNAL: u64 = 0xc0ffee;
const EPOCH_KILLED: u64 = 0xdead;
pub const EPOCH_DLOPEN: u64 = 0xd10ad;

// switch the epoch of the main thread of the cage to "signal" state
// thread safety: this function could possibly be invoked by multiple threads of the same cage
Expand Down Expand Up @@ -139,6 +140,98 @@ pub fn epoch_kill_all(cageid: u64, caller_tid: i32) {
}
}

// trigger EPOCH_DLOPEN on all other threads of the cage (fire-and-forget)
// Only writes EPOCH_DLOPEN if the current epoch is EPOCH_NORMAL — if already
// non-NORMAL, the callback will fire anyway and handle all pending work.
// Also sends SIGUSR2 to interrupt threads blocked in host syscalls.
pub fn epoch_dlopen_trigger_others(cageid: u64, caller_tid: i32) {
#[cfg(feature = "disable_signals")]
return;

#[cfg(not(feature = "disable_signals"))]
{
let cage = match get_cage(cageid) {
Some(c) => c,
None => return,
};

for entry in cage.epoch_handler.iter() {
if entry.key() == &caller_tid {
continue;
}
let epoch_handler = entry.value();
let epoch = epoch_handler.load(Ordering::Acquire);
if epoch.is_null() {
continue;
}
// SAFETY: see comment at `signal_epoch_trigger`
unsafe {
// Only overwrite EPOCH_NORMAL. If already EPOCH_SIGNAL or EPOCH_KILLED,
// the callback will fire and handle all pending work anyway.
if *epoch == EPOCH_NORMAL {
*epoch = EPOCH_DLOPEN;
}
}
}

// Best-effort: interrupt threads blocked in host syscalls so they
// replay the dlopen promptly.
let my_tid = unsafe { libc::syscall(libc::SYS_gettid) };
for entry in cage.os_tid_map.iter() {
let os_tid = *entry.value();
if os_tid != my_tid {
unsafe {
libc::syscall(libc::SYS_tkill, os_tid as i32, libc::SIGUSR2);
}
}
}
}
}

// reset the epoch of a specific thread back to EPOCH_NORMAL
// Used after dlopen replay completes on a non-main thread.
pub fn epoch_thread_reset(cageid: u64, thread_id: i32) {
#[cfg(feature = "disable_signals")]
return;

#[cfg(not(feature = "disable_signals"))]
{
let cage = match get_cage(cageid) {
Some(c) => c,
None => return,
};

let epoch_handler = match cage.epoch_handler.get(&thread_id) {
Some(h) => h,
None => return,
};
let epoch = epoch_handler.load(Ordering::Acquire);
if epoch.is_null() {
return;
}
// SAFETY: see comment at `signal_epoch_trigger`
unsafe {
*epoch = EPOCH_NORMAL;
}
}
}

// returns true if the cage has more than one thread registered
// Used to skip the cross-thread sync path in single-threaded cages.
pub fn has_other_threads(cageid: u64, caller_tid: i32) -> bool {
#[cfg(feature = "disable_signals")]
return false;

#[cfg(not(feature = "disable_signals"))]
{
let cage = match get_cage(cageid) {
Some(c) => c,
None => return false,
};
cage.epoch_handler.len() > 1 && cage.epoch_handler.contains_key(&caller_tid)
}
}

// get the current epoch state of the thread
// thread safety: this function will only be invoked by main thread of the cage
fn get_epoch_state(cageid: u64, thread_id: u64) -> u64 {
Expand Down
38 changes: 27 additions & 11 deletions src/lind-boot/src/lind_wasmtime/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::lind_wasmtime::host::{
};
use crate::{cli::CliOptions, lind_wasmtime::host::HostCtx, lind_wasmtime::trampoline::*};
use anyhow::{Context, Result, anyhow, bail};
use cage::signal::{lind_signal_init, signal_may_trigger};
use cage::signal::{
epoch_dlopen_trigger_others, has_other_threads, lind_signal_init, signal_may_trigger,
};
use cfg_if::cfg_if;
use std::os::unix::fs::MetadataExt;
use std::path::Path;
Expand Down Expand Up @@ -491,10 +493,6 @@ fn attach_api(
let linker = lind_ctx.linker.clone().unwrap();
let got_table = lind_ctx.got_table.clone().unwrap();

if lind_ctx.had_threads() {
lind_debug_panic("dlopen within threads is currently not supported!");
}

load_library_module(caller, linker, got_table, cageid, library_name, mode)
});
Some(dynamic_loader)
Expand Down Expand Up @@ -832,7 +830,7 @@ fn load_library_module(
// the library's function references can be relocated correctly.
//
// The GOT is used to patch symbol addresses/indices after instantiation.
let ret = match linker.module_with_caller(
let (ret, memory_base, symbol_map_clone) = match linker.module_with_caller(
&mut main_module,
cageid as u64,
library_name,
Expand All @@ -842,17 +840,35 @@ fn load_library_module(
symbol_map,
library_name.to_string(),
) {
Ok(handle) => handle as i32,
Err(e) => {
Ok((handle, memory_base, symbol_map_clone)) => {
(handle as i32, memory_base, symbol_map_clone)
}
Err(_) => {
#[cfg(feature = "debug-dylink")]
println!("failed to process library `{}`: {:?}", library_name, e);
-(DylinkErrorCode::EINTERNAL as i32) // consider as internal error for now
println!("failed to process library `{}`", library_name);
return -(DylinkErrorCode::EINTERNAL as i32);
}
};

// Release the GOT lock before notifying workers. Worker threads that receive
// EPOCH_DLOPEN will call handle_dlopen_replay, which also acquires got_arc.
// Dropping got_guard here ensures they can proceed without contention.
drop(got_guard);

let caller_tid = main_module.data().lind_fork_ctx.as_ref().unwrap().tid;
let lind_ctx = main_module.data_mut().lind_fork_ctx.as_mut().unwrap();
lind_ctx.attach_linker(linker);
lind_ctx.append_module(library_name.to_string(), lib_module);
lind_ctx.append_module(
library_name.to_string(),
lib_module,
memory_base,
symbol_map_clone,
);

// Fire-and-forget: notify other threads in this cage to replay the new library.
if has_other_threads(cageid as u64, caller_tid) {
epoch_dlopen_trigger_others(cageid as u64, caller_tid);
}

ret
}
Expand Down
103 changes: 88 additions & 15 deletions src/wasmtime/crates/lind-multi-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sysdefs::logging::lind_debug_panic;
use sysdefs::{constants::sys_const, data::sys_struct};
use threei::{threei::make_syscall, threei_const};
use wasmtime_lind_3i::*;
use wasmtime_lind_utils::{LindCageManager, LindGOT};
use wasmtime_lind_utils::{LindCageManager, LindGOT, symbol_table::SymbolMap};

use std::ffi::CStr;
use std::os::raw::c_char;
Expand Down Expand Up @@ -73,7 +73,16 @@ pub struct LindCtx<T, U> {
// the module associated with the ctx
modules: Vec<(String, String, Module)>,

dlopen_modules: Vec<(String, String, Module)>,
// Dynamically loaded modules from dlopen(), shared across threads of the same cage.
// Per-process (fork): child gets its own Arc with a snapshot of the parent's list.
// Per-thread (pthread_create): all threads share the same Arc so that cross-thread
// dlopen visibility works via the epoch-based replay mechanism.
// Tuple: (module_name, path, module, memory_base, symbol_map)
dlopen_modules: Arc<Mutex<Vec<(String, String, Module, i32, SymbolMap)>>>,

// Per-thread cursor into dlopen_modules. Entries from this index onward have not
// yet been replayed into this thread's store.
dlopen_replay_index: usize,

// cage id
pub cageid: i32,
Expand Down Expand Up @@ -191,7 +200,8 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
linker: Some(linker),
got_table,
modules: modules.clone(),
dlopen_modules: vec![],
dlopen_modules: Arc::new(Mutex::new(vec![])),
dlopen_replay_index: 0,
cageid,
tid,
next_threadid,
Expand Down Expand Up @@ -219,12 +229,34 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
}
}

// Record a dynamically loaded module (from dlopen) into this cage's module
// list. During fork() and pthread_create(), every entry in dlopen_modules
// is re-instantiated into the child/thread store so that the child inherits
// access to all libraries the parent opened at runtime.
pub fn append_module(&mut self, path: String, module: Module) {
self.dlopen_modules.push(("env".to_string(), path, module));
// Record a dynamically loaded module (from dlopen) into this cage's shared module list.
// During fork() and pthread_create(), entries are replayed into child/thread stores.
pub fn append_module(
&mut self,
path: String,
module: Module,
memory_base: i32,
symbol_map: SymbolMap,
) {
let mut list = self.dlopen_modules.lock().unwrap();
list.push(("env".to_string(), path, module, memory_base, symbol_map));
}

// Returns true if this thread has pending dlopen entries to replay.
pub fn has_pending_dlopen_replay(&self) -> bool {
let list = self.dlopen_modules.lock().unwrap();
self.dlopen_replay_index < list.len()
}

// Snapshot the entries that have not yet been replayed into this thread's store.
pub fn pending_dlopen_entries(&self) -> Vec<(String, String, Module, i32, SymbolMap)> {
let list = self.dlopen_modules.lock().unwrap();
list[self.dlopen_replay_index..].to_vec()
}

// Advance the per-thread replay cursor after successful replay.
pub fn advance_dlopen_replay(&mut self, count: usize) {
self.dlopen_replay_index += count;
}

// The way multi-processing works depends on Asyncify from Binaryen. Asyncify marks the process into 3 states:
Expand Down Expand Up @@ -431,7 +463,10 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
let lind_manager = child_ctx.lind_manager.clone();
let module = main_module.clone();
let modules = child_ctx.modules.clone();
let dlopen_modules = child_ctx.dlopen_modules.clone();
let dlopen_modules = {
let list = child_ctx.dlopen_modules.lock().unwrap();
list.clone()
};

let mut child_got = if dylink_enabled {
let got = child_ctx.got_table.as_ref().unwrap();
Expand Down Expand Up @@ -598,7 +633,9 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
.instance_dylink(&mut store, "env", instance, vec!["signal_callback"])
.unwrap();

for (name, _path, module) in dlopen_modules.iter() {
for (name, _path, module, _memory_base, _symbol_map) in
dlopen_modules.iter()
{
// Read dylink metadata for this dlopen'd module.
// This contains the module's declared table/memory requirements.
let dylink_info = module.dylink_meminfo();
Expand Down Expand Up @@ -940,6 +977,19 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +

let global_snapshots = caller.as_context_mut().get_global_snapshot();

// Snapshot dlopen_modules at the same point as global_snapshots so that
// memory_base_table (derived from global_snapshots) and dlopen_modules are
// consistent. Reading dlopen_modules inside the spawned thread closure is
// racy: the main thread can call dlopen between pthread_create_call and when
// the Rust thread actually runs, causing memory_base_table to miss the new
// library's entry.
let dlopen_modules_snapshot = {
let mut parent_host = caller.data_mut();
let parent_ctx = get_cx(&mut parent_host);
let list = parent_ctx.dlopen_modules.lock().unwrap();
list.clone()
};

// mark the start of unwind
let _res =
asyncify_start_unwind_func.call(&mut caller, parent_unwind_data_start_usr as i32);
Expand Down Expand Up @@ -1023,7 +1073,9 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +

let module = main_module.clone();
let modules = child_ctx.modules.clone();
let dlopen_modules = child_ctx.dlopen_modules.clone();
// Use the snapshot taken before the asyncify unwind so that
// memory_base_table and dlopen_modules are consistent.
let dlopen_modules = dlopen_modules_snapshot;

let mut store = Store::new_with_inner(&engine, child_host, store_inner)
.expect("failed to create store");
Expand Down Expand Up @@ -1170,7 +1222,9 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
.instance_dylink(&mut store, "env", instance, vec!["signal_callback"])
.unwrap();

for (name, _path, module) in dlopen_modules.iter() {
for (name, _path, module, _memory_base, _symbol_map) in
dlopen_modules.iter()
{
let dylink_info = module.dylink_meminfo();
let dylink_info = dylink_info.as_ref().unwrap();
let table_start = child_table.size(&mut store) as i32;
Expand Down Expand Up @@ -1835,11 +1889,20 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
None
};

// Child process gets its own independent copy of the dlopen list.
// replay_index starts at 0 because the child re-instantiates all libraries
// from scratch via the normal fork_call path.
let dlopen_snapshot = {
let list = self.dlopen_modules.lock().unwrap();
list.clone()
};

let forked_ctx = Self {
linker: None, // Linker is explicitly set up by the caller
got_table: cloned_got, // use GOT with cloned cache, GOT entries will be constructed later
modules: self.modules.clone(),
dlopen_modules: self.dlopen_modules.clone(),
dlopen_modules: Arc::new(Mutex::new(dlopen_snapshot)),
dlopen_replay_index: 0,
cageid: 0, // cageid is managed by lind-common
tid: 1, // thread id starts from 1
next_threadid: Arc::new(AtomicU32::new(1)), // thread id starts from 1
Expand All @@ -1864,11 +1927,21 @@ impl<T: Clone + Send + 'static + std::marker::Sync, U: Clone + Send + 'static +
None
};

// New thread shares the same dlopen_modules Arc as the parent so it sees
// libraries opened after thread creation. replay_index starts at current
// length because the thread instantiation loop (pthread_create_call) will
// replay all existing entries; only future entries need epoch-based replay.
let current_len = {
let list = self.dlopen_modules.lock().unwrap();
list.len()
};

let forked_ctx = Self {
linker: None, // Linker is explicitly set up by the caller
got_table: cloned_got, // use GOT with cloned cache, GOT entries will be constructed later
modules: self.modules.clone(),
dlopen_modules: self.dlopen_modules.clone(),
dlopen_modules: self.dlopen_modules.clone(), // shared Arc
dlopen_replay_index: current_len, // already caught up via pthread_create_call
cageid: self.cageid,
tid: self.tid,
next_threadid: self.next_threadid.clone(),
Expand Down
Loading