Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 104 additions & 143 deletions src/openhuman/tinyagents/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,88 +1052,66 @@ impl Middleware<()> for CostBudgetMiddleware {
}
}

/// Consecutive **any**-failure no-progress backstop: different commands all
/// failing means the goal is unreachable here. Matches the legacy
/// `NO_PROGRESS_FAILURE_THRESHOLD`.
const NO_PROGRESS_FAILURE_THRESHOLD: usize = 6;
/// Consecutive **identical** hard-policy-rejection repeats before halting — a
/// blocked call re-issued unchanged can never succeed. Legacy
/// `HARD_REJECT_REPEAT_THRESHOLD`.
const HARD_REJECT_REPEAT_THRESHOLD: usize = 2;

/// `after_tool`: stop the run when tool calls keep failing with no progress
/// (issue #4249). The legacy tool loop's progress guard surfaced a root-cause
/// halt summary — a security/approval denial re-issued unchanged, an identical
/// error retried, or *different* commands all failing — instead of burning the
/// whole iteration budget and ending on a generic cap error. The tinyagents path
/// kept only the model/tool call caps, so this reinstates the guard as a graph
/// middleware. Three halt conditions, checked per failure (any success resets
/// every counter — progress was made):
/// `after_tool`: drive the no-progress escalation ladder (issues #4249, #4089).
/// The legacy tool loop's progress guard surfaced a root-cause halt summary — a
/// security/approval denial re-issued unchanged, an identical error retried, or
/// *different* commands all failing — instead of burning the whole iteration
/// budget and ending on a generic cap error. The tinyagents path kept only the
/// model/tool call caps, so this reinstates the guard as a graph middleware.
///
/// 1. **Hard policy rejection** (`[policy-blocked]`) repeated `HARD_REJECT_REPEAT_THRESHOLD`
/// times with an identical signature — "blocked by the security policy … re-issued".
/// 2. **Identical** error signature repeated `identical_threshold` times —
/// "retried N times with identical arguments".
/// 3. **Any** failure `NO_PROGRESS_FAILURE_THRESHOLD` times in a row (even with
/// varied errors) — "N tool calls in a row failed".
/// The detection + escalation live in the reusable
/// [`NoProgressTracker`](super::no_progress::NoProgressTracker); this middleware
/// is the thin driver that feeds each tool outcome in and translates the
/// returned [`NoProgress`](super::no_progress::NoProgress) verdict onto the
/// shared steering handle:
///
/// On trip it records a root-cause summary into the shared [`HaltSummarySlot`]
/// (the turn overrides its final text with it) and pauses the run via the shared
/// steering handle (same mechanism as the stop-hook / cap pausers).
/// - [`NoProgress::Nudge`] — a repeated identical failure below the retry cap:
/// inject a structured "no progress since step X" corrective as a user turn so
/// the model changes strategy on its next step (#4089). The loop continues.
/// - [`NoProgress::Halt`] — same-strategy retries exhausted (or the any-failure
/// backstop tripped): record the root-cause summary into the shared
/// [`HaltSummarySlot`] (the turn overrides its final text with it) and pause
/// the run via the steering handle (same mechanism as the stop-hook / cap
/// pausers).
pub struct RepeatedToolFailureMiddleware {
handle: SteeringHandle,
identical_threshold: usize,
halt_summary: super::HaltSummarySlot,
state: std::sync::Mutex<FailureState>,
tracker: super::no_progress::NoProgressTracker,
/// call_id → argument fingerprint, captured in `before_tool` (the tool result
/// carries no arguments). Folded into the identical-repeat signature so the
/// "identical arguments" halt only trips on the *same* args — two different
/// "identical arguments" ladder only trips on the *same* args — two different
/// argument sets that happen to share a first error line don't count as a
/// repeat and can't pre-empt the generic no-progress backstop.
arg_sigs: std::sync::Mutex<std::collections::HashMap<String, String>>,
}

#[derive(Default)]
struct FailureState {
last_sig: Option<String>,
same_count: usize,
consecutive: usize,
}

impl RepeatedToolFailureMiddleware {
/// Build the breaker. `identical_threshold` (the identical-signature retry
/// ceiling) is clamped to at least 2 — a single failure is never a loop.
/// Build the breaker. `identical_threshold` is the same-strategy retry cap
/// (identical tool + args + error), clamped by the tracker so a nudge always
/// precedes the halt.
pub fn new(
handle: SteeringHandle,
identical_threshold: usize,
halt_summary: super::HaltSummarySlot,
) -> Self {
Self {
handle,
identical_threshold: identical_threshold.max(2),
halt_summary,
state: std::sync::Mutex::new(FailureState::default()),
tracker: super::no_progress::NoProgressTracker::new(identical_threshold),
arg_sigs: std::sync::Mutex::new(std::collections::HashMap::new()),
}
}
}

