Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/src/providers/ChatRuntimeProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,10 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => {
elapsedMs: event.subagent?.elapsed_ms ?? updatedCalls[callIdx].elapsedMs,
outputChars: event.subagent?.output_chars ?? updatedCalls[callIdx].outputChars,
result: event.output ?? updatedCalls[callIdx].result,
// Carry the structured failure so the child row keeps its "why / next"
// copy live instead of losing it until a snapshot reload (#4459). A
// successful result clears any stale failure on the row.
failure: event.success ? undefined : parseToolFailure(event.failure),
Comment thread
senamakel marked this conversation as resolved.
};
const next = [...existing];
next[idx] = { ...entry, subagent: { ...entry.subagent, toolCalls: updatedCalls } };
Expand All @@ -912,6 +916,7 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => {
elapsedMs: event.subagent?.elapsed_ms,
outputChars: event.subagent?.output_chars,
result: event.output,
failure: event.success ? undefined : parseToolFailure(event.failure),
})
);
},
Expand Down
68 changes: 68 additions & 0 deletions app/src/providers/__tests__/ChatRuntimeProvider.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,74 @@ describe('ChatRuntimeProvider — dedupe, proactive resolution, mid-turn invaria
});
});

it('stores the structured failure on a failed subagent tool call (#4459)', () => {
const listeners = renderProvider();
const threadId = 'tsa-fail';

act(() => {
listeners.onSubagentSpawned?.({
thread_id: threadId,
request_id: 'r1',
tool_name: 'researcher',
skill_id: 'sub-1',
message: 'spawned',
round: 1,
subagent: { mode: 'typed', dedicated_thread: false, prompt_chars: 42 },
});
listeners.onSubagentToolCall?.({
thread_id: threadId,
request_id: 'r1',
round: 1,
tool_name: 'shell',
skill_id: 'sub-1',
tool_call_id: 'cc-1',
subagent: { agent_id: 'researcher', task_id: 'sub-1', child_iteration: 1 },
});
});

act(() => {
listeners.onSubagentToolResult?.({
thread_id: threadId,
request_id: 'r1',
round: 1,
tool_name: 'shell',
skill_id: 'sub-1',
tool_call_id: 'cc-1',
success: false,
failure: {
class: 'denied',
category: 'user_declined',
cause_plain: 'You declined this action.',
next_action: 'Ask again if you change your mind.',
recoverable: false,
},
subagent: { agent_id: 'researcher', task_id: 'sub-1', child_iteration: 1, elapsed_ms: 5 },
});
});

const timeline = store.getState().chatRuntime.toolTimelineByThread[threadId] ?? [];
const call = timeline[0]?.subagent?.toolCalls[0];
expect(call).toMatchObject({ callId: 'cc-1', status: 'error' });
// The structured why/next survives live rather than being dropped until a
// snapshot reload (#4459).
expect(call?.failure).toMatchObject({
class: 'denied',
category: 'user_declined',
recoverable: false,
causePlain: 'You declined this action.',
nextAction: 'Ask again if you change your mind.',
});
// The rendered live path uses `subagent.transcript`, so the failure must
// also land on the transcript tool item, not just the fallback list.
const transcriptTool = timeline[0]?.subagent?.transcript?.find(
i => i.kind === 'tool' && i.callId === 'cc-1'
);
expect(transcriptTool).toMatchObject({
status: 'error',
failure: { class: 'denied', category: 'user_declined' },
});
});

