MUL-2256 fix(realtime): invalidate workspace queries on WSClient instance change (#2665)

* fix(realtime): invalidate workspace queries on WSClient instance change

When switching workspaces, the old WSClient is torn down and a new one
is created. Events emitted during the transition are lost because
onReconnect only fires for reconnections within the same instance.

Add an effect that tracks the WSClient instance via useRef and, on
detecting a non-initial new instance, invalidates all workspace-scoped
queries (same set as onReconnect). The first assignment is skipped to
avoid redundant refetches on initial mount.

Closes multica-ai/multica#2562

Co-authored-by: multica-agent <github@multica.ai>

* refactor(realtime): extract shared invalidation helper + add ws instance test

- Extract invalidateWorkspaceScopedQueries() to deduplicate the
  invalidation key list shared by onReconnect and ws-instance-change effects
- Add hook test covering: first ws skip, null gap no-op, new instance
  invalidates exactly once, same instance no re-invalidation

Addresses review nits from PR #2665.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
LinYushen
2026-05-15 13:37:48 +08:00
committed by GitHub
parent 35e9a7f0f6
commit 7c8cf929d1
3 changed files with 165 additions and 15 deletions

View File

@@ -108,8 +108,11 @@
},
"devDependencies": {
"@multica/tsconfig": "workspace:*",
"@testing-library/react": "catalog:",
"@types/react": "catalog:",
"jsdom": "catalog:",
"react": "catalog:",
"react-dom": "catalog:",
"typescript": "catalog:",
"vitest": "catalog:"
}

View File

@@ -0,0 +1,120 @@
/**
* @vitest-environment jsdom
*/
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { renderHook } from "@testing-library/react";
import type { ReactNode } from "react";
import { describe, expect, it, vi, beforeEach } from "vitest";
import type { WSClient } from "../api/ws-client";
import { useRealtimeSync, type RealtimeSyncStores } from "./use-realtime-sync";
vi.mock("../platform/workspace-storage", () => ({
getCurrentWsId: () => "ws-1",
getCurrentSlug: () => "test-ws",
}));
vi.mock("../paths", () => ({
useHasOnboarded: () => true,
resolvePostAuthDestination: () => "/",
}));
function createMockWs(): WSClient {
return {
on: vi.fn(() => () => {}),
onAny: vi.fn(() => () => {}),
onReconnect: vi.fn(() => () => {}),
} as unknown as WSClient;
}
function createStores(): RealtimeSyncStores {
return {
authStore: Object.assign(() => ({}), {
getState: () => ({ user: { id: "u1" } }),
subscribe: () => () => {},
setState: () => {},
destroy: () => {},
}),
} as unknown as RealtimeSyncStores;
}
function createWrapper(qc: QueryClient) {
return ({ children }: { children: ReactNode }) => (
<QueryClientProvider client={qc}>{children}</QueryClientProvider>
);
}
describe("useRealtimeSync — ws instance change", () => {
let qc: QueryClient;
let stores: RealtimeSyncStores;
let invalidateSpy: ReturnType<typeof vi.spyOn>;
beforeEach(() => {
qc = new QueryClient({ defaultOptions: { queries: { retry: false } } });
stores = createStores();
invalidateSpy = vi.spyOn(qc, "invalidateQueries");
});
it("skips invalidation on first non-null ws instance", () => {
const ws = createMockWs();
renderHook(() => useRealtimeSync(ws, stores), {
wrapper: createWrapper(qc),
});
// The main effect calls invalidateQueries for its own setup, but the
// ws-instance-change effect should NOT have fired invalidation.
// The only invalidateQueries calls should come from the main effect's
// event handlers, not from the instance-change effect.
// We verify by checking that no call was made with workspaceKeys.list()
// pattern from the instance-change path (it logs a specific message).
// Simpler: count calls — first mount with a ws should not trigger the
// workspace-scoped bulk invalidation.
expect(invalidateSpy).not.toHaveBeenCalled();
});
it("does not invalidate when ws goes from instance to null", () => {
const ws1 = createMockWs();
const { rerender } = renderHook(
({ ws }) => useRealtimeSync(ws, stores),
{ initialProps: { ws: ws1 as WSClient | null }, wrapper: createWrapper(qc) },
);
invalidateSpy.mockClear();
rerender({ ws: null });
expect(invalidateSpy).not.toHaveBeenCalled();
});
it("invalidates exactly once when a new ws instance appears after null gap", () => {
const ws1 = createMockWs();
const { rerender } = renderHook(
({ ws }) => useRealtimeSync(ws, stores),
{ initialProps: { ws: ws1 as WSClient | null }, wrapper: createWrapper(qc) },
);
// Simulate workspace switch: ws -> null -> new ws
invalidateSpy.mockClear();
rerender({ ws: null });
expect(invalidateSpy).not.toHaveBeenCalled();
const ws2 = createMockWs();
rerender({ ws: ws2 });
// Should have called invalidateQueries for all workspace-scoped keys
// (11 workspace-scoped + 1 workspaceKeys.list() = 12 calls)
expect(invalidateSpy).toHaveBeenCalledTimes(12);
});
it("does not re-invalidate when rerendered with the same ws instance", () => {
const ws1 = createMockWs();
const { rerender } = renderHook(
({ ws }) => useRealtimeSync(ws, stores),
{ initialProps: { ws: ws1 as WSClient | null }, wrapper: createWrapper(qc) },
);
invalidateSpy.mockClear();
// Rerender with same instance
rerender({ ws: ws1 });
expect(invalidateSpy).not.toHaveBeenCalled();
});
});

