Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions src/openhuman/channels/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,16 @@ impl EventHandler for ChannelInboundSubscriber {
}
}
_ = edit_timer.tick() => {
if streaming_state.thinking_dirty && !streaming_state.thinking_edit_disabled {
flush_thinking_message(channel, &mut streaming_state).await;
}
if streaming_state.dirty && !streaming_state.edit_disabled {
flush_streaming_edit(channel, &mut streaming_state).await;
// Progressive draft/thinking bubbles require edit+delete
// support; skip them on channels that lack it (Discord) so
// they don't leave un-cleanable placeholder messages.
if channel_supports_progressive_ui(channel) {
if streaming_state.thinking_dirty && !streaming_state.thinking_edit_disabled {
flush_thinking_message(channel, &mut streaming_state).await;
}
if streaming_state.dirty && !streaming_state.edit_disabled {
flush_streaming_edit(channel, &mut streaming_state).await;
}
}
}
_ = typing_timer.tick() => {
Expand All @@ -258,7 +263,9 @@ impl EventHandler for ChannelInboundSubscriber {
}
}
_ = filler_timer.tick() => {
if !streaming_state.filler_disabled {
// Fillers ("💭 Still working on it…") are ephemeral and
// deleted on finalize — only post them where cleanup works.
if channel_supports_progressive_ui(channel) && !streaming_state.filler_disabled {
send_filler_message(channel, &mut streaming_state).await;
}
}
Expand Down Expand Up @@ -297,6 +304,25 @@ const MAX_TYPING_FAILURES: u32 = 2;
/// on finalization alongside the ephemeral thinking bubble.
const FILLER_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(13);

/// Whether a channel supports the progressive-UI placeholders — the
/// evolving draft bubble, the rotating "💭" fillers, and the ephemeral
/// "thinking" bubble. All three rely on the backend supporting **both**
/// message *edit* and *delete*: edit keeps a single bubble evolving in
/// place, delete removes it once the final reply lands. Telegram supports
/// both. Discord's adapter supports **neither** (edits 404, delete is a
/// hard `Delete not supported` stub), so every placeholder becomes a
/// permanent, un-editable, un-deletable message — the channel fills with
/// "💭 Still working on it…" bubbles. For such channels we suppress the
/// placeholders entirely and rely on the typing indicator plus the final
/// atomic reply, so the user sees a clean single answer.
fn channel_supports_progressive_ui(channel: &str) -> bool {
// Inbound channels arrive provider-prefixed from the socket layer
// (e.g. `discord:<guild>`, `tg:<chat>`), so compare the provider prefix,
// not the whole id — mirroring `channel_is_telegram`.
let provider = channel.split(':').next().unwrap_or(channel);
provider != "discord"
}

/// Maximum consecutive filler-send failures before we stop trying.
/// Same rationale as the thinking/typing latches.
const MAX_FILLER_FAILURES: u32 = 2;
Expand Down Expand Up @@ -1042,7 +1068,23 @@ fn channel_is_telegram(channel: &str) -> bool {

#[cfg(test)]
mod inbound_thread_id_tests {
use super::{derive_inbound_client_id, derive_inbound_thread_id};
use super::{
channel_supports_progressive_ui, derive_inbound_client_id, derive_inbound_thread_id,
};

#[test]
fn discord_suppresses_progressive_ui_other_channels_keep_it() {
// Discord's adapter supports neither edit nor delete, so evolving
// drafts / "💭" fillers / thinking bubbles would become permanent
// un-cleanable spam — suppress them there. Channels with edit+delete
// (Telegram) keep the progressive UI.
assert!(!channel_supports_progressive_ui("discord"));
// Inbound channels arrive provider-prefixed — the prefix must still match.
assert!(!channel_supports_progressive_ui("discord:guild-1"));
assert!(channel_supports_progressive_ui("telegram"));
assert!(channel_supports_progressive_ui("tg"));
assert!(channel_supports_progressive_ui("tg:12345"));
}

#[test]
fn socket_inbound_client_id_keys_per_sender() {
Expand Down
17 changes: 16 additions & 1 deletion src/openhuman/channels/providers/telegram/channel_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::channel_types::{
TelegramChannel, TelegramUpdateWindow, TELEGRAM_RECENT_UPDATE_CACHE_SIZE,
};
use super::text::TELEGRAM_BIND_COMMAND;
use super::text::{TELEGRAM_BIND_COMMAND, TELEGRAM_START_COMMAND};
use crate::openhuman::config::{Config, StreamMode};
use crate::openhuman::security::pairing::PairingGuard;
use anyhow::Context;
Expand Down Expand Up @@ -140,6 +140,21 @@ impl TelegramChannel {
parts.next().map(str::trim).filter(|code| !code.is_empty())
}

/// Whether `text` is the standard Telegram `/start` bot-onboarding command
/// (optionally addressed as `/start@botname`, with or without a payload).
///
/// On the self-bot-token path this is the operator's explicit "I'm setting up
/// my bot" signal: the first `/start` while pairing is still pending pairs the
/// sender (see `handle_unauthorized_message`), matching the "first sender after
/// /start" behaviour sanctioned by openhuman#4381.
pub(crate) fn is_start_command(text: &str) -> bool {
let Some(command) = text.split_whitespace().next() else {
return false;
};
let base_command = command.split('@').next().unwrap_or(command);
base_command == TELEGRAM_START_COMMAND
}

pub(crate) fn track_update_id(&self, update_id: i64) -> bool {
let mut window = self.recent_updates.lock();
if window.recent_lookup.contains(&update_id) {
Expand Down
196 changes: 140 additions & 56 deletions src/openhuman/channels/providers/telegram/channel_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,16 @@ impl TelegramChannel {
/// Legitimate first-run pairing (`allowed_users=[]` at construction) always sets
/// `pairing = Some(...)` so it is never suppressed here.
pub(crate) fn is_race_condition_instance(&self) -> bool {
let runtime_empty = self
.allowed_users
self.allowlist_is_empty() && self.pairing.is_none()
}

/// Whether the runtime allowlist currently has no entries. A poisoned lock is
/// treated as non-empty (fail-closed) so we never widen access on a lock error.
pub(crate) fn allowlist_is_empty(&self) -> bool {
self.allowed_users
.read()
.map(|users| users.is_empty())
.unwrap_or(false);
runtime_empty && self.pairing.is_none()
.unwrap_or(false)
}

/// Build the de-bounce key for approval prompts: `"{chat_id}:{sender}"`.
Expand Down Expand Up @@ -308,55 +312,78 @@ impl TelegramChannel {
return;
}

// ── First-run onboarding: `/start` pairs the operator ────────────────────
// On the self-bot-token path a blank allowlist arms `pairing = Some(..)` (a
// fresh bot is world-reachable by @username, so we must not allow-all like
// Discord). The one-time bind code, however, is only printed to core stdout
// and is invisible to a desktop operator — leaving the gate un-openable and
// every message stuck on the approval prompt (openhuman#4381).
//
// The operator's first `/start` is their explicit "I'm setting up my bot"
// signal. While pairing is still pending we treat that sender as the owner,
// add them to the allowlist, and let their subsequent messages reach the
// agent — matching the "first sender after /start" behaviour the issue
// sanctions. The guard is tight: `pairing.is_some()` excludes an
// explicitly-configured allowlist, and `allowlist_is_empty()` restricts
// onboarding to the genuine first sender — once the operator is bound the
// list is non-empty, so a later stranger's `/start` falls through to the
// normal approval prompt instead of being auto-approved.
if self.pairing.is_some()
&& self.allowlist_is_empty()
&& text.map(Self::is_start_command).unwrap_or(false)
{
match Self::bindable_identity(&normalized_username, normalized_sender_id.as_deref()) {
Some(identity) => {
tracing::info!(
chat_id,
identity,
"[telegram][approval] /start onboarding: pairing first sender as operator"
);
self.approve_and_persist_sender(&identity, &chat_id).await;
Comment thread
YellowSnnowmann marked this conversation as resolved.
// Finish the one-time pairing flow: the operator is bound
// via /start rather than /bind <code>, so consume the code
// here too — otherwise the stdout code stays live and a
// later sender who obtains it could still /bind themselves.
if let Some(pairing) = self.pairing.as_ref() {
pairing.invalidate_code();
}
}
None => {
let _ = self
.send(&SendMessage::new(
"❌ Could not identify your Telegram account from /start. Ensure your account has a username or stable user ID, then try again.",
&chat_id,
))
.await;
}
}
return;
}

if let Some(code) = text.and_then(Self::extract_bind_code) {
if let Some(pairing) = self.pairing.as_ref() {
match pairing.try_pair(code).await {
Ok(Some(_token)) => {
let bind_identity = normalized_sender_id.clone().or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
match Self::bindable_identity(
&normalized_username,
normalized_sender_id.as_deref(),
) {
Some(identity) => {
tracing::info!(
chat_id,
identity,
"[telegram][approval] paired via bind code and allowlisted identity"
);
self.approve_and_persist_sender(&identity, &chat_id).await;
}
});

if let Some(identity) = bind_identity {
self.add_allowed_identity_runtime(&identity);
match self.persist_allowed_identity(&identity).await {
Ok(()) => {
let _ = self
.send(&SendMessage::new(
"✅ Telegram account bound successfully. You can talk to OpenHuman now.",
&chat_id,
))
.await;
tracing::info!(
chat_id,
identity,
"[telegram][approval] paired and allowlisted identity"
);
}
Err(e) => {
tracing::error!(
chat_id,
error = %e,
"[telegram][approval] failed to persist allowlist after bind"
);
let _ = self
.send(&SendMessage::new(
"⚠️ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.",
&chat_id,
))
.await;
}
None => {
let _ = self
.send(&SendMessage::new(
"❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.",
&chat_id,
))
.await;
}
} else {
let _ = self
.send(&SendMessage::new(
"❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.",
&chat_id,
))
.await;
}
}
Ok(None) => {
Expand Down Expand Up @@ -411,28 +438,85 @@ impl TelegramChannel {
Allowlist Telegram username (without '@') or numeric user ID."
);

let _ = self
.send(&SendMessage::new(
"🔐 This bot requires operator approval.\n\nAsk the operator to approve the pairing in the web UI, then send your message again.".to_string(),
&chat_id,
))
.await;

// Copy depends on whether first-run pairing is armed. In pairing mode the
// operator unlocks the bot by sending `/start` (or `/bind <code>` if they
// have the code from the app); there is no "approve in the web UI" action for
// the self-bot-token path, so we must not point the user at one (openhuman#4381).
if self.pairing_code_active() {
tracing::debug!(
chat_id,
sender = sender_key,
"[telegram][approval] pairing code active — sending /bind hint"
"[telegram][approval] pairing pending — sending /start onboarding prompt"
);
let _ = self
.send(&SendMessage::new(
"ℹ️ If operator provides a one-time pairing code, you can also run `/bind <code>`.",
"🔐 This bot isn't set up yet.\n\nIf you're the operator, send /start to finish connecting your bot. \
Otherwise ask the operator to add your Telegram username (without '@') or numeric user ID to the bot's Allowed Users, then message again.\n\n\
If the operator gave you a one-time pairing code, run `/bind <code>`.".to_string(),
&chat_id,
))
.await;
} else {
let _ = self
.send(&SendMessage::new(
"🔐 This bot requires operator approval.\n\nAsk the operator to add your Telegram username (without '@') or numeric user ID to the bot's Allowed Users, then send your message again.".to_string(),
&chat_id,
))
.await;
}
}

/// Resolve a stable identity to allowlist for a sender: prefer the numeric user
/// ID (immutable), fall back to a real username. Returns `None` when the sender
/// has neither (`normalized_username` empty or the `"unknown"` sentinel and no id).
pub(crate) fn bindable_identity(
normalized_username: &str,
normalized_sender_id: Option<&str>,
) -> Option<String> {
if let Some(id) = normalized_sender_id.filter(|id| !id.is_empty()) {
return Some(id.to_string());
}
if normalized_username.is_empty() || normalized_username == "unknown" {
return None;
}
Some(normalized_username.to_string())
}

/// Add `identity` to the allowlist (runtime + persisted config) and acknowledge
/// to the chat. Shared by the `/start` onboarding and `/bind <code>` paths so
/// both stay in lock-step on persistence and messaging.
pub(crate) async fn approve_and_persist_sender(&self, identity: &str, chat_id: &str) {
self.add_allowed_identity_runtime(identity);
match self.persist_allowed_identity(identity).await {
Ok(()) => {
let _ = self
.send(&SendMessage::new(
"✅ You're all set — OpenHuman is connected. Send me a message and I'll take it from here.",
chat_id,
))
.await;
tracing::info!(
chat_id,
identity,
"[telegram][approval] allowlisted identity (runtime + persisted)"
);
}
Err(e) => {
tracing::error!(
chat_id,
error = %e,
"[telegram][approval] failed to persist allowlist after approval"
);
let _ = self
.send(&SendMessage::new(
"⚠️ Connected for now, but I couldn't save it — access may be lost after a restart. Check the config file permissions.",
chat_id,
))
.await;
}
}
}

pub(crate) fn is_supported_unauthorized_message(message: &serde_json::Value) -> bool {
message
.get("text")
Expand Down
Loading
Loading