it('ignores subagent_tool_call events that arrive before subagent_spawned', () => {
const listeners = renderProvider();
const threadId = 'tsa-orphan';
Expand Down
7 changes: 7 additions & 0 deletions app/src/services/chatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ export interface ChatSubagentToolResultEvent {
* `subagent.elapsed_ms`.
*/
output?: string;
/**
* Optional structured failure explanation for a FAILED child tool call
* (#4459), present only when `success` is false. Parsed via
* `parseToolFailure` and stored on the child row so the "why / next" copy
* survives live (previously it was dropped until a snapshot reload).
*/
failure?: unknown;
subagent?: SubagentProgressDetail;
}

Expand Down
12 changes: 11 additions & 1 deletion app/src/store/chatRuntimeSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ export type SubagentTranscriptItem =
displayName?: string;
/** Server-computed contextual detail (path / recipient / query). */
detail?: string;
/** Plain-language failure explanation for a FAILED child tool call
* (#4459) — kept on the transcript item so the rendered live path (not
* just the fallback `toolCalls` list) shows the why/next copy. */
failure?: ToolFailureExplanation;
};

/** One child tool call performed by a running sub-agent. */
Expand Down Expand Up @@ -730,6 +734,7 @@ function subagentTranscriptItemFromPersisted(
outputChars: item.outputChars,
displayName: item.displayName,
detail: item.detail,
failure: item.failure,
};
}
return { kind: item.kind, iteration: item.iteration, text: item.text };
Expand Down Expand Up @@ -1109,16 +1114,21 @@ const chatRuntimeSlice = createSlice({
elapsedMs?: number;
outputChars?: number;
result?: string;
failure?: ToolFailureExplanation;
}>
) => {
const { threadId, rowId, callId, success, elapsedMs, outputChars, result } = action.payload;
const { threadId, rowId, callId, success, elapsedMs, outputChars, result, failure } =
action.payload;
const entry = state.toolTimelineByThread[threadId]?.find(e => e.id === rowId);
const item = entry?.subagent?.transcript?.find(i => i.kind === 'tool' && i.callId === callId);
if (!item || item.kind !== 'tool') return;
item.status = success ? 'success' : 'error';
if (elapsedMs != null) item.elapsedMs = elapsedMs;
if (outputChars != null) item.outputChars = outputChars;
if (result != null) item.result = result;
// Carry the structured why/next onto the rendered transcript item; a
// successful result clears any stale failure (#4459).
item.failure = success ? undefined : failure;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Render subagent failures instead of only storing them

When a sub-agent child tool fails in the normal transcript-present path, this only stores failure on the transcript item, but the renderers for those items (ToolTimelineBlock inline rows and SubagentDrawer rows) still display only the tool name/status/detail/timing and never read the new field. In that scenario the UI still shows just “Failed” and the why/next copy remains invisible, so this plumbing needs to be connected to the subagent row renderers as well.

Useful? React with 👍 / 👎.

},
setTaskBoardForThread: (
state,
Expand Down
3 changes: 3 additions & 0 deletions app/src/types/turnState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ export type PersistedSubagentTranscriptItem =
outputChars?: number;
displayName?: string;
detail?: string;
/** Plain-language failure explanation for a FAILED child tool call
* (#4459); mirrors {@link PersistedSubagentToolCall.failure}. */
failure?: PersistedToolFailure;
};

