diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 7794451970..a2eb9627cd 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -1697,6 +1697,18 @@ const Conversations = ({ const shouldRenderTimelineBeforeLatestAgentMessage = selectedThreadToolTimeline.length > 0 && !isSending && Boolean(latestVisibleAgentMessage); + // Live agent activity that must stay visible even before the thread's + // message history has loaded: an in-flight turn, recorded tool steps, a + // processing transcript, or streamed prose. Without this, switching to a + // thread mid-turn rendered a blank pane (the message list is gated on + // `hasVisibleMessages`) until `loadThreadMessages` resolved — tool calls and + // streaming output silently invisible despite landing in Redux. + const hasLiveAgentActivity = + isSending || + selectedThreadToolTimeline.length > 0 || + selectedThreadProcessing.length > 0 || + Boolean(selectedStreamingAssistant); + // Anchor the "Agentic task insights" panel right after the latest turn's user // message — processing happens *before* the answer, so it reads above the // result (for both the live streaming preview and the settled agent bubbles). @@ -1815,7 +1827,12 @@ const Conversations = ({ // centered composer; the moment the first message lands, hasVisibleMessages // flips true and this collapses back to the normal conversation layout. const isNewWindow = - !isSidebar && !isLoadingMessages && !messagesError && !hasVisibleMessages && !hasTaskBoard; + !isSidebar && + !isLoadingMessages && + !messagesError && + !hasVisibleMessages && + !hasTaskBoard && + !hasLiveAgentActivity; // Track the floating composer footer's height so the message list can reserve // matching bottom padding. In the page variant the footer is absolutely @@ -2140,7 +2157,7 @@ const Conversations = ({ {t('common.reload')} - ) : hasVisibleMessages || hasTaskBoard ? ( + ) : hasVisibleMessages || hasTaskBoard || hasLiveAgentActivity ? (
entry.subagent); // For a scoped *non*-sub-agent step, the detail (args / output) to show. const scopedDetail = scopedEntry - ? (formatTimelineEntry(scopedEntry).detail ?? scopedEntry.argsBuffer) + ? (normalizeScopedBody(scopedEntry.result) ?? + normalizeScopedBody(formatTimelineEntry(scopedEntry).detail) ?? + normalizeScopedBody(scopedEntry.argsBuffer)) : undefined; return ( diff --git a/app/src/pages/conversations/components/ToolTimelineBlock.tsx b/app/src/pages/conversations/components/ToolTimelineBlock.tsx index 8ed00fa8ad..e655f7f2d1 100644 --- a/app/src/pages/conversations/components/ToolTimelineBlock.tsx +++ b/app/src/pages/conversations/components/ToolTimelineBlock.tsx @@ -447,11 +447,12 @@ export function ToolTimelineBlock({ normalizeToolBody(formatted.detail) ?? normalizeToolBody(entry.argsBuffer); const workerRef = parseWorkerThreadRef(formatted.detail ?? entry.detail); const subagent = entry.subagent; + const resultContent = normalizeToolBody(entry.result); // A subagent row should always render the expandable details so // its live activity is visible — even when there is no prompt // detail to show. Mirrors the rule that a non-subagent row only - // expands when it has detail content. - const expandable = detailContent != null || subagent != null; + // expands when it has detail content (or a result to show). + const expandable = detailContent != null || subagent != null || resultContent != null; const isLatestRunning = latestRunningEntryId != null && latestRunningEntryId === entry.id; const shouldAutoExpand = expandAllRows || isLatestRunning; const nameTone = agentNameTone(entry.status); @@ -470,19 +471,28 @@ export function ToolTimelineBlock({ // opens the full-run panel scoped to this step. A collapsed row // is backgrounded, so it never pulses — only the single active // (expanded) step blinks. Strip `animate-pulse` from the tone. - +
+ + {resultContent ? ( +
+                      {resultContent}
+                    
+ ) : null} +
) : expandable ? (
@@ -512,6 +522,16 @@ export function ToolTimelineBlock({ {detailContent} ) : null} + {resultContent ? ( + // What the tool returned (size-capped upstream). Scrolls + // inside its own box so a long result never floods the + // timeline. +
+                      {resultContent}
+                    
+ ) : null} {subagent ? ( { expect(screen.queryByText('whole-run narration')).toBeNull(); }); + it('prefers scoped tool result output over captured args/detail', () => { + const scoped: ToolTimelineEntry = { + id: 'tool-result-only', + name: 'run_code', + round: 1, + status: 'success', + argsBuffer: '{"command":"pnpm test"}', + result: 'exit 0\nAll checks passed.', + }; + renderPanel( + {}} /> + ); + expect(screen.getByText('Run Code')).toBeInTheDocument(); + expect(screen.getByText(/All checks passed/)).toBeInTheDocument(); + expect(screen.queryByText(/pnpm test/)).toBeNull(); + }); + it('renders no source rows when no web tools were used', () => { renderPanel( { expect(container.querySelector('[data-testid="agent-task-insights"]')).toBeNull(); }); + it('renders the tool result output inside the expanded row', () => { + const entries: ToolTimelineEntry[] = [ + { + id: 'd', + name: 'web_search', + round: 1, + status: 'success', + argsBuffer: '{"query":"f1"}', + result: 'Top result: https://openhuman.dev', + }, + ]; + renderInStore(); + const output = screen.getByTestId('tool-result-output'); + expect(output.textContent).toContain('Top result: https://openhuman.dev'); + }); + + it('makes a row expandable on a result alone and omits the block without one', () => { + const entries: ToolTimelineEntry[] = [ + // No argsBuffer / detail / subagent — the result is the only body. + { id: 'a', name: 'run_code', round: 1, status: 'success', result: 'exit 0' }, + { id: 'b', name: 'run_code', round: 2, status: 'success' }, + ]; + renderInStore(); + const outputs = screen.getAllByTestId('tool-result-output'); + expect(outputs).toHaveLength(1); + expect(outputs[0].textContent).toBe('exit 0'); + }); + it('renders the parent live response inside the panel under a Response heading', () => { const entries: ToolTimelineEntry[] = [ { id: 'r', name: 'web_search', round: 1, status: 'running', argsBuffer: '{"query":"f1"}' }, @@ -464,7 +492,14 @@ describe('ToolTimelineBlock — worker thread ref status propagation', () => { describe('ToolTimelineBlock — compact chat mode (onViewDetails)', () => { const entries: ToolTimelineEntry[] = [ // A finished step. - { id: 'tl-1', name: 'agent_prepare_context', round: 1, status: 'success', detail: 'fetch X' }, + { + id: 'tl-1', + name: 'agent_prepare_context', + round: 1, + status: 'success', + detail: 'fetch X', + result: 'Prepared context from 3 sources.', + }, // The currently-running sub-agent (latest running). { id: 'sa-1', @@ -492,6 +527,9 @@ describe('ToolTimelineBlock — compact chat mode (onViewDetails)', () => { // (its activity is visible) — and shows no "View details" link itself. const activity = screen.getByTestId('subagent-activity'); expect(activity.textContent).toContain('pondering'); + expect(screen.getByTestId('tool-result-output').textContent).toContain( + 'Prepared context from 3 sources.' + ); // Clicking the finished step's link opens the full-run panel. fireEvent.click(links[0]); diff --git a/app/src/providers/ChatRuntimeProvider.tsx b/app/src/providers/ChatRuntimeProvider.tsx index 11485af929..439b8eecaf 100644 --- a/app/src/providers/ChatRuntimeProvider.tsx +++ b/app/src/providers/ChatRuntimeProvider.tsx @@ -644,6 +644,12 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => { const nextEntries = [...existing]; let changed = false; + // The core forwards the (size-capped) tool result text on `output`; + // keep it on the row so the timeline can show what the tool + // returned. Older cores sent a metadata stub here — accept only + // non-empty payloads so a stub-less row stays `undefined`. + const result = event.output && event.output.length > 0 ? event.output : undefined; + if (event.tool_call_id) { const idx = nextEntries.findIndex(entry => entry.id === event.tool_call_id); if (idx >= 0) { @@ -651,6 +657,7 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => { ...nextEntries[idx], status: event.success ? 'success' : 'error', failure, + result, }; changed = true; } @@ -664,7 +671,12 @@ const ChatRuntimeProvider = ({ children }: { children: React.ReactNode }) => { entry.name === event.tool_name && entry.round === event.round ) { - nextEntries[i] = { ...entry, status: event.success ? 'success' : 'error', failure }; + nextEntries[i] = { + ...entry, + status: event.success ? 'success' : 'error', + failure, + result, + }; changed = true; break; } diff --git a/app/src/providers/__tests__/ChatRuntimeProvider.test.tsx b/app/src/providers/__tests__/ChatRuntimeProvider.test.tsx index cf6eb84ff8..a5ec9e92d7 100644 --- a/app/src/providers/__tests__/ChatRuntimeProvider.test.tsx +++ b/app/src/providers/__tests__/ChatRuntimeProvider.test.tsx @@ -163,6 +163,66 @@ describe('ChatRuntimeProvider — dedupe, proactive resolution, mid-turn invaria expect(timeline[0]?.status).toBe('running'); }); + it('attaches the tool_result output to the timeline row as its result', () => { + const listeners = renderProvider(); + + act(() => { + listeners.onToolCall?.({ + thread_id: 't-res', + request_id: 'r1', + round: 0, + tool_name: 'web_search', + skill_id: 'web_channel', + args: {}, + tool_call_id: 'call-res', + }); + listeners.onToolResult?.({ + thread_id: 't-res', + request_id: 'r1', + round: 0, + tool_name: 'web_search', + skill_id: 'web_channel', + output: 'Top hit: openhuman.dev', + success: true, + tool_call_id: 'call-res', + }); + }); + + const row = store.getState().chatRuntime.toolTimelineByThread['t-res']?.[0]; + expect(row?.status).toBe('success'); + expect(row?.result).toBe('Top hit: openhuman.dev'); + }); + + it('leaves result unset when the tool_result carries no output text', () => { + const listeners = renderProvider(); + + act(() => { + listeners.onToolCall?.({ + thread_id: 't-res2', + request_id: 'r1', + round: 0, + tool_name: 'web_search', + skill_id: 'web_channel', + args: {}, + tool_call_id: 'call-res2', + }); + listeners.onToolResult?.({ + thread_id: 't-res2', + request_id: 'r1', + round: 0, + tool_name: 'web_search', + skill_id: 'web_channel', + output: '', + success: true, + tool_call_id: 'call-res2', + }); + }); + + const row = store.getState().chatRuntime.toolTimelineByThread['t-res2']?.[0]; + expect(row?.status).toBe('success'); + expect(row?.result).toBeUndefined(); + }); + it('collapses a spawn_subagent tool-call row into the subagent row', () => { const listeners = renderProvider(); diff --git a/app/src/services/__tests__/chatService.artifactEvents.test.ts b/app/src/services/__tests__/chatService.artifactEvents.test.ts index b14fb9a79b..60b36bbdee 100644 --- a/app/src/services/__tests__/chatService.artifactEvents.test.ts +++ b/app/src/services/__tests__/chatService.artifactEvents.test.ts @@ -4,7 +4,9 @@ import { aiRegenerate, subscribeChatEvents } from '../chatService'; import { callCoreRpc } from '../coreRpcClient'; import { socketService } from '../socketService'; -vi.mock('../socketService', () => ({ socketService: { getSocket: vi.fn() } })); +vi.mock('../socketService', () => ({ + socketService: { getSocket: vi.fn(), on: vi.fn(), off: vi.fn() }, +})); vi.mock('../coreRpcClient', () => ({ callCoreRpc: vi.fn() })); type Handler = (...args: unknown[]) => void; @@ -31,6 +33,12 @@ function createMockSocket() { return { id: 'socket-1', on, off, emit }; } +function bindMockSocket(socket: ReturnType) { + vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + vi.mocked(socketService.on).mockImplementation((event, cb) => socket.on(event, cb as Handler)); + vi.mocked(socketService.off).mockImplementation((event, cb) => socket.off(event, cb as Handler)); +} + describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () => { beforeEach(() => { vi.clearAllMocks(); @@ -38,7 +46,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('subscribes to artifact events under canonical snake_case names', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); subscribeChatEvents({ onArtifactReady: () => {}, onArtifactFailed: () => {} }); @@ -48,7 +56,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('flattens the wire envelope into a typed ArtifactReadyEvent', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); subscribeChatEvents({ onArtifactReady }); @@ -81,7 +89,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('drops an artifact_ready payload missing required fields', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); subscribeChatEvents({ onArtifactReady }); @@ -109,7 +117,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('rejects artifact_ready with an unknown kind (not in the allowlist)', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); subscribeChatEvents({ onArtifactReady }); @@ -131,7 +139,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('accepts every allowlisted kind for artifact_ready', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); subscribeChatEvents({ onArtifactReady }); @@ -161,7 +169,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('flattens artifact_failed into a typed ArtifactFailedEvent', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactFailed = vi.fn(); subscribeChatEvents({ onArtifactFailed }); @@ -192,7 +200,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('drops an artifact_failed payload missing required fields', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactFailed = vi.fn(); subscribeChatEvents({ onArtifactFailed }); @@ -218,7 +226,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('rejects artifact_failed with an unknown kind', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactFailed = vi.fn(); subscribeChatEvents({ onArtifactFailed }); @@ -234,7 +242,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('preserves the full error string on the dispatched event even when huge', () => { // The handler caps only the LOG line, not the dispatched payload. const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactFailed = vi.fn(); subscribeChatEvents({ onArtifactFailed }); @@ -259,7 +267,7 @@ describe('chatService — artifact_ready / artifact_failed handlers (#2779)', () it('removes both artifact handlers on cleanup', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const cleanup = subscribeChatEvents({ onArtifactReady: () => {}, onArtifactFailed: () => {} }); cleanup(); @@ -276,7 +284,7 @@ describe('chatService — artifact_pending handler (#3162)', () => { it('subscribes to artifact_pending under its canonical snake_case name', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); subscribeChatEvents({ onArtifactPending: () => {} }); @@ -286,7 +294,7 @@ describe('chatService — artifact_pending handler (#3162)', () => { it('flattens the wire envelope into a typed ArtifactPendingEvent', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactPending = vi.fn(); subscribeChatEvents({ onArtifactPending }); @@ -317,7 +325,7 @@ describe('chatService — artifact_pending handler (#3162)', () => { it('drops a pending payload missing required args', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactPending = vi.fn(); subscribeChatEvents({ onArtifactPending }); diff --git a/app/src/services/__tests__/chatService.subagentDelta.test.ts b/app/src/services/__tests__/chatService.subagentDelta.test.ts index c94aca5c5c..a5add3655b 100644 --- a/app/src/services/__tests__/chatService.subagentDelta.test.ts +++ b/app/src/services/__tests__/chatService.subagentDelta.test.ts @@ -17,7 +17,7 @@ function fakeSocket() { on: vi.fn((event: string, cb: (payload: unknown) => void) => { handlers.set(event, cb); }), - off: vi.fn((event: string) => { + off: vi.fn((event: string, _cb?: (payload: unknown) => void) => { handlers.delete(event); }), emit: (event: string, payload: unknown) => handlers.get(event)?.(payload), @@ -33,6 +33,12 @@ describe('subscribeChatEvents — subagent delta events', () => { vi.spyOn(socketService, 'getSocket').mockReturnValue( socket as unknown as ReturnType ); + vi.spyOn(socketService, 'on').mockImplementation((event, cb) => + socket.on(event, cb as (payload: unknown) => void) + ); + vi.spyOn(socketService, 'off').mockImplementation((event, cb) => + socket.off(event, cb as (payload: unknown) => void) + ); }); afterEach(() => { @@ -78,10 +84,12 @@ describe('subscribeChatEvents — subagent delta events', () => { expect(socket.has('subagent_thinking_delta')).toBe(false); }); - it('returns a no-op unsubscribe when there is no socket', () => { + it('does not require a raw socket when the socketService wrapper handles subscription', () => { vi.spyOn(socketService, 'getSocket').mockReturnValue( null as unknown as ReturnType ); + vi.spyOn(socketService, 'on').mockImplementation(() => {}); + vi.spyOn(socketService, 'off').mockImplementation(() => {}); expect(() => subscribeChatEvents({ onSubagentTextDelta: vi.fn() })()).not.toThrow(); }); }); diff --git a/app/src/services/__tests__/chatService.test.ts b/app/src/services/__tests__/chatService.test.ts index 93c30cc771..4ab2041217 100644 --- a/app/src/services/__tests__/chatService.test.ts +++ b/app/src/services/__tests__/chatService.test.ts @@ -5,7 +5,9 @@ import { socketService } from '../socketService'; const mockCallCoreRpc = vi.fn(); -vi.mock('../socketService', () => ({ socketService: { getSocket: vi.fn() } })); +vi.mock('../socketService', () => ({ + socketService: { getSocket: vi.fn(), on: vi.fn(), off: vi.fn() }, +})); vi.mock('../coreRpcClient', () => ({ callCoreRpc: (...args: unknown[]) => mockCallCoreRpc(...args), })); @@ -35,6 +37,12 @@ function createMockSocket() { return { id: 'socket-1', on, off, emit }; } +function bindMockSocket(socket: ReturnType) { + vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + vi.mocked(socketService.on).mockImplementation((event, cb) => socket.on(event, cb as Handler)); + vi.mocked(socketService.off).mockImplementation((event, cb) => socket.off(event, cb as Handler)); +} + describe('chatService.subscribeChatEvents', () => { beforeEach(() => { vi.clearAllMocks(); @@ -43,7 +51,7 @@ describe('chatService.subscribeChatEvents', () => { it('subscribes to canonical snake_case chat events only', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); subscribeChatEvents({ onToolCall: () => {}, @@ -70,7 +78,7 @@ describe('chatService.subscribeChatEvents', () => { it('forwards inference_heartbeat through onInferenceHeartbeat (#4270)', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onInferenceHeartbeat = vi.fn(); subscribeChatEvents({ onInferenceHeartbeat }); @@ -86,7 +94,7 @@ describe('chatService.subscribeChatEvents', () => { it('does not process alias events when only canonical subscriptions are active', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onDone = vi.fn(); subscribeChatEvents({ onDone }); @@ -105,7 +113,7 @@ describe('chatService.subscribeChatEvents', () => { // refactor renames a socket event. it('subscribes and forwards live subagent events under canonical names', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onSubagentSpawned = vi.fn(); const onSubagentDone = vi.fn(); @@ -192,7 +200,7 @@ describe('chatService.subscribeChatEvents', () => { it('removes all handlers on cleanup', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const cleanup = subscribeChatEvents({ onToolCall: () => {}, onDone: () => {} }); cleanup(); @@ -203,7 +211,7 @@ describe('chatService.subscribeChatEvents', () => { it('subscribes and forwards task board updates', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onTaskBoardUpdated = vi.fn(); subscribeChatEvents({ onTaskBoardUpdated }); @@ -224,7 +232,7 @@ describe('chatService.subscribeChatEvents', () => { it('drops malformed artifact_ready payloads without crashing', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); const onArtifactFailed = vi.fn(); @@ -331,7 +339,7 @@ describe('chatService.subscribeChatEvents', () => { // `onArtifactFailed` listener is wired but never fired. it('forwards a well-formed artifact_failed payload through onArtifactFailed', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactFailed = vi.fn(); subscribeChatEvents({ onArtifactFailed }); @@ -367,7 +375,7 @@ describe('chatService.subscribeChatEvents', () => { // fires for the failed path. it('drops artifact_ready / artifact_failed envelopes with non-string thread_id', () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); const onArtifactReady = vi.fn(); const onArtifactFailed = vi.fn(); @@ -392,7 +400,7 @@ describe('chatService.subscribeChatEvents', () => { it('sends chat payload with consistent optional RPC params', async () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); await chatSend({ threadId: 'thread-1', message: 'hello' }); @@ -410,7 +418,7 @@ describe('chatService.subscribeChatEvents', () => { it('forwards speak_reply, source, session_id when provided', async () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); await chatSend({ threadId: 'thread-1', @@ -435,7 +443,7 @@ describe('chatService.subscribeChatEvents', () => { it('does not include the new fields when omitted', async () => { const socket = createMockSocket(); - vi.mocked(socketService.getSocket).mockReturnValue(socket as never); + bindMockSocket(socket); await chatSend({ threadId: 'thread-1', message: 'hi' }); const params = mockCallCoreRpc.mock.calls[0][0].params; diff --git a/app/src/services/chatService.ts b/app/src/services/chatService.ts index 2cdf1e02ec..f3e43c7a1f 100644 --- a/app/src/services/chatService.ts +++ b/app/src/services/chatService.ts @@ -547,8 +547,16 @@ export interface ChatEventListeners { } export function subscribeChatEvents(listeners: ChatEventListeners): () => void { - const socket = socketService.getSocket(); - if (!socket) return () => {}; + // Register through the socketService wrapper (not the raw socket instance) + // so chat listeners get the same lifecycle guarantees as every other + // subscription: queued while the socket is still connecting (the raw-socket + // path silently no-opped when `getSocket()` was null, dropping the whole + // chat event stream until the next re-subscribe) and re-attached when the + // service flushes pending listeners on (re)connect. + const socket = { + on: (event: string, cb: (...args: unknown[]) => void) => socketService.on(event, cb), + off: (event: string, cb: (...args: unknown[]) => void) => socketService.off(event, cb), + }; const handlers: Array<[string, (...args: unknown[]) => void]> = []; // Canonical convention for web-channel events is snake_case. diff --git a/app/src/store/chatRuntimeSlice.test.ts b/app/src/store/chatRuntimeSlice.test.ts index ab34750881..b41ff78290 100644 --- a/app/src/store/chatRuntimeSlice.test.ts +++ b/app/src/store/chatRuntimeSlice.test.ts @@ -4,15 +4,18 @@ import { describe, expect, it } from 'vitest'; import type { AgentRun, AgentRunStatus, PersistedTurnState } from '../types/turnState'; import chatRuntimeReducer, { appendProcessingProse, + beginInferenceTurn, clearAllChatRuntime, clearQueueStatusForThread, clearRuntimeForThread, hydrateRuntimeFromRunLedger, hydrateRuntimeFromSnapshot, hydrateThreadUsage, + markInferenceTurnStreaming, type QueueStatus, recordChatTurnUsage, resetSessionTokenUsage, + setPendingApprovalForThread, setQueueStatusForThread, setToolTimelineForThread, } from './chatRuntimeSlice'; @@ -400,6 +403,37 @@ describe('chatRuntimeSlice queue status', () => { expect(byId['subagent:sub-completed']).toBe('success'); }); + it('does not duplicate a live subagent row whose taskId is already on screen', () => { + const store = makeStore(); + // Live row created by the socket path — a different entry id scheme than + // the ledger's `subagent:`, but the same delegation taskId. + store.dispatch( + setToolTimelineForThread({ + threadId: 't-dup', + entries: [ + { + id: 't-dup:subagent:run-1:spawn_subagent', + name: 'subagent:tinyplace_agent', + round: 1, + status: 'running', + subagent: { taskId: 'run-1', agentId: 'tinyplace_agent', toolCalls: [] }, + }, + ], + }) + ); + store.dispatch( + hydrateRuntimeFromRunLedger({ + threadId: 't-dup', + runs: [makeRun('run-1', 'running'), makeRun('run-2', 'completed')], + }) + ); + const timeline = store.getState().chatRuntime.toolTimelineByThread['t-dup']; + // run-1 is already represented by the live row; only run-2 is added. + expect(timeline).toHaveLength(2); + expect(timeline.filter(e => e.subagent?.taskId === 'run-1')).toHaveLength(1); + expect(timeline.some(e => e.id === 'subagent:run-2')).toBe(true); + }); + it('settles the parent row but preserves an awaiting_user subagent on interrupt', () => { const store = makeStore(); store.dispatch( @@ -579,3 +613,123 @@ describe('hydrateRuntimeFromSnapshot — sub-agent prose persistence', () => { expect(text.kind === 'text' ? text.text : undefined).toBe('here is the summary'); }); }); + +describe('hydrateRuntimeFromSnapshot — live-driver guard', () => { + function makeStreamingSnapshot(threadId: string): PersistedTurnState { + return { + threadId, + requestId: 'req-stale', + lifecycle: 'streaming', + iteration: 1, + maxIterations: 10, + streamingText: '', + thinking: '', + // Flush-boundary snapshot: one lonely row, behind the live state. + toolTimeline: [{ id: 'c-old', name: 'web_search', round: 1, status: 'running' }], + taskBoard: { + threadId, + cards: [ + { + id: 'card-1', + title: 'Do the thing', + status: 'in_progress', + order: 0, + updatedAt: '2026-06-23T00:00:00Z', + }, + ], + updatedAt: '2026-06-23T00:00:00Z', + }, + startedAt: '2026-06-23T00:00:00Z', + updatedAt: '2026-06-23T00:00:00Z', + }; + } + + it('does not clobber a thread with a live in-flight turn (tab-switch case)', () => { + const store = makeStore(); + // Live driver: the user sent, events are streaming into Redux. + store.dispatch(beginInferenceTurn({ threadId: 't-live' })); + store.dispatch(markInferenceTurnStreaming({ threadId: 't-live' })); + store.dispatch( + setToolTimelineForThread({ + threadId: 't-live', + entries: [ + { id: 'c1', name: 'web_search', round: 1, status: 'success', result: 'found 3 hits' }, + { id: 'c2', name: 'read_file', round: 2, status: 'running' }, + ], + }) + ); + store.dispatch( + setPendingApprovalForThread({ + threadId: 't-live', + approval: { requestId: 'req-live', toolName: 'shell', message: 'Run `ls`?' }, + }) + ); + + // A thread-switch hydration lands with a stale flush-boundary snapshot. + store.dispatch(hydrateRuntimeFromSnapshot({ snapshot: makeStreamingSnapshot('t-live') })); + + const state = store.getState().chatRuntime; + // The richer live timeline (2 rows, with a result) survives untouched… + expect(state.toolTimelineByThread['t-live'].map(e => e.id)).toEqual(['c1', 'c2']); + expect(state.toolTimelineByThread['t-live'][0].result).toBe('found 3 hits'); + // …the pending approval card is not wiped mid-turn… + expect(state.pendingApprovalByThread['t-live']?.requestId).toBe('req-live'); + // …and the lifecycle stays live. + expect(state.inferenceTurnLifecycleByThread['t-live']).toBe('streaming'); + // The task board (monotonic, cheap) is still applied. + expect(state.taskBoardByThread['t-live']?.cards[0]?.id).toBe('card-1'); + }); + + it('applies the snapshot when there is no live driver (cold boot / new window)', () => { + const store = makeStore(); + store.dispatch(hydrateRuntimeFromSnapshot({ snapshot: makeStreamingSnapshot('t-cold') })); + const state = store.getState().chatRuntime; + expect(state.toolTimelineByThread['t-cold'].map(e => e.id)).toEqual(['c-old']); + expect(state.inferenceTurnLifecycleByThread['t-cold']).toBe('streaming'); + }); +}); + +describe('hydrateRuntimeFromSnapshot — persisted tool result output', () => { + it('maps the persisted output onto parent and sub-agent rows as result', () => { + const store = makeStore(); + const snapshot: PersistedTurnState = { + threadId: 't-out', + requestId: 'req-1', + lifecycle: 'completed', + iteration: 2, + maxIterations: 10, + streamingText: '', + thinking: '', + toolTimeline: [ + { + id: 'c1', + name: 'web_search', + round: 1, + status: 'success', + output: 'top hit: openhuman.dev', + }, + { + id: 'subagent:task-z', + name: 'subagent:researcher', + round: 2, + status: 'success', + subagent: { + taskId: 'task-z', + agentId: 'researcher', + toolCalls: [ + { callId: 'cc1', toolName: 'read_file', status: 'success', output: 'file body' }, + ], + }, + }, + ], + startedAt: '2026-06-23T00:00:00Z', + updatedAt: '2026-06-23T00:00:00Z', + }; + + store.dispatch(hydrateRuntimeFromSnapshot({ snapshot })); + + const timeline = store.getState().chatRuntime.toolTimelineByThread['t-out']; + expect(timeline[0].result).toBe('top hit: openhuman.dev'); + expect(timeline[1].subagent?.toolCalls[0]?.result).toBe('file body'); + }); +}); diff --git a/app/src/store/chatRuntimeSlice.ts b/app/src/store/chatRuntimeSlice.ts index de2b31874e..8624409f10 100644 --- a/app/src/store/chatRuntimeSlice.ts +++ b/app/src/store/chatRuntimeSlice.ts @@ -259,6 +259,14 @@ export interface ToolTimelineEntry { * persisted round-trip so a reloaded failed turn keeps its explanation. */ failure?: ToolFailureExplanation; + /** + * The tool's actual (size-capped) result text, set on completion from the + * `tool_result` socket event's `output` and carried through the persisted + * turn-state round-trip. Mirrors {@link SubagentToolCallEntry.result} so the + * timeline can show what a main-agent tool returned. Absent while running + * and on rows from cores that predate output forwarding. + */ + result?: string; } export interface StreamingAssistantState { @@ -686,6 +694,9 @@ function subagentToolCallFromPersisted(call: PersistedSubagentToolCall): Subagen detail: call.detail, // Carry the persisted failure explanation across the round-trip (#4459). failure: parseToolFailure(call.failure), + // Carry the persisted (capped) result text so a rehydrated child row can + // still show what the tool returned. + result: call.output, }; } @@ -787,6 +798,8 @@ function toolTimelineFromPersisted(entry: PersistedToolTimelineEntry): ToolTimel // Carry a persisted failure explanation across the round-trip (#4254). The // shared parser tolerates both camelCase (persisted) and snake_case (wire). failure: parseToolFailure(entry.failure), + // Persisted (capped) tool result text, when the core recorded one. + result: entry.output, }; } @@ -1473,6 +1486,23 @@ const chatRuntimeSlice = createSlice({ const { snapshot } = action.payload; const threadId = snapshot.threadId; + // A live socket driver is feeding this thread right now (the provider is + // mounted globally, so events keep dispatching even while the user is on + // another tab/route). The snapshot was written at the last flush boundary + // and is at best equal to — usually behind — the in-memory state, so + // applying it would wipe streamed prose, tool results, and any pending + // approval card mid-turn. Take only the task board (monotonic, cheap) and + // leave the volatile state to the live event stream. Rehydration is a + // fallback for when there is no live driver (cold boot, new window, + // interrupted turn), not an overwrite of one. + const liveLifecycle = state.inferenceTurnLifecycleByThread[threadId]; + if (liveLifecycle === 'started' || liveLifecycle === 'streaming') { + if (snapshot.taskBoard) { + state.taskBoardByThread[threadId] = snapshot.taskBoard; + } + return; + } + // `completed` is a settled turn, not an in-flight lifecycle — drop any // stale in-flight marker rather than store it (the in-flight enum only // covers started/streaming/interrupted). @@ -1554,9 +1584,16 @@ const chatRuntimeSlice = createSlice({ const { threadId, runs } = action.payload; const existing = state.toolTimelineByThread[threadId] ?? []; const byId = new Map(existing.map(entry => [entry.id, entry])); + // Live rows key their id differently from ledger rows + // (`:subagent::` vs `subagent:`), so also + // dedupe on the sub-agent taskId — otherwise a ledger hydrate during a + // live turn would add a second row for a delegation already on screen. + const liveTaskIds = new Set( + existing.map(entry => entry.subagent?.taskId).filter(Boolean) as string[] + ); for (const run of runs) { const entry = timelineEntryFromRun(run); - if (!entry || byId.has(entry.id)) continue; + if (!entry || byId.has(entry.id) || liveTaskIds.has(run.id)) continue; byId.set(entry.id, entry); } state.toolTimelineByThread[threadId] = Array.from(byId.values()); diff --git a/app/src/types/turnState.ts b/app/src/types/turnState.ts index 0a21da9d33..69c81d34a1 100644 --- a/app/src/types/turnState.ts +++ b/app/src/types/turnState.ts @@ -67,6 +67,9 @@ export interface PersistedSubagentToolCall { * Mirrors the parent {@link PersistedToolTimelineEntry.failure}; absent on * successful rows and on snapshots written before this field. */ failure?: PersistedToolFailure; + /** Size-capped child tool result text. Absent while running and on + * snapshots written before this field. */ + output?: string; } /** @@ -150,6 +153,9 @@ export interface PersistedToolTimelineEntry { sourceToolName?: string; subagent?: PersistedSubagentActivity; failure?: PersistedToolFailure; + /** Size-capped tool result text. Absent while running and on snapshots + * written before this field. */ + output?: string; } export interface PersistedTurnState { diff --git a/docs/README.de.md b/docs/README.de.md index 563cd966c2..ccafae844f 100644 --- a/docs/README.de.md +++ b/docs/README.de.md @@ -156,7 +156,7 @@ Gespeicherte Workflows sind dauerhaft und trigger-gesteuert: sie feuern auf Zeit ## Beitragen aus dem Quellcode -Neu hier? Beginne mit [`CONTRIBUTING.md`](../CONTRIBUTING.md) für den Fork-/PR-Workflow und die lokalen Prüfbefehle, oder nutze den Copy-Paste-Prompt für KI-Coding-Agenten in [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional-let-an-ai-coding-agent-guide-you). Der kurze Weg: +Neu hier? Beginne mit [`CONTRIBUTING.md`](../CONTRIBUTING.md) für den Fork-/PR-Workflow und die lokalen Prüfbefehle, oder nutze den Copy-Paste-Prompt für KI-Coding-Agenten in [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional--let-an-ai-coding-agent-guide-you). Der kurze Weg: 1. Installiere Git, Node.js 24+, pnpm 10.10.0, Rust 1.93.0 (`rustfmt` + `clippy`), CMake, Ninja, ripgrep sowie die plattformspezifischen Desktop-Build-Voraussetzungen. 2. Forke und klone das Repo, führe dann `git submodule update --init --recursive` aus, bevor du `pnpm install` startest, damit die mitgelieferten Tauri/CEF-Quellen vorhanden sind. diff --git a/docs/README.ja-JP.md b/docs/README.ja-JP.md index c90ed36864..78ecafb5cf 100644 --- a/docs/README.ja-JP.md +++ b/docs/README.ja-JP.md @@ -156,7 +156,7 @@ n8n と Zapier に強くインスパイアされた[ワークフロー](https:// ## ソースからのコントリビュート -新しいコントリビューターの方は、まず [`CONTRIBUTING.md`](../CONTRIBUTING.md) で fork/PR ワークフローとローカル検証コマンドを確認するか、[`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional-let-an-ai-coding-agent-guide-you) のコピー&ペーストできる AI エージェント向けプロンプトを使ってください。最短経路は以下のとおりです: +新しいコントリビューターの方は、まず [`CONTRIBUTING.md`](../CONTRIBUTING.md) で fork/PR ワークフローとローカル検証コマンドを確認するか、[`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional--let-an-ai-coding-agent-guide-you) のコピー&ペーストできる AI エージェント向けプロンプトを使ってください。最短経路は以下のとおりです: 1. Git、Node.js 24+、pnpm 10.10.0、Rust 1.93.0(`rustfmt` + `clippy`)、CMake、Ninja、ripgrep、プラットフォーム向けデスクトップビルドの前提条件をインストールします。 2. リポジトリを fork してクローンし、`pnpm install` の前に `git submodule update --init --recursive` を実行して、ベンダー化された Tauri/CEF のソースを取得します。 diff --git a/docs/README.ko.md b/docs/README.ko.md index 69b9546faa..f4c5657ab7 100644 --- a/docs/README.ko.md +++ b/docs/README.ko.md @@ -156,7 +156,7 @@ n8n과 Zapier에서 깊은 영감을 받은 [워크플로우](https://tinyhumans ## 소스에서 기여하기 -새로운 기여자인가요? 포크/PR 워크플로우 및 로컬 검증 명령에 대해서는 [`CONTRIBUTING.md`](../CONTRIBUTING.md)에서 시작하거나, [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional-let-an-ai-coding-agent-guide-you)의 복사-붙여넣기 AI 에이전트 프롬프트를 사용하세요. 빠른 경로는 다음과 같습니다. +새로운 기여자인가요? 포크/PR 워크플로우 및 로컬 검증 명령에 대해서는 [`CONTRIBUTING.md`](../CONTRIBUTING.md)에서 시작하거나, [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional--let-an-ai-coding-agent-guide-you)의 복사-붙여넣기 AI 에이전트 프롬프트를 사용하세요. 빠른 경로는 다음과 같습니다. 1. Git, Node.js 24+, pnpm 10.10.0, Rust 1.93.0(`rustfmt` + `clippy`), CMake, Ninja, ripgrep 및 플랫폼 데스크톱 빌드 필수 구성 요소를 설치합니다. 2. 저장소를 포크하고 클론한 다음, `pnpm install` 전에 `git submodule update --init --recursive`를 실행하여 벤더링된 Tauri/CEF 소스가 존재하는지 확인합니다. diff --git a/docs/README.ur-pk.md b/docs/README.ur-pk.md index 47beb59e08..be00f30fdd 100644 --- a/docs/README.ur-pk.md +++ b/docs/README.ur-pk.md @@ -202,7 +202,7 @@ n8n اور Zapier سے گہرے متاثر، [ورک فلوز](https://tinyhuman ## سورس سے تعاون -نیا تعاون کنندہ؟ fork/PR ورک فلو اور مقامی تصدیقی کمانڈز کے لیے [`CONTRIBUTING.md`](../CONTRIBUTING.md) سے شروع کریں، یا [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional-let-an-ai-coding-agent-guide-you) میں موجود کاپی پیسٹ AI-ایجنٹ پرامپٹ استعمال کریں۔ مختصر راستہ: +نیا تعاون کنندہ؟ fork/PR ورک فلو اور مقامی تصدیقی کمانڈز کے لیے [`CONTRIBUTING.md`](../CONTRIBUTING.md) سے شروع کریں، یا [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional--let-an-ai-coding-agent-guide-you) میں موجود کاپی پیسٹ AI-ایجنٹ پرامپٹ استعمال کریں۔ مختصر راستہ: 1. Git، Node.js 24+، pnpm 10.10.0، Rust 1.93.0 (`rustfmt` + `clippy`)، CMake، Ninja، ripgrep، اور پلیٹ فارم ڈیسک ٹاپ بلڈ کی ضروریات انسٹال کریں۔ 2. ریپو کو fork اور کلون کریں، پھر `pnpm install` سے پہلے `git submodule update --init --recursive` چلائیں تاکہ وینڈرڈ Tauri/CEF سورس موجود ہوں۔ diff --git a/docs/README.zh-CN.md b/docs/README.zh-CN.md index 8d50464922..8ec203f115 100644 --- a/docs/README.zh-CN.md +++ b/docs/README.zh-CN.md @@ -156,7 +156,7 @@ OpenHuman 跳过了等待期。连接你的账户,让[自动拉取](https://ti ## 从源码贡献 -新贡献者?从 [`CONTRIBUTING.md`](../CONTRIBUTING.md) 了解 fork/PR 工作流和本地验证命令,或使用 [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional-let-an-ai-coding-agent-guide-you) 中可直接复制粘贴的 AI 智能体提示词。快速路径: +新贡献者?从 [`CONTRIBUTING.md`](../CONTRIBUTING.md) 了解 fork/PR 工作流和本地验证命令,或使用 [`CONTRIBUTING-BEGINNERS.md`](../CONTRIBUTING-BEGINNERS.md#optional--let-an-ai-coding-agent-guide-you) 中可直接复制粘贴的 AI 智能体提示词。快速路径: 1. 安装 Git、Node.js 24+、pnpm 10.10.0、Rust 1.93.0(`rustfmt` + `clippy`)、CMake、Ninja、ripgrep,以及各平台桌面构建的前置依赖。 2. Fork 并克隆仓库,然后运行 `git submodule update --init --recursive` 之后再执行 `pnpm install`,确保内置的 Tauri/CEF 源码就位。 diff --git a/src/openhuman/channels/providers/web/progress_bridge.rs b/src/openhuman/channels/providers/web/progress_bridge.rs index 296a472c95..5587116a91 100644 --- a/src/openhuman/channels/providers/web/progress_bridge.rs +++ b/src/openhuman/channels/providers/web/progress_bridge.rs @@ -470,6 +470,7 @@ pub(crate) fn spawn_progress_bridge( tool_name, success, output_chars, + output, elapsed_ms, iteration, failure, @@ -500,10 +501,11 @@ pub(crate) fn spawn_progress_bridge( request_id: request_id.clone(), tool_name: Some(tool_name), skill_id: Some("web_channel".to_string()), - output: Some( - json!({"output_chars": output_chars, "elapsed_ms": elapsed_ms}) - .to_string(), - ), + // Forward the real tool result (size-capped) so the UI + // can render tool output — mirrors the subagent + // `subagent_tool_result` path. Frontends that only + // need size/timing read the ledger telemetry instead. + output: Some(cap_wire_output(output)), success: Some(success), round: Some(iteration), tool_call_id: Some(call_id), @@ -1303,6 +1305,70 @@ mod tests { assert!(capped.len() <= MAX_WIRE_SUBAGENT_OUTPUT); } + /// The `tool_result` wire event must carry the tool's real (capped) output + /// so the UI can render what the tool returned — not the legacy + /// `{"output_chars", "elapsed_ms"}` metadata stub (which broke both the + /// timeline result view and the `propose_workflow` proposal parser). + #[tokio::test] + async fn tool_call_completed_forwards_real_output_on_tool_result() { + let tmp = tempfile::TempDir::new().unwrap(); + let config = crate::openhuman::config::Config { + workspace_dir: tmp.path().join("workspace"), + action_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Default::default() + }; + let store = TurnStateStore::new(tmp.path().join("turn_states")); + let (tx, rx) = tokio::sync::mpsc::channel(16); + let mut bus = super::super::event_bus::subscribe_web_channel_events(); + spawn_progress_bridge( + rx, + "client-out".into(), + "thread-out".into(), + "req-out".into(), + store, + ChatRequestMetadata::default(), + config, + ); + + tx.send( + crate::openhuman::agent::progress::AgentProgress::ToolCallCompleted { + call_id: "call-1".into(), + tool_name: "web_search".into(), + success: true, + output_chars: 12, + output: "real payload".into(), + arguments: None, + elapsed_ms: 42, + iteration: 1, + failure: None, + }, + ) + .await + .expect("send progress"); + + // The bus is process-global — skip unrelated events from concurrent + // tests and wait (bounded) for our thread's tool_result. + let event = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + match bus.recv().await { + Ok(ev) if ev.thread_id == "thread-out" && ev.event == "tool_result" => { + return ev; + } + Ok(_) => continue, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(err) => panic!("bus closed: {err}"), + } + } + }) + .await + .expect("tool_result within timeout"); + + assert_eq!(event.output.as_deref(), Some("real payload")); + assert_eq!(event.success, Some(true)); + assert_eq!(event.tool_call_id.as_deref(), Some("call-1")); + } + #[test] fn worktree_detail_collapses_empty_changed_files_to_none() { // Non-isolated / clean worker: empty list → `None` so the renderer diff --git a/src/openhuman/threads/turn_state/mirror.rs b/src/openhuman/threads/turn_state/mirror.rs index b1eb0d7dd0..b36faa121a 100644 --- a/src/openhuman/threads/turn_state/mirror.rs +++ b/src/openhuman/threads/turn_state/mirror.rs @@ -24,6 +24,39 @@ use super::types::{ const MIRROR_LOG_PREFIX: &str = "[threads:turn_state:mirror]"; +/// Upper bound on the tool result text persisted per timeline row. The +/// snapshot file is rewritten in full at every tool boundary, so this is +/// deliberately tighter than the 256 KiB live-socket cap — it bounds the +/// per-flush rewrite while still giving the rehydrated "View processing" +/// panel a meaningful result preview. +const MAX_PERSISTED_TOOL_OUTPUT: usize = 64 * 1024; + +/// Bytes reserved within the cap for the truncation marker so the final +/// persisted payload (content + marker) never exceeds +/// [`MAX_PERSISTED_TOOL_OUTPUT`]. +const TRUNCATION_MARKER_BUDGET: usize = 80; + +/// Cap `output` for snapshot persistence, slicing on a char boundary and +/// appending a truncation marker when content was dropped. Returns `None` +/// for empty output (payload capture off) so the field serializes away. +fn cap_persisted_output(output: &str) -> Option { + if output.is_empty() { + return None; + } + if output.len() <= MAX_PERSISTED_TOOL_OUTPUT { + return Some(output.to_string()); + } + let mut end = MAX_PERSISTED_TOOL_OUTPUT.saturating_sub(TRUNCATION_MARKER_BUDGET); + while end > 0 && !output.is_char_boundary(end) { + end -= 1; + } + let omitted = output.len() - end; + Some(format!( + "{}\n…[truncated {omitted} bytes of tool output]", + &output[..end] + )) +} + /// In-process cursor that keeps the authoritative [`TurnState`] in sync /// with the agent loop and writes it through to a [`TurnStateStore`]. pub struct TurnStateMirror { @@ -131,6 +164,7 @@ impl TurnStateMirror { source_tool_name: None, subagent: None, failure: None, + output: None, }); } self.flush(); @@ -140,6 +174,7 @@ impl TurnStateMirror { call_id, success, failure, + output, .. } => { if let Some(entry) = self @@ -158,6 +193,10 @@ impl TurnStateMirror { // survives a thread switch / cold boot (#4459). Clear it on // a (re-)success so a retried row doesn't keep stale copy. entry.failure = failure.as_ref().map(PersistedToolFailure::from); + // Persist the (capped) result text so the rehydrated + // timeline can show what the tool returned, matching the + // live `tool_result` socket payload. + entry.output = cap_persisted_output(output); } if self.state.active_tool.is_some() { self.state.active_tool = None; @@ -202,6 +241,7 @@ impl TurnStateMirror { transcript: Vec::new(), }), failure: None, + output: None, }); self.flush(); true @@ -279,6 +319,7 @@ impl TurnStateMirror { display_name: display_label.clone(), detail: display_detail.clone(), failure: None, + output: None, }); // Mirror the call into the ordered transcript so the // rehydrated thoughts interleave it at the right spot. @@ -304,6 +345,7 @@ impl TurnStateMirror { task_id, call_id, success, + output, output_chars, elapsed_ms, failure, @@ -329,6 +371,7 @@ impl TurnStateMirror { // Carry the child failure so a failed sub-agent row // keeps its explanation across a round-trip (#4459). call.failure = persisted_failure; + call.output = cap_persisted_output(output); } // Keep the transcript's Tool item in lockstep so the // rehydrated row shows the terminal status + timing. @@ -415,6 +458,7 @@ impl TurnStateMirror { source_tool_name: None, subagent: None, failure: None, + output: None, }); } false diff --git a/src/openhuman/threads/turn_state/mirror_tests.rs b/src/openhuman/threads/turn_state/mirror_tests.rs index 04f72814ea..2ca9ab0c97 100644 --- a/src/openhuman/threads/turn_state/mirror_tests.rs +++ b/src/openhuman/threads/turn_state/mirror_tests.rs @@ -137,6 +137,61 @@ fn tool_call_start_and_complete_track_timeline() { let s = m.snapshot(); assert_eq!(s.tool_timeline[0].status, ToolTimelineStatus::Success); assert!(s.active_tool.is_none()); + // Empty output (payload capture off) serializes away — no `Some("")`. + assert!(s.tool_timeline[0].output.is_none()); +} + +#[test] +fn tool_call_completed_persists_capped_output() { + let (_d, mut m) = fresh("t"); + m.observe(&AgentProgress::ToolCallStarted { + call_id: "tc-1".into(), + tool_name: "shell".into(), + arguments: serde_json::json!({}), + iteration: 1, + display_label: None, + display_detail: None, + }); + m.observe(&AgentProgress::ToolCallCompleted { + call_id: "tc-1".into(), + tool_name: "shell".into(), + success: true, + output_chars: 11, + output: "hello world".into(), + arguments: None, + elapsed_ms: 50, + iteration: 1, + failure: None, + }); + let s = m.snapshot(); + assert_eq!(s.tool_timeline[0].output.as_deref(), Some("hello world")); + + // Oversized output is truncated on a char boundary with a marker so the + // per-flush snapshot rewrite stays bounded. + m.observe(&AgentProgress::ToolCallStarted { + call_id: "tc-2".into(), + tool_name: "shell".into(), + arguments: serde_json::json!({}), + iteration: 2, + display_label: None, + display_detail: None, + }); + let big = "é".repeat(80 * 1024); // 2 bytes per char > 64 KiB cap + m.observe(&AgentProgress::ToolCallCompleted { + call_id: "tc-2".into(), + tool_name: "shell".into(), + success: true, + output_chars: big.chars().count(), + output: big, + arguments: None, + elapsed_ms: 50, + iteration: 2, + failure: None, + }); + let s = m.snapshot(); + let persisted = s.tool_timeline[1].output.as_deref().unwrap(); + assert!(persisted.len() <= 64 * 1024); + assert!(persisted.contains("truncated")); } #[test] @@ -415,7 +470,7 @@ fn subagent_transcript_persists_interleaved_prose_and_tools() { tool_name: "search".into(), success: true, output_chars: 5, - output: String::new(), + output: "3 hits".into(), arguments: None, elapsed_ms: 12, iteration: 1, @@ -444,6 +499,9 @@ fn subagent_transcript_persists_interleaved_prose_and_tools() { } other => panic!("expected tool, got {other:?}"), } + // The child tool's (capped) result text is persisted on the call so a + // rehydrated drawer can show what the tool returned. + assert_eq!(activity.tool_calls[0].output.as_deref(), Some("3 hits")); match &activity.transcript[2] { SubagentTranscriptItem::Text { text, .. } => assert_eq!(text, "Found it."), other => panic!("expected narration, got {other:?}"), diff --git a/src/openhuman/threads/turn_state/store_tests.rs b/src/openhuman/threads/turn_state/store_tests.rs index c4e8475fa7..b726cc5fb9 100644 --- a/src/openhuman/threads/turn_state/store_tests.rs +++ b/src/openhuman/threads/turn_state/store_tests.rs @@ -29,6 +29,7 @@ fn put_then_get_roundtrips_state() { source_tool_name: None, subagent: None, failure: None, + output: None, }); store.put(&state).expect("put"); diff --git a/src/openhuman/threads/turn_state/types.rs b/src/openhuman/threads/turn_state/types.rs index 9b2367a34b..27834cd5ce 100644 --- a/src/openhuman/threads/turn_state/types.rs +++ b/src/openhuman/threads/turn_state/types.rs @@ -119,6 +119,12 @@ pub struct ToolTimelineEntry { /// success and on legacy snapshots. #[serde(default, skip_serializing_if = "Option::is_none")] pub failure: Option, + /// Size-capped tool result text, persisted so the "View processing" + /// panel can show what a tool returned after a thread switch / cold + /// boot — the live socket forwards the same capped payload on + /// `tool_result`. `None` while running and on legacy snapshots. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output: Option, } /// Live sub-agent activity nested under a `subagent:*` timeline row. @@ -188,6 +194,11 @@ pub struct SubagentToolCall { /// success and on legacy snapshots. #[serde(default, skip_serializing_if = "Option::is_none")] pub failure: Option, + /// Size-capped child tool result text, persisted for the same reason as + /// [`ToolTimelineEntry::output`]. `None` while running and on legacy + /// snapshots. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output: Option, } /// One ordered item in a sub-agent's processing transcript — its streamed diff --git a/src/openhuman/tinyagents/observability.rs b/src/openhuman/tinyagents/observability.rs index 8ea9c9168c..955b93cf18 100644 --- a/src/openhuman/tinyagents/observability.rs +++ b/src/openhuman/tinyagents/observability.rs @@ -1042,7 +1042,18 @@ impl EventListener for OpenhumanEventBridge { "[fallback] SDK failed over to a cross-route fallback model" ); } - _ => {} + other => { + // Not projected into `AgentProgress` (run lifecycle, sub-agent + // boundaries — reconstructed from the orchestration tools' + // manual emits — middleware, workspace, memory, limits). Trace + // the kind so a dropped-event hypothesis is checkable from + // logs instead of reading this match. + tracing::trace!( + model = %self.model, + kind = ?std::mem::discriminant(other), + "[tinyagents:bridge] event observed but not forwarded to UI progress" + ); + } } } } @@ -1094,7 +1105,6 @@ mod tests { sink.emit(AgentEvent::ToolCompleted { call_id: "c1".into(), tool_name: "echo".to_string(), - started_at_ms: None, input: None, output: None, }); diff --git a/tests/memory_raw_coverage_e2e.rs b/tests/memory_raw_coverage_e2e.rs index bfbda1bc1a..5ca4c65b1d 100644 --- a/tests/memory_raw_coverage_e2e.rs +++ b/tests/memory_raw_coverage_e2e.rs @@ -613,11 +613,13 @@ fn threads_turn_state_store_skips_corrupt_entries_and_marks_interrupted() { elapsed_ms: Some(20), output_chars: Some(64), display_name: None, + output: None, detail: None, failure: None, }], transcript: vec![], }), + output: None, }); let second = TurnState::started("thread-b", "req-b", 2, "2026-05-29T12:01:00Z"); store.put(&first).expect("put first"); diff --git a/tests/memory_threads_raw_coverage_e2e.rs b/tests/memory_threads_raw_coverage_e2e.rs index 1fccf4c14b..6b65ddfcd2 100644 --- a/tests/memory_threads_raw_coverage_e2e.rs +++ b/tests/memory_threads_raw_coverage_e2e.rs @@ -1156,6 +1156,7 @@ fn thread_title_error_and_turn_state_helpers_cover_wire_shapes() { detail: Some("2 results".into()), source_tool_name: Some("memory.search".into()), subagent: None, + output: None, }); let wire = serde_json::to_value(GetTurnStateResponse { turn_state: Some(state.clone()), @@ -3482,11 +3483,13 @@ fn turn_state_store_persists_lists_marks_and_clears_snapshots() { elapsed_ms: Some(100), output_chars: Some(10), display_name: None, + output: None, detail: None, failure: None, }], transcript: vec![], }), + output: None, }); let second = TurnState::started("thread/b", "request-2", 2, "2026-05-29T12:01:00Z");