View File

@@ -107,6 +107,29 @@ export function applyChatDoneToCache(
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(sessionId) });
}
/**
* Invalidates all workspace-scoped queries. Used after reconnect and when a
* new WSClient instance is detected (workspace switch) to recover events
* missed while disconnected.
*/
function invalidateWorkspaceScopedQueries(qc: QueryClient): void {
const wsId = getCurrentWsId();
if (wsId) {
qc.invalidateQueries({ queryKey: issueKeys.all(wsId) });
qc.invalidateQueries({ queryKey: inboxKeys.all(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.agents(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.members(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.skills(wsId) });
qc.invalidateQueries({ queryKey: projectKeys.all(wsId) });
qc.invalidateQueries({ queryKey: runtimeKeys.all(wsId) });
qc.invalidateQueries({ queryKey: autopilotKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentTaskSnapshotKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentActivityKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentRunCountsKeys.all(wsId) });
}
qc.invalidateQueries({ queryKey: workspaceKeys.list() });
}
export interface RealtimeSyncStores {
authStore: UseBoundStore<StoreApi<AuthState>>;
}
@@ -833,21 +856,7 @@ export function useRealtimeSync(
const unsub = ws.onReconnect(async () => {
logger.info("reconnected, refetching all data");
try {
const wsId = getCurrentWsId();
if (wsId) {
qc.invalidateQueries({ queryKey: issueKeys.all(wsId) });
qc.invalidateQueries({ queryKey: inboxKeys.all(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.agents(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.members(wsId) });
qc.invalidateQueries({ queryKey: workspaceKeys.skills(wsId) });
qc.invalidateQueries({ queryKey: projectKeys.all(wsId) });
qc.invalidateQueries({ queryKey: runtimeKeys.all(wsId) });
qc.invalidateQueries({ queryKey: autopilotKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentTaskSnapshotKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentActivityKeys.all(wsId) });
qc.invalidateQueries({ queryKey: agentRunCountsKeys.all(wsId) });
}
qc.invalidateQueries({ queryKey: workspaceKeys.list() });
invalidateWorkspaceScopedQueries(qc);
} catch (e) {
logger.error("reconnect refetch failed", e);
}
@@ -855,4 +864,22 @@ export function useRealtimeSync(
return unsub;
}, [ws, qc]);
// New WSClient instance (workspace switch) -> invalidate workspace-scoped
// queries to recover events missed while the previous instance was torn down.
// Skips the initial assignment to avoid a redundant refetch on first mount.
const wsInstanceRef = useRef<WSClient | null>(null);
useEffect(() => {
if (!ws) return;
if (wsInstanceRef.current === null) {
// First non-null instance — store and skip invalidation.
wsInstanceRef.current = ws;
return;
}
if (wsInstanceRef.current === ws) return;
wsInstanceRef.current = ws;
logger.info("new WSClient instance detected, invalidating workspace queries");
invalidateWorkspaceScopedQueries(qc);
}, [ws, qc]);
}