Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/openhuman/app_state/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ static SNAPSHOT_REQ_COUNTER: AtomicU64 = AtomicU64::new(0);
struct CachedRuntimeSnapshot {
snapshot: RuntimeSnapshot,
fetched_at: Instant,
/// Config identity (`workspace_dir`) the snapshot was built for. The cache
/// holds one entry process-wide, so a snapshot built for one config must
/// never be served to another — otherwise a different user/workspace (or an
/// E2E test with an injected service mock) reads a stale, foreign runtime.
config_key: PathBuf,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -756,9 +761,14 @@ pub fn peek_cached_current_user_identity() -> Option<crate::openhuman::agent::pr
/// Return the cached runtime snapshot when it is still within
/// `RUNTIME_SNAPSHOT_TTL`, else `None`. Kept as a small helper so both the
/// fast-path read and the post-lock double-check share identical freshness logic.
fn fresh_cached_runtime_snapshot(req_id: u64) -> Option<RuntimeSnapshot> {
fn fresh_cached_runtime_snapshot(config: &Config, req_id: u64) -> Option<RuntimeSnapshot> {
let cache = RUNTIME_SNAPSHOT_CACHE.lock();
let entry = cache.as_ref()?;
// A snapshot built for a different config identity is a miss: rebuild against
// this config rather than serve another workspace's runtime.
if entry.config_key != config.workspace_dir {
return None;
}
let age = entry.fetched_at.elapsed();
if age < RUNTIME_SNAPSHOT_TTL {
debug!(
Expand All @@ -774,7 +784,7 @@ fn fresh_cached_runtime_snapshot(req_id: u64) -> Option<RuntimeSnapshot> {
async fn build_runtime_snapshot(config: &Config, req_id: u64) -> RuntimeSnapshot {
// Fast path: a fresh cached snapshot serves every poller without touching the
// sub-op fan-out.
if let Some(snapshot) = fresh_cached_runtime_snapshot(req_id) {
if let Some(snapshot) = fresh_cached_runtime_snapshot(config, req_id) {
return snapshot;
}

Expand All @@ -783,7 +793,7 @@ async fn build_runtime_snapshot(config: &Config, req_id: u64) -> RuntimeSnapshot
// double-check) and return it instead of launching a duplicate build —
// collapsing an N-way stampede into one build per TTL window.
let _rebuild_guard = RUNTIME_SNAPSHOT_REBUILD.lock().await;
if let Some(snapshot) = fresh_cached_runtime_snapshot(req_id) {
if let Some(snapshot) = fresh_cached_runtime_snapshot(config, req_id) {
debug!(
"{LOG_PREFIX} build_runtime_snapshot: coalesced onto concurrent rebuild req_id={req_id}"
);
Expand Down Expand Up @@ -912,6 +922,7 @@ async fn build_runtime_snapshot(config: &Config, req_id: u64) -> RuntimeSnapshot
*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: snapshot.clone(),
fetched_at: Instant::now(),
config_key: config.workspace_dir.clone(),
});

snapshot
Expand Down
42 changes: 39 additions & 3 deletions src/openhuman/app_state/ops_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ fn runtime_snapshot_cache_hit_within_ttl() {
*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: dummy.clone(),
fetched_at: Instant::now(),
config_key: std::path::PathBuf::new(),
});

let cache = RUNTIME_SNAPSHOT_CACHE.lock();
Expand All @@ -229,6 +230,7 @@ fn runtime_snapshot_cache_miss_after_ttl() {
*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: build_dummy_runtime_snapshot(),
fetched_at: Instant::now() - (RUNTIME_SNAPSHOT_TTL + Duration::from_millis(100)),
config_key: std::path::PathBuf::new(),
});

let cache = RUNTIME_SNAPSHOT_CACHE.lock();
Expand All @@ -245,12 +247,14 @@ fn fresh_cached_runtime_snapshot_returns_entry_within_ttl() {
let _reset = SnapshotCacheResetGuard;

let dummy = build_dummy_runtime_snapshot();
let cfg = Config::default();
*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: dummy.clone(),
fetched_at: Instant::now(),
config_key: cfg.workspace_dir.clone(),
});

let served = fresh_cached_runtime_snapshot(1).expect("fresh entry should be served");
let served = fresh_cached_runtime_snapshot(&cfg, 1).expect("fresh entry should be served");
assert_eq!(served.autocomplete.phase, dummy.autocomplete.phase);
}

Expand All @@ -259,16 +263,48 @@ fn fresh_cached_runtime_snapshot_misses_when_stale_or_empty() {
let _cache_lock = APP_STATE_CACHE_TEST_LOCK.lock();
let _reset = SnapshotCacheResetGuard;

let cfg = Config::default();

// Empty cache → miss (forces the single-flight rebuild path).
*RUNTIME_SNAPSHOT_CACHE.lock() = None;
assert!(fresh_cached_runtime_snapshot(2).is_none());
assert!(fresh_cached_runtime_snapshot(&cfg, 2).is_none());

// Stale cache → miss, so the TTL bump can't silently keep serving old data.
*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: build_dummy_runtime_snapshot(),
fetched_at: Instant::now() - (RUNTIME_SNAPSHOT_TTL + Duration::from_millis(100)),
config_key: cfg.workspace_dir.clone(),
});
assert!(fresh_cached_runtime_snapshot(3).is_none());
assert!(fresh_cached_runtime_snapshot(&cfg, 3).is_none());
}

#[test]
fn fresh_cached_runtime_snapshot_misses_on_config_key_mismatch() {
let _cache_lock = APP_STATE_CACHE_TEST_LOCK.lock();
let _reset = SnapshotCacheResetGuard;

// A fresh entry cached for one workspace must never be served to another
// config — a second user, or an E2E harness with an injected service mock,
// has to rebuild against its own runtime instead of reading a foreign one.
let mut owner = Config::default();
owner.workspace_dir = std::path::PathBuf::from("/tmp/ws-owner");
let mut other = Config::default();
other.workspace_dir = std::path::PathBuf::from("/tmp/ws-other");

*RUNTIME_SNAPSHOT_CACHE.lock() = Some(CachedRuntimeSnapshot {
snapshot: build_dummy_runtime_snapshot(),
fetched_at: Instant::now(),
config_key: owner.workspace_dir.clone(),
});

assert!(
fresh_cached_runtime_snapshot(&owner, 4).is_some(),
"a config reads back its own fresh snapshot"
);
assert!(
fresh_cached_runtime_snapshot(&other, 5).is_none(),
"a foreign config misses instead of serving the wrong runtime"
);
}

#[test]
Expand Down
61 changes: 54 additions & 7 deletions src/openhuman/channels/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,16 @@ impl EventHandler for ChannelInboundSubscriber {
}
}
_ = edit_timer.tick() => {
if streaming_state.thinking_dirty && !streaming_state.thinking_edit_disabled {
flush_thinking_message(channel, &mut streaming_state).await;
}
if streaming_state.dirty && !streaming_state.edit_disabled {
flush_streaming_edit(channel, &mut streaming_state).await;
// Progressive draft/thinking bubbles require edit+delete
// support; skip them on channels that lack it (Discord) so
// they don't leave un-cleanable placeholder messages.
if channel_supports_progressive_ui(channel) {
if streaming_state.thinking_dirty && !streaming_state.thinking_edit_disabled {
flush_thinking_message(channel, &mut streaming_state).await;
}
if streaming_state.dirty && !streaming_state.edit_disabled {
flush_streaming_edit(channel, &mut streaming_state).await;
}
}
}
_ = typing_timer.tick() => {
Expand All @@ -258,7 +263,9 @@ impl EventHandler for ChannelInboundSubscriber {
}
}
_ = filler_timer.tick() => {
if !streaming_state.filler_disabled {
// Fillers ("💭 Still working on it…") are ephemeral and
// deleted on finalize — only post them where cleanup works.
if channel_supports_progressive_ui(channel) && !streaming_state.filler_disabled {
send_filler_message(channel, &mut streaming_state).await;
}
}
Expand Down Expand Up @@ -297,6 +304,28 @@ const MAX_TYPING_FAILURES: u32 = 2;
/// on finalization alongside the ephemeral thinking bubble.
const FILLER_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(13);

/// Whether a channel supports the progressive-UI placeholders — the
/// evolving draft bubble, the rotating "💭" fillers, and the ephemeral
/// "thinking" bubble. All three rely on the backend supporting **both**
/// message *edit* and *delete*: edit keeps a single bubble evolving in
/// place, delete removes it once the final reply lands. Telegram supports
/// both. Discord's adapter supports **neither** (edits 404, delete is a
/// hard `Delete not supported` stub), so every placeholder becomes a
/// permanent, un-editable, un-deletable message — the channel fills with
/// "💭 Still working on it…" bubbles.
///
/// This is an **allowlist**, not a denylist: only channels confirmed to
/// support edit+delete opt in. A new/unknown adapter therefore fails *safe*
/// (placeholders suppressed) rather than silently re-introducing the spam bug
/// this gate was added to fix.
fn channel_supports_progressive_ui(channel: &str) -> bool {
// Inbound channels arrive provider-prefixed from the socket layer
// (e.g. `discord:<guild>`, `tg:<chat>`), so compare the provider prefix,
// not the whole id — mirroring `channel_is_telegram`.
let provider = channel.split(':').next().unwrap_or(channel);
matches!(provider, "telegram" | "tg")
}

/// Maximum consecutive filler-send failures before we stop trying.
/// Same rationale as the thinking/typing latches.
const MAX_FILLER_FAILURES: u32 = 2;
Expand Down Expand Up @@ -1042,7 +1071,25 @@ fn channel_is_telegram(channel: &str) -> bool {

#[cfg(test)]
mod inbound_thread_id_tests {
use super::{derive_inbound_client_id, derive_inbound_thread_id};
use super::{
channel_supports_progressive_ui, derive_inbound_client_id, derive_inbound_thread_id,
};

#[test]
fn progressive_ui_is_an_allowlist_failing_safe_for_unknown_channels() {
// Only edit+delete-capable providers opt in. Telegram supports both;
// everything else (Discord's stub delete / 404 edits, and any new or
// unknown adapter) is suppressed so the "💭" spam can't reappear.
assert!(channel_supports_progressive_ui("telegram"));
assert!(channel_supports_progressive_ui("tg"));
// Inbound channels arrive provider-prefixed — the prefix must still match.
assert!(channel_supports_progressive_ui("tg:12345"));
assert!(!channel_supports_progressive_ui("discord"));
assert!(!channel_supports_progressive_ui("discord:guild-1"));
// Unknown/new adapters fail safe (allowlist, not denylist).
assert!(!channel_supports_progressive_ui("slack"));
assert!(!channel_supports_progressive_ui("whatsapp:123"));
}

#[test]
fn socket_inbound_client_id_keys_per_sender() {
Expand Down
25 changes: 24 additions & 1 deletion src/openhuman/channels/providers/telegram/channel_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::channel_types::{
TelegramChannel, TelegramUpdateWindow, TELEGRAM_RECENT_UPDATE_CACHE_SIZE,
};
use super::text::TELEGRAM_BIND_COMMAND;
use super::text::{TELEGRAM_BIND_COMMAND, TELEGRAM_START_COMMAND};
use crate::openhuman::config::{Config, StreamMode};
use crate::openhuman::security::pairing::PairingGuard;
use anyhow::Context;
Expand Down Expand Up @@ -123,6 +123,14 @@ impl TelegramChannel {
format!("{}/bot{}/{method}", self.api_base, self.bot_token)
}

/// Point outbound Telegram API calls at `base` (test-only seam). Used to
/// aim `send()` at a dead local port so onboarding tests exercise the
/// decision logic without reaching api.telegram.org.
#[cfg(test)]
pub(crate) fn set_api_base_for_tests(&mut self, base: impl Into<String>) {
self.api_base = base.into();
}

pub(crate) fn pairing_code_active(&self) -> bool {
self.pairing
.as_ref()
Expand All @@ -140,6 +148,21 @@ impl TelegramChannel {
parts.next().map(str::trim).filter(|code| !code.is_empty())
}

/// Whether `text` is the standard Telegram `/start` bot-onboarding command
/// (optionally addressed as `/start@botname`, with or without a payload).
///
/// On the self-bot-token path this is the operator's explicit "I'm setting up
/// my bot" signal: the first `/start` while pairing is still pending pairs the
/// sender (see `handle_unauthorized_message`), matching the "first sender after
/// /start" behaviour sanctioned by openhuman#4381.
pub(crate) fn is_start_command(text: &str) -> bool {
let Some(command) = text.split_whitespace().next() else {
return false;
};
let base_command = command.split('@').next().unwrap_or(command);
base_command == TELEGRAM_START_COMMAND
}

pub(crate) fn track_update_id(&self, update_id: i64) -> bool {
let mut window = self.recent_updates.lock();
if window.recent_lookup.contains(&update_id) {
Expand Down
Loading
Loading