Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ OPENHUMAN_TEMPERATURE=0.7
# [optional] Operator override for the inference HTTP connection-establishment
# timeout, in seconds (default 10, range 1-300). Leave unset to use the default.
# OPENHUMAN_INFERENCE_CONNECT_TIMEOUT_SECS=
# [optional] Per-chunk streaming inactivity watchdog, in seconds (default 90,
# range 1-3600). Aborts a stalled SSE response when NO token arrives for this
# long and retries, so an upstream that flushes 200 then goes silent can't hang
# a turn until OPENHUMAN_INFERENCE_TIMEOUT_SECS (which is often raised for long
# research turns). The window resets on every token, so valid long responses are
# never cut. Leave unset to use the default.
# OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS=
# [optional] Headless update restart contract: self_replace | supervisor
# OPENHUMAN_AUTO_UPDATE_RESTART_STRATEGY=self_replace
# [optional] Allow bearer-authenticated RPC callers to invoke update.apply/update.run
Expand Down
15 changes: 15 additions & 0 deletions src/openhuman/inference/provider/compatible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ pub struct OpenAiCompatibleProvider {
/// Used for profile-aware context window resolution and diagnostics.
pub(crate) local_provider_kind:
Option<crate::openhuman::inference::local::profile::LocalProviderKind>,
/// Per-chunk stream inactivity window for the native streaming path
/// (`stream_native_chat`, #4269). Defaults to
/// [`compatible_timeout::stream_idle_timeout`]; tests inject a small value
/// via [`OpenAiCompatibleProvider::with_stream_idle_timeout`].
stream_idle_timeout: std::time::Duration,
}

/// How the provider expects the API key to be sent.
Expand Down Expand Up @@ -210,6 +215,7 @@ impl OpenAiCompatibleProvider {
vision: true,
ollama_num_ctx: None,
local_provider_kind: None,
stream_idle_timeout: compatible_timeout::stream_idle_timeout(),
}
}

Expand All @@ -222,6 +228,15 @@ impl OpenAiCompatibleProvider {
self
}

/// Test-only: shrink the stream inactivity watchdog window (#4269) so
/// stalled-stream and wedged-consumer behaviour can be exercised without
/// waiting the production default (90s).
#[cfg(test)]
pub(crate) fn with_stream_idle_timeout(mut self, window: std::time::Duration) -> Self {
self.stream_idle_timeout = window;
self
}

/// Toggle whether this provider advertises OpenAI-compatible vision input.
/// Cloud providers default to enabled; local OpenAI-compatible shims use
/// this to stay fail-closed for text-only local models.
Expand Down
125 changes: 100 additions & 25 deletions src/openhuman/inference/provider/compatible_stream_native.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::openhuman::inference::provider::traits::ChatResponse as ProviderChatResponse;
use crate::openhuman::inference::provider::ProviderDelta;

