Compare commits

...

1 Commits

Author SHA1 Message Date
Naiyuan Qing
e1f1eeaf7f fix(transcript): live-update issue/agent transcript dialog from shared cache
The transcript dialog opened from a running task's row showed only a
one-shot snapshot taken at open time: TranscriptButton fetched once via
api.listTaskMessages and cached it locally, never subscribing to the
shared ["task-messages", taskId] cache that the WS task:message stream
already seeds. New tool calls / thinking / text never appeared until the
task finished or the page reloaded.

Add a live-cache mode to the shared TranscriptButton: when isLive and the
parent provides no items and the task id is a persisted UUID, render from
the shared task-messages cache so the open dialog grows in real time. On
open (and again on the running→terminal transition) force a backfill via
api.listTaskMessages and merge it into the cache by seq — taskMessagesOptions
is staleTime:Infinity, so a plain subscription never heals a WS reconnect
gap. The cache observer is read-only (enabled:false) so React Query never
blind-replaces the cache; only the WS handler and the seq-merged backfill
write it. The subscription mounts only while the dialog is open, so closed
live rows add no baseline requests; terminal tasks keep the lazy one-shot
fetch.

Covers issue execution-log and agent activity. Autopilot issue-less
run_only live log is out of scope: the backend doesn't broadcast
task:message for tasks with no issue/chat session, so there's nothing to
subscribe to — backend broadcast unchanged.

Extract mergeTaskMessagesBySeq into packages/core/chat/queries.ts and route
both the realtime task:message handler and the new backfill through it, so
there is one seq-merge semantics for that cache instead of two.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 15:14:31 +08:00
5 changed files with 395 additions and 40 deletions

View File

