mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-29 18:39:17 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/98
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19d53052e9 | ||
|
|
b42bf71788 |
231
packages/views/issues/components/agent-live-card.test.tsx
Normal file
231
packages/views/issues/components/agent-live-card.test.tsx
Normal file
@@ -0,0 +1,231 @@
|
||||
import { useEffect } from "react";
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { act, render, screen, waitFor } from "@testing-library/react";
|
||||
import { I18nProvider } from "@multica/core/i18n/react";
|
||||
import type { AgentTask } from "@multica/core/types/agent";
|
||||
import enCommon from "../../locales/en/common.json";
|
||||
import enIssues from "../../locales/en/issues.json";
|
||||
|
||||
const TEST_RESOURCES = { en: { common: enCommon, issues: enIssues } };
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mocks
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Capture WS event handlers so the test can drive them directly. The card
|
||||
// subscribes to task:queued, task:dispatch, task:completed, task:failed,
|
||||
// task:cancelled, and task:message via useWSEvent. We mirror the real
|
||||
// hook's useEffect-based subscription so stale subscriptions clean up
|
||||
// across re-renders (otherwise every render would stack a duplicate
|
||||
// handler and one event would fan out into many reconcile calls).
|
||||
type EventHandler = (payload: unknown) => void;
|
||||
const wsHandlers = vi.hoisted(() => new Map<string, Set<EventHandler>>());
|
||||
const wsReconnectCallbacks = vi.hoisted(() => new Set<() => void>());
|
||||
|
||||
vi.mock("@multica/core/realtime", () => ({
|
||||
useWSEvent: (event: string, handler: EventHandler) => {
|
||||
useEffect(() => {
|
||||
const set = wsHandlers.get(event) ?? new Set<EventHandler>();
|
||||
set.add(handler);
|
||||
wsHandlers.set(event, set);
|
||||
return () => {
|
||||
set.delete(handler);
|
||||
};
|
||||
}, [event, handler]);
|
||||
},
|
||||
useWSReconnect: (cb: () => void) => {
|
||||
useEffect(() => {
|
||||
wsReconnectCallbacks.add(cb);
|
||||
return () => {
|
||||
wsReconnectCallbacks.delete(cb);
|
||||
};
|
||||
}, [cb]);
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@multica/core/workspace/hooks", () => ({
|
||||
useActorName: () => ({
|
||||
getActorName: (_: string, id: string) => (id ? `Agent ${id}` : "Agent"),
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../common/actor-avatar", () => ({
|
||||
ActorAvatar: ({ actorId }: { actorId: string }) => (
|
||||
<span data-testid="actor-avatar">{actorId}</span>
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock("../../common/task-transcript", async () => {
|
||||
const buildTimeline = vi.fn().mockReturnValue([]);
|
||||
return {
|
||||
TranscriptButton: () => <button data-testid="transcript-button">transcript</button>,
|
||||
buildTimeline,
|
||||
};
|
||||
});
|
||||
|
||||
const mockApi = vi.hoisted(() => ({
|
||||
getActiveTasksForIssue: vi.fn(),
|
||||
listTaskMessages: vi.fn(),
|
||||
cancelTask: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("@multica/core/api", () => ({
|
||||
api: mockApi,
|
||||
}));
|
||||
|
||||
vi.mock("sonner", () => ({
|
||||
toast: { error: vi.fn(), success: vi.fn() },
|
||||
}));
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
import { AgentLiveCard } from "./agent-live-card";
|
||||
|
||||
function makeTask(id: string): AgentTask {
|
||||
return {
|
||||
id,
|
||||
agent_id: "agent-1",
|
||||
runtime_id: "rt-1",
|
||||
issue_id: "issue-1",
|
||||
status: "running",
|
||||
priority: 0,
|
||||
dispatched_at: "2026-01-01T00:00:00Z",
|
||||
started_at: "2026-01-01T00:00:00Z",
|
||||
completed_at: null,
|
||||
result: null,
|
||||
error: null,
|
||||
created_at: "2026-01-01T00:00:00Z",
|
||||
};
|
||||
}
|
||||
|
||||
interface Deferred<T> {
|
||||
promise: Promise<T>;
|
||||
resolve: (value: T) => void;
|
||||
}
|
||||
|
||||
function deferred<T>(): Deferred<T> {
|
||||
let resolveFn!: (value: T) => void;
|
||||
const promise = new Promise<T>((res) => {
|
||||
resolveFn = res;
|
||||
});
|
||||
return { promise, resolve: resolveFn };
|
||||
}
|
||||
|
||||
function fireEvent(event: string, payload: unknown) {
|
||||
const handlers = wsHandlers.get(event) ?? [];
|
||||
for (const h of handlers) h(payload);
|
||||
}
|
||||
|
||||
function renderCard(issueId = "issue-1") {
|
||||
return render(
|
||||
<I18nProvider locale="en" resources={TEST_RESOURCES}>
|
||||
<AgentLiveCard issueId={issueId} />
|
||||
</I18nProvider>,
|
||||
);
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
wsHandlers.clear();
|
||||
wsReconnectCallbacks.clear();
|
||||
mockApi.getActiveTasksForIssue.mockReset();
|
||||
mockApi.listTaskMessages.mockReset();
|
||||
mockApi.listTaskMessages.mockResolvedValue([]);
|
||||
mockApi.cancelTask.mockReset();
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("AgentLiveCard reconcile race", () => {
|
||||
it("does not re-add a banner when an older active-task response resolves after a newer empty one", async () => {
|
||||
const mountFetch = deferred<{ tasks: AgentTask[] }>();
|
||||
const queuedFetch = deferred<{ tasks: AgentTask[] }>();
|
||||
const completedFetch = deferred<{ tasks: AgentTask[] }>();
|
||||
|
||||
// The component issues three reconciles in this test:
|
||||
// 1. mount
|
||||
// 2. task:queued
|
||||
// 3. task:completed (after optimistic delete)
|
||||
// We control the order they resolve to reproduce the GPT-Boy race.
|
||||
mockApi.getActiveTasksForIssue
|
||||
.mockReturnValueOnce(mountFetch.promise)
|
||||
.mockReturnValueOnce(queuedFetch.promise)
|
||||
.mockReturnValueOnce(completedFetch.promise);
|
||||
|
||||
renderCard();
|
||||
|
||||
// Mount call resolves with empty — no banner yet.
|
||||
await act(async () => {
|
||||
mountFetch.resolve({ tasks: [] });
|
||||
});
|
||||
expect(screen.queryByText(/is working/)).toBeNull();
|
||||
|
||||
// task:queued fires; reconcile A is now in flight (queuedFetch).
|
||||
act(() => {
|
||||
fireEvent("task:queued", { issue_id: "issue-1", task_id: "task-1" });
|
||||
});
|
||||
|
||||
// task:completed fires; handler optimistically deletes (no-op since
|
||||
// the banner isn't rendered yet) then issues reconcile B (completedFetch).
|
||||
act(() => {
|
||||
fireEvent("task:completed", { issue_id: "issue-1", task_id: "task-1" });
|
||||
});
|
||||
|
||||
// Reconcile B resolves first with empty list — server truth says no
|
||||
// active tasks. State is empty.
|
||||
await act(async () => {
|
||||
completedFetch.resolve({ tasks: [] });
|
||||
});
|
||||
expect(screen.queryByText(/is working/)).toBeNull();
|
||||
|
||||
// Reconcile A (older, slow) resolves last with a stale snapshot that
|
||||
// still includes the task. With the generation guard, this response
|
||||
// must be dropped. Without the guard, the banner would re-appear.
|
||||
await act(async () => {
|
||||
queuedFetch.resolve({ tasks: [makeTask("task-1")] });
|
||||
});
|
||||
|
||||
// The banner must NOT come back.
|
||||
expect(screen.queryByText(/is working/)).toBeNull();
|
||||
expect(mockApi.getActiveTasksForIssue).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("WS reconnect refetch removes a stale banner whose end event was lost", async () => {
|
||||
const mountFetch = deferred<{ tasks: AgentTask[] }>();
|
||||
const reconnectFetch = deferred<{ tasks: AgentTask[] }>();
|
||||
|
||||
mockApi.getActiveTasksForIssue
|
||||
.mockReturnValueOnce(mountFetch.promise)
|
||||
.mockReturnValueOnce(reconnectFetch.promise);
|
||||
|
||||
renderCard();
|
||||
|
||||
// Mount sees the task as active — banner shows.
|
||||
await act(async () => {
|
||||
mountFetch.resolve({ tasks: [makeTask("task-1")] });
|
||||
});
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/is working/)).toBeTruthy();
|
||||
});
|
||||
|
||||
// Simulate the WS dropping task:completed and then reconnecting.
|
||||
// The reconnect callback runs reconcile, which fetches and finds the
|
||||
// task is no longer active.
|
||||
expect(wsReconnectCallbacks.size).toBeGreaterThan(0);
|
||||
act(() => {
|
||||
for (const cb of wsReconnectCallbacks) cb();
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
reconnectFetch.resolve({ tasks: [] });
|
||||
});
|
||||
|
||||
// The banner self-heals.
|
||||
await waitFor(() => {
|
||||
expect(screen.queryByText(/is working/)).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -3,7 +3,7 @@
|
||||
import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { Bot, Loader2, Square } from "lucide-react";
|
||||
import { api } from "@multica/core/api";
|
||||
import { useWSEvent } from "@multica/core/realtime";
|
||||
import { useWSEvent, useWSReconnect } from "@multica/core/realtime";
|
||||
import type { TaskMessagePayload } from "@multica/core/types/events";
|
||||
import type { AgentTask } from "@multica/core/types/agent";
|
||||
import { toast } from "sonner";
|
||||
@@ -54,52 +54,97 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
|
||||
const { getActorName } = useActorName();
|
||||
const [taskStates, setTaskStates] = useState<Map<string, TaskState>>(new Map());
|
||||
const seenSeqs = useRef(new Set<string>());
|
||||
|
||||
// Fetch active tasks on mount
|
||||
const hydratedTaskIds = useRef(new Set<string>());
|
||||
const mountedRef = useRef(true);
|
||||
// Monotonic counter — each reconcile() call captures its issued seq and
|
||||
// only applies its response if it's still the latest issued. This stops
|
||||
// a slow getActiveTasksForIssue response from clobbering newer truth
|
||||
// (e.g. a stale "task is active" payload re-adding a banner that a
|
||||
// newer "tasks: []" response just cleared).
|
||||
const reconcileSeq = useRef(0);
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
|
||||
if (cancelled || tasks.length === 0) return;
|
||||
mountedRef.current = true;
|
||||
return () => { mountedRef.current = false; };
|
||||
}, []);
|
||||
|
||||
// Reconcile local state to server truth. Replaces taskStates with the
|
||||
// server's active set: tasks no longer active are dropped (this is what
|
||||
// self-heals a stale "is working" banner when a task:completed/failed/
|
||||
// cancelled event was lost during a WS reconnect window), and tasks
|
||||
// still active keep their accumulated TimelineItems so the live
|
||||
// TranscriptButton doesn't lose history. New tasks get a one-shot
|
||||
// listTaskMessages hydration to backfill any messages that landed
|
||||
// before the WS subscription saw them.
|
||||
const reconcile = useCallback(() => {
|
||||
const mySeq = ++reconcileSeq.current;
|
||||
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
|
||||
if (!mountedRef.current) return;
|
||||
// A newer reconcile was issued after this one — drop this response
|
||||
// unconditionally and let the latest request win, regardless of
|
||||
// resolution order. Without this guard, a slow A then a fast B can
|
||||
// resolve in B-then-A order and A re-adds tasks B already cleared.
|
||||
if (mySeq !== reconcileSeq.current) return;
|
||||
const activeIds = new Set(tasks.map((t) => t.id));
|
||||
|
||||
// Show cards immediately with empty timeline
|
||||
setTaskStates((prev) => {
|
||||
const next = new Map(prev);
|
||||
const next = new Map<string, TaskState>();
|
||||
for (const task of tasks) {
|
||||
if (!next.has(task.id)) {
|
||||
next.set(task.id, { task, items: [] });
|
||||
}
|
||||
const existing = prev.get(task.id);
|
||||
next.set(task.id, existing
|
||||
? { task, items: existing.items }
|
||||
: { task, items: [] });
|
||||
}
|
||||
return next;
|
||||
});
|
||||
|
||||
// Load messages per task in the background — these feed the live
|
||||
// TranscriptButton, not an inline timeline (timeline UI moved to
|
||||
// the right panel).
|
||||
// Drop bookkeeping for tasks that vanished, so a future re-dispatch
|
||||
// of the same id (very rare, but possible) re-hydrates cleanly.
|
||||
for (const key of Array.from(seenSeqs.current)) {
|
||||
const taskId = key.slice(0, key.indexOf(":"));
|
||||
if (!activeIds.has(taskId)) seenSeqs.current.delete(key);
|
||||
}
|
||||
for (const id of Array.from(hydratedTaskIds.current)) {
|
||||
if (!activeIds.has(id)) hydratedTaskIds.current.delete(id);
|
||||
}
|
||||
|
||||
// Hydrate messages for tasks we haven't fetched yet. Per-task guard
|
||||
// prevents duplicate fetches when reconcile fires repeatedly (mount
|
||||
// + reconnect + queued/dispatch can stack within a single tick).
|
||||
for (const task of tasks) {
|
||||
if (hydratedTaskIds.current.has(task.id)) continue;
|
||||
hydratedTaskIds.current.add(task.id);
|
||||
api.listTaskMessages(task.id).then((msgs) => {
|
||||
if (cancelled) return;
|
||||
if (!mountedRef.current) return;
|
||||
const timeline = buildTimeline(msgs);
|
||||
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
|
||||
setTaskStates((prev) => {
|
||||
const next = new Map(prev);
|
||||
const existing = next.get(task.id);
|
||||
if (existing) {
|
||||
const loadedSeqs = new Set(timeline.map((i) => i.seq));
|
||||
const wsOnly = existing.items.filter((i) => !loadedSeqs.has(i.seq));
|
||||
const merged = [...timeline, ...wsOnly].sort((a, b) => a.seq - b.seq);
|
||||
next.set(task.id, { task: existing.task, items: merged });
|
||||
} else {
|
||||
next.set(task.id, { task, items: timeline });
|
||||
}
|
||||
if (!existing) return prev;
|
||||
const loadedSeqs = new Set(timeline.map((i) => i.seq));
|
||||
const wsOnly = existing.items.filter((i) => !loadedSeqs.has(i.seq));
|
||||
const merged = [...timeline, ...wsOnly].sort((a, b) => a.seq - b.seq);
|
||||
next.set(task.id, { task: existing.task, items: merged });
|
||||
return next;
|
||||
});
|
||||
}).catch(console.error);
|
||||
}).catch((e) => {
|
||||
hydratedTaskIds.current.delete(task.id);
|
||||
console.error(e);
|
||||
});
|
||||
}
|
||||
}).catch(console.error);
|
||||
|
||||
return () => { cancelled = true; };
|
||||
}, [issueId]);
|
||||
|
||||
// Initial fetch on mount / issueId change.
|
||||
useEffect(() => {
|
||||
reconcile();
|
||||
}, [reconcile]);
|
||||
|
||||
// WS reconnect — anything that happened while we were offline (most
|
||||
// notably task:completed / task:failed / task:cancelled) won't replay,
|
||||
// so re-pull the truth and let reconcile drop any stale banners.
|
||||
useWSReconnect(reconcile);
|
||||
|
||||
// Real-time messages — route by task_id and dedupe by seq.
|
||||
useWSEvent(
|
||||
"task:message",
|
||||
@@ -131,18 +176,21 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
|
||||
}, [issueId]),
|
||||
);
|
||||
|
||||
// Task end — drop the banner. The right-panel ExecutionLogSection
|
||||
// will pick the same task back up under "Past runs" via its own WS
|
||||
// invalidate path.
|
||||
// Task end — optimistically drop the banner for snappy UX, then
|
||||
// reconcile to also clean up sibling tasks whose own end events may
|
||||
// have been missed (e.g. a sequence of tasks all ending during a WS
|
||||
// reconnect window will only replay this one event when we resubscribe).
|
||||
const handleTaskEnd = useCallback((payload: unknown) => {
|
||||
const p = payload as { task_id: string; issue_id: string };
|
||||
if (p.issue_id !== issueId) return;
|
||||
setTaskStates((prev) => {
|
||||
if (!prev.has(p.task_id)) return prev;
|
||||
const next = new Map(prev);
|
||||
next.delete(p.task_id);
|
||||
return next;
|
||||
});
|
||||
}, [issueId]);
|
||||
reconcile();
|
||||
}, [issueId, reconcile]);
|
||||
|
||||
useWSEvent("task:completed", handleTaskEnd);
|
||||
useWSEvent("task:failed", handleTaskEnd);
|
||||
@@ -152,23 +200,13 @@ export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
|
||||
// to both events matters because retry creates a queued child without
|
||||
// emitting task:dispatch (only the daemon's claim does), so listening
|
||||
// to dispatch alone leaves the banner stale during the queued window.
|
||||
// The handler is idempotent (only inserts unseen task IDs), so it's
|
||||
// safe to fire once per event even when both arrive in quick succession.
|
||||
// reconcile is idempotent (per-task hydration guard) and also drops
|
||||
// stale tasks, so it's safe to fire once per event.
|
||||
const handleTaskActive = useCallback((payload: unknown) => {
|
||||
const p = payload as { issue_id?: string };
|
||||
if (p.issue_id && p.issue_id !== issueId) return;
|
||||
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
|
||||
setTaskStates((prev) => {
|
||||
const next = new Map(prev);
|
||||
for (const task of tasks) {
|
||||
if (!next.has(task.id)) {
|
||||
next.set(task.id, { task, items: [] });
|
||||
}
|
||||
}
|
||||
return next;
|
||||
});
|
||||
}).catch(console.error);
|
||||
}, [issueId]);
|
||||
reconcile();
|
||||
}, [issueId, reconcile]);
|
||||
|
||||
useWSEvent("task:queued", handleTaskActive);
|
||||
useWSEvent("task:dispatch", handleTaskActive);
|
||||
|
||||
Reference in New Issue
Block a user