Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
19d53052e9 fix(agent-live-card): guard reconcile against out-of-order responses
reconcile() previously had no request-ordering protection, so a slow
getActiveTasksForIssue response could land after a newer one and clobber
the fresher state. Race scenario: task:queued fires reconcile A (response
includes T but is delayed); task:completed fires next, optimistically
removes T, and triggers reconcile B; B resolves empty and clears the
banner; A finally resolves with the stale snapshot and re-adds T —
permanent stale "is working" banner with no further events to clear it.

Add a monotonic reconcileSeq ref. Each call captures its issued seq;
the response only applies if mySeq === reconcileSeq.current (i.e. no
newer call was issued after this one). Drop the response otherwise.

Add a regression test covering the deferred-promise case plus a
companion test for the WS reconnect self-heal path.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-06 18:06:42 +08:00
Jiang Bohan
b42bf71788 fix(agent-live-card): self-heal stale "is working" banner via reconcile
The banner relied on receiving task:completed/failed/cancelled to clear
itself. When a WS reconnect dropped one of those events the banner stayed
forever and the elapsed timer kept ticking.

Replace the additive update paths (mount + queued/dispatch) with a single
reconcile() that refetches /active-task and replaces the local task set
with the server's truth, preserving accumulated TimelineItems for tasks
still active. Wire it to:

- mount / issueId change
- WS reconnect (useWSReconnect)
- task:queued / task:dispatch
- task:completed / task:failed / task:cancelled (after the optimistic
  delete, so a missed sibling end-event also clears)

Per-task hydration guard (hydratedTaskIds) keeps the messages backfill
one-shot when reconcile fires repeatedly within a tick.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-06 17:57:22 +08:00
2 changed files with 313 additions and 44 deletions

View File