/// A stable, bounded fingerprint of a tool call's arguments for the identical-
/// repeat signature (hashed so a huge payload doesn't bloat the map/comparison).
fn args_fingerprint(arguments: &serde_json::Value) -> String {
pub(super) fn args_fingerprint(arguments: &serde_json::Value) -> String {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
arguments.to_string().hash(&mut hasher);
format!("{:x}", hasher.finish())
}

/// Trim a tool error for inclusion in a halt summary (keep it bounded but retain
/// the deterministic leading detail the model/user needs).
fn truncate_for_halt(text: &str) -> String {
const MAX: usize = 600;
crate::openhuman::util::truncate_with_ellipsis(text, MAX)
}

#[async_trait]
impl Middleware<()> for RepeatedToolFailureMiddleware {
fn name(&self) -> &str {
Expand All @@ -1156,102 +1134,67 @@ impl Middleware<()> for RepeatedToolFailureMiddleware {

async fn after_tool(
&self,
_ctx: &mut RunContext<()>,
ctx: &mut RunContext<()>,
_state: &(),
result: &mut TaToolResult,
) -> TaResult<()> {
let mut state = self.state.lock().unwrap();
let arg_fp = self
.arg_sigs
.lock()
.ok()
.and_then(|mut sigs| sigs.remove(&result.call_id))
.unwrap_or_default();
let Some(err) = result.error.as_deref() else {
// Success → progress was made; reset every counter.
*state = FailureState::default();
return Ok(());
};

// Signature: tool name + argument fingerprint + first error line (the
// deterministic parts; a huge payload tail must not dominate the
// identical-repeat comparison). Including the args means the "identical
// arguments" halt only fires when the args truly repeat.
let err_line = err.lines().next().unwrap_or(err);
let sig = format!("{}\u{1f}{arg_fp}\u{1f}{err_line}", result.name);
// The unknown-tool recovery is a failure the model can correct (it got the
// "unknown tool" feedback), so it must NOT feed the generic *any*-failure
// no-progress counter — else a turn that recovers from one bad tool name
// and then legitimately exhausts its iteration budget would trip the
// backstop instead of hitting the cap. It still feeds the *identical*-repeat
// counter, so re-issuing the SAME unavailable tool halts with a root cause.
if result.name != UNKNOWN_TOOL_SENTINEL {
state.consecutive += 1;
}
let same_count = match &state.last_sig {
Some(prev) if *prev == sig => {
state.same_count += 1;
state.same_count
}
_ => {
state.last_sig = Some(sig);
state.same_count = 1;
1
}
};

// A hard policy rejection is marked in the tool output; it can never
// succeed when re-issued unchanged, so it trips faster.
let is_hard_reject = result
// succeed when re-issued unchanged, so the ladder trips it faster.
let hard_reject = result
.content
.contains(crate::openhuman::security::POLICY_BLOCKED_MARKER)
|| err.contains(crate::openhuman::security::POLICY_BLOCKED_MARKER);

let summary = if is_hard_reject && same_count >= HARD_REJECT_REPEAT_THRESHOLD {
Some(format!(
"Stopping: the `{}` call is blocked by the security policy and was re-issued with \
identical arguments — it can never succeed this way. Reason:\n{}\n\nDo not repeat \
this call; use an allowed alternative or report that it can't be done here.",
result.name,
truncate_for_halt(err),
))
} else if same_count >= self.identical_threshold {
Some(format!(
"Stopping: the `{}` call was retried {same_count} times with identical arguments \
and kept failing — repeating it will not help. Last error:\n{}\n\nThis looks \
unrecoverable in the current environment. Report this back instead of retrying.",
result.name,
truncate_for_halt(err),
))
} else if state.consecutive >= NO_PROGRESS_FAILURE_THRESHOLD {
Some(format!(
"Stopping: {} tool calls in a row failed with no progress. Last error (from \
`{}`):\n{}\n\nDifferent commands are all failing — the goal looks unreachable in \
this environment. Report this back instead of retrying.",
state.consecutive,
result.name,
truncate_for_halt(err),
))
} else {
None
|| result
.error
.as_deref()
.is_some_and(|e| e.contains(crate::openhuman::security::POLICY_BLOCKED_MARKER));

let attempt = super::no_progress::ToolAttempt {
tool: &result.name,
arg_fingerprint: &arg_fp,
error: result.error.as_deref(),
hard_reject,
recoverable_miss: result.name == UNKNOWN_TOOL_SENTINEL,
};
// The model-call count doubles as the loop "step" for the no-progress
// wording ("no progress since step X").
let step = ctx.limits.model_calls();

if let Some(summary) = summary {
tracing::warn!(
tool = %result.name,
consecutive = state.consecutive,
same_count,
is_hard_reject,
"[tinyagents::mw] repeated tool failure — halting run so the root cause surfaces"
);
if let Ok(mut slot) = self.halt_summary.lock() {
*slot = Some(summary);
match self.tracker.record(step, &attempt) {
super::no_progress::NoProgress::Continue => {}
super::no_progress::NoProgress::Nudge(signal) => {
tracing::info!(
tool = %result.name,
step,
"[tinyagents::mw] no progress — nudging the model to change strategy"
);
// Feed the corrective back into the loop as a user turn; the
// harness applies it at the next steering checkpoint (before the
// next model call), so the model sees it before it acts again.
self.handle
.send(SteeringCommand::InjectMessage(TaMessage::user(signal)));
Comment thread
M3gA-Mind marked this conversation as resolved.
Outdated
}
super::no_progress::NoProgress::Halt(summary) => {
tracing::warn!(
tool = %result.name,
step,
hard_reject,
"[tinyagents::mw] repeated tool failure — halting run so the root cause surfaces"
);
if let Ok(mut slot) = self.halt_summary.lock() {
*slot = Some(summary);
}
// Pause at the top of the next iteration (before the next model
// call), matching the stop-hook / cap pause path. The tracker
// already reset its state on the halt.
self.handle.send(SteeringCommand::Pause);
}
// Pause at the top of the next iteration (before the next model call),
// matching the stop-hook / cap pause path. Reset so a resumed run does
// not immediately re-pause on the same latched state.
self.handle.send(SteeringCommand::Pause);
*state = FailureState::default();
}
Ok(())
}
Expand Down Expand Up @@ -1596,50 +1539,67 @@ mod tests {
r
}

/// Kinds of the steering commands currently queued on `handle`, drained.
fn drained_kinds(handle: &SteeringHandle) -> Vec<tinyagents::harness::steering::SteeringCommandKind> {
handle.drain().iter().map(|c| c.kind()).collect()
}

#[tokio::test]
async fn repeated_tool_failure_pauses_only_after_the_threshold() {
async fn repeated_identical_failure_nudges_then_pauses() {
use tinyagents::harness::steering::SteeringCommandKind;
let handle = SteeringHandle::allow_all();
let mw = RepeatedToolFailureMiddleware::new(
handle.clone(),
3,
std::sync::Arc::new(std::sync::Mutex::new(None)),
);
// Two identical failures: below the threshold, no pause.
for _ in 0..2 {
let mut r = failing_result("flaky", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
}
assert_eq!(handle.pending(), 0, "no pause before the threshold");
// Third identical failure trips the breaker.
// First identical failure: nothing yet.
let mut r = failing_result("flaky", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
assert!(
handle.pending() >= 1,
assert_eq!(drained_kinds(&handle), Vec::new(), "no signal on the first failure");
// Second: a "no progress" nudge feeds back into the loop — not a pause.
let mut r = failing_result("flaky", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
assert_eq!(
drained_kinds(&handle),
vec![SteeringCommandKind::InjectMessage],
"the second identical failure should nudge, not pause"
);
// Third: same-strategy retries exhausted → pause the run.
let mut r = failing_result("flaky", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
assert_eq!(
drained_kinds(&handle),
vec![SteeringCommandKind::Pause],
"the third identical failure should pause the run"
);
}

#[tokio::test]
async fn repeated_tool_failure_resets_on_a_success() {
use tinyagents::harness::steering::SteeringCommandKind;
let handle = SteeringHandle::allow_all();
let mw = RepeatedToolFailureMiddleware::new(
handle.clone(),
3,
std::sync::Arc::new(std::sync::Mutex::new(None)),
);
// Two failures, then a success clears the counter.
// Two failures (the second nudges), then a success clears the counter.
for _ in 0..2 {
let mut r = failing_result("t", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
}
let mut ok = tool_result("t", "fine"); // error = None
mw.after_tool(&mut ctx(), &(), &mut ok).await.unwrap();
// Two more failures — still below the threshold because the counter reset.
// Two more failures — the counter reset, so no Pause fires (only a nudge).
for _ in 0..2 {
let mut r = failing_result("t", "boom");
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
}
assert_eq!(handle.pending(), 0, "a success should reset the breaker");
assert!(
!drained_kinds(&handle).contains(&SteeringCommandKind::Pause),
"a success should reset the breaker so it never halts"
);
}

#[tokio::test]
Expand All @@ -1651,15 +1611,16 @@ mod tests {
std::sync::Arc::new(std::sync::Mutex::new(None)),
);
// Three *different* errors never trip the breaker — only an identical,
// deterministic failure loop does.
// deterministic failure loop does (and the varied-failure nudge only
// fires at four in a row).
for err in ["e1", "e2", "e3"] {
let mut r = failing_result("t", err);
mw.after_tool(&mut ctx(), &(), &mut r).await.unwrap();
}
assert_eq!(
handle.pending(),
0,
"distinct errors must not trip the breaker"
"distinct errors must not trip the breaker this early"
);
}

Expand Down
Loading
Loading