diff --git a/.env.example b/.env.example index b8626e8475..79aa6a5238 100644 --- a/.env.example +++ b/.env.example @@ -111,6 +111,13 @@ OPENHUMAN_TEMPERATURE=0.7 # [optional] Operator override for the inference HTTP connection-establishment # timeout, in seconds (default 10, range 1-300). Leave unset to use the default. # OPENHUMAN_INFERENCE_CONNECT_TIMEOUT_SECS= +# [optional] Per-chunk streaming inactivity watchdog, in seconds (default 90, +# range 1-3600). Aborts a stalled SSE response when NO token arrives for this +# long and retries, so an upstream that flushes 200 then goes silent can't hang +# a turn until OPENHUMAN_INFERENCE_TIMEOUT_SECS (which is often raised for long +# research turns). The window resets on every token, so valid long responses are +# never cut. Leave unset to use the default. +# OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS= # [optional] Headless update restart contract: self_replace | supervisor # OPENHUMAN_AUTO_UPDATE_RESTART_STRATEGY=self_replace # [optional] Allow bearer-authenticated RPC callers to invoke update.apply/update.run diff --git a/src/openhuman/inference/provider/compatible.rs b/src/openhuman/inference/provider/compatible.rs index 34eb4d08b3..f902a0c903 100644 --- a/src/openhuman/inference/provider/compatible.rs +++ b/src/openhuman/inference/provider/compatible.rs @@ -102,6 +102,11 @@ pub struct OpenAiCompatibleProvider { /// Used for profile-aware context window resolution and diagnostics. pub(crate) local_provider_kind: Option, + /// Per-chunk stream inactivity window for the native streaming path + /// (`stream_native_chat`, #4269). Defaults to + /// [`compatible_timeout::stream_idle_timeout`]; tests inject a small value + /// via [`OpenAiCompatibleProvider::with_stream_idle_timeout`]. + stream_idle_timeout: std::time::Duration, } /// How the provider expects the API key to be sent. @@ -210,6 +215,7 @@ impl OpenAiCompatibleProvider { vision: true, ollama_num_ctx: None, local_provider_kind: None, + stream_idle_timeout: compatible_timeout::stream_idle_timeout(), } } @@ -222,6 +228,15 @@ impl OpenAiCompatibleProvider { self } + /// Test-only: shrink the stream inactivity watchdog window (#4269) so + /// stalled-stream and wedged-consumer behaviour can be exercised without + /// waiting the production default (90s). + #[cfg(test)] + pub(crate) fn with_stream_idle_timeout(mut self, window: std::time::Duration) -> Self { + self.stream_idle_timeout = window; + self + } + /// Toggle whether this provider advertises OpenAI-compatible vision input. /// Cloud providers default to enabled; local OpenAI-compatible shims use /// this to stay fail-closed for text-only local models. diff --git a/src/openhuman/inference/provider/compatible_stream_native.rs b/src/openhuman/inference/provider/compatible_stream_native.rs index ea4ef9d57f..99ec9b6dc8 100644 --- a/src/openhuman/inference/provider/compatible_stream_native.rs +++ b/src/openhuman/inference/provider/compatible_stream_native.rs @@ -1,4 +1,5 @@ use crate::openhuman::inference::provider::traits::ChatResponse as ProviderChatResponse; +use crate::openhuman::inference::provider::ProviderDelta; use super::compatible_dump::dump_response_if_enabled; use super::compatible_repeat::{StreamRepeatDetector, STREAM_REPEAT_THRESHOLD}; @@ -259,7 +260,37 @@ impl OpenAiCompatibleProvider { let mut sse_chunks_parsed: usize = 0; let mut body_bytes_received: usize = 0; - 'stream: while let Some(item) = bytes_stream.next().await { + // #4269: bound each SSE read with a per-chunk inactivity watchdog. The + // window RESETS on every received chunk, so a legitimately long response + // that keeps emitting tokens is never cut — only a stream that goes + // silent for the whole window (an upstream that flushed 200 then stalled + // / half-closed the body) trips it. Without this the read parks on + // `next().await` until the blunt whole-request timeout fires — which + // operators are told to raise up to 3600s for long research turns + // (#3856) — presenting as the indefinite RESPONSE-phase hang in #4269. + // The bail classifies as retryable, so `ReliableProvider` replays it. + let idle_window = self.stream_idle_timeout; + 'stream: loop { + let item = match tokio::time::timeout(idle_window, bytes_stream.next()).await { + Ok(Some(item)) => item, + Ok(None) => break 'stream, + Err(_elapsed) => { + let idle_secs = idle_window.as_secs(); + log::warn!( + "[stream] {} watchdog fired — no SSE data for {}s (elapsed_ms={} sse_chunks={} body_bytes={}); aborting stalled stream for retry", + self.name, + idle_secs, + stream_started_at.elapsed().as_millis(), + sse_chunks_parsed, + body_bytes_received, + ); + anyhow::bail!( + "{} streaming watchdog: no response data for {}s — aborting stalled stream for retry", + self.name, + idle_secs, + ); + } + }; let bytes = item?; body_bytes_received += bytes.len(); buffer.push_str(&String::from_utf8_lossy(&bytes)); @@ -295,7 +326,13 @@ impl OpenAiCompatibleProvider { }; let data = data.trim(); if data == "[DONE]" { - continue; + // `[DONE]` is the terminal SSE sentinel — the response is + // complete. Stop reading immediately rather than looping + // back to another watchdog-armed read: a provider that + // sends `[DONE]` but lingers the socket would otherwise + // trip the inactivity watchdog and fail a finished + // response as a retryable stall (#4269 watchdog review). + break 'stream; } let chunk: StreamChunkResponse = match serde_json::from_str(data) { @@ -325,11 +362,13 @@ impl OpenAiCompatibleProvider { if let Some(content) = choice.delta.content.as_ref() { if !content.is_empty() { text_accum.push_str(content); - let _ = delta_tx - .send(crate::openhuman::inference::provider::ProviderDelta::TextDelta { + self.forward_delta( + delta_tx, + ProviderDelta::TextDelta { delta: content.clone(), - }) - .await; + }, + ) + .await?; if repeat_detector.observe(content) { log::warn!( "[stream] {} degenerate repetition detected (≥{} identical lines) — aborting generation, truncating (text_chars={})", @@ -345,13 +384,13 @@ impl OpenAiCompatibleProvider { if let Some(reasoning) = choice.delta.reasoning_content.as_ref() { if !reasoning.is_empty() { thinking_accum.push_str(reasoning); - let _ = delta_tx - .send( - crate::openhuman::inference::provider::ProviderDelta::ThinkingDelta { - delta: reasoning.clone(), - }, - ) - .await; + self.forward_delta( + delta_tx, + ProviderDelta::ThinkingDelta { + delta: reasoning.clone(), + }, + ) + .await?; } } // Tool-call fragments. @@ -442,12 +481,14 @@ impl OpenAiCompatibleProvider { id, name, ); - let _ = delta_tx - .send(crate::openhuman::inference::provider::ProviderDelta::ToolCallStart { + self.forward_delta( + delta_tx, + ProviderDelta::ToolCallStart { call_id: id.clone(), tool_name: name.clone(), - }) - .await; + }, + ) + .await?; entry.emitted_start = true; if !entry.arguments.is_empty() { log::debug!( @@ -457,12 +498,14 @@ impl OpenAiCompatibleProvider { entry.arguments.len(), ); let buffered = entry.arguments.clone(); - let _ = delta_tx - .send(crate::openhuman::inference::provider::ProviderDelta::ToolCallArgsDelta { + self.forward_delta( + delta_tx, + ProviderDelta::ToolCallArgsDelta { call_id: id.clone(), delta: buffered, - }) - .await; + }, + ) + .await?; entry.emitted_chars = entry.arguments.len(); } } @@ -470,12 +513,14 @@ impl OpenAiCompatibleProvider { if let Some(ref id) = entry.id { let fresh = entry.arguments[entry.emitted_chars..].to_string(); - let _ = delta_tx - .send(crate::openhuman::inference::provider::ProviderDelta::ToolCallArgsDelta { + self.forward_delta( + delta_tx, + ProviderDelta::ToolCallArgsDelta { call_id: id.clone(), delta: fresh, - }) - .await; + }, + ) + .await?; entry.emitted_chars = entry.arguments.len(); } } @@ -624,6 +669,36 @@ impl OpenAiCompatibleProvider { Self::parse_native_response(api_resp, &self.name) } + + /// Forward a streamed [`ProviderDelta`] to the caller under the same + /// inactivity watchdog as the SSE read (#4269). A dropped receiver is a + /// benign stop (the consumer is gone — `Ok`, and the loop keeps aggregating + /// as before); a consumer that backpressures for the whole idle window is a + /// wedge (e.g. a stalled UI progress bridge), which trips the watchdog so a + /// turn can't hang on a full delta channel. The bail classifies as retryable. + async fn forward_delta( + &self, + delta_tx: &tokio::sync::mpsc::Sender, + delta: ProviderDelta, + ) -> anyhow::Result<()> { + match tokio::time::timeout(self.stream_idle_timeout, delta_tx.send(delta)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(_)) => Ok(()), // receiver dropped — consumer gone, not a stall + Err(_elapsed) => { + let idle_secs = self.stream_idle_timeout.as_secs(); + log::warn!( + "[stream] {} watchdog fired — delta channel backpressured for {}s; aborting stalled stream for retry", + self.name, + idle_secs, + ); + anyhow::bail!( + "{} streaming watchdog: delta channel stalled for {}s — aborting stalled stream for retry", + self.name, + idle_secs, + ) + } + } + } } /// Extract the `data:` payload of an SSE `event: error` frame, or `None` when diff --git a/src/openhuman/inference/provider/compatible_tests.rs b/src/openhuman/inference/provider/compatible_tests.rs index d66e34647c..43b9747caa 100644 --- a/src/openhuman/inference/provider/compatible_tests.rs +++ b/src/openhuman/inference/provider/compatible_tests.rs @@ -4631,3 +4631,221 @@ async fn codex_oauth_responses_preserves_concrete_model() { "a concrete pinned model must not be remapped" ); } + +// --------------------------------------------------------------------------- +// #4269: SSE inactivity watchdog on stream_native_chat +// --------------------------------------------------------------------------- + +/// Minimal streaming request for the watchdog tests. +fn watchdog_request() -> NativeChatRequest { + NativeChatRequest { + model: "watchdog-model".to_string(), + messages: vec![NativeMessage { + role: "user".to_string(), + content: Some("hi".into()), + tool_call_id: None, + tool_calls: None, + reasoning_content: None, + }], + temperature: Some(0.7), + stream: Some(true), + tools: None, + tool_choice: None, + thread_id: None, + stream_options: Some(super::compatible_types::OpenAiStreamOptions { + include_usage: true, + }), + options: None, + frequency_penalty: None, + max_tokens: None, + } +} + +/// Raw TCP server: writes a 200 SSE response head, plays `script` (each entry: +/// sleep `delay`, then write `bytes`), then either closes the socket +/// (`close_after`) or holds it open for `hold_open`. wiremock can't stall +/// mid-body, so this reproduces the #4269 upstream-stall shape deterministically. +async fn spawn_scripted_sse( + script: Vec<(std::time::Duration, &'static str)>, + close_after: bool, + hold_open: std::time::Duration, +) -> String { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + if let Ok((mut sock, _)) = listener.accept().await { + // Best-effort drain of the request head so the client's send() completes. + let mut buf = [0u8; 2048]; + let _ = sock.read(&mut buf).await; + let head = + "HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nConnection: close\r\n\r\n"; + if sock.write_all(head.as_bytes()).await.is_err() { + return; + } + let _ = sock.flush().await; + for (delay, bytes) in script { + tokio::time::sleep(delay).await; + if sock.write_all(bytes.as_bytes()).await.is_err() { + return; + } + let _ = sock.flush().await; + } + if close_after { + return; + } + tokio::time::sleep(hold_open).await; + } + }); + format!("http://{addr}") +} + +#[tokio::test] +async fn stream_watchdog_trips_on_stalled_read() { + // Upstream flushes 200 SSE headers then goes silent and holds the socket — + // the #4269 shape. With a short idle window the read watchdog must abort + // with a retryable error rather than parking on next().await until the + // whole-request timeout (which #3856 tells operators to raise up to 1h). + let url = spawn_scripted_sse(vec![], false, std::time::Duration::from_secs(30)).await; + let provider = OpenAiCompatibleProvider::new("stalltest", &url, None, AuthStyle::None) + .with_stream_idle_timeout(std::time::Duration::from_millis(400)); + let request = watchdog_request(); + let (delta_tx, _delta_rx) = tokio::sync::mpsc::channel(8); + let err = provider + .stream_native_chat(None, &request, &delta_tx, 0) + .await + .expect_err("a stalled stream must trip the read watchdog"); + let msg = err.to_string(); + assert!( + msg.contains("streaming watchdog") && msg.contains("no response data"), + "unexpected error: {msg}" + ); + // Must classify as retryable so ReliableProvider replays the turn. + assert!( + !crate::openhuman::inference::provider::reliable::is_non_retryable(&err), + "watchdog stall must be retryable, got: {msg}" + ); +} + +#[tokio::test] +async fn stream_watchdog_resets_on_each_chunk() { + // Chunks arrive every 150ms — each well under the 500ms idle window — so the + // watchdog resets on every chunk and never fires: a legitimately long + // response that keeps emitting tokens is not cut (the #4269 no-regression + // guarantee). + let chunk = "data: {\"choices\":[{\"delta\":{\"content\":\"tok \"}}]}\n\n"; + let script = vec![ + (std::time::Duration::from_millis(150), chunk), + (std::time::Duration::from_millis(150), chunk), + (std::time::Duration::from_millis(150), chunk), + (std::time::Duration::from_millis(150), "data: [DONE]\n\n"), + ]; + let url = spawn_scripted_sse(script, true, std::time::Duration::from_secs(0)).await; + let provider = OpenAiCompatibleProvider::new("resettest", &url, None, AuthStyle::None) + .with_stream_idle_timeout(std::time::Duration::from_millis(500)); + let request = watchdog_request(); + let (delta_tx, _delta_rx) = tokio::sync::mpsc::channel(64); + let resp = provider + .stream_native_chat(None, &request, &delta_tx, 0) + .await + .expect("chunks within the idle window must not trip the watchdog"); + let text = resp.text.as_deref().unwrap_or_default(); + assert!( + text.contains("tok tok tok"), + "all three tokens must stream to completion (watchdog must not fire): {text:?}" + ); +} + +#[tokio::test] +async fn stream_watchdog_trips_on_wedged_delta_consumer() { + // The full SSE body arrives at once, but the delta channel is capacity-1 and + // never drained (receiver held, not polled). The send-side watchdog must + // trip rather than block the stream forever on a full delta channel — the + // backpressure-wedge path (#4269). + let body = "data: {\"choices\":[{\"delta\":{\"content\":\"a\"}}]}\n\n\ + data: {\"choices\":[{\"delta\":{\"content\":\"b\"}}]}\n\n\ + data: {\"choices\":[{\"delta\":{\"content\":\"c\"}}]}\n\n\ + data: [DONE]\n\n"; + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/chat/completions")) + .respond_with(ResponseTemplate::new(200).set_body_raw(body, "text/event-stream")) + .mount(&mock_server) + .await; + let provider = + OpenAiCompatibleProvider::new("wedgetest", &mock_server.uri(), None, AuthStyle::None) + .with_stream_idle_timeout(std::time::Duration::from_millis(300)); + let request = watchdog_request(); + // Capacity 1, receiver kept alive but never polled -> the second send wedges. + let (delta_tx, _delta_rx) = tokio::sync::mpsc::channel(1); + let err = provider + .stream_native_chat(None, &request, &delta_tx, 0) + .await + .expect_err("a wedged delta consumer must trip the send-side watchdog"); + let msg = err.to_string(); + assert!( + msg.contains("streaming watchdog") && msg.contains("delta channel"), + "unexpected error: {msg}" + ); + assert!( + !crate::openhuman::inference::provider::reliable::is_non_retryable(&err), + "watchdog wedge must be retryable, got: {msg}" + ); +} + +#[tokio::test] +async fn stream_watchdog_tolerates_dropped_delta_receiver() { + // A dropped receiver is benign, not a stall: forward_delta returns Ok and the + // loop keeps aggregating, so the response still completes with no watchdog error. + let body = "data: {\"choices\":[{\"delta\":{\"content\":\"x\"}}]}\n\n\ + data: {\"choices\":[{\"delta\":{\"content\":\"y\"}}]}\n\n\ + data: [DONE]\n\n"; + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/chat/completions")) + .respond_with(ResponseTemplate::new(200).set_body_raw(body, "text/event-stream")) + .mount(&mock_server) + .await; + let provider = + OpenAiCompatibleProvider::new("droptest", &mock_server.uri(), None, AuthStyle::None) + .with_stream_idle_timeout(std::time::Duration::from_millis(300)); + let request = watchdog_request(); + let (delta_tx, delta_rx) = tokio::sync::mpsc::channel(8); + drop(delta_rx); // consumer gone before streaming starts + let resp = provider + .stream_native_chat(None, &request, &delta_tx, 0) + .await + .expect("a dropped delta receiver must not fail the stream"); + assert_eq!(resp.text.as_deref(), Some("xy")); +} + +#[tokio::test] +async fn stream_stops_on_done_even_if_socket_lingers() { + // A provider that sends `[DONE]` then holds the socket open must finalize the + // (complete) response immediately — the watchdog must NOT re-arm and fail it + // as a stall. Regression for the watchdog + terminal-sentinel interaction + // (CodeRabbit review, PR #4393). With the pre-fix `continue`, this stream + // would trip the 300ms idle watchdog instead of completing. + let script = vec![ + ( + std::time::Duration::from_millis(0), + "data: {\"choices\":[{\"delta\":{\"content\":\"done-content\"}}]}\n\n", + ), + (std::time::Duration::from_millis(0), "data: [DONE]\n\n"), + ]; + // close_after = false: hold the socket open for 30s AFTER [DONE]. + let url = spawn_scripted_sse(script, false, std::time::Duration::from_secs(30)).await; + let provider = OpenAiCompatibleProvider::new("donetest", &url, None, AuthStyle::None) + .with_stream_idle_timeout(std::time::Duration::from_millis(300)); + let request = watchdog_request(); + let (delta_tx, _delta_rx) = tokio::sync::mpsc::channel(8); + // Must return well under the 30s socket hold (and without a stall error). + let resp = tokio::time::timeout( + std::time::Duration::from_secs(5), + provider.stream_native_chat(None, &request, &delta_tx, 0), + ) + .await + .expect("must complete on [DONE], not wait for the socket to close") + .expect("[DONE] must finalize the response, not trip the watchdog"); + assert_eq!(resp.text.as_deref(), Some("done-content")); +} diff --git a/src/openhuman/inference/provider/compatible_timeout.rs b/src/openhuman/inference/provider/compatible_timeout.rs index d74102ebe9..869a34a987 100644 --- a/src/openhuman/inference/provider/compatible_timeout.rs +++ b/src/openhuman/inference/provider/compatible_timeout.rs @@ -6,8 +6,9 @@ //! resolved from environment variables, with the previous values as defaults so //! behaviour is unchanged unless an operator overrides them: //! -//! - `OPENHUMAN_INFERENCE_TIMEOUT_SECS` — whole-request timeout (default 120) -//! - `OPENHUMAN_INFERENCE_CONNECT_TIMEOUT_SECS` — connection-establishment timeout (default 10) +//! - `OPENHUMAN_INFERENCE_TIMEOUT_SECS` — whole-request timeout (default 120) +//! - `OPENHUMAN_INFERENCE_CONNECT_TIMEOUT_SECS` — connection-establishment timeout (default 10) +//! - `OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS` — per-chunk stream inactivity timeout (default 90, #4269) //! //! A missing, non-numeric, or out-of-range value falls back to the default //! (logged at debug level by [`resolve`]), so a typo can never disable the @@ -29,9 +30,20 @@ const MAX_REQUEST_TIMEOUT_SECS: u64 = 3600; /// Largest accepted connect timeout (5 minutes) — establishing a connection /// should never legitimately take longer. const MAX_CONNECT_TIMEOUT_SECS: u64 = 300; +/// Default per-chunk stream inactivity timeout in seconds (#4269). Sits +/// comfortably above normal inter-token gaps — including reasoning-model +/// thinking pauses, which still stream as `reasoning_content` deltas and so +/// reset the window — yet below the 120s whole-request default, so a stalled +/// RESPONSE phase is caught and retried rather than held to the request ceiling +/// (up to 1 hour). The window RESETS on every received chunk, so a legitimately +/// long answer that keeps emitting tokens is never cut. +const DEFAULT_STREAM_IDLE_TIMEOUT_SECS: u64 = 90; +/// Largest accepted stream-idle timeout (1 hour) — matches the request ceiling. +const MAX_STREAM_IDLE_TIMEOUT_SECS: u64 = 3600; const REQUEST_ENV_VAR: &str = "OPENHUMAN_INFERENCE_TIMEOUT_SECS"; const CONNECT_ENV_VAR: &str = "OPENHUMAN_INFERENCE_CONNECT_TIMEOUT_SECS"; +const STREAM_IDLE_ENV_VAR: &str = "OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS"; /// Parse a raw env-var value into a bounded timeout in seconds. /// @@ -90,6 +102,24 @@ pub(super) fn connect_timeout() -> Duration { }) } +/// Per-chunk inactivity timeout for streaming inference responses (#4269). +/// Override via `OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS` (default 90s, +/// range 1..=3600). The streaming read loop and each downstream delta send are +/// guarded by this window; it RESETS on every received chunk, so a legitimately +/// long response that keeps emitting tokens is never cut — only a genuine stall +/// (no bytes from upstream, or a wedged consumer) for the whole window trips it. +/// Resolved once per process and cached — see [`request_timeout`]. +pub(super) fn stream_idle_timeout() -> Duration { + static CACHED: OnceLock = OnceLock::new(); + *CACHED.get_or_init(|| { + resolve( + STREAM_IDLE_ENV_VAR, + DEFAULT_STREAM_IDLE_TIMEOUT_SECS, + MAX_STREAM_IDLE_TIMEOUT_SECS, + ) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -125,4 +155,30 @@ mod tests { assert_eq!(DEFAULT_REQUEST_TIMEOUT_SECS, 120); assert_eq!(DEFAULT_CONNECT_TIMEOUT_SECS, 10); } + + #[test] + fn stream_idle_default_fires_before_the_whole_request_timeout() { + // #4269: the watchdog must trip before the whole-request deadline so a + // stalled RESPONSE phase is retried, not held to the request ceiling. + assert_eq!(DEFAULT_STREAM_IDLE_TIMEOUT_SECS, 90); + assert!(DEFAULT_STREAM_IDLE_TIMEOUT_SECS < DEFAULT_REQUEST_TIMEOUT_SECS); + } + + #[test] + fn stream_idle_parse_respects_bounds() { + let (def, min, max) = (DEFAULT_STREAM_IDLE_TIMEOUT_SECS, MIN_TIMEOUT_SECS, 3600); + assert_eq!(parse_timeout_secs(None, def, min, max), def); + assert_eq!(parse_timeout_secs(Some("0"), def, min, max), def); // 0 would disable + assert_eq!(parse_timeout_secs(Some("5"), def, min, max), 5); + assert_eq!(parse_timeout_secs(Some("3600"), def, min, max), max); // ceiling + assert_eq!(parse_timeout_secs(Some("3601"), def, min, max), def); // above ceiling + } + + #[test] + fn stream_idle_getter_returns_a_sane_duration() { + // Unset (or set) in the env, the cached getter must resolve to a value + // inside the documented bounds — never 0 (which would disable the guard). + let d = stream_idle_timeout(); + assert!(d.as_secs() >= MIN_TIMEOUT_SECS && d.as_secs() <= MAX_STREAM_IDLE_TIMEOUT_SECS); + } }