@@ -0,0 +1,231 @@
import { useEffect } from "react";
import { describe, it, expect, vi, beforeEach } from "vitest";
import { act, render, screen, waitFor } from "@testing-library/react";
import { I18nProvider } from "@multica/core/i18n/react";
import type { AgentTask } from "@multica/core/types/agent";
import enCommon from "../../locales/en/common.json";
import enIssues from "../../locales/en/issues.json";
const TEST_RESOURCES = { en: { common: enCommon, issues: enIssues } };
// ---------------------------------------------------------------------------
// Mocks
// ---------------------------------------------------------------------------
// Capture WS event handlers so the test can drive them directly. The card
// subscribes to task:queued, task:dispatch, task:completed, task:failed,
// task:cancelled, and task:message via useWSEvent. We mirror the real
// hook's useEffect-based subscription so stale subscriptions clean up
// across re-renders (otherwise every render would stack a duplicate
// handler and one event would fan out into many reconcile calls).
type EventHandler = (payload: unknown) => void;
const wsHandlers = vi.hoisted(() => new Map<string, Set<EventHandler>>());
const wsReconnectCallbacks = vi.hoisted(() => new Set<() => void>());
vi.mock("@multica/core/realtime", () => ({
useWSEvent: (event: string, handler: EventHandler) => {
useEffect(() => {
const set = wsHandlers.get(event) ?? new Set<EventHandler>();
set.add(handler);
wsHandlers.set(event, set);
return () => {
set.delete(handler);
};
}, [event, handler]);
},
useWSReconnect: (cb: () => void) => {
useEffect(() => {
wsReconnectCallbacks.add(cb);
return () => {
wsReconnectCallbacks.delete(cb);
};
}, [cb]);
},
}));
vi.mock("@multica/core/workspace/hooks", () => ({
useActorName: () => ({
getActorName: (_: string, id: string) => (id ? `Agent ${id}` : "Agent"),
}),
}));
vi.mock("../../common/actor-avatar", () => ({
ActorAvatar: ({ actorId }: { actorId: string }) => (
<span data-testid="actor-avatar">{actorId}</span>
),
}));
vi.mock("../../common/task-transcript", async () => {
const buildTimeline = vi.fn().mockReturnValue([]);
return {
TranscriptButton: () => <button data-testid="transcript-button">transcript</button>,
buildTimeline,
};
});
const mockApi = vi.hoisted(() => ({
getActiveTasksForIssue: vi.fn(),
listTaskMessages: vi.fn(),
cancelTask: vi.fn(),
}));
vi.mock("@multica/core/api", () => ({
api: mockApi,
}));
vi.mock("sonner", () => ({
toast: { error: vi.fn(), success: vi.fn() },
}));
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
import { AgentLiveCard } from "./agent-live-card";
function makeTask(id: string): AgentTask {
return {
id,
agent_id: "agent-1",
runtime_id: "rt-1",
issue_id: "issue-1",
status: "running",
priority: 0,
dispatched_at: "2026-01-01T00:00:00Z",
started_at: "2026-01-01T00:00:00Z",
completed_at: null,
result: null,
error: null,
created_at: "2026-01-01T00:00:00Z",
};
}
interface Deferred<T> {
promise: Promise<T>;
resolve: (value: T) => void;
}
function deferred<T>(): Deferred<T> {
let resolveFn!: (value: T) => void;
const promise = new Promise<T>((res) => {
resolveFn = res;
});
return { promise, resolve: resolveFn };
}
function fireEvent(event: string, payload: unknown) {
const handlers = wsHandlers.get(event) ?? [];
for (const h of handlers) h(payload);
}
function renderCard(issueId = "issue-1") {
return render(
<I18nProvider locale="en" resources={TEST_RESOURCES}>
<AgentLiveCard issueId={issueId} />
</I18nProvider>,
);
}
beforeEach(() => {
wsHandlers.clear();
wsReconnectCallbacks.clear();
mockApi.getActiveTasksForIssue.mockReset();
mockApi.listTaskMessages.mockReset();
mockApi.listTaskMessages.mockResolvedValue([]);
mockApi.cancelTask.mockReset();
});
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("AgentLiveCard reconcile race", () => {
it("does not re-add a banner when an older active-task response resolves after a newer empty one", async () => {
const mountFetch = deferred<{ tasks: AgentTask[] }>();
const queuedFetch = deferred<{ tasks: AgentTask[] }>();
const completedFetch = deferred<{ tasks: AgentTask[] }>();
// The component issues three reconciles in this test:
// 1. mount
// 2. task:queued
// 3. task:completed (after optimistic delete)
// We control the order they resolve to reproduce the GPT-Boy race.
mockApi.getActiveTasksForIssue
.mockReturnValueOnce(mountFetch.promise)
.mockReturnValueOnce(queuedFetch.promise)
.mockReturnValueOnce(completedFetch.promise);
renderCard();
// Mount call resolves with empty — no banner yet.
await act(async () => {
mountFetch.resolve({ tasks: [] });
});
expect(screen.queryByText(/is working/)).toBeNull();
// task:queued fires; reconcile A is now in flight (queuedFetch).
act(() => {
fireEvent("task:queued", { issue_id: "issue-1", task_id: "task-1" });
});
// task:completed fires; handler optimistically deletes (no-op since
// the banner isn't rendered yet) then issues reconcile B (completedFetch).
act(() => {
fireEvent("task:completed", { issue_id: "issue-1", task_id: "task-1" });
});
// Reconcile B resolves first with empty list — server truth says no
// active tasks. State is empty.
await act(async () => {
completedFetch.resolve({ tasks: [] });
});
expect(screen.queryByText(/is working/)).toBeNull();
// Reconcile A (older, slow) resolves last with a stale snapshot that
// still includes the task. With the generation guard, this response
// must be dropped. Without the guard, the banner would re-appear.
await act(async () => {
queuedFetch.resolve({ tasks: [makeTask("task-1")] });
});
// The banner must NOT come back.
expect(screen.queryByText(/is working/)).toBeNull();
expect(mockApi.getActiveTasksForIssue).toHaveBeenCalledTimes(3);
});
it("WS reconnect refetch removes a stale banner whose end event was lost", async () => {
const mountFetch = deferred<{ tasks: AgentTask[] }>();
const reconnectFetch = deferred<{ tasks: AgentTask[] }>();
mockApi.getActiveTasksForIssue
.mockReturnValueOnce(mountFetch.promise)
.mockReturnValueOnce(reconnectFetch.promise);
renderCard();
// Mount sees the task as active — banner shows.
await act(async () => {
mountFetch.resolve({ tasks: [makeTask("task-1")] });
});
await waitFor(() => {
expect(screen.getByText(/is working/)).toBeTruthy();
});
// Simulate the WS dropping task:completed and then reconnecting.
// The reconnect callback runs reconcile, which fetches and finds the
// task is no longer active.
expect(wsReconnectCallbacks.size).toBeGreaterThan(0);
act(() => {
for (const cb of wsReconnectCallbacks) cb();
});
await act(async () => {
reconnectFetch.resolve({ tasks: [] });
});
// The banner self-heals.
await waitFor(() => {
expect(screen.queryByText(/is working/)).toBeNull();
});
});
});

