Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/src/providers/ChatRuntimeProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,10 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => {
elapsedMs: event.subagent?.elapsed_ms ?? updatedCalls[callIdx].elapsedMs,
outputChars: event.subagent?.output_chars ?? updatedCalls[callIdx].outputChars,
result: event.output ?? updatedCalls[callIdx].result,
// Carry the structured failure so the child row keeps its "why / next"
// copy live instead of losing it until a snapshot reload (#4459). A
// successful result clears any stale failure on the row.
failure: event.success ? undefined : parseToolFailure(event.failure),
Comment thread
senamakel marked this conversation as resolved.
};
const next = [...existing];
next[idx] = { ...entry, subagent: { ...entry.subagent, toolCalls: updatedCalls } };
Expand Down
7 changes: 7 additions & 0 deletions app/src/services/chatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ export interface ChatSubagentToolResultEvent {
* `subagent.elapsed_ms`.
*/
output?: string;
/**
* Optional structured failure explanation for a FAILED child tool call
* (#4459), present only when `success` is false. Parsed via
* `parseToolFailure` and stored on the child row so the "why / next" copy
* survives live (previously it was dropped until a snapshot reload).
*/
failure?: unknown;
subagent?: SubagentProgressDetail;
}

Expand Down
10 changes: 7 additions & 3 deletions src/openhuman/tinyagents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,9 +1204,13 @@ fn assemble_turn_harness(
// the retained provider handle (the other clone was consumed into the
// primary `ProviderModel`); `build_route_models` clones it per route and
// skips the turn's own model so we don't shadow the default.
for route in
routes::build_route_models(&summary_provider, temperature, model, max_output_tokens)
{
for route in routes::build_route_models(
&summary_provider,
temperature,
model,
max_output_tokens,
&provider_usage_carry,
) {
let routes::RouteModel {
name,
model: route_model,
Expand Down
70 changes: 64 additions & 6 deletions src/openhuman/tinyagents/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ pub(crate) struct OpenhumanEventBridge {
/// is recorded exactly once. See [`OpenhumanEventBridge::record_usage`].
recorded_iterations: Mutex<std::collections::HashSet<u32>>,
state: Mutex<BridgeState>,
/// Ordered overflow buffer for progress events that hit backpressure
/// (channel `Full`). Once ANY event spills here, `draining` stays set and
/// every subsequent event queues here too — a single spawned forwarder
/// drains them to the channel in FIFO order — so a later fast-path
/// `try_send` can never jump ahead of an earlier spilled event and scramble
/// start/completed ordering (which would leave a tool row stuck `running`
/// when a `ToolCallCompleted` overtakes its `ToolCallStarted`) (#4466).
overflow: Arc<Mutex<OverflowState>>,
}

/// Backpressure overflow state guarded by a single mutex so the "are we
/// draining?" decision and the queue mutation stay atomic together.
#[derive(Default)]
struct OverflowState {
queue: std::collections::VecDeque<AgentProgress>,
draining: bool,
}

impl OpenhumanEventBridge {
Expand Down Expand Up @@ -221,6 +237,7 @@ impl OpenhumanEventBridge {
usage_carry,
recorded_iterations: Mutex::new(std::collections::HashSet::new()),
state: Mutex::new(BridgeState::default()),
overflow: Arc::default(),
})
}

Expand Down Expand Up @@ -267,17 +284,53 @@ impl OpenhumanEventBridge {
let Some(tx) = &self.on_progress else {
return;
};
// Hold the overflow lock across the ordering decision so "are we
// draining?" and the queue mutation are atomic (try_send is
// non-blocking, so holding a std mutex across it is fine).
let mut ov = self.overflow.lock().unwrap_or_else(|p| p.into_inner());
if ov.draining {
// Already spilling: queue in order; the single forwarder delivers it.
ov.queue.push_back(progress);
return;
}
match tx.try_send(progress) {
Ok(()) => {}
Err(TrySendError::Closed(_)) => {}
Err(TrySendError::Full(progress)) => {
// Backpressure, not capacity loss: hand the delta to an awaited
// `send()` on a spawned task rather than dropping it. Guard on a
// live runtime so a non-async construction path can't panic.
// Backpressure, not capacity loss. Enter ordered-drain mode:
// queue this event and spawn ONE forwarder that awaits `send()`
// per event in FIFO order. `draining` stays set (so every later
// event also queues here) until the buffer fully drains — that is
// what stops a later `try_send` from overtaking a spilled earlier
// event and scrambling start/completed ordering.
if let Ok(handle) = tokio::runtime::Handle::try_current() {
ov.queue.push_back(progress);
ov.draining = true;
let overflow = Arc::clone(&self.overflow);
let tx = tx.clone();
drop(ov);
handle.spawn(async move {
let _ = tx.send(progress).await;
loop {
let next = {
let mut ov =
overflow.lock().unwrap_or_else(|p| p.into_inner());
match ov.queue.pop_front() {
Some(item) => item,
None => {
ov.draining = false;
break;
}
}
};
if tx.send(next).await.is_err() {
// Receiver gone: stop draining, discard the rest.
let mut ov =
overflow.lock().unwrap_or_else(|p| p.into_inner());
ov.queue.clear();
ov.draining = false;
break;
}
}
});
} else {
tracing::debug!(
Expand Down Expand Up @@ -445,8 +498,13 @@ impl OpenhumanEventBridge {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cached_input_tokens: usage.cache_read_tokens,
cache_creation_tokens: usage.cache_creation_tokens,
reasoning_tokens: usage.reasoning_tokens,
// Use the carried-or-crate breakdown (computed above): for
// providers where cache-creation/reasoning tokens ride only on
// the carried UsageInfo, the crate `usage.*` fields are 0, which
// would leave per-call progress/Langfuse telemetry at zero even
// though the cost tracker got the right values (#4467).
cache_creation_tokens,
reasoning_tokens,
cost_usd: call_cost,
});
self.send(AgentProgress::TurnCostUpdated {
Expand Down
8 changes: 7 additions & 1 deletion src/openhuman/tinyagents/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub(super) fn build_route_models(
temperature: f64,
skip_model: &str,
max_output_tokens: Option<u32>,
// Shared provider-usage carry (#4467): fallback route models must drain the
// SAME carry the bridge reads, or a successful fallback call never feeds its
// backend-charged USD / context-window / cache-creation-reasoning breakdown
// to the cost surfaces (it would fall back to a catalogue estimate).
usage_carry: &super::observability::ProviderUsageCarry,
) -> Vec<RouteModel> {
let mut out = Vec::new();
for &tier in WORKLOAD_ROUTE_TIERS {
Expand All @@ -108,7 +113,8 @@ pub(super) fn build_route_models(
}
let mut model = ProviderModel::new(provider.clone(), tier, temperature)
.with_vision(vision)
.with_reasoning(reasoning);
.with_reasoning(reasoning)
.with_usage_carry(usage_carry.clone());
if let Some(cap) = max_output_tokens {
model = model.with_max_tokens(cap);
}
Expand Down
36 changes: 35 additions & 1 deletion src/openhuman/tools/impl/filesystem/update_memory_md.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,36 @@ fn workspace_write_lock(workspace_dir: &Path) -> Arc<tokio::sync::Mutex<()>> {
)
}

/// Acquire an **inter-process** advisory write lock for `workspace_dir` (#4458).
///
/// [`WORKSPACE_WRITE_LOCKS`] only serializes writers inside this OS process, but
/// cron launches work via separate `tokio::process::Command` subprocesses that
/// don't share that mutex — so two cron runs could still clobber the same
/// `MEMORY.md` mid read-modify-write. This takes an `fs2` exclusive `flock` on a
/// sentinel `.memory-write.lock` file in the workspace; the returned `File`
/// holds the lock until it is dropped (end of the write). `flock` acquisition
/// blocks, so it runs on a blocking thread.
async fn acquire_cross_process_write_lock(workspace_dir: &Path) -> anyhow::Result<std::fs::File> {
let lock_path = workspace_dir.join(".memory-write.lock");
tokio::task::spawn_blocking(move || {
use fs2::FileExt;
if let Some(parent) = lock_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_path)
Comment on lines +64 to +68

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject symlinked workspace lock files

If a workspace already contains .memory-write.lock as a symlink (including a dangling symlink), this new OpenOptions call follows it with create(true).write(true) before any containment check. That lets a project-controlled path make update_memory_md create/open/lock a file outside the workspace, bypassing the symlink hardening applied to MEMORY.md/SKILL.md; the lock file should be opened with no-follow semantics or prevalidated as a regular file inside the canonical workspace.

Useful? React with 👍 / 👎.

.map_err(|e| anyhow::anyhow!("open workspace lock file {lock_path:?}: {e}"))?;
file.lock_exclusive()
.map_err(|e| anyhow::anyhow!("acquire workspace write flock: {e}"))?;
Ok::<std::fs::File, anyhow::Error>(file)
})
.await
.map_err(|e| anyhow::anyhow!("workspace lock task join failed: {e}"))?
}

/// Atomically replace `path`'s contents with `content`.
///
/// Writes to a sibling temp file in the same directory (so the rename stays on
Expand Down Expand Up @@ -218,9 +248,13 @@ impl Tool for UpdateMemoryMdTool {
// write so no interleaving append can be lost.
let lock = workspace_write_lock(&workspace_dir);
let _guard = lock.lock().await;
// Also take a cross-process advisory lock so cron subprocesses (which
// don't share the in-process mutex above) can't clobber the same file
// mid-RMW. Held across read + atomic write; released on drop.
let _file_lock = acquire_cross_process_write_lock(&workspace_dir).await?;
tracing::debug!(
workspace = %workspace_dir.display(),
"[update_memory_md] acquired per-workspace write lock"
"[update_memory_md] acquired per-workspace write lock (in-process + cross-process flock)"
);

match action {
Expand Down
Loading