use super::compatible_dump::dump_response_if_enabled;
use super::compatible_repeat::{StreamRepeatDetector, STREAM_REPEAT_THRESHOLD};
Expand Down Expand Up @@ -259,7 +260,37 @@ impl OpenAiCompatibleProvider {
let mut sse_chunks_parsed: usize = 0;
let mut body_bytes_received: usize = 0;

'stream: while let Some(item) = bytes_stream.next().await {
// #4269: bound each SSE read with a per-chunk inactivity watchdog. The
// window RESETS on every received chunk, so a legitimately long response
// that keeps emitting tokens is never cut — only a stream that goes
// silent for the whole window (an upstream that flushed 200 then stalled
// / half-closed the body) trips it. Without this the read parks on
// `next().await` until the blunt whole-request timeout fires — which
// operators are told to raise up to 3600s for long research turns
// (#3856) — presenting as the indefinite RESPONSE-phase hang in #4269.
// The bail classifies as retryable, so `ReliableProvider` replays it.
let idle_window = self.stream_idle_timeout;
'stream: loop {
let item = match tokio::time::timeout(idle_window, bytes_stream.next()).await {
Ok(Some(item)) => item,
Ok(None) => break 'stream,
Err(_elapsed) => {
let idle_secs = idle_window.as_secs();
log::warn!(
"[stream] {} watchdog fired — no SSE data for {}s (elapsed_ms={} sse_chunks={} body_bytes={}); aborting stalled stream for retry",
self.name,
idle_secs,
stream_started_at.elapsed().as_millis(),
sse_chunks_parsed,
body_bytes_received,
);
anyhow::bail!(
"{} streaming watchdog: no response data for {}s — aborting stalled stream for retry",
self.name,
idle_secs,
);
}
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let bytes = item?;
body_bytes_received += bytes.len();
buffer.push_str(&String::from_utf8_lossy(&bytes));
Expand Down Expand Up @@ -295,7 +326,13 @@ impl OpenAiCompatibleProvider {
};
let data = data.trim();
if data == "[DONE]" {
continue;
// `[DONE]` is the terminal SSE sentinel — the response is
// complete. Stop reading immediately rather than looping
// back to another watchdog-armed read: a provider that
// sends `[DONE]` but lingers the socket would otherwise
// trip the inactivity watchdog and fail a finished
// response as a retryable stall (#4269 watchdog review).
break 'stream;
}

let chunk: StreamChunkResponse = match serde_json::from_str(data) {
Expand Down Expand Up @@ -325,11 +362,13 @@ impl OpenAiCompatibleProvider {
if let Some(content) = choice.delta.content.as_ref() {
if !content.is_empty() {
text_accum.push_str(content);
let _ = delta_tx
.send(crate::openhuman::inference::provider::ProviderDelta::TextDelta {
self.forward_delta(
delta_tx,
ProviderDelta::TextDelta {
delta: content.clone(),
})
.await;
},
)
.await?;
if repeat_detector.observe(content) {
log::warn!(
"[stream] {} degenerate repetition detected (≥{} identical lines) — aborting generation, truncating (text_chars={})",
Expand All @@ -345,13 +384,13 @@ impl OpenAiCompatibleProvider {
if let Some(reasoning) = choice.delta.reasoning_content.as_ref() {
if !reasoning.is_empty() {
thinking_accum.push_str(reasoning);
let _ = delta_tx
.send(
crate::openhuman::inference::provider::ProviderDelta::ThinkingDelta {
delta: reasoning.clone(),
},
)
.await;
self.forward_delta(
delta_tx,
ProviderDelta::ThinkingDelta {
delta: reasoning.clone(),
},
)
.await?;
}
}
// Tool-call fragments.
Expand Down Expand Up @@ -442,12 +481,14 @@ impl OpenAiCompatibleProvider {
id,
name,
);
let _ = delta_tx
.send(crate::openhuman::inference::provider::ProviderDelta::ToolCallStart {
self.forward_delta(
delta_tx,
ProviderDelta::ToolCallStart {
call_id: id.clone(),
tool_name: name.clone(),
})
.await;
},
)
.await?;
entry.emitted_start = true;
if !entry.arguments.is_empty() {
log::debug!(
Expand All @@ -457,25 +498,29 @@ impl OpenAiCompatibleProvider {
entry.arguments.len(),
);
let buffered = entry.arguments.clone();
let _ = delta_tx
.send(crate::openhuman::inference::provider::ProviderDelta::ToolCallArgsDelta {
self.forward_delta(
delta_tx,
ProviderDelta::ToolCallArgsDelta {
call_id: id.clone(),
delta: buffered,
})
.await;
},
)
.await?;
entry.emitted_chars = entry.arguments.len();
}
}
} else if entry.arguments.len() > entry.emitted_chars {
if let Some(ref id) = entry.id {
let fresh =
entry.arguments[entry.emitted_chars..].to_string();
let _ = delta_tx
.send(crate::openhuman::inference::provider::ProviderDelta::ToolCallArgsDelta {
self.forward_delta(
delta_tx,
ProviderDelta::ToolCallArgsDelta {
call_id: id.clone(),
delta: fresh,
})
.await;
},
)
.await?;
entry.emitted_chars = entry.arguments.len();
}
}
Expand Down Expand Up @@ -624,6 +669,36 @@ impl OpenAiCompatibleProvider {

Self::parse_native_response(api_resp, &self.name)
}

/// Forward a streamed [`ProviderDelta`] to the caller under the same
/// inactivity watchdog as the SSE read (#4269). A dropped receiver is a
/// benign stop (the consumer is gone — `Ok`, and the loop keeps aggregating
/// as before); a consumer that backpressures for the whole idle window is a
/// wedge (e.g. a stalled UI progress bridge), which trips the watchdog so a
/// turn can't hang on a full delta channel. The bail classifies as retryable.
async fn forward_delta(
&self,
delta_tx: &tokio::sync::mpsc::Sender<ProviderDelta>,
delta: ProviderDelta,
) -> anyhow::Result<()> {
match tokio::time::timeout(self.stream_idle_timeout, delta_tx.send(delta)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(_)) => Ok(()), // receiver dropped — consumer gone, not a stall
Err(_elapsed) => {
let idle_secs = self.stream_idle_timeout.as_secs();
log::warn!(
"[stream] {} watchdog fired — delta channel backpressured for {}s; aborting stalled stream for retry",
self.name,
idle_secs,
);
anyhow::bail!(
"{} streaming watchdog: delta channel stalled for {}s — aborting stalled stream for retry",
self.name,
idle_secs,
)
}
}
}
}

/// Extract the `data:` payload of an SSE `event: error` frame, or `None` when
Expand Down
Loading
Loading