View File

@@ -3,7 +3,7 @@
import { useState, useEffect, useCallback, useRef } from "react";
import { Bot, Loader2, Square } from "lucide-react";
import { api } from "@multica/core/api";
import { useWSEvent } from "@multica/core/realtime";
import { useWSEvent, useWSReconnect } from "@multica/core/realtime";
import type { TaskMessagePayload } from "@multica/core/types/events";
import type { AgentTask } from "@multica/core/types/agent";
import { toast } from "sonner";
@@ -54,52 +54,97 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
const { getActorName } = useActorName();
const [taskStates, setTaskStates] = useState<Map<string, TaskState>>(new Map());
const seenSeqs = useRef(new Set<string>());
// Fetch active tasks on mount
const hydratedTaskIds = useRef(new Set<string>());
const mountedRef = useRef(true);
// Monotonic counter — each reconcile() call captures its issued seq and
// only applies its response if it's still the latest issued. This stops
// a slow getActiveTasksForIssue response from clobbering newer truth
// (e.g. a stale "task is active" payload re-adding a banner that a
// newer "tasks: []" response just cleared).
const reconcileSeq = useRef(0);
useEffect(() => {
let cancelled = false;
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
if (cancelled || tasks.length === 0) return;
mountedRef.current = true;
return () => { mountedRef.current = false; };
}, []);
// Reconcile local state to server truth. Replaces taskStates with the
// server's active set: tasks no longer active are dropped (this is what
// self-heals a stale "is working" banner when a task:completed/failed/
// cancelled event was lost during a WS reconnect window), and tasks
// still active keep their accumulated TimelineItems so the live
// TranscriptButton doesn't lose history. New tasks get a one-shot
// listTaskMessages hydration to backfill any messages that landed
// before the WS subscription saw them.
const reconcile = useCallback(() => {
const mySeq = ++reconcileSeq.current;
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
if (!mountedRef.current) return;
// A newer reconcile was issued after this one — drop this response
// unconditionally and let the latest request win, regardless of
// resolution order. Without this guard, a slow A then a fast B can
// resolve in B-then-A order and A re-adds tasks B already cleared.
if (mySeq !== reconcileSeq.current) return;
const activeIds = new Set(tasks.map((t) => t.id));
// Show cards immediately with empty timeline
setTaskStates((prev) => {
const next = new Map(prev);
const next = new Map<string, TaskState>();
for (const task of tasks) {
if (!next.has(task.id)) {
next.set(task.id, { task, items: [] });
}
const existing = prev.get(task.id);
next.set(task.id, existing
? { task, items: existing.items }
: { task, items: [] });
}
return next;
});
// Load messages per task in the background — these feed the live
// TranscriptButton, not an inline timeline (timeline UI moved to
// the right panel).
// Drop bookkeeping for tasks that vanished, so a future re-dispatch
// of the same id (very rare, but possible) re-hydrates cleanly.
for (const key of Array.from(seenSeqs.current)) {
const taskId = key.slice(0, key.indexOf(":"));
if (!activeIds.has(taskId)) seenSeqs.current.delete(key);
}
for (const id of Array.from(hydratedTaskIds.current)) {
if (!activeIds.has(id)) hydratedTaskIds.current.delete(id);
}
// Hydrate messages for tasks we haven't fetched yet. Per-task guard
// prevents duplicate fetches when reconcile fires repeatedly (mount
// + reconnect + queued/dispatch can stack within a single tick).
for (const task of tasks) {
if (hydratedTaskIds.current.has(task.id)) continue;
hydratedTaskIds.current.add(task.id);
api.listTaskMessages(task.id).then((msgs) => {
if (cancelled) return;
if (!mountedRef.current) return;
const timeline = buildTimeline(msgs);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
setTaskStates((prev) => {
const next = new Map(prev);
const existing = next.get(task.id);
if (existing) {
const loadedSeqs = new Set(timeline.map((i) => i.seq));
const wsOnly = existing.items.filter((i) => !loadedSeqs.has(i.seq));
const merged = [...timeline, ...wsOnly].sort((a, b) => a.seq - b.seq);
next.set(task.id, { task: existing.task, items: merged });
} else {
next.set(task.id, { task, items: timeline });
}
if (!existing) return prev;
const loadedSeqs = new Set(timeline.map((i) => i.seq));
const wsOnly = existing.items.filter((i) => !loadedSeqs.has(i.seq));
const merged = [...timeline, ...wsOnly].sort((a, b) => a.seq - b.seq);
next.set(task.id, { task: existing.task, items: merged });
return next;
});
}).catch(console.error);
}).catch((e) => {
hydratedTaskIds.current.delete(task.id);
console.error(e);
});
}
}).catch(console.error);
return () => { cancelled = true; };
}, [issueId]);
// Initial fetch on mount / issueId change.
useEffect(() => {
reconcile();
}, [reconcile]);
// WS reconnect — anything that happened while we were offline (most
// notably task:completed / task:failed / task:cancelled) won't replay,
// so re-pull the truth and let reconcile drop any stale banners.
useWSReconnect(reconcile);
// Real-time messages — route by task_id and dedupe by seq.
useWSEvent(
"task:message",
@@ -131,18 +176,21 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
}, [issueId]),
);
// Task end — drop the banner. The right-panel ExecutionLogSection
// will pick the same task back up under "Past runs" via its own WS
// invalidate path.
// Task end — optimistically drop the banner for snappy UX, then
// reconcile to also clean up sibling tasks whose own end events may
// have been missed (e.g. a sequence of tasks all ending during a WS
// reconnect window will only replay this one event when we resubscribe).
const handleTaskEnd = useCallback((payload: unknown) => {
const p = payload as { task_id: string; issue_id: string };
if (p.issue_id !== issueId) return;
setTaskStates((prev) => {
if (!prev.has(p.task_id)) return prev;
const next = new Map(prev);
next.delete(p.task_id);
return next;
});
}, [issueId]);
reconcile();
}, [issueId, reconcile]);
useWSEvent("task:completed", handleTaskEnd);
useWSEvent("task:failed", handleTaskEnd);
@@ -152,23 +200,13 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
// to both events matters because retry creates a queued child without
// emitting task:dispatch (only the daemon's claim does), so listening
// to dispatch alone leaves the banner stale during the queued window.
// The handler is idempotent (only inserts unseen task IDs), so it's
// safe to fire once per event even when both arrive in quick succession.
// reconcile is idempotent (per-task hydration guard) and also drops
// stale tasks, so it's safe to fire once per event.
const handleTaskActive = useCallback((payload: unknown) => {
const p = payload as { issue_id?: string };
if (p.issue_id && p.issue_id !== issueId) return;
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
setTaskStates((prev) => {
const next = new Map(prev);
for (const task of tasks) {
if (!next.has(task.id)) {
next.set(task.id, { task, items: [] });
}
}
return next;
});
}).catch(console.error);
}, [issueId]);
reconcile();
}, [issueId, reconcile]);
useWSEvent("task:queued", handleTaskActive);
useWSEvent("task:dispatch", handleTaskActive);