@@ -1,6 +1,19 @@
import { describe, expect, it } from "vitest";
import { isTaskMessageTaskId, taskMessagesOptions } from "./queries";
import type { TaskMessagePayload } from "../types/events";
import {
isTaskMessageTaskId,
mergeTaskMessagesBySeq,
taskMessagesOptions,
} from "./queries";
const msg = (seq: number): TaskMessagePayload => ({
task_id: "task-1",
issue_id: "issue-1",
seq,
type: "text",
content: `m${seq}`,
});
describe("taskMessagesOptions", () => {
it("fetches task messages for persisted UUID task ids", () => {
@@ -17,3 +30,32 @@ describe("taskMessagesOptions", () => {
expect(taskMessagesOptions(taskId).enabled).toBe(false);
});
});
describe("mergeTaskMessagesBySeq", () => {
it("backfills missing seqs and keeps the list seq-ordered", () => {
const existing = [msg(1), msg(3)];
const merged = mergeTaskMessagesBySeq(existing, [msg(2), msg(4)]);
expect(merged.map((m) => m.seq)).toEqual([1, 2, 3, 4]);
});
it("drops duplicate seqs and lets the existing entry win", () => {
const existing = [{ ...msg(1), content: "ws" }];
const merged = mergeTaskMessagesBySeq(existing, [
{ ...msg(1), content: "refetch" },
msg(2),
]);
expect(merged.map((m) => m.seq)).toEqual([1, 2]);
expect(merged.find((m) => m.seq === 1)?.content).toBe("ws");
});
it("preserves the array reference when nothing new arrives", () => {
const existing = [msg(1), msg(2)];
// Empty incoming and fully-duplicate incoming must both no-op so React
// Query observers don't re-render on replayed events.
expect(mergeTaskMessagesBySeq(existing, [])).toBe(existing);
expect(mergeTaskMessagesBySeq(existing, [msg(1), msg(2)])).toBe(existing);
});
});

View File

@@ -1,5 +1,6 @@
import { infiniteQueryOptions, queryOptions } from "@tanstack/react-query";
import { api } from "../api";
import type { TaskMessagePayload } from "../types/events";
// NOTE on workspace scoping:
// `wsId` is used only as part of queryKey for cache isolation per workspace.
@@ -95,6 +96,29 @@ export function taskMessagesOptions(taskId: string) {
});
}
/**
* Merge task-message batches into one seq-ordered, seq-deduplicated list for
* the shared `["task-messages", taskId]` cache. Existing entries win on
* conflict, and the original array reference is preserved when nothing new
* arrives so React Query observers don't re-render on duplicate events.
*
* Both the realtime `task:message` handler (a single payload) and the
* transcript backfill (a full refetch) write this cache. Routing both through
* one helper keeps a forced backfill from blind-replacing a seq the WebSocket
* already delivered — and keeps a late WS event from being lost to an
* in-flight backfill.
*/
export function mergeTaskMessagesBySeq(
existing: readonly TaskMessagePayload[],
incoming: readonly TaskMessagePayload[],
): TaskMessagePayload[] {
if (incoming.length === 0) return existing as TaskMessagePayload[];
const knownSeqs = new Set(existing.map((m) => m.seq));
const fresh = incoming.filter((m) => !knownSeqs.has(m.seq));
if (fresh.length === 0) return existing as TaskMessagePayload[];
return [...existing, ...fresh].sort((a, b) => a.seq - b.seq);
}
/**
* Aggregate of in-flight chat tasks for the current user in this workspace.
* Drives the FAB "running" indicator while the chat window is minimised —

View File

@@ -42,7 +42,7 @@ import {
type SystemNotificationPayload,
} from "../platform/system-notification";
import type { Workspace } from "../types/workspace";
import { chatKeys } from "../chat/queries";
import { chatKeys, mergeTaskMessagesBySeq } from "../chat/queries";
import { useChatStore } from "../chat";
import { resolvePostAuthDestination, useHasOnboarded } from "../paths";
import type {
@@ -824,11 +824,8 @@ export function useRealtimeSync(
const unsubTaskMessage = ws.on("task:message", (p) => {
const payload = p as TaskMessagePayload;
qc.setQueryData<TaskMessagePayload[]>(
["task-messages", payload.task_id],
(old = []) => {
if (old.some((m) => m.seq === payload.seq)) return old;
return [...old, payload].sort((a, b) => a.seq - b.seq);
},
chatKeys.taskMessages(payload.task_id),
(old = []) => mergeTaskMessagesBySeq(old, [payload]),
);
chatWsLogger.debug("task:message (global)", {
task_id: payload.task_id,

View File

@@ -1,8 +1,25 @@
// @vitest-environment jsdom
import { act, fireEvent, render, screen, waitFor } from "@testing-library/react";
import {
act,
fireEvent,
render,
screen,
waitFor,
type RenderResult,
} from "@testing-library/react";
import {
QueryClient,
QueryClientProvider,
} from "@tanstack/react-query";
import { api } from "@multica/core/api";
import {
chatKeys,
mergeTaskMessagesBySeq,
} from "@multica/core/chat/queries";
import type { AgentTask } from "@multica/core/types/agent";
import { describe, expect, it, vi } from "vitest";
import type { TaskMessagePayload } from "@multica/core/types/events";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { TranscriptButton } from "./transcript-button";
import type { TimelineItem } from "./build-timeline";
@@ -12,49 +29,90 @@ vi.mock("@multica/core/api", () => ({
},
}));
// Render the timeline items so tests can assert the dialog grows in place.
// `tool_use` / `tool_result` entries don't coalesce, so each message stays a
// distinct row — unlike adjacent text/thinking, which buildTimeline merges.
vi.mock("./agent-transcript-dialog", () => ({
AgentTranscriptDialog: ({
open,
onOpenChange,
items,
}: {
open: boolean;
onOpenChange: (open: boolean) => void;
items: TimelineItem[];
}) =>
open ? (
<div role="dialog">
<div role="dialog" data-testid="transcript-dialog">
<button type="button" onClick={() => onOpenChange(false)}>
Close
</button>
{items.map((item) => (
<div key={item.seq} data-testid="event" data-seq={item.seq} />
))}
</div>
) : null,
}));
const task: AgentTask = {
id: "task-1",
const LIVE_TASK_ID = "4a2e8d1c-7f9b-4e2a-9c1d-123456789abc";
const baseTask: AgentTask = {
id: LIVE_TASK_ID,
agent_id: "agent-1",
runtime_id: "",
issue_id: "issue-1",
status: "completed",
status: "running",
priority: 0,
dispatched_at: "2026-05-15T10:00:05.000Z",
started_at: "2026-05-15T10:00:06.000Z",
completed_at: "2026-05-15T10:00:10.000Z",
completed_at: null,
result: null,
error: null,
created_at: "2026-05-15T10:00:00.000Z",
};
const items: TimelineItem[] = [
{
seq: 1,
type: "text",
content: "hello world",
},
];
const msg = (seq: number, tool: string): TaskMessagePayload => ({
task_id: LIVE_TASK_ID,
issue_id: "issue-1",
seq,
type: "tool_use",
tool,
input: { i: String(seq) },
});
function newClient() {
return new QueryClient({
defaultOptions: { queries: { retry: false } },
});
}
function renderWith(qc: QueryClient, ui: React.ReactNode): RenderResult {
return render(<QueryClientProvider client={qc}>{ui}</QueryClientProvider>);
}
const listTaskMessages = vi.mocked(api.listTaskMessages);
beforeEach(() => {
listTaskMessages.mockReset();
listTaskMessages.mockResolvedValue([]);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("TranscriptButton", () => {
it("closes the transcript dialog when desktop navigation starts", async () => {
render(<TranscriptButton task={task} agentName="Codex" items={items} />);
const items: TimelineItem[] = [{ seq: 1, type: "text", content: "hello" }];
const qc = newClient();
renderWith(
qc,
<TranscriptButton
task={{ ...baseTask, status: "completed" }}
agentName="Codex"
items={items}
/>,
);
fireEvent.click(screen.getByRole("button", { name: "View transcript" }));
expect(screen.getByRole("dialog")).toBeInTheDocument();
@@ -71,4 +129,120 @@ describe("TranscriptButton", () => {
expect(screen.queryByRole("dialog")).not.toBeInTheDocument();
});
});
it("live mode: the open dialog grows as the shared cache receives new messages", async () => {
const qc = newClient();
qc.setQueryData(chatKeys.taskMessages(LIVE_TASK_ID), [msg(1, "Bash")]);
listTaskMessages.mockResolvedValue([msg(1, "Bash")]);
renderWith(qc, <TranscriptButton task={baseTask} agentName="Codex" isLive />);
fireEvent.click(screen.getByRole("button", { name: "View transcript" }));
await waitFor(() =>
expect(screen.getAllByTestId("event")).toHaveLength(1),
);
// Simulate a WS `task:message` append into the shared cache.
act(() => {
qc.setQueryData<TaskMessagePayload[]>(
chatKeys.taskMessages(LIVE_TASK_ID),
(old = []) => mergeTaskMessagesBySeq(old, [msg(2, "Read")]),
);
});
await waitFor(() =>
expect(screen.getAllByTestId("event")).toHaveLength(2),
);
});
it("live mode: forces a backfill on open even when the cache already has data", async () => {
const qc = newClient();
qc.setQueryData(chatKeys.taskMessages(LIVE_TASK_ID), [
msg(1, "Bash"),
msg(2, "Read"),
]);
listTaskMessages.mockResolvedValue([msg(1, "Bash"), msg(2, "Read")]);
renderWith(qc, <TranscriptButton task={baseTask} agentName="Codex" isLive />);
fireEvent.click(screen.getByRole("button", { name: "View transcript" }));
await waitFor(() =>
expect(listTaskMessages).toHaveBeenCalledWith(LIVE_TASK_ID),
);
});
it("terminal mode: fetches once on open and does not subscribe to the cache", async () => {
const qc = newClient();
listTaskMessages.mockResolvedValue([msg(1, "Bash")]);
renderWith(
qc,
<TranscriptButton
task={{ ...baseTask, status: "completed", completed_at: "2026-05-15T10:00:10.000Z" }}
agentName="Codex"
/>,
);
fireEvent.click(screen.getByRole("button", { name: "View transcript" }));
await waitFor(() =>
expect(screen.getAllByTestId("event")).toHaveLength(1),
);
// A later cache write must NOT reach the terminal dialog: it renders a
// one-shot local snapshot, never an observer of the shared cache.
act(() => {
qc.setQueryData(chatKeys.taskMessages(LIVE_TASK_ID), [
msg(1, "Bash"),
msg(2, "Read"),
]);
});
expect(screen.getAllByTestId("event")).toHaveLength(1);
expect(listTaskMessages).toHaveBeenCalledTimes(1);
});
it("running→terminal: keeps the dialog populated and takes a final backfill", async () => {
const qc = newClient();
qc.setQueryData(chatKeys.taskMessages(LIVE_TASK_ID), [msg(1, "Bash")]);
listTaskMessages.mockResolvedValue([msg(1, "Bash")]);
const { rerender } = renderWith(
qc,
<TranscriptButton task={baseTask} agentName="Codex" isLive />,
);
fireEvent.click(screen.getByRole("button", { name: "View transcript" }));
await waitFor(() =>
expect(screen.getAllByTestId("event")).toHaveLength(1),
);
await waitFor(() => expect(listTaskMessages).toHaveBeenCalledTimes(1));
// Task finishes: parent flips isLive→false and the status to terminal.
rerender(
<QueryClientProvider client={qc}>
<TranscriptButton
task={{ ...baseTask, status: "completed", completed_at: "2026-05-15T10:00:10.000Z" }}
agentName="Codex"
isLive={false}
/>
</QueryClientProvider>,
);
// Dialog stays mounted (latched), and the terminal transition triggers a
// second authoritative backfill rather than blanking to local state.
expect(screen.getByTestId("transcript-dialog")).toBeInTheDocument();
await waitFor(() => expect(listTaskMessages).toHaveBeenCalledTimes(2));
// The final tail message still flows in through the shared cache.
act(() => {
qc.setQueryData<TaskMessagePayload[]>(
chatKeys.taskMessages(LIVE_TASK_ID),
(old = []) => mergeTaskMessagesBySeq(old, [msg(2, "Read")]),
);
});
await waitFor(() =>
expect(screen.getAllByTestId("event")).toHaveLength(2),
);
});
});

View File

@@ -1,7 +1,8 @@
"use client";
import { useCallback, useEffect, useState } from "react";
import { useCallback, useEffect, useMemo, useState } from "react";
import { Loader2, ScrollText } from "lucide-react";
import { useQuery, useQueryClient } from "@tanstack/react-query";
import { cn } from "@multica/ui/lib/utils";
import {
Tooltip,
@@ -9,7 +10,14 @@ import {
TooltipTrigger,
} from "@multica/ui/components/ui/tooltip";
import { api } from "@multica/core/api";
import {
chatKeys,
isTaskMessageTaskId,
mergeTaskMessagesBySeq,
taskMessagesOptions,
} from "@multica/core/chat/queries";
import type { AgentTask } from "@multica/core/types/agent";
import type { TaskMessagePayload } from "@multica/core/types/events";
import { AgentTranscriptDialog } from "./agent-transcript-dialog";
import { buildTimeline, type TimelineItem } from "./build-timeline";
@@ -18,9 +26,11 @@ interface TranscriptButtonProps {
agentName: string;
/**
* Pre-loaded timeline. When provided the button skips the fetch and opens
* the dialog immediately — used by the live card where `items` already
* accumulate via WS. Omit for terminal tasks; the button will fetch via
* `api.listTaskMessages` on the first click and cache the result.
* the dialog immediately — used by surfaces that already own an accumulating
* timeline. Omit for terminal tasks; the button will fetch via
* `api.listTaskMessages` on the first click and cache the result. Omit for
* live tasks too: the button then subscribes to the shared task-messages
* cache so the dialog keeps growing as new events arrive.
*/
items?: TimelineItem[];
isLive?: boolean;
@@ -38,6 +48,14 @@ interface TranscriptButtonProps {
* surface that lists agent tasks (issue activity card, agent detail
* activity tab). Owns its own dialog state and lazy-load — the parent
* just drops it in.
*
* Three data modes:
* - Provided items: parent owns the timeline, we just render it.
* - Live cache: `isLive` with no provided items and a persisted task id —
* subscribe to the shared `["task-messages", taskId]` cache (seeded by the
* WS `task:message` stream) so the open dialog keeps growing in real time,
* and force a seq-merged backfill on open to heal any WS reconnect gap.
* - Lazy: terminal tasks fetch once on first click and cache locally.
*/
export function TranscriptButton({
task,
@@ -52,14 +70,33 @@ export function TranscriptButton({
const [loading, setLoading] = useState(false);
const [loadedItems, setLoadedItems] = useState<TimelineItem[] | null>(null);
// Live mode: parent owns the timeline, we just render it.
// Lazy mode: we fetch once and cache.
// Live cache mode: the running task feeds the shared task-messages cache, so
// we render straight off that cache instead of a one-shot local snapshot.
const liveCacheMode =
isLive && providedItems === undefined && isTaskMessageTaskId(task.id);
// Latch the live path for the duration of an open session. The parent flips
// `isLive` to false the moment the task finishes; without the latch the
// dialog would drop to empty `loadedItems` mid-view. Staying on the cache
// path keeps every delivered seq on screen and lets the dialog take a final
// authoritative backfill on the running→terminal transition.
const [liveSession, setLiveSession] = useState(false);
useEffect(() => {
if (!open) setLiveSession(false);
}, [open]);
// Live mode renders from the cache; lazy/provided modes from local state.
const items = providedItems ?? loadedItems ?? [];
const handleClick = useCallback(
(e: React.MouseEvent) => {
e.preventDefault();
e.stopPropagation();
if (liveCacheMode) {
setLiveSession(true);
setOpen(true);
return;
}
if (providedItems !== undefined || loadedItems !== null) {
setOpen(true);
return;
@@ -78,7 +115,7 @@ export function TranscriptButton({
})
.finally(() => setLoading(false));
},
[providedItems, loadedItems, task.id],
[liveCacheMode, providedItems, loadedItems, task.id],
);
useEffect(() => {
@@ -116,17 +153,98 @@ export function TranscriptButton({
<TooltipContent>{title}</TooltipContent>
</Tooltip>
{open && (
<AgentTranscriptDialog
open={open}
onOpenChange={setOpen}
task={task}
items={items}
agentName={agentName}
isLive={isLive}
headerSlot={headerSlot}
/>
)}
{open &&
(liveSession ? (
<LiveTranscriptDialog
task={task}
agentName={agentName}
isLive={isLive}
onOpenChange={setOpen}
headerSlot={headerSlot}
/>
) : (
<AgentTranscriptDialog
open={open}
onOpenChange={setOpen}
task={task}
items={items}
agentName={agentName}
isLive={isLive}
headerSlot={headerSlot}
/>
))}
</>
);
}
interface LiveTranscriptDialogProps {
task: AgentTask;
agentName: string;
isLive: boolean;
onOpenChange: (open: boolean) => void;
headerSlot?: React.ReactNode;
}
/**
* Live transcript view backed by the shared task-messages cache. Mounted only
* while the dialog is open, so closed live rows hold no query subscription and
* don't widen the baseline request volume.
*
* The cache observer is read-only (`enabled: false`): the WS `task:message`
* handler is the live writer, and the backfill below is the only fetch here.
* Keeping React Query from issuing its own refetch is deliberate — its result
* would blind-replace the cache and could drop a seq that arrived mid-flight,
* whereas the backfill merges by seq.
*/
function LiveTranscriptDialog({
task,
agentName,
isLive,
onOpenChange,
headerSlot,
}: LiveTranscriptDialogProps) {
const queryClient = useQueryClient();
const { data } = useQuery({
...taskMessagesOptions(task.id),
enabled: false,
});
// Force a backfill on open, and again when the task reaches a terminal state.
// `taskMessagesOptions` is `staleTime: Infinity`, so a plain subscription
// never refetches — a WS reconnect gap (or the final tail of messages a
// completed issue task never re-broadcasts) would otherwise leave a hole.
// Merge by seq so the fetch and any concurrent WS append both survive.
useEffect(() => {
if (!isTaskMessageTaskId(task.id)) return;
let cancelled = false;
api
.listTaskMessages(task.id)
.then((msgs) => {
if (cancelled) return;
queryClient.setQueryData<TaskMessagePayload[]>(
chatKeys.taskMessages(task.id),
(old = []) => mergeTaskMessagesBySeq(old, msgs),
);
})
.catch((err) => {
console.error(err);
});
return () => {
cancelled = true;
};
}, [task.id, isLive, queryClient]);
const items = useMemo(() => buildTimeline(data ?? []), [data]);
return (
<AgentTranscriptDialog
open
onOpenChange={onOpenChange}
task={task}
items={items}
agentName={agentName}
isLive={isLive}
headerSlot={headerSlot}
/>
);
}