export interface PersistedSubagentActivity {
Expand Down
10 changes: 7 additions & 3 deletions src/openhuman/tinyagents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,9 +1204,13 @@ fn assemble_turn_harness(
// the retained provider handle (the other clone was consumed into the
// primary `ProviderModel`); `build_route_models` clones it per route and
// skips the turn's own model so we don't shadow the default.
for route in
routes::build_route_models(&summary_provider, temperature, model, max_output_tokens)
{
for route in routes::build_route_models(
&summary_provider,
temperature,
model,
max_output_tokens,
&provider_usage_carry,
) {
let routes::RouteModel {
name,
model: route_model,
Expand Down
68 changes: 62 additions & 6 deletions src/openhuman/tinyagents/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ pub(crate) struct OpenhumanEventBridge {
/// is recorded exactly once. See [`OpenhumanEventBridge::record_usage`].
recorded_iterations: Mutex<std::collections::HashSet<u32>>,
state: Mutex<BridgeState>,
/// Ordered overflow buffer for progress events that hit backpressure
/// (channel `Full`). Once ANY event spills here, `draining` stays set and
/// every subsequent event queues here too — a single spawned forwarder
/// drains them to the channel in FIFO order — so a later fast-path
/// `try_send` can never jump ahead of an earlier spilled event and scramble
/// start/completed ordering (which would leave a tool row stuck `running`
/// when a `ToolCallCompleted` overtakes its `ToolCallStarted`) (#4466).
overflow: Arc<Mutex<OverflowState>>,
}

/// Backpressure overflow state guarded by a single mutex so the "are we
/// draining?" decision and the queue mutation stay atomic together.
#[derive(Default)]
struct OverflowState {
queue: std::collections::VecDeque<AgentProgress>,
draining: bool,
}

impl OpenhumanEventBridge {
Expand Down Expand Up @@ -221,6 +237,7 @@ impl OpenhumanEventBridge {
usage_carry,
recorded_iterations: Mutex::new(std::collections::HashSet::new()),
state: Mutex::new(BridgeState::default()),
overflow: Arc::default(),
})
}

Expand Down Expand Up @@ -267,17 +284,51 @@ impl OpenhumanEventBridge {
let Some(tx) = &self.on_progress else {
return;
};
// Hold the overflow lock across the ordering decision so "are we
// draining?" and the queue mutation are atomic (try_send is
// non-blocking, so holding a std mutex across it is fine).
let mut ov = self.overflow.lock().unwrap_or_else(|p| p.into_inner());
if ov.draining {
// Already spilling: queue in order; the single forwarder delivers it.
ov.queue.push_back(progress);
return;
}
match tx.try_send(progress) {
Ok(()) => {}
Err(TrySendError::Closed(_)) => {}
Err(TrySendError::Full(progress)) => {
// Backpressure, not capacity loss: hand the delta to an awaited
// `send()` on a spawned task rather than dropping it. Guard on a
// live runtime so a non-async construction path can't panic.
// Backpressure, not capacity loss. Enter ordered-drain mode:
// queue this event and spawn ONE forwarder that awaits `send()`
// per event in FIFO order. `draining` stays set (so every later
// event also queues here) until the buffer fully drains — that is
// what stops a later `try_send` from overtaking a spilled earlier
// event and scrambling start/completed ordering.
if let Ok(handle) = tokio::runtime::Handle::try_current() {
ov.queue.push_back(progress);
ov.draining = true;
let overflow = Arc::clone(&self.overflow);
let tx = tx.clone();
drop(ov);
handle.spawn(async move {
let _ = tx.send(progress).await;
loop {
let next = {
let mut ov = overflow.lock().unwrap_or_else(|p| p.into_inner());
match ov.queue.pop_front() {
Some(item) => item,
None => {
ov.draining = false;
break;
}
}
};
if tx.send(next).await.is_err() {
// Receiver gone: stop draining, discard the rest.
let mut ov = overflow.lock().unwrap_or_else(|p| p.into_inner());
ov.queue.clear();
ov.draining = false;
break;
}
}
});
} else {
tracing::debug!(
Expand Down Expand Up @@ -445,8 +496,13 @@ impl OpenhumanEventBridge {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cached_input_tokens: usage.cache_read_tokens,
cache_creation_tokens: usage.cache_creation_tokens,
reasoning_tokens: usage.reasoning_tokens,
// Use the carried-or-crate breakdown (computed above): for
// providers where cache-creation/reasoning tokens ride only on
// the carried UsageInfo, the crate `usage.*` fields are 0, which
// would leave per-call progress/Langfuse telemetry at zero even
// though the cost tracker got the right values (#4467).
cache_creation_tokens,
reasoning_tokens,
cost_usd: call_cost,
});
self.send(AgentProgress::TurnCostUpdated {
Expand Down
8 changes: 7 additions & 1 deletion src/openhuman/tinyagents/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub(super) fn build_route_models(
temperature: f64,
skip_model: &str,
max_output_tokens: Option<u32>,
// Shared provider-usage carry (#4467): fallback route models must drain the
// SAME carry the bridge reads, or a successful fallback call never feeds its
// backend-charged USD / context-window / cache-creation-reasoning breakdown
// to the cost surfaces (it would fall back to a catalogue estimate).
usage_carry: &super::observability::ProviderUsageCarry,
) -> Vec<RouteModel> {
let mut out = Vec::new();
for &tier in WORKLOAD_ROUTE_TIERS {
Expand All @@ -108,7 +113,8 @@ pub(super) fn build_route_models(
}
let mut model = ProviderModel::new(provider.clone(), tier, temperature)
.with_vision(vision)
.with_reasoning(reasoning);
.with_reasoning(reasoning)
.with_usage_carry(usage_carry.clone());
if let Some(cap) = max_output_tokens {
model = model.with_max_tokens(cap);
}
Expand Down
36 changes: 35 additions & 1 deletion src/openhuman/tools/impl/filesystem/update_memory_md.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,36 @@ fn workspace_write_lock(workspace_dir: &Path) -> Arc<tokio::sync::Mutex<()>> {
)
}

/// Acquire an **inter-process** advisory write lock for `workspace_dir` (#4458).
///
/// [`WORKSPACE_WRITE_LOCKS`] only serializes writers inside this OS process, but
/// cron launches work via separate `tokio::process::Command` subprocesses that
/// don't share that mutex — so two cron runs could still clobber the same
/// `MEMORY.md` mid read-modify-write. This takes an `fs2` exclusive `flock` on a
/// sentinel `.memory-write.lock` file in the workspace; the returned `File`
/// holds the lock until it is dropped (end of the write). `flock` acquisition
/// blocks, so it runs on a blocking thread.
async fn acquire_cross_process_write_lock(workspace_dir: &Path) -> anyhow::Result<std::fs::File> {
let lock_path = workspace_dir.join(".memory-write.lock");
tokio::task::spawn_blocking(move || {
use fs2::FileExt;
if let Some(parent) = lock_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_path)
Comment on lines +64 to +68

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject symlinked workspace lock files

If a workspace already contains .memory-write.lock as a symlink (including a dangling symlink), this new OpenOptions call follows it with create(true).write(true) before any containment check. That lets a project-controlled path make update_memory_md create/open/lock a file outside the workspace, bypassing the symlink hardening applied to MEMORY.md/SKILL.md; the lock file should be opened with no-follow semantics or prevalidated as a regular file inside the canonical workspace.

Useful? React with 👍 / 👎.

.map_err(|e| anyhow::anyhow!("open workspace lock file {lock_path:?}: {e}"))?;
file.lock_exclusive()
.map_err(|e| anyhow::anyhow!("acquire workspace write flock: {e}"))?;
Ok::<std::fs::File, anyhow::Error>(file)
})
.await
.map_err(|e| anyhow::anyhow!("workspace lock task join failed: {e}"))?
}

/// Atomically replace `path`'s contents with `content`.
///
/// Writes to a sibling temp file in the same directory (so the rename stays on
Expand Down Expand Up @@ -218,9 +248,13 @@ impl Tool for UpdateMemoryMdTool {
// write so no interleaving append can be lost.
let lock = workspace_write_lock(&workspace_dir);
let _guard = lock.lock().await;
// Also take a cross-process advisory lock so cron subprocesses (which
// don't share the in-process mutex above) can't clobber the same file
// mid-RMW. Held across read + atomic write; released on drop.
let _file_lock = acquire_cross_process_write_lock(&workspace_dir).await?;
tracing::debug!(
workspace = %workspace_dir.display(),
"[update_memory_md] acquired per-workspace write lock"
"[update_memory_md] acquired per-workspace write lock (in-process + cross-process flock)"
);

match action {
Expand Down
Loading