mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-25 16:39:33 +02:00
Compare commits
7 Commits
agent/lamb
...
v0.2.21
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
32d61d018e | ||
|
|
51bc5a818f | ||
|
|
2dddfaa196 | ||
|
|
cbe7f2c886 | ||
|
|
1d1dedbf6e | ||
|
|
298ed75b1d | ||
|
|
47b5e38dc6 |
@@ -174,6 +174,22 @@ Daemon behavior is configured via flags or environment variables:
|
||||
| Device name | `--device-name` | `MULTICA_DAEMON_DEVICE_NAME` | hostname |
|
||||
| Runtime name | `--runtime-name` | `MULTICA_AGENT_RUNTIME_NAME` | `Local Agent` |
|
||||
| Workspaces root | — | `MULTICA_WORKSPACES_ROOT` | `~/multica_workspaces` |
|
||||
| GC enabled | — | `MULTICA_GC_ENABLED` | `true` (set `false`/`0` to disable) |
|
||||
| GC scan interval | — | `MULTICA_GC_INTERVAL` | `1h` |
|
||||
| GC TTL (done/cancelled issues) | — | `MULTICA_GC_TTL` | `24h` |
|
||||
| GC orphan TTL (no `.gc_meta.json`) | — | `MULTICA_GC_ORPHAN_TTL` | `72h` |
|
||||
| GC artifact TTL (open issues) | — | `MULTICA_GC_ARTIFACT_TTL` | `12h` (set `0` to disable) |
|
||||
| GC artifact patterns | — | `MULTICA_GC_ARTIFACT_PATTERNS` | `node_modules,.next,.turbo` |
|
||||
|
||||
#### Workspace garbage collection
|
||||
|
||||
The daemon periodically scans `MULTICA_WORKSPACES_ROOT` and reclaims disk space in three modes:
|
||||
|
||||
- **Full task cleanup** — when an issue's status is `done` or `cancelled` and has been idle for `MULTICA_GC_TTL`, the entire task directory is removed.
|
||||
- **Orphan cleanup** — task directories with no `.gc_meta.json` (e.g. left over from a daemon crash) are removed once they exceed `MULTICA_GC_ORPHAN_TTL`.
|
||||
- **Artifact-only cleanup** — when a task has been completed for at least `MULTICA_GC_ARTIFACT_TTL` but the issue is still open, regenerable build outputs whose directory basename matches `MULTICA_GC_ARTIFACT_PATTERNS` are removed; the rest of the workdir (source, `.git`, `output/`, `logs/`, `.gc_meta.json`) is preserved so the agent can resume the same workdir on the next task.
|
||||
|
||||
Patterns are basename-only — entries containing `/` or `\` are silently dropped — and `.git` subtrees are never descended into. The default list (`node_modules`, `.next`, `.turbo`) is intentionally narrow; extend it per deployment if your repos consistently produce other regenerable directories (for example, `MULTICA_GC_ARTIFACT_PATTERNS=node_modules,.next,.turbo,target,__pycache__`). To disable artifact cleanup entirely, set `MULTICA_GC_ARTIFACT_TTL=0`.
|
||||
|
||||
Agent-specific overrides:
|
||||
|
||||
|
||||
@@ -110,22 +110,58 @@ function AppContent() {
|
||||
: undefined;
|
||||
useDaemonIPCBridge(activeWsId);
|
||||
|
||||
// Workspace presence wins over onboarding state: a user invited into an
|
||||
// existing workspace must enter that workspace, not be trapped in the
|
||||
// onboarding overlay just because their personal `onboarded_at` is null.
|
||||
// Onboarding is only the right destination when the account has zero
|
||||
// workspaces AND has never onboarded.
|
||||
// Pre-workspace overlay routing for desktop. Mirrors the web entry-point
|
||||
// judgment in callback / login:
|
||||
// un-onboarded:
|
||||
// pending invites on email → /invitations overlay
|
||||
// no invites → /onboarding overlay
|
||||
// already onboarded:
|
||||
// zero workspaces → /workspaces/new overlay
|
||||
// ≥1 workspaces → no overlay, fall through to dashboard
|
||||
//
|
||||
// The "un-onboarded but in workspace" state is now physically impossible
|
||||
// because backend transactions atomically set onboarded_at when a user
|
||||
// joins the `member` table. Anyone with workspaces is by definition
|
||||
// onboarded.
|
||||
useEffect(() => {
|
||||
if (!user || !workspaceListFetched) return;
|
||||
if (!user || !workspaceListFetched) return undefined;
|
||||
const { overlay, open } = useWindowOverlayStore.getState();
|
||||
if (overlay) return;
|
||||
if (wsCount > 0) return;
|
||||
if (overlay) return undefined;
|
||||
if (wsCount > 0) return undefined;
|
||||
if (!hasOnboarded) {
|
||||
open({ type: "onboarding" });
|
||||
} else {
|
||||
open({ type: "new-workspace" });
|
||||
// Look up pending invitations by email. Network blip is non-fatal —
|
||||
// fall through to onboarding so the user isn't stuck on a blank
|
||||
// window. The sidebar's pending-invitations dropdown will surface
|
||||
// missed invites later once they're onboarded.
|
||||
let cancelled = false;
|
||||
void api
|
||||
.listMyInvitations()
|
||||
.then((invites) => {
|
||||
if (cancelled) return;
|
||||
const { overlay: latestOverlay, open: latestOpen } =
|
||||
useWindowOverlayStore.getState();
|
||||
if (latestOverlay) return;
|
||||
if (invites.length > 0) {
|
||||
qc.setQueryData(workspaceKeys.myInvitations(), invites);
|
||||
latestOpen({ type: "invitations" });
|
||||
} else {
|
||||
latestOpen({ type: "onboarding" });
|
||||
}
|
||||
})
|
||||
.catch(() => {
|
||||
if (cancelled) return;
|
||||
const { overlay: latestOverlay, open: latestOpen } =
|
||||
useWindowOverlayStore.getState();
|
||||
if (latestOverlay) return;
|
||||
latestOpen({ type: "onboarding" });
|
||||
});
|
||||
return () => {
|
||||
cancelled = true;
|
||||
};
|
||||
}
|
||||
}, [user, workspaceListFetched, wsCount, workspaces, hasOnboarded]);
|
||||
open({ type: "new-workspace" });
|
||||
return undefined;
|
||||
}, [user, workspaceListFetched, wsCount, workspaces, hasOnboarded, qc]);
|
||||
|
||||
// Validate persisted tab state against the current user's workspace list,
|
||||
// and pick an active workspace if none is set. Runs in useLayoutEffect
|
||||
|
||||
@@ -65,5 +65,7 @@ function overlayPath(overlay: WindowOverlay): string {
|
||||
return "/onboarding";
|
||||
case "invite":
|
||||
return `/invite/${overlay.invitationId}`;
|
||||
case "invitations":
|
||||
return "/invitations";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { NewWorkspacePage } from "@multica/views/workspace/new-workspace-page";
|
||||
import { InvitePage } from "@multica/views/invite";
|
||||
import { InvitationsPage } from "@multica/views/invitations";
|
||||
import { OnboardingFlow } from "@multica/views/onboarding";
|
||||
import { useNavigation } from "@multica/views/navigation";
|
||||
import { paths } from "@multica/core/paths";
|
||||
@@ -58,6 +59,7 @@ function WindowOverlayInner() {
|
||||
onBack={onBack}
|
||||
/>
|
||||
)}
|
||||
{overlay.type === "invitations" && <InvitationsPage />}
|
||||
{overlay.type === "onboarding" && (
|
||||
<OnboardingFlow
|
||||
onComplete={(ws) => {
|
||||
|
||||
@@ -61,6 +61,13 @@ function tryRouteToOverlay(path: string, router?: DataRouter): boolean {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (path === "/invitations") {
|
||||
overlay.open({ type: "invitations" });
|
||||
if (router && router.state.location.pathname !== "/") {
|
||||
router.navigate("/", { replace: true });
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (path.startsWith("/invite/")) {
|
||||
let id = "";
|
||||
try {
|
||||
|
||||
@@ -15,6 +15,7 @@ import { create } from "zustand";
|
||||
export type WindowOverlay =
|
||||
| { type: "new-workspace" }
|
||||
| { type: "invite"; invitationId: string }
|
||||
| { type: "invitations" }
|
||||
| { type: "onboarding" };
|
||||
|
||||
interface WindowOverlayStore {
|
||||
|
||||
28
apps/web/app/(auth)/invitations/page.tsx
Normal file
28
apps/web/app/(auth)/invitations/page.tsx
Normal file
@@ -0,0 +1,28 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect } from "react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
import { paths } from "@multica/core/paths";
|
||||
import { InvitationsPage } from "@multica/views/invitations";
|
||||
|
||||
export default function InvitationsRoutePage() {
|
||||
const router = useRouter();
|
||||
const user = useAuthStore((s) => s.user);
|
||||
const isLoading = useAuthStore((s) => s.isLoading);
|
||||
|
||||
// Unauthenticated users have nowhere meaningful to land here — kick them
|
||||
// through login and bring them back. The login page will eventually run
|
||||
// its own listMyInvitations() check and route them here again.
|
||||
useEffect(() => {
|
||||
if (!isLoading && !user) {
|
||||
router.replace(
|
||||
`${paths.login()}?next=${encodeURIComponent(paths.invitations())}`,
|
||||
);
|
||||
}
|
||||
}, [isLoading, user, router]);
|
||||
|
||||
if (isLoading || !user) return null;
|
||||
|
||||
return <InvitationsPage />;
|
||||
}
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import { Suspense, useEffect, useState } from "react";
|
||||
import { useSearchParams, useRouter } from "next/navigation";
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { useQueryClient, type QueryClient } from "@tanstack/react-query";
|
||||
import { sanitizeNextUrl, useAuthStore } from "@multica/core/auth";
|
||||
import { useConfigStore } from "@multica/core/config";
|
||||
import { workspaceKeys } from "@multica/core/workspace/queries";
|
||||
@@ -27,6 +27,32 @@ import { setLoggedInCookie } from "@/features/auth/auth-cookie";
|
||||
import Link from "next/link";
|
||||
import { LoginPage, validateCliCallback } from "@multica/views/auth";
|
||||
|
||||
/**
|
||||
* Pick where a logged-in user with no explicit `?next=` should land.
|
||||
* Un-onboarded users with pending invitations on their email get routed to
|
||||
* the batch /invitations page; everyone else falls through to the standard
|
||||
* resolver. A network blip on listMyInvitations is non-fatal — we fall
|
||||
* through rather than trap the user on an error screen.
|
||||
*/
|
||||
async function resolveLoggedInDestination(
|
||||
qc: QueryClient,
|
||||
hasOnboarded: boolean,
|
||||
workspaces: Workspace[],
|
||||
): Promise<string> {
|
||||
if (!hasOnboarded) {
|
||||
try {
|
||||
const invites = await api.listMyInvitations();
|
||||
if (invites.length > 0) {
|
||||
qc.setQueryData(workspaceKeys.myInvitations(), invites);
|
||||
return paths.invitations();
|
||||
}
|
||||
} catch {
|
||||
// fall through
|
||||
}
|
||||
}
|
||||
return resolvePostAuthDestination(workspaces, hasOnboarded);
|
||||
}
|
||||
|
||||
function LoginPageContent() {
|
||||
const router = useRouter();
|
||||
const qc = useQueryClient();
|
||||
@@ -77,10 +103,12 @@ function LoginPageContent() {
|
||||
return;
|
||||
}
|
||||
const list = qc.getQueryData<Workspace[]>(workspaceKeys.list()) ?? [];
|
||||
router.replace(resolvePostAuthDestination(list, hasOnboarded));
|
||||
void resolveLoggedInDestination(qc, hasOnboarded, list).then((dest) =>
|
||||
router.replace(dest),
|
||||
);
|
||||
}, [isLoading, user, router, nextUrl, cliCallbackRaw, isDesktopHandoff, hasOnboarded, qc]);
|
||||
|
||||
const handleSuccess = () => {
|
||||
const handleSuccess = async () => {
|
||||
// Read the latest user snapshot directly — the closure's `hasOnboarded`
|
||||
// was captured before login completed and would be stale here.
|
||||
const currentUser = useAuthStore.getState().user;
|
||||
@@ -90,7 +118,8 @@ function LoginPageContent() {
|
||||
return;
|
||||
}
|
||||
const list = qc.getQueryData<Workspace[]>(workspaceKeys.list()) ?? [];
|
||||
router.push(resolvePostAuthDestination(list, onboarded));
|
||||
const dest = await resolveLoggedInDestination(qc, onboarded, list);
|
||||
router.push(dest);
|
||||
};
|
||||
|
||||
// Build Google OAuth state: encode platform + next URL so the callback
|
||||
|
||||
@@ -34,7 +34,6 @@ export default function OnboardingPage() {
|
||||
...workspaceListOptions(),
|
||||
enabled: !!user,
|
||||
});
|
||||
const hasWorkspaces = workspaces.length > 0;
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || !user) {
|
||||
@@ -42,15 +41,19 @@ export default function OnboardingPage() {
|
||||
return;
|
||||
}
|
||||
if (!workspacesFetched) return;
|
||||
// Bounce out if onboarding doesn't apply: either already onboarded, or
|
||||
// the user already has a workspace (e.g. arrived via invitation) — we
|
||||
// never trap an in-workspace user on the onboarding screen.
|
||||
if (hasOnboarded || hasWorkspaces) {
|
||||
// Bounce out only when onboarding genuinely doesn't apply: the user is
|
||||
// already onboarded. We deliberately don't bounce on `workspaces.length`
|
||||
// here — Step 3 of the flow creates a workspace mid-onboarding, and a
|
||||
// hasWorkspaces bounce here would kick the user out before Steps 4–5
|
||||
// (runtime / agent / first issue) can run. The new entry-point
|
||||
// judgment in callback / login handles "where should this user go on
|
||||
// login" so OnboardingPage no longer needs to second-guess it.
|
||||
if (hasOnboarded) {
|
||||
router.replace(resolvePostAuthDestination(workspaces, hasOnboarded));
|
||||
}
|
||||
}, [isLoading, user, hasOnboarded, workspacesFetched, workspaces, hasWorkspaces, router]);
|
||||
}, [isLoading, user, hasOnboarded, workspacesFetched, workspaces, router]);
|
||||
|
||||
if (isLoading || !user || hasOnboarded || hasWorkspaces) return null;
|
||||
if (isLoading || !user || hasOnboarded) return null;
|
||||
|
||||
// Layout: page owns its own scroll (root layout sets `body {
|
||||
// overflow: hidden }` for the app-shell convention). OnboardingFlow
|
||||
|
||||
@@ -2,13 +2,21 @@ import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { render, waitFor } from "@testing-library/react";
|
||||
import { paths } from "@multica/core/paths";
|
||||
|
||||
const { mockPush, mockSearchParams, mockLoginWithGoogle, mockListWorkspaces } =
|
||||
vi.hoisted(() => ({
|
||||
mockPush: vi.fn(),
|
||||
mockSearchParams: new URLSearchParams(),
|
||||
mockLoginWithGoogle: vi.fn(),
|
||||
mockListWorkspaces: vi.fn(),
|
||||
}));
|
||||
const {
|
||||
mockPush,
|
||||
mockSearchParams,
|
||||
mockLoginWithGoogle,
|
||||
mockListWorkspaces,
|
||||
mockListMyInvitations,
|
||||
mockSetQueryData,
|
||||
} = vi.hoisted(() => ({
|
||||
mockPush: vi.fn(),
|
||||
mockSearchParams: new URLSearchParams(),
|
||||
mockLoginWithGoogle: vi.fn(),
|
||||
mockListWorkspaces: vi.fn(),
|
||||
mockListMyInvitations: vi.fn(),
|
||||
mockSetQueryData: vi.fn(),
|
||||
}));
|
||||
|
||||
const makeUser = (overrides: Partial<{ onboarded_at: string | null }> = {}) => ({
|
||||
id: "user-1",
|
||||
@@ -28,7 +36,7 @@ vi.mock("next/navigation", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("@tanstack/react-query", () => ({
|
||||
useQueryClient: () => ({ setQueryData: vi.fn() }),
|
||||
useQueryClient: () => ({ setQueryData: mockSetQueryData }),
|
||||
}));
|
||||
|
||||
// Preserve the real sanitizeNextUrl so the "drop unsafe ?next=" behavior is
|
||||
@@ -46,12 +54,16 @@ vi.mock("@multica/core/auth", async () => {
|
||||
});
|
||||
|
||||
vi.mock("@multica/core/workspace/queries", () => ({
|
||||
workspaceKeys: { list: () => ["workspaces"] },
|
||||
workspaceKeys: {
|
||||
list: () => ["workspaces"],
|
||||
myInvitations: () => ["invitations", "mine"],
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@multica/core/api", () => ({
|
||||
api: {
|
||||
listWorkspaces: mockListWorkspaces,
|
||||
listMyInvitations: mockListMyInvitations,
|
||||
googleLogin: vi.fn(),
|
||||
},
|
||||
}));
|
||||
@@ -69,6 +81,7 @@ describe("CallbackPage", () => {
|
||||
mockSearchParams.set("code", "test-code");
|
||||
mockLoginWithGoogle.mockResolvedValue(makeUser());
|
||||
mockListWorkspaces.mockResolvedValue([]);
|
||||
mockListMyInvitations.mockResolvedValue([]);
|
||||
});
|
||||
|
||||
it("unonboarded user honors a safe next= (e.g. /invite/{id}) so invitees aren't trapped", async () => {
|
||||
@@ -78,16 +91,39 @@ describe("CallbackPage", () => {
|
||||
expect(mockPush).toHaveBeenCalledWith("/invite/abc123");
|
||||
});
|
||||
expect(mockPush).not.toHaveBeenCalledWith(paths.onboarding());
|
||||
// nextUrl is a fast path — listMyInvitations should not be queried.
|
||||
expect(mockListMyInvitations).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("unonboarded user with no next= and zero workspaces lands on /onboarding", async () => {
|
||||
it("unonboarded user with no next= and no pending invitations lands on /onboarding", async () => {
|
||||
render(<CallbackPage />);
|
||||
await waitFor(() => {
|
||||
expect(mockPush).toHaveBeenCalledWith(paths.onboarding());
|
||||
});
|
||||
expect(mockListMyInvitations).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("unonboarded user with existing workspace lands in that workspace, not /onboarding", async () => {
|
||||
it("unonboarded user with pending invitations lands on /invitations", async () => {
|
||||
mockListMyInvitations.mockResolvedValue([
|
||||
{
|
||||
id: "inv-1",
|
||||
workspace_id: "ws-1",
|
||||
workspace_name: "Acme",
|
||||
role: "member",
|
||||
status: "pending",
|
||||
},
|
||||
]);
|
||||
render(<CallbackPage />);
|
||||
await waitFor(() => {
|
||||
expect(mockPush).toHaveBeenCalledWith(paths.invitations());
|
||||
});
|
||||
expect(mockPush).not.toHaveBeenCalledWith(paths.onboarding());
|
||||
});
|
||||
|
||||
it("onboarded user with workspace lands in that workspace", async () => {
|
||||
mockLoginWithGoogle.mockResolvedValue(
|
||||
makeUser({ onboarded_at: "2026-01-01T00:00:00Z" }),
|
||||
);
|
||||
mockListWorkspaces.mockResolvedValue([
|
||||
{
|
||||
id: "ws-1",
|
||||
@@ -106,7 +142,9 @@ describe("CallbackPage", () => {
|
||||
await waitFor(() => {
|
||||
expect(mockPush).toHaveBeenCalledWith(paths.workspace("acme").issues());
|
||||
});
|
||||
expect(mockPush).not.toHaveBeenCalledWith(paths.onboarding());
|
||||
// Already-onboarded users skip the listMyInvitations check; new invites
|
||||
// surface in the sidebar instead of the wall.
|
||||
expect(mockListMyInvitations).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("onboarded user ignores unsafe next= targets and lands on the default destination", async () => {
|
||||
@@ -135,4 +173,12 @@ describe("CallbackPage", () => {
|
||||
expect(mockPush).toHaveBeenCalledWith("/invite/abc123");
|
||||
});
|
||||
});
|
||||
|
||||
it("falls through to /onboarding when listMyInvitations errors", async () => {
|
||||
mockListMyInvitations.mockRejectedValue(new Error("network"));
|
||||
render(<CallbackPage />);
|
||||
await waitFor(() => {
|
||||
expect(mockPush).toHaveBeenCalledWith(paths.onboarding());
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -66,13 +66,42 @@ function CallbackContent() {
|
||||
const wsList = await api.listWorkspaces();
|
||||
qc.setQueryData(workspaceKeys.list(), wsList);
|
||||
const onboarded = loggedInUser.onboarded_at != null;
|
||||
// Workspace presence beats onboarding state: an invitee with zero
|
||||
// `onboarded_at` but a real workspace must land in that workspace,
|
||||
// not in the new-workspace wizard. A `next=` (e.g. /invite/<id>)
|
||||
// always wins so invite acceptance flows survive auth round-trips.
|
||||
router.push(
|
||||
nextUrl || resolvePostAuthDestination(wsList, onboarded),
|
||||
);
|
||||
|
||||
// 1. nextUrl wins: a `next=/invite/<id>` always survives the OAuth
|
||||
// round-trip — the user clicked a specific link and we should
|
||||
// honor exactly that destination.
|
||||
if (nextUrl) {
|
||||
router.push(nextUrl);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Un-onboarded users may have pending invitations on their
|
||||
// email even when no `next=` was carried (came from a fresh
|
||||
// login on app.multica.ai instead of clicking the email link,
|
||||
// or `state` was lost across the round-trip). Look them up by
|
||||
// email and route to the batch /invitations page if any.
|
||||
// Already-onboarded users skip this lookup — their new invites
|
||||
// surface in the sidebar dropdown, not as a forced wall.
|
||||
if (!onboarded) {
|
||||
try {
|
||||
const invites = await api.listMyInvitations();
|
||||
if (invites.length > 0) {
|
||||
qc.setQueryData(workspaceKeys.myInvitations(), invites);
|
||||
router.push(paths.invitations());
|
||||
return;
|
||||
}
|
||||
} catch {
|
||||
// Network blip on the invite lookup is non-fatal — fall through
|
||||
// to the normal post-auth destination so the user isn't stuck
|
||||
// on a blank callback screen. Worst case they land on
|
||||
// /onboarding and the sidebar will surface invites later.
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Default: hand off to the resolver (onboarding for first-timers,
|
||||
// first workspace for returning users, /workspaces/new for
|
||||
// onboarded users with zero workspaces).
|
||||
router.push(resolvePostAuthDestination(wsList, onboarded));
|
||||
})
|
||||
.catch((err) => {
|
||||
setError(err instanceof Error ? err.message : "Login failed");
|
||||
|
||||
@@ -283,6 +283,29 @@ export function createEnDict(allowSignup: boolean): LandingDict {
|
||||
fixes: "Bug Fixes",
|
||||
},
|
||||
entries: [
|
||||
{
|
||||
version: "0.2.21",
|
||||
date: "2026-04-30",
|
||||
title: "Quick Capture Overhaul, Mermaid Diagrams & Typed Project Resources",
|
||||
changes: [],
|
||||
features: [
|
||||
"Quick Capture replaces the old New Issue dialog — continuous-create mode, file uploads, and automatic enrichment from pasted URLs",
|
||||
"Mermaid diagrams render inline in markdown, with a fullscreen lightbox for complex graphs",
|
||||
"Projects can bind their own repo, separate from the workspace default",
|
||||
"Permission-aware UI across agents, comments, runtimes, and skills — actions you can't take are no longer offered",
|
||||
],
|
||||
improvements: [
|
||||
"Daemon `/tasks/claim` polling uses a Redis empty-claim fast-path, dropping idle DB load and reclaiming disk on long-open issues",
|
||||
"Multica Agent commits include a `Co-authored-by` trailer for proper Git attribution",
|
||||
"Desktop blocks Cmd+R / Ctrl+R / F5 from reloading the app and shows the real version in dev and Updates settings",
|
||||
],
|
||||
fixes: [
|
||||
"Quick Create no longer invents requirements beyond user input, and subscribes the requester to the issue it creates",
|
||||
"Inbox jumps straight to the targeted comment, and auto-archives when the issue is marked Done from the detail page",
|
||||
"Task rerun starts a fresh session and skips poisoned resume state",
|
||||
"Invitees land on their workspace after sign-in instead of being forced through `/onboarding`",
|
||||
],
|
||||
},
|
||||
{
|
||||
version: "0.2.20",
|
||||
date: "2026-04-29",
|
||||
|
||||
@@ -283,6 +283,29 @@ export function createZhDict(allowSignup: boolean): LandingDict {
|
||||
fixes: "问题修复",
|
||||
},
|
||||
entries: [
|
||||
{
|
||||
version: "0.2.21",
|
||||
date: "2026-04-30",
|
||||
title: "Quick Capture 全面升级、Mermaid 图表与 Typed Project Resources",
|
||||
changes: [],
|
||||
features: [
|
||||
"Quick Capture 取代旧的 New Issue 弹窗 —— 支持连续创建、文件上传,并能根据粘贴的 URL 自动丰富标题与描述",
|
||||
"Markdown 内联渲染 Mermaid 图表,复杂图支持全屏 lightbox",
|
||||
"Project 支持单独绑定 repo,无需依赖 workspace 默认配置",
|
||||
"Agent / 评论 / Runtime / Skill 全面接入权限感知 UI,没有权限的操作不再展示",
|
||||
],
|
||||
improvements: [
|
||||
"Daemon `/tasks/claim` 轮询走 Redis 空认领 fast-path,空闲态 DB 压力下降,长期 open 的 Issue 自动回收磁盘",
|
||||
"Multica Agent 的 Git 提交自动追加 `Co-authored-by` trailer,归属更清晰",
|
||||
"Desktop 拦截 Cmd+R / Ctrl+R / F5 防止意外刷新,开发模式与 Updates 设置中均展示真实版本号",
|
||||
],
|
||||
fixes: [
|
||||
"Quick Create 不再凭空脑补需求,并自动把发起人订阅到 Issue",
|
||||
"Inbox 点击通知后立即跳到目标评论;从 Issue 详情页 Mark as Done 时自动归档",
|
||||
"Task rerun 启动全新 session,跳过被污染的 resume 状态",
|
||||
"受邀成员登录后路由到所在 workspace,不再强制带去 `/onboarding`",
|
||||
],
|
||||
},
|
||||
{
|
||||
version: "0.2.20",
|
||||
date: "2026-04-29",
|
||||
|
||||
@@ -16,7 +16,8 @@ export type OnboardingCompletionPath =
|
||||
| "full" // Reached Step 5 (first_issue) with a runtime connected
|
||||
| "runtime_skipped" // Step 3 skipped (no runtime) but still completed
|
||||
| "cloud_waitlist" // Submitted the cloud waitlist form and skipped Step 3
|
||||
| "skip_existing"; // "I've done this before" from Welcome
|
||||
| "skip_existing" // "I've done this before" from Welcome
|
||||
| "invite_accept"; // Accepted at least one invite from /invitations
|
||||
|
||||
export type TeamSize = "solo" | "team" | "other";
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ export const paths = {
|
||||
login: () => "/login",
|
||||
newWorkspace: () => "/workspaces/new",
|
||||
invite: (id: string) => `/invite/${encode(id)}`,
|
||||
invitations: () => "/invitations",
|
||||
onboarding: () => "/onboarding",
|
||||
authCallback: () => "/auth/callback",
|
||||
root: () => "/",
|
||||
@@ -54,7 +55,7 @@ export type WorkspacePaths = ReturnType<typeof workspaceScoped>;
|
||||
// A path is global if it equals or begins with any of these.
|
||||
// Note: `/workspaces/` (trailing slash) is the prefix — `workspaces` is reserved,
|
||||
// so any path starting with `/workspaces/...` is system-owned, not user-owned.
|
||||
const GLOBAL_PREFIXES = ["/login", "/workspaces/", "/invite/", "/onboarding", "/auth/", "/logout", "/signup"];
|
||||
const GLOBAL_PREFIXES = ["/login", "/workspaces/", "/invite/", "/invitations", "/onboarding", "/auth/", "/logout", "/signup"];
|
||||
|
||||
export function isGlobalPath(path: string): boolean {
|
||||
return GLOBAL_PREFIXES.some((p) => path === p || path.startsWith(p));
|
||||
|
||||
@@ -20,6 +20,7 @@ export const RESERVED_SLUGS = new Set([
|
||||
"oauth",
|
||||
"callback",
|
||||
"invite",
|
||||
"invitations",
|
||||
"verify",
|
||||
"reset",
|
||||
"password",
|
||||
|
||||
@@ -19,24 +19,26 @@ function makeWs(slug: string): Workspace {
|
||||
}
|
||||
|
||||
describe("resolvePostAuthDestination", () => {
|
||||
it("has workspace → /<first.slug>/issues regardless of onboarded state", () => {
|
||||
it("!onboarded → /onboarding regardless of workspace count", () => {
|
||||
// Un-onboarded users are routed back to the onboarding flow. The
|
||||
// "un-onboarded but in workspace" state is now physically impossible
|
||||
// (backend invariant + migration 065 backfill), but the resolver still
|
||||
// does the right thing if it ever appears: send the user to onboarding
|
||||
// rather than dropping them into a workspace with `onboarded_at` null.
|
||||
expect(resolvePostAuthDestination([], false)).toBe(paths.onboarding());
|
||||
expect(resolvePostAuthDestination([makeWs("acme")], false)).toBe(
|
||||
paths.onboarding(),
|
||||
);
|
||||
});
|
||||
|
||||
it("onboarded + has workspace → /<first.slug>/issues", () => {
|
||||
const ws = [makeWs("acme"), makeWs("beta")];
|
||||
expect(resolvePostAuthDestination(ws, true)).toBe(
|
||||
paths.workspace("acme").issues(),
|
||||
);
|
||||
expect(resolvePostAuthDestination(ws, false)).toBe(
|
||||
paths.workspace("acme").issues(),
|
||||
);
|
||||
expect(resolvePostAuthDestination([makeWs("acme")], false)).toBe(
|
||||
paths.workspace("acme").issues(),
|
||||
);
|
||||
});
|
||||
|
||||
it("zero workspaces + !onboarded → /onboarding", () => {
|
||||
expect(resolvePostAuthDestination([], false)).toBe(paths.onboarding());
|
||||
});
|
||||
|
||||
it("zero workspaces + onboarded → /workspaces/new", () => {
|
||||
it("onboarded + zero workspaces → /workspaces/new", () => {
|
||||
expect(resolvePostAuthDestination([], true)).toBe(paths.newWorkspace());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,23 +4,34 @@ import { paths } from "./paths";
|
||||
|
||||
/**
|
||||
* Priority:
|
||||
* has workspace → /<first.slug>/issues
|
||||
* zero workspaces && !hasOnboarded → /onboarding
|
||||
* zero workspaces && hasOnboarded → /workspaces/new
|
||||
* !hasOnboarded → /onboarding
|
||||
* hasOnboarded && has workspace → /<first.slug>/issues
|
||||
* hasOnboarded && zero workspaces → /workspaces/new
|
||||
*
|
||||
* Workspace presence wins over onboarding state: a user invited into an
|
||||
* existing workspace must NOT be bounced into the new-workspace wizard
|
||||
* just because their personal `onboarded_at` is still null.
|
||||
* `onboarded_at` is the single source of truth for whether the user has
|
||||
* passed first-contact. Backend transactions (CreateWorkspace,
|
||||
* AcceptInvitation) atomically set this field whenever a user joins a
|
||||
* `member` row, so "has workspace but !onboarded" is now a
|
||||
* physically impossible state — see migration 065 for the existing-data
|
||||
* backfill that closed the door retroactively.
|
||||
*
|
||||
* Callers that need invitation-aware routing (callback / login) handle the
|
||||
* "un-onboarded with pending invites" branch themselves before calling
|
||||
* this resolver — this resolver only deals with the post-invite-check
|
||||
* destination.
|
||||
*/
|
||||
export function resolvePostAuthDestination(
|
||||
workspaces: Workspace[],
|
||||
hasOnboarded: boolean,
|
||||
): string {
|
||||
if (!hasOnboarded) {
|
||||
return paths.onboarding();
|
||||
}
|
||||
const first = workspaces[0];
|
||||
if (first) {
|
||||
return paths.workspace(first.slug).issues();
|
||||
}
|
||||
return hasOnboarded ? paths.newWorkspace() : paths.onboarding();
|
||||
return paths.newWorkspace();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
1
packages/views/invitations/index.ts
Normal file
1
packages/views/invitations/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { InvitationsPage } from "./invitations-page";
|
||||
170
packages/views/invitations/invitations-page.test.tsx
Normal file
170
packages/views/invitations/invitations-page.test.tsx
Normal file
@@ -0,0 +1,170 @@
|
||||
import { describe, expect, it, vi, beforeEach } from "vitest";
|
||||
import { render, screen, waitFor, fireEvent } from "@testing-library/react";
|
||||
import {
|
||||
QueryClient,
|
||||
QueryClientProvider,
|
||||
} from "@tanstack/react-query";
|
||||
|
||||
const {
|
||||
navigate,
|
||||
logout,
|
||||
refreshMe,
|
||||
acceptInvitation,
|
||||
markOnboardingComplete,
|
||||
listMyInvitations,
|
||||
listWorkspaces,
|
||||
} = vi.hoisted(() => ({
|
||||
navigate: vi.fn(),
|
||||
logout: vi.fn(),
|
||||
refreshMe: vi.fn(),
|
||||
acceptInvitation: vi.fn(),
|
||||
markOnboardingComplete: vi.fn(),
|
||||
listMyInvitations: vi.fn(),
|
||||
listWorkspaces: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../navigation", () => ({
|
||||
useNavigation: () => ({ push: navigate, replace: navigate }),
|
||||
}));
|
||||
|
||||
vi.mock("../auth", () => ({
|
||||
useLogout: () => logout,
|
||||
}));
|
||||
|
||||
vi.mock("../platform", () => ({
|
||||
DragStrip: () => null,
|
||||
}));
|
||||
|
||||
vi.mock("@multica/core/auth", () => ({
|
||||
useAuthStore: Object.assign(
|
||||
(selector?: (s: unknown) => unknown) => {
|
||||
const state = { refreshMe };
|
||||
return selector ? selector(state) : state;
|
||||
},
|
||||
{
|
||||
getState: () => ({ refreshMe }),
|
||||
},
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock("@multica/core/api", () => ({
|
||||
api: {
|
||||
acceptInvitation,
|
||||
markOnboardingComplete,
|
||||
listMyInvitations,
|
||||
listWorkspaces,
|
||||
},
|
||||
}));
|
||||
|
||||
import { InvitationsPage } from "./invitations-page";
|
||||
|
||||
function renderWithClient(client: QueryClient = new QueryClient()) {
|
||||
return render(
|
||||
<QueryClientProvider client={client}>
|
||||
<InvitationsPage />
|
||||
</QueryClientProvider>,
|
||||
);
|
||||
}
|
||||
|
||||
const mkInvite = (id: string, wsId: string, wsName: string) => ({
|
||||
id,
|
||||
workspace_id: wsId,
|
||||
inviter_id: "u-2",
|
||||
invitee_email: "x@example.com",
|
||||
invitee_user_id: null,
|
||||
role: "member" as const,
|
||||
status: "pending" as const,
|
||||
created_at: "",
|
||||
updated_at: "",
|
||||
expires_at: "",
|
||||
workspace_name: wsName,
|
||||
inviter_name: "Alice",
|
||||
});
|
||||
|
||||
const mkWs = (id: string, slug: string) => ({
|
||||
id,
|
||||
name: slug,
|
||||
slug,
|
||||
description: null,
|
||||
context: null,
|
||||
settings: {},
|
||||
repos: [],
|
||||
issue_prefix: slug.toUpperCase(),
|
||||
created_at: "",
|
||||
updated_at: "",
|
||||
});
|
||||
|
||||
describe("InvitationsPage", () => {
|
||||
beforeEach(() => {
|
||||
navigate.mockReset();
|
||||
logout.mockReset();
|
||||
refreshMe.mockReset();
|
||||
acceptInvitation.mockReset();
|
||||
markOnboardingComplete.mockReset();
|
||||
listMyInvitations.mockReset();
|
||||
listWorkspaces.mockReset();
|
||||
refreshMe.mockResolvedValue(undefined);
|
||||
acceptInvitation.mockResolvedValue({});
|
||||
markOnboardingComplete.mockResolvedValue({});
|
||||
});
|
||||
|
||||
it("renders pending invitations with workspace names", async () => {
|
||||
listMyInvitations.mockResolvedValue([
|
||||
mkInvite("inv-1", "ws-1", "Acme"),
|
||||
mkInvite("inv-2", "ws-2", "Beta Corp"),
|
||||
]);
|
||||
renderWithClient();
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText("Acme")).toBeInTheDocument();
|
||||
expect(screen.getByText("Beta Corp")).toBeInTheDocument();
|
||||
});
|
||||
});
|
||||
|
||||
it("with no selections, submitting routes to /onboarding", async () => {
|
||||
listMyInvitations.mockResolvedValue([mkInvite("inv-1", "ws-1", "Acme")]);
|
||||
renderWithClient();
|
||||
await waitFor(() => screen.getByText("Acme"));
|
||||
fireEvent.click(screen.getByRole("button", { name: /skip/i }));
|
||||
expect(navigate).toHaveBeenCalledWith("/onboarding");
|
||||
// Empty submit doesn't accept anything or touch onboarding state.
|
||||
expect(acceptInvitation).not.toHaveBeenCalled();
|
||||
expect(markOnboardingComplete).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("accepts selected invitations, marks onboarded, navigates to first ws", async () => {
|
||||
listMyInvitations.mockResolvedValue([
|
||||
mkInvite("inv-1", "ws-1", "Acme"),
|
||||
mkInvite("inv-2", "ws-2", "Beta"),
|
||||
]);
|
||||
listWorkspaces.mockResolvedValue([mkWs("ws-1", "acme"), mkWs("ws-2", "beta")]);
|
||||
renderWithClient();
|
||||
|
||||
await waitFor(() => screen.getByText("Acme"));
|
||||
// Select Acme via its label/checkbox row.
|
||||
fireEvent.click(screen.getByText("Acme"));
|
||||
|
||||
fireEvent.click(screen.getByRole("button", { name: /join 1 workspace/i }));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(acceptInvitation).toHaveBeenCalledWith("inv-1");
|
||||
expect(markOnboardingComplete).toHaveBeenCalledWith({
|
||||
completion_path: "invite_accept",
|
||||
});
|
||||
expect(refreshMe).toHaveBeenCalled();
|
||||
expect(navigate).toHaveBeenCalledWith("/acme/issues");
|
||||
});
|
||||
});
|
||||
|
||||
it("empty list falls through to onboarding via Continue button", async () => {
|
||||
listMyInvitations.mockResolvedValue([]);
|
||||
renderWithClient();
|
||||
|
||||
await waitFor(() =>
|
||||
screen.getByRole("button", { name: /continue to setup/i }),
|
||||
);
|
||||
fireEvent.click(
|
||||
screen.getByRole("button", { name: /continue to setup/i }),
|
||||
);
|
||||
expect(navigate).toHaveBeenCalledWith("/onboarding");
|
||||
});
|
||||
});
|
||||
280
packages/views/invitations/invitations-page.tsx
Normal file
280
packages/views/invitations/invitations-page.tsx
Normal file
@@ -0,0 +1,280 @@
|
||||
"use client";
|
||||
|
||||
import { useState, type ReactNode } from "react";
|
||||
import { useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import { api } from "@multica/core/api";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
import {
|
||||
myInvitationListOptions,
|
||||
workspaceKeys,
|
||||
workspaceListOptions,
|
||||
} from "@multica/core/workspace/queries";
|
||||
import { paths } from "@multica/core/paths";
|
||||
import type { Invitation } from "@multica/core/types";
|
||||
import { useNavigation } from "../navigation";
|
||||
import { useLogout } from "../auth";
|
||||
import { DragStrip } from "../platform";
|
||||
import { Button } from "@multica/ui/components/ui/button";
|
||||
import { Card, CardContent } from "@multica/ui/components/ui/card";
|
||||
import { Checkbox } from "@multica/ui/components/ui/checkbox";
|
||||
import { Skeleton } from "@multica/ui/components/ui/skeleton";
|
||||
import { LogOut, Mail, Users } from "lucide-react";
|
||||
|
||||
/**
|
||||
* Batch invitation handling page for first-contact users who land here
|
||||
* because callback / login detected pending invitations on their email.
|
||||
*
|
||||
* Design:
|
||||
* - This route is only reachable for un-onboarded users (the entry-point
|
||||
* judgment in callback/login routes already-onboarded users straight
|
||||
* into their workspace; new invites for those users surface in the
|
||||
* sidebar's pending-invitations dropdown instead).
|
||||
* - The user picks zero or more invitations to accept. "Submit" then:
|
||||
* • zero selected → continue to /onboarding
|
||||
* • ≥1 selected → accept each, mark onboarding complete, navigate
|
||||
* into the first accepted workspace.
|
||||
* - Unselected invitations are intentionally left as `pending` in the DB.
|
||||
* The user can later decline them from the sidebar; we don't auto-decline
|
||||
* here because closing/refreshing this page should not be a destructive
|
||||
* action.
|
||||
*/
|
||||
export function InvitationsPage() {
|
||||
const { push } = useNavigation();
|
||||
const qc = useQueryClient();
|
||||
const [selected, setSelected] = useState<Set<string>>(new Set());
|
||||
const [submitting, setSubmitting] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
const {
|
||||
data: invitations,
|
||||
isLoading,
|
||||
error: fetchError,
|
||||
refetch,
|
||||
} = useQuery(myInvitationListOptions());
|
||||
|
||||
const toggle = (id: string) => {
|
||||
setSelected((prev) => {
|
||||
const next = new Set(prev);
|
||||
if (next.has(id)) next.delete(id);
|
||||
else next.add(id);
|
||||
return next;
|
||||
});
|
||||
};
|
||||
|
||||
const handleSubmit = async () => {
|
||||
setError(null);
|
||||
|
||||
// Zero selected: hand off to onboarding. Pending invites stay pending and
|
||||
// can be picked up later from the sidebar.
|
||||
if (selected.size === 0) {
|
||||
push(paths.onboarding());
|
||||
return;
|
||||
}
|
||||
|
||||
setSubmitting(true);
|
||||
const acceptedIds: string[] = [];
|
||||
try {
|
||||
for (const id of selected) {
|
||||
await api.acceptInvitation(id);
|
||||
acceptedIds.push(id);
|
||||
}
|
||||
|
||||
// markOnboardingComplete is a frontend-side belt to the backend braces:
|
||||
// each AcceptInvitation transaction already sets onboarded_at via
|
||||
// MarkUserOnboarded, but calling this from the client makes sure the
|
||||
// returned `User` is freshly written and gives refreshMe something
|
||||
// canonical to read.
|
||||
await api.markOnboardingComplete({ completion_path: "invite_accept" });
|
||||
await useAuthStore.getState().refreshMe();
|
||||
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.myInvitations() });
|
||||
const wsList = await qc.fetchQuery({
|
||||
...workspaceListOptions(),
|
||||
staleTime: 0,
|
||||
});
|
||||
|
||||
const firstAcceptedInvite = invitations?.find(
|
||||
(inv) => inv.id === acceptedIds[0],
|
||||
);
|
||||
const targetWs = firstAcceptedInvite
|
||||
? wsList.find((w) => w.id === firstAcceptedInvite.workspace_id)
|
||||
: undefined;
|
||||
|
||||
// If we can't resolve the just-accepted workspace by id (shouldn't
|
||||
// happen — the backend just inserted the membership and we just
|
||||
// refetched), fall back to the resolver. Don't blindly route to
|
||||
// wsList[0]: that could teleport the user into an unrelated old
|
||||
// workspace they happen to also belong to.
|
||||
push(
|
||||
targetWs ? paths.workspace(targetWs.slug).issues() : paths.newWorkspace(),
|
||||
);
|
||||
} catch (e) {
|
||||
setError(
|
||||
e instanceof Error
|
||||
? e.message
|
||||
: "Failed to process invitations. Please try again.",
|
||||
);
|
||||
// Partial success: any accepts that landed before the failure ALREADY
|
||||
// set onboarded_at on the backend (the AcceptInvitation transaction
|
||||
// is atomic per invite). Refresh local user + workspace state so the
|
||||
// sidebar reflects the partial accept and the user isn't stuck with a
|
||||
// stale `onboarded_at == null` view. The next submit is safe — the
|
||||
// server returns 4xx on re-accept and the catch path will surface that.
|
||||
if (acceptedIds.length > 0) {
|
||||
await useAuthStore.getState().refreshMe().catch(() => {});
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.list() });
|
||||
}
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.myInvitations() });
|
||||
refetch();
|
||||
} finally {
|
||||
setSubmitting(false);
|
||||
}
|
||||
};
|
||||
|
||||
if (isLoading) {
|
||||
return (
|
||||
<InvitationsShell>
|
||||
<Card className="w-full max-w-lg">
|
||||
<CardContent className="flex flex-col gap-4 py-12">
|
||||
<Skeleton className="h-6 w-48" />
|
||||
<Skeleton className="h-4 w-72" />
|
||||
<Skeleton className="h-16 w-full" />
|
||||
<Skeleton className="h-16 w-full" />
|
||||
</CardContent>
|
||||
</Card>
|
||||
</InvitationsShell>
|
||||
);
|
||||
}
|
||||
|
||||
// Empty / error: send the user on to onboarding so they're never stuck.
|
||||
// Genuine fetch failure is rare; treating it as "no invites" is safer than
|
||||
// trapping the user on an error screen they can't act on.
|
||||
if (fetchError || !invitations || invitations.length === 0) {
|
||||
return (
|
||||
<InvitationsShell>
|
||||
<Card className="w-full max-w-md">
|
||||
<CardContent className="flex flex-col items-center gap-4 py-12">
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-muted">
|
||||
<Mail className="h-6 w-6 text-muted-foreground" />
|
||||
</div>
|
||||
<h2 className="text-lg font-semibold">No pending invitations</h2>
|
||||
<p className="text-sm text-muted-foreground text-center">
|
||||
Continue to set up your own workspace.
|
||||
</p>
|
||||
<Button onClick={() => push(paths.onboarding())}>
|
||||
Continue to setup
|
||||
</Button>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</InvitationsShell>
|
||||
);
|
||||
}
|
||||
|
||||
const submitLabel =
|
||||
selected.size === 0
|
||||
? "Skip and set up my own workspace"
|
||||
: selected.size === 1
|
||||
? "Join 1 workspace"
|
||||
: `Join ${selected.size} workspaces`;
|
||||
|
||||
return (
|
||||
<InvitationsShell>
|
||||
<Card className="w-full max-w-lg">
|
||||
<CardContent className="flex flex-col gap-6 py-10">
|
||||
<div className="flex flex-col items-center gap-3 text-center">
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-primary/10">
|
||||
<Users className="h-6 w-6 text-primary" />
|
||||
</div>
|
||||
<div className="space-y-1">
|
||||
<h2 className="text-xl font-semibold">
|
||||
You've been invited
|
||||
</h2>
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Pick the workspaces you want to join. You can always handle the
|
||||
rest later from the sidebar.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<ul className="flex flex-col gap-2">
|
||||
{invitations.map((inv) => (
|
||||
<InvitationRow
|
||||
key={inv.id}
|
||||
invitation={inv}
|
||||
checked={selected.has(inv.id)}
|
||||
onToggle={() => toggle(inv.id)}
|
||||
/>
|
||||
))}
|
||||
</ul>
|
||||
|
||||
<Button
|
||||
className="w-full"
|
||||
onClick={handleSubmit}
|
||||
disabled={submitting}
|
||||
>
|
||||
{submitting ? "Joining..." : submitLabel}
|
||||
</Button>
|
||||
|
||||
{error && (
|
||||
<p className="text-sm text-destructive text-center">{error}</p>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
</InvitationsShell>
|
||||
);
|
||||
}
|
||||
|
||||
function InvitationRow({
|
||||
invitation,
|
||||
checked,
|
||||
onToggle,
|
||||
}: {
|
||||
invitation: Invitation;
|
||||
checked: boolean;
|
||||
onToggle: () => void;
|
||||
}) {
|
||||
const inviter = invitation.inviter_name || invitation.inviter_email || "Someone";
|
||||
return (
|
||||
<li>
|
||||
<label
|
||||
className="flex cursor-pointer items-start gap-3 rounded-md border border-border bg-card p-4 hover:bg-accent/40"
|
||||
>
|
||||
<Checkbox
|
||||
checked={checked}
|
||||
onCheckedChange={onToggle}
|
||||
className="mt-1"
|
||||
/>
|
||||
<div className="flex-1 min-w-0 space-y-1">
|
||||
<div className="font-medium truncate">
|
||||
{invitation.workspace_name ?? "Workspace"}
|
||||
</div>
|
||||
<div className="text-xs text-muted-foreground truncate">
|
||||
{inviter} invited you as{" "}
|
||||
{invitation.role === "admin" ? "an admin" : "a member"}
|
||||
</div>
|
||||
</div>
|
||||
</label>
|
||||
</li>
|
||||
);
|
||||
}
|
||||
|
||||
function InvitationsShell({ children }: { children: ReactNode }) {
|
||||
const logout = useLogout();
|
||||
return (
|
||||
<div className="relative flex min-h-svh flex-col bg-background">
|
||||
<DragStrip />
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
className="absolute top-16 right-12 text-muted-foreground hover:text-destructive"
|
||||
onClick={logout}
|
||||
>
|
||||
<LogOut />
|
||||
Log out
|
||||
</Button>
|
||||
<div className="flex flex-1 flex-col items-center justify-center px-6 pb-12">
|
||||
{children}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
import { useState, type ReactNode } from "react";
|
||||
import { useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import { api } from "@multica/core/api";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
import {
|
||||
workspaceKeys,
|
||||
workspaceListOptions,
|
||||
@@ -62,6 +63,12 @@ export function InvitePage({ invitationId, onBack }: InvitePageProps) {
|
||||
setError(null);
|
||||
try {
|
||||
await api.acceptInvitation(invitationId);
|
||||
// Belt to the backend's braces: AcceptInvitation already sets
|
||||
// onboarded_at inside the same transaction, but explicitly calling
|
||||
// markOnboardingComplete + refreshMe here keeps local user state in
|
||||
// sync immediately so downstream guards don't see stale `null`.
|
||||
await api.markOnboardingComplete({ completion_path: "invite_accept" });
|
||||
await useAuthStore.getState().refreshMe();
|
||||
setDone("accepted");
|
||||
// Fetch the refreshed workspace list so we know the joined workspace's slug.
|
||||
const nextList = await qc.fetchQuery({
|
||||
|
||||
@@ -532,7 +532,7 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
|
||||
</span>
|
||||
</div>
|
||||
<div className="flex items-center gap-1 shrink-0">
|
||||
{issue.status !== "done" && issue.status !== "cancelled" && (
|
||||
{onDone && issue.status !== "done" && issue.status !== "cancelled" && (
|
||||
<Tooltip>
|
||||
<TooltipTrigger
|
||||
render={
|
||||
|
||||
@@ -21,14 +21,15 @@ import { useNavigation } from "../navigation";
|
||||
* - Not logged in → /login
|
||||
* - Logged in but workspace list not yet loaded → wait (don't bounce prematurely)
|
||||
* - Logged in but URL slug doesn't resolve to any workspace →
|
||||
* `resolvePostAuthDestination(list, hasOnboarded)` — first workspace if any,
|
||||
* onboarding for first-timers, /workspaces/new for returning users who
|
||||
* deleted out.
|
||||
* `resolvePostAuthDestination(list, hasOnboarded)`:
|
||||
* • un-onboarded → /onboarding
|
||||
* • onboarded with workspaces → first workspace
|
||||
* • onboarded with zero workspaces → /workspaces/new
|
||||
*
|
||||
* Onboarding is NOT a separate gate: a user invited into a workspace can have
|
||||
* `onboarded_at == null` yet legitimately belong inside a workspace, and must
|
||||
* not be bounced to the new-workspace wizard. The onboarding redirect only
|
||||
* fires from the resolver when the user has zero workspaces.
|
||||
* The "un-onboarded but in workspace" state is now physically impossible:
|
||||
* CreateWorkspace and AcceptInvitation both atomically set `onboarded_at`
|
||||
* inside the same transaction that inserts the `member` row.
|
||||
* Existing dirty rows from PR #1868 are cleaned by migration 065.
|
||||
*
|
||||
* We read the workspace list query state directly (rather than relying on
|
||||
* useCurrentWorkspace's null return) so we can distinguish "list loading"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import { useState } from "react";
|
||||
import { Loader2 } from "lucide-react";
|
||||
import { toast } from "sonner";
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import { api } from "@multica/core/api";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
import { useNavigation } from "@multica/views/navigation";
|
||||
@@ -12,7 +12,10 @@ import type { QuestionnaireAnswers } from "@multica/core/onboarding";
|
||||
import { pinKeys } from "@multica/core/pins";
|
||||
import { projectKeys } from "@multica/core/projects";
|
||||
import { issueKeys } from "@multica/core/issues/queries";
|
||||
import { workspaceKeys } from "@multica/core/workspace/queries";
|
||||
import {
|
||||
memberListOptions,
|
||||
workspaceKeys,
|
||||
} from "@multica/core/workspace/queries";
|
||||
import { Button } from "@multica/ui/components/ui/button";
|
||||
import {
|
||||
Dialog,
|
||||
@@ -50,11 +53,26 @@ export function StarterContentPrompt() {
|
||||
null,
|
||||
);
|
||||
|
||||
// Member-list fetch is the proxy we use to detect "did this user CREATE
|
||||
// this workspace, or were they invited into it?" An invitee is by definition
|
||||
// not the only member (the inviter is also there); a fresh self-created
|
||||
// workspace has exactly one member — the creator. `starter_content_state`
|
||||
// is a user-level field and can't represent (user, workspace) state directly,
|
||||
// so we layer this membership check on top until that field is migrated to
|
||||
// the `member` table. See follow-up issue: starter_content_state per-workspace.
|
||||
const { data: members = [] } = useQuery({
|
||||
...memberListOptions(workspace?.id ?? ""),
|
||||
enabled: !!workspace?.id,
|
||||
});
|
||||
const isSoloMember =
|
||||
members.length === 1 && members[0]?.user_id === user?.id;
|
||||
|
||||
const shouldShow =
|
||||
!!user &&
|
||||
!!workspace &&
|
||||
user.onboarded_at != null &&
|
||||
user.starter_content_state == null;
|
||||
user.starter_content_state == null &&
|
||||
isSoloMember;
|
||||
|
||||
if (!shouldShow || !workspace || !user) return null;
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
"./chat": "./chat/index.ts",
|
||||
"./settings": "./settings/index.ts",
|
||||
"./invite": "./invite/index.ts",
|
||||
"./invitations": "./invitations/index.ts",
|
||||
"./onboarding": "./onboarding/index.ts",
|
||||
"./platform": "./platform/index.ts"
|
||||
},
|
||||
|
||||
@@ -119,6 +119,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
h.PATCache = patCache
|
||||
h.DaemonTokenCache = daemonTokenCache
|
||||
|
||||
// Empty-claim cache: lets the daemon poll path skip a Postgres
|
||||
// scan when a recent check confirmed the runtime had no queued
|
||||
// task. Returns nil when rdb is nil — TaskService treats that
|
||||
// as "no cache, always hit DB" (existing behavior).
|
||||
h.TaskService.EmptyClaim = service.NewEmptyClaimCache(rdb)
|
||||
|
||||
// Wire WS heartbeat after stores are finalized so the WS path uses the
|
||||
// same (possibly Redis-backed) stores as the HTTP path.
|
||||
daemonHub.SetHeartbeatHandler(h.HandleDaemonWSHeartbeat)
|
||||
|
||||
@@ -24,6 +24,7 @@ const (
|
||||
OnboardingPathRuntimeSkipped = "runtime_skipped" // completed without connecting a runtime
|
||||
OnboardingPathCloudWaitlist = "cloud_waitlist" // completed via cloud waitlist soft exit
|
||||
OnboardingPathSkipExisting = "skip_existing" // "I've done this before" from welcome
|
||||
OnboardingPathInviteAccept = "invite_accept" // accepted at least one invitation from /invitations
|
||||
OnboardingPathUnknown = "unknown" // fallback when the server can't derive the path
|
||||
)
|
||||
|
||||
|
||||
@@ -25,8 +25,17 @@ const (
|
||||
DefaultGCInterval = 1 * time.Hour
|
||||
DefaultGCTTL = 24 * time.Hour // 1 day — AI-coding issues rarely stay open long
|
||||
DefaultGCOrphanTTL = 72 * time.Hour // 3 days — orphans with no meta (crashes, pre-GC leftovers)
|
||||
DefaultGCArtifactTTL = 12 * time.Hour // 12h — drop regenerable artifacts on completed but still-open issues
|
||||
)
|
||||
|
||||
// DefaultGCArtifactPatterns lists basename matches that the GC loop treats as
|
||||
// regenerable build artifacts. Kept conservative: only directories that are
|
||||
// always cheap to recreate (`pnpm install`, `next build`, `turbo build`). Things
|
||||
// like `dist/`, `build/`, `.cache/` or `.venv/` may legitimately hold source or
|
||||
// release output in some repos and are NOT included by default — set
|
||||
// MULTICA_GC_ARTIFACT_PATTERNS to extend the list per deployment.
|
||||
var DefaultGCArtifactPatterns = []string{"node_modules", ".next", ".turbo"}
|
||||
|
||||
// Config holds all daemon configuration.
|
||||
type Config struct {
|
||||
ServerBaseURL string
|
||||
@@ -44,8 +53,10 @@ type Config struct {
|
||||
MaxConcurrentTasks int // max tasks running in parallel (default: 20)
|
||||
GCEnabled bool // enable periodic workspace garbage collection (default: true)
|
||||
GCInterval time.Duration // how often the GC loop runs (default: 1h)
|
||||
GCTTL time.Duration // clean dirs whose issue is done/canceled and updated_at < now()-TTL (default: 24h)
|
||||
GCOrphanTTL time.Duration // clean orphan dirs with no meta older than this (default: 72h). Dirs whose issue returned 404 are cleaned immediately.
|
||||
GCTTL time.Duration // clean dirs whose issue is done/cancelled and updated_at < now()-TTL (default: 24h)
|
||||
GCOrphanTTL time.Duration // clean orphan dirs with no meta, or dirs whose issue gc-check returns 404, once they exceed this age (default: 72h). The 404 path uses the same TTL — a scoped-down token can't instantly wipe live workspaces.
|
||||
GCArtifactTTL time.Duration // when a task has been completed for at least this long but its issue is still open, drop regenerable artifacts (default: 12h, set 0 to disable)
|
||||
GCArtifactPatterns []string // basename patterns whose subtrees are removed during artifact cleanup (default: node_modules, .next, .turbo)
|
||||
PollInterval time.Duration
|
||||
HeartbeatInterval time.Duration
|
||||
AgentTimeout time.Duration
|
||||
@@ -318,6 +329,11 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
gcArtifactTTL, err := durationFromEnv("MULTICA_GC_ARTIFACT_TTL", DefaultGCArtifactTTL)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
gcArtifactPatterns := patternsFromEnv("MULTICA_GC_ARTIFACT_PATTERNS", DefaultGCArtifactPatterns)
|
||||
|
||||
return Config{
|
||||
ServerBaseURL: serverBaseURL,
|
||||
@@ -333,6 +349,8 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
GCInterval: gcInterval,
|
||||
GCTTL: gcTTL,
|
||||
GCOrphanTTL: gcOrphanTTL,
|
||||
GCArtifactTTL: gcArtifactTTL,
|
||||
GCArtifactPatterns: gcArtifactPatterns,
|
||||
HealthPort: healthPort,
|
||||
MaxConcurrentTasks: maxConcurrentTasks,
|
||||
PollInterval: pollInterval,
|
||||
@@ -368,6 +386,29 @@ func NormalizeServerBaseURL(raw string) (string, error) {
|
||||
return strings.TrimRight(u.String(), "/"), nil
|
||||
}
|
||||
|
||||
// patternsFromEnv reads a comma-separated list from env. Patterns containing
|
||||
// path separators are silently dropped — the GC artifact cleanup only matches
|
||||
// directory basenames, never paths, so a pattern like "foo/bar" is meaningless
|
||||
// and accepting it would just be a footgun.
|
||||
func patternsFromEnv(name string, defaults []string) []string {
|
||||
raw := strings.TrimSpace(os.Getenv(name))
|
||||
if raw == "" {
|
||||
out := make([]string, len(defaults))
|
||||
copy(out, defaults)
|
||||
return out
|
||||
}
|
||||
parts := strings.Split(raw, ",")
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" || strings.ContainsAny(p, "/\\") {
|
||||
continue
|
||||
}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func shellArgsFromEnv(name string) ([]string, error) {
|
||||
raw := strings.TrimSpace(os.Getenv(name))
|
||||
if raw == "" {
|
||||
|
||||
29
server/internal/daemon/config_test.go
Normal file
29
server/internal/daemon/config_test.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPatternsFromEnv_DefaultsWhenUnset(t *testing.T) {
|
||||
t.Setenv("MULTICA_GC_ARTIFACT_PATTERNS", "")
|
||||
defaults := []string{"node_modules", ".next", ".turbo"}
|
||||
got := patternsFromEnv("MULTICA_GC_ARTIFACT_PATTERNS", defaults)
|
||||
if !reflect.DeepEqual(got, defaults) {
|
||||
t.Fatalf("expected defaults %v, got %v", defaults, got)
|
||||
}
|
||||
// Ensure callers get a copy, not a shared backing array.
|
||||
got[0] = "mutated"
|
||||
if defaults[0] == "mutated" {
|
||||
t.Fatal("patternsFromEnv must not return a slice aliased with defaults")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatternsFromEnv_DropsSeparatorBearingEntries(t *testing.T) {
|
||||
t.Setenv("MULTICA_GC_ARTIFACT_PATTERNS", "node_modules, .next ,foo/bar, ../etc, ,target")
|
||||
got := patternsFromEnv("MULTICA_GC_ARTIFACT_PATTERNS", nil)
|
||||
want := []string{"node_modules", ".next", "target"}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("expected %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
@@ -66,6 +66,9 @@ type Daemon struct {
|
||||
restartBinary string // non-empty after a successful update; path to the new binary
|
||||
updating atomic.Bool // prevents concurrent update attempts
|
||||
activeTasks atomic.Int64 // number of tasks currently in handleTask; exposed via /health
|
||||
|
||||
activeEnvRootsMu sync.Mutex
|
||||
activeEnvRoots map[string]int // env root path -> reference count (handles reuse paths marked twice)
|
||||
}
|
||||
|
||||
// New creates a new Daemon instance.
|
||||
@@ -81,10 +84,11 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
repoCache: repocache.New(cacheRoot, logger),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSetCh: make(chan struct{}, 1),
|
||||
agentVersions: make(map[string]string),
|
||||
wsHBLastAck: make(map[string]time.Time),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSetCh: make(chan struct{}, 1),
|
||||
agentVersions: make(map[string]string),
|
||||
wsHBLastAck: make(map[string]time.Time),
|
||||
activeEnvRoots: make(map[string]int),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1251,6 +1255,21 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
QuickCreatePrompt: task.QuickCreatePrompt,
|
||||
}
|
||||
|
||||
// Mark candidate env roots as active before any env work so the GC loop
|
||||
// can't reclaim artifacts inside them mid-execution. We mark both the
|
||||
// predicted root for a fresh Prepare and the prior root for Reuse — they
|
||||
// usually differ (Reuse keeps the original task's directory).
|
||||
predictedRoot := execenv.PredictRootDir(d.cfg.WorkspacesRoot, task.WorkspaceID, task.ID)
|
||||
d.markActiveEnvRoot(predictedRoot)
|
||||
defer d.unmarkActiveEnvRoot(predictedRoot)
|
||||
if task.PriorWorkDir != "" {
|
||||
priorRoot := filepath.Dir(task.PriorWorkDir)
|
||||
if priorRoot != predictedRoot {
|
||||
d.markActiveEnvRoot(priorRoot)
|
||||
defer d.unmarkActiveEnvRoot(priorRoot)
|
||||
}
|
||||
}
|
||||
|
||||
// Try to reuse the workdir from a previous task on the same (agent, issue) pair.
|
||||
var env *execenv.Environment
|
||||
codexVersion := d.agentVersion("codex")
|
||||
@@ -1272,6 +1291,12 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
return TaskResult{}, fmt.Errorf("prepare execution environment: %w", err)
|
||||
}
|
||||
}
|
||||
// Belt-and-suspenders: also mark whatever root we ended up with, in case
|
||||
// future changes diverge from PredictRootDir.
|
||||
if env.RootDir != predictedRoot && env.RootDir != "" {
|
||||
d.markActiveEnvRoot(env.RootDir)
|
||||
defer d.unmarkActiveEnvRoot(env.RootDir)
|
||||
}
|
||||
|
||||
// Inject runtime-specific config (meta skill) so the agent discovers .agent_context/.
|
||||
if err := execenv.InjectRuntimeConfig(env.WorkDir, provider, taskCtx); err != nil {
|
||||
@@ -1798,6 +1823,38 @@ func convertProjectResourcesForEnv(resources []ProjectResourceData) []execenv.Pr
|
||||
return result
|
||||
}
|
||||
|
||||
// markActiveEnvRoot records that a task is currently using the given env root,
|
||||
// so the GC loop won't reclaim its artifacts mid-execution. Calls are
|
||||
// reference-counted so a reuse path marked twice (predicted + prior) only
|
||||
// becomes inactive after both unmark calls.
|
||||
func (d *Daemon) markActiveEnvRoot(envRoot string) {
|
||||
if envRoot == "" {
|
||||
return
|
||||
}
|
||||
d.activeEnvRootsMu.Lock()
|
||||
defer d.activeEnvRootsMu.Unlock()
|
||||
d.activeEnvRoots[envRoot]++
|
||||
}
|
||||
|
||||
func (d *Daemon) unmarkActiveEnvRoot(envRoot string) {
|
||||
if envRoot == "" {
|
||||
return
|
||||
}
|
||||
d.activeEnvRootsMu.Lock()
|
||||
defer d.activeEnvRootsMu.Unlock()
|
||||
if d.activeEnvRoots[envRoot] <= 1 {
|
||||
delete(d.activeEnvRoots, envRoot)
|
||||
return
|
||||
}
|
||||
d.activeEnvRoots[envRoot]--
|
||||
}
|
||||
|
||||
func (d *Daemon) isActiveEnvRoot(envRoot string) bool {
|
||||
d.activeEnvRootsMu.Lock()
|
||||
defer d.activeEnvRootsMu.Unlock()
|
||||
return d.activeEnvRoots[envRoot] > 0
|
||||
}
|
||||
|
||||
// shortID returns the first 8 characters of an ID for readable logs.
|
||||
func shortID(id string) string {
|
||||
if len(id) <= 8 {
|
||||
|
||||
@@ -87,6 +87,16 @@ type Environment struct {
|
||||
logger *slog.Logger // for cleanup logging
|
||||
}
|
||||
|
||||
// PredictRootDir returns the env root path that Prepare would create for the
|
||||
// given task, without performing any I/O. Callers use this to claim ownership
|
||||
// of the directory (e.g. against the GC loop) before Prepare/Reuse runs.
|
||||
func PredictRootDir(workspacesRoot, workspaceID, taskID string) string {
|
||||
if workspacesRoot == "" || workspaceID == "" || taskID == "" {
|
||||
return ""
|
||||
}
|
||||
return filepath.Join(workspacesRoot, workspaceID, shortID(taskID))
|
||||
}
|
||||
|
||||
// Prepare creates an isolated execution environment for a task.
|
||||
// The workdir starts empty (no repo checkouts). The agent checks out repos
|
||||
// on demand via `multica repo checkout <url>`.
|
||||
|
||||
@@ -31,6 +31,24 @@ func TestShortID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPredictRootDir(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := PredictRootDir("/root", "ws-uuid", "a1b2c3d4-e5f6-7890-abcd-ef1234567890")
|
||||
want := filepath.Join("/root", "ws-uuid", "a1b2c3d4")
|
||||
if got != want {
|
||||
t.Errorf("PredictRootDir = %q, want %q", got, want)
|
||||
}
|
||||
if got := PredictRootDir("", "ws", "task"); got != "" {
|
||||
t.Errorf("expected empty when workspaces root missing, got %q", got)
|
||||
}
|
||||
if got := PredictRootDir("/r", "", "task"); got != "" {
|
||||
t.Errorf("expected empty when workspace ID missing, got %q", got)
|
||||
}
|
||||
if got := PredictRootDir("/r", "ws", ""); got != "" {
|
||||
t.Errorf("expected empty when task ID missing, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSanitizeName(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
|
||||
@@ -14,13 +14,19 @@ import (
|
||||
)
|
||||
|
||||
// gcLoop periodically scans local workspace directories and removes those
|
||||
// whose issue is done/canceled and hasn't been updated within the configured TTL.
|
||||
// whose issue is done/cancelled and hasn't been updated within the configured TTL.
|
||||
func (d *Daemon) gcLoop(ctx context.Context) {
|
||||
if !d.cfg.GCEnabled {
|
||||
d.logger.Info("gc: disabled")
|
||||
return
|
||||
}
|
||||
d.logger.Info("gc: started", "interval", d.cfg.GCInterval, "ttl", d.cfg.GCTTL, "orphan_ttl", d.cfg.GCOrphanTTL)
|
||||
d.logger.Info("gc: started",
|
||||
"interval", d.cfg.GCInterval,
|
||||
"ttl", d.cfg.GCTTL,
|
||||
"orphan_ttl", d.cfg.GCOrphanTTL,
|
||||
"artifact_ttl", d.cfg.GCArtifactTTL,
|
||||
"artifact_patterns", d.cfg.GCArtifactPatterns,
|
||||
)
|
||||
|
||||
// Run once at startup after a short delay (let the daemon finish initializing).
|
||||
if err := sleepWithContext(ctx, 30*time.Second); err != nil {
|
||||
@@ -41,6 +47,17 @@ func (d *Daemon) gcLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// gcStats accumulates byte counts and per-pattern hit counts for one GC cycle.
|
||||
type gcStats struct {
|
||||
cleaned int // whole task dirs removed (issue done/cancelled)
|
||||
orphaned int // whole task dirs removed (no meta / unreachable issue)
|
||||
skipped int // task dirs left untouched
|
||||
artifactDirs int // task dirs that had at least one artifact reclaimed
|
||||
artifactRemoved int // count of removed artifact subdirs
|
||||
bytesReclaimed int64 // total bytes freed in this cycle
|
||||
byPattern map[string]int // basename -> reclaim count, for visibility
|
||||
}
|
||||
|
||||
// runGC performs a single GC scan across all workspace directories.
|
||||
func (d *Daemon) runGC(ctx context.Context) {
|
||||
root := d.cfg.WorkspacesRoot
|
||||
@@ -53,34 +70,40 @@ func (d *Daemon) runGC(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
var cleaned, skipped, orphaned int
|
||||
stats := &gcStats{byPattern: map[string]int{}}
|
||||
for _, wsEntry := range entries {
|
||||
if !wsEntry.IsDir() || wsEntry.Name() == ".repos" {
|
||||
continue
|
||||
}
|
||||
wsDir := filepath.Join(root, wsEntry.Name())
|
||||
c, s, o := d.gcWorkspace(ctx, wsDir)
|
||||
cleaned += c
|
||||
skipped += s
|
||||
orphaned += o
|
||||
d.gcWorkspace(ctx, wsDir, stats)
|
||||
}
|
||||
|
||||
// Prune stale worktree references from all bare repo caches.
|
||||
d.pruneRepoWorktrees(root)
|
||||
|
||||
if cleaned > 0 || orphaned > 0 {
|
||||
d.logger.Info("gc: cycle complete", "cleaned", cleaned, "orphaned", orphaned, "skipped", skipped)
|
||||
if stats.cleaned > 0 || stats.orphaned > 0 || stats.artifactDirs > 0 {
|
||||
d.logger.Info("gc: cycle complete",
|
||||
"cleaned", stats.cleaned,
|
||||
"orphaned", stats.orphaned,
|
||||
"skipped", stats.skipped,
|
||||
"artifact_dirs", stats.artifactDirs,
|
||||
"artifact_removed", stats.artifactRemoved,
|
||||
"bytes_reclaimed", stats.bytesReclaimed,
|
||||
"by_pattern", stats.byPattern,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// gcWorkspace scans task directories inside a single workspace directory.
|
||||
func (d *Daemon) gcWorkspace(ctx context.Context, wsDir string) (cleaned, skipped, orphaned int) {
|
||||
func (d *Daemon) gcWorkspace(ctx context.Context, wsDir string, stats *gcStats) {
|
||||
taskEntries, err := os.ReadDir(wsDir)
|
||||
if err != nil {
|
||||
d.logger.Warn("gc: read workspace dir failed", "dir", wsDir, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
cleanedHere := 0
|
||||
for _, entry := range taskEntries {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
@@ -92,36 +115,62 @@ func (d *Daemon) gcWorkspace(ctx context.Context, wsDir string) (cleaned, skippe
|
||||
action := d.shouldCleanTaskDir(ctx, taskDir)
|
||||
switch action {
|
||||
case gcActionClean:
|
||||
bytes := dirSize(taskDir)
|
||||
d.cleanTaskDir(taskDir)
|
||||
cleaned++
|
||||
stats.cleaned++
|
||||
stats.bytesReclaimed += bytes
|
||||
cleanedHere++
|
||||
case gcActionOrphan:
|
||||
bytes := dirSize(taskDir)
|
||||
d.cleanTaskDir(taskDir)
|
||||
orphaned++
|
||||
stats.orphaned++
|
||||
stats.bytesReclaimed += bytes
|
||||
cleanedHere++
|
||||
case gcActionCleanArtifacts:
|
||||
removed, bytes, perPattern := d.cleanTaskArtifacts(taskDir, d.cfg.GCArtifactPatterns)
|
||||
if removed > 0 {
|
||||
stats.artifactDirs++
|
||||
stats.artifactRemoved += removed
|
||||
stats.bytesReclaimed += bytes
|
||||
for k, v := range perPattern {
|
||||
stats.byPattern[k] += v
|
||||
}
|
||||
}
|
||||
stats.skipped++ // task dir itself preserved
|
||||
default:
|
||||
skipped++
|
||||
stats.skipped++
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the workspace directory itself if it's now empty.
|
||||
if cleaned+orphaned > 0 {
|
||||
if cleanedHere > 0 {
|
||||
remaining, _ := os.ReadDir(wsDir)
|
||||
if len(remaining) == 0 {
|
||||
os.Remove(wsDir)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type gcAction int
|
||||
|
||||
const (
|
||||
gcActionSkip gcAction = iota
|
||||
gcActionClean // issue is done/canceled and stale
|
||||
gcActionOrphan // no meta or unknown issue and dir is old
|
||||
gcActionSkip gcAction = iota
|
||||
gcActionClean // issue is done/cancelled and stale
|
||||
gcActionOrphan // no meta or unknown issue and dir is old
|
||||
gcActionCleanArtifacts // task completed long enough ago; drop regenerable artifacts only
|
||||
)
|
||||
|
||||
// shouldCleanTaskDir decides whether a task directory should be removed.
|
||||
func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcAction {
|
||||
// A task currently running on this env root must never be reclaimed —
|
||||
// not even on the done/cancelled or orphan-404 paths. A new comment on
|
||||
// an already-done issue can dispatch a follow-up task that reuses the
|
||||
// prior workdir without bumping the issue's updated_at, so the regular
|
||||
// TTL check alone wouldn't notice the resumed activity.
|
||||
if d.isActiveEnvRoot(taskDir) {
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
meta, err := execenv.ReadGCMeta(taskDir)
|
||||
if err != nil {
|
||||
// No .gc_meta.json — check mtime for orphan cleanup.
|
||||
@@ -158,7 +207,7 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
if (status.Status == "done" || status.Status == "canceled") &&
|
||||
if (status.Status == "done" || status.Status == "cancelled") &&
|
||||
time.Since(status.UpdatedAt) > d.cfg.GCTTL {
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
@@ -169,6 +218,22 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
return gcActionClean
|
||||
}
|
||||
|
||||
// Artifact-only cleanup: issue is still open but the task itself completed
|
||||
// long enough ago that its build artifacts are unlikely to be reused.
|
||||
// Active-root protection is handled by the early return above; skip here
|
||||
// only when artifact GC is disabled or the meta has no completed_at
|
||||
// (defensive — that means the task crashed before WriteGCMeta).
|
||||
if d.cfg.GCArtifactTTL > 0 && len(d.cfg.GCArtifactPatterns) > 0 &&
|
||||
!meta.CompletedAt.IsZero() && time.Since(meta.CompletedAt) > d.cfg.GCArtifactTTL {
|
||||
d.logger.Info("gc: eligible for artifact cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"issue", meta.IssueID,
|
||||
"status", status.Status,
|
||||
"completed_at", meta.CompletedAt.Format(time.RFC3339),
|
||||
)
|
||||
return gcActionCleanArtifacts
|
||||
}
|
||||
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
@@ -181,6 +246,114 @@ func (d *Daemon) cleanTaskDir(taskDir string) {
|
||||
}
|
||||
}
|
||||
|
||||
// cleanTaskArtifacts walks taskDir and deletes every directory whose basename
|
||||
// matches one of patterns. Returns (removedCount, bytesReclaimed, perPattern).
|
||||
//
|
||||
// Safety contract:
|
||||
// - patterns are basename-only; entries with a path separator are dropped.
|
||||
// - .git subtrees are never descended into, so the agent's git history stays
|
||||
// intact even if a pattern would otherwise match.
|
||||
// - symlinks are skipped entirely — neither the link nor its target is
|
||||
// touched, so a malicious or stale link can't redirect the GC outside the
|
||||
// workdir.
|
||||
// - every removal target is verified to live inside taskDir, so a tampered
|
||||
// .gc_meta.json can't trick the daemon into deleting outside its sandbox.
|
||||
func (d *Daemon) cleanTaskArtifacts(taskDir string, patterns []string) (removed int, bytes int64, perPattern map[string]int) {
|
||||
perPattern = map[string]int{}
|
||||
if taskDir == "" || len(patterns) == 0 {
|
||||
return
|
||||
}
|
||||
patternSet := make(map[string]struct{}, len(patterns))
|
||||
for _, p := range patterns {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" || strings.ContainsAny(p, "/\\") {
|
||||
continue
|
||||
}
|
||||
patternSet[p] = struct{}{}
|
||||
}
|
||||
if len(patternSet) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
absRoot, err := filepath.Abs(taskDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
walkErr := filepath.WalkDir(absRoot, func(path string, entry os.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return nil // best-effort — keep walking
|
||||
}
|
||||
if path == absRoot {
|
||||
return nil
|
||||
}
|
||||
if !entry.IsDir() {
|
||||
return nil
|
||||
}
|
||||
// Never descend into .git — preserves agent commits even if a pattern
|
||||
// like "objects" would otherwise match.
|
||||
if entry.Name() == ".git" {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
// Refuse to follow symlinked directories. WalkDir reports them as type
|
||||
// Dir on some platforms; lstat to be sure.
|
||||
info, statErr := os.Lstat(path)
|
||||
if statErr != nil {
|
||||
return nil
|
||||
}
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
if _, ok := patternSet[entry.Name()]; !ok {
|
||||
return nil
|
||||
}
|
||||
// Containment check: target must remain inside taskDir.
|
||||
rel, relErr := filepath.Rel(absRoot, path)
|
||||
if relErr != nil || rel == "" || rel == "." || strings.HasPrefix(rel, "..") {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
size := dirSize(path)
|
||||
if rmErr := os.RemoveAll(path); rmErr != nil {
|
||||
d.logger.Warn("gc: artifact remove failed", "path", path, "error", rmErr)
|
||||
return filepath.SkipDir
|
||||
}
|
||||
removed++
|
||||
bytes += size
|
||||
perPattern[entry.Name()]++
|
||||
d.logger.Info("gc: artifact removed", "path", path, "bytes", size)
|
||||
// Don't descend into the now-deleted subtree.
|
||||
return filepath.SkipDir
|
||||
})
|
||||
if walkErr != nil {
|
||||
d.logger.Warn("gc: artifact walk failed", "dir", taskDir, "error", walkErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// dirSize returns the total size of all regular files under root, in bytes.
|
||||
// Non-fatal: errors during the walk are ignored so callers can report a
|
||||
// best-effort byte count without aborting the whole GC cycle.
|
||||
func dirSize(root string) int64 {
|
||||
var total int64
|
||||
_ = filepath.WalkDir(root, func(_ string, entry os.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if entry.IsDir() {
|
||||
return nil
|
||||
}
|
||||
info, infoErr := entry.Info()
|
||||
if infoErr != nil {
|
||||
return nil
|
||||
}
|
||||
if info.Mode().IsRegular() {
|
||||
total += info.Size()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return total
|
||||
}
|
||||
|
||||
const gitCmdTimeout = 30 * time.Second
|
||||
|
||||
// pruneRepoWorktrees runs `git worktree prune` on all bare repos in the cache.
|
||||
|
||||
@@ -23,11 +23,13 @@ func newGCTestDaemon(t *testing.T, handler http.Handler) *Daemon {
|
||||
|
||||
root := t.TempDir()
|
||||
cfg := Config{
|
||||
WorkspacesRoot: root,
|
||||
GCEnabled: true,
|
||||
GCInterval: 1 * time.Hour,
|
||||
GCTTL: 5 * 24 * time.Hour,
|
||||
GCOrphanTTL: 30 * 24 * time.Hour,
|
||||
WorkspacesRoot: root,
|
||||
GCEnabled: true,
|
||||
GCInterval: 1 * time.Hour,
|
||||
GCTTL: 5 * 24 * time.Hour,
|
||||
GCOrphanTTL: 30 * 24 * time.Hour,
|
||||
GCArtifactTTL: 12 * time.Hour,
|
||||
GCArtifactPatterns: []string{"node_modules", ".next", ".turbo"},
|
||||
}
|
||||
d := New(cfg, slog.Default())
|
||||
d.client = NewClient(srv.URL)
|
||||
@@ -77,7 +79,7 @@ func TestShouldCleanTaskDir_DoneIssueOverTTL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_CanceledIssueOverTTL(t *testing.T) {
|
||||
func TestShouldCleanTaskDir_CancelledIssueOverTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "22222222-2222-2222-2222-222222222222"
|
||||
|
||||
@@ -85,7 +87,7 @@ func TestShouldCleanTaskDir_CanceledIssueOverTTL(t *testing.T) {
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "canceled",
|
||||
"status": "cancelled",
|
||||
"updated_at": time.Now().Add(-6 * 24 * time.Hour),
|
||||
})
|
||||
})
|
||||
@@ -292,13 +294,338 @@ func TestGcWorkspace_CleansEmptyWorkspaceDir(t *testing.T) {
|
||||
CompletedAt: time.Now(),
|
||||
})
|
||||
|
||||
d.gcWorkspace(context.Background(), wsDir)
|
||||
d.gcWorkspace(context.Background(), wsDir, &gcStats{byPattern: map[string]int{}})
|
||||
|
||||
if _, err := os.Stat(wsDir); !os.IsNotExist(err) {
|
||||
t.Fatal("empty workspace dir should be removed after all tasks cleaned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_OpenIssueArtifactCleanup(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "88888888-8888-8888-8888-888888888888"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "in_progress",
|
||||
"updated_at": time.Now(),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "open-task", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now().Add(-24 * time.Hour),
|
||||
})
|
||||
|
||||
action := d.shouldCleanTaskDir(context.Background(), taskDir)
|
||||
if action != gcActionCleanArtifacts {
|
||||
t.Fatalf("expected gcActionCleanArtifacts for old completed task on open issue, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_OpenIssueRecentTaskSkipped(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "88888888-8888-8888-8888-888888888889"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "in_progress",
|
||||
"updated_at": time.Now(),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "fresh-task", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now().Add(-1 * time.Minute),
|
||||
})
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip for fresh completed_at, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_ActiveEnvRootSkipsArtifactCleanup(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "88888888-8888-8888-8888-88888888888a"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "in_progress",
|
||||
"updated_at": time.Now(),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "active-task", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now().Add(-24 * time.Hour),
|
||||
})
|
||||
|
||||
d.markActiveEnvRoot(taskDir)
|
||||
defer d.unmarkActiveEnvRoot(taskDir)
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip while task is active, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_ActiveEnvRootSkipsFullCleanup(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "99999999-9999-9999-9999-999999999999"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
// Done long enough ago to satisfy GCTTL — this would normally return
|
||||
// gcActionClean. But the env root is in use (e.g. follow-up comment
|
||||
// dispatched a task that reuses the prior workdir), and CreateComment
|
||||
// does not bump issue.updated_at. Active-root guard must override.
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "done",
|
||||
"updated_at": time.Now().Add(-30 * 24 * time.Hour),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "active-done", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now().Add(-30 * 24 * time.Hour),
|
||||
})
|
||||
|
||||
d.markActiveEnvRoot(taskDir)
|
||||
defer d.unmarkActiveEnvRoot(taskDir)
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip on active env root with done+stale issue, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_ActiveEnvRootSkipsOrphan404(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "99999999-9999-9999-9999-99999999999a"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
w.Write([]byte(`{"error":"not found"}`))
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
d.cfg.GCOrphanTTL = 0 // would normally make this an immediate orphan delete
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "active-404", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now(),
|
||||
})
|
||||
|
||||
d.markActiveEnvRoot(taskDir)
|
||||
defer d.unmarkActiveEnvRoot(taskDir)
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip on active env root with 404 issue, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_ActiveEnvRootSkipsNoMetaOrphan(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newGCTestDaemon(t, http.NewServeMux())
|
||||
d.cfg.GCOrphanTTL = 0
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "active-no-meta", nil)
|
||||
|
||||
d.markActiveEnvRoot(taskDir)
|
||||
defer d.unmarkActiveEnvRoot(taskDir)
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip on active env root with no-meta orphan, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldCleanTaskDir_ArtifactTTLDisabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
issueID := "88888888-8888-8888-8888-88888888888b"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "in_progress",
|
||||
"updated_at": time.Now(),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
d.cfg.GCArtifactTTL = 0
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws1", "no-artifact-gc", &execenv.GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: "ws1",
|
||||
CompletedAt: time.Now().Add(-100 * 24 * time.Hour),
|
||||
})
|
||||
|
||||
if action := d.shouldCleanTaskDir(context.Background(), taskDir); action != gcActionSkip {
|
||||
t.Fatalf("expected gcActionSkip when artifact GC disabled, got %d", action)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanTaskArtifacts_RemovesOnlyMatchedDirs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newGCTestDaemon(t, http.NewServeMux())
|
||||
taskDir := t.TempDir()
|
||||
|
||||
// Create a synthetic project layout.
|
||||
mustMkdir := func(rel string) string {
|
||||
p := filepath.Join(taskDir, rel)
|
||||
if err := os.MkdirAll(p, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return p
|
||||
}
|
||||
mustWrite := func(rel string, content string) {
|
||||
p := filepath.Join(taskDir, rel)
|
||||
if err := os.MkdirAll(filepath.Dir(p), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(p, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
mustMkdir("workdir/repo/src")
|
||||
mustWrite("workdir/repo/src/index.ts", "console.log('hi')")
|
||||
mustMkdir("workdir/repo/.git/objects")
|
||||
mustWrite("workdir/repo/.git/objects/pack", "binary")
|
||||
mustMkdir("workdir/repo/node_modules/lodash")
|
||||
mustWrite("workdir/repo/node_modules/lodash/index.js", "module.exports = {}")
|
||||
mustMkdir("workdir/repo/.next/cache")
|
||||
mustWrite("workdir/repo/.next/cache/page.html", "<html></html>")
|
||||
mustMkdir("workdir/repo/.turbo")
|
||||
mustWrite("workdir/repo/.turbo/log", "trace")
|
||||
mustMkdir("workdir/repo/dist") // not in default patterns — must be preserved
|
||||
mustWrite("workdir/repo/dist/main.js", "compiled")
|
||||
mustWrite(".gc_meta.json", `{"issue_id":"x"}`)
|
||||
mustMkdir("output")
|
||||
mustWrite("output/result.txt", "done")
|
||||
|
||||
removed, bytes, perPattern := d.cleanTaskArtifacts(taskDir, []string{"node_modules", ".next", ".turbo"})
|
||||
|
||||
if removed != 3 {
|
||||
t.Fatalf("expected 3 artifact dirs removed, got %d", removed)
|
||||
}
|
||||
if bytes <= 0 {
|
||||
t.Fatalf("expected non-zero bytes reclaimed, got %d", bytes)
|
||||
}
|
||||
if perPattern["node_modules"] != 1 || perPattern[".next"] != 1 || perPattern[".turbo"] != 1 {
|
||||
t.Fatalf("unexpected per-pattern counts: %+v", perPattern)
|
||||
}
|
||||
|
||||
// Verify protected paths are intact.
|
||||
for _, rel := range []string{
|
||||
"workdir/repo/src/index.ts",
|
||||
"workdir/repo/.git/objects/pack",
|
||||
"workdir/repo/dist/main.js",
|
||||
"output/result.txt",
|
||||
".gc_meta.json",
|
||||
} {
|
||||
if _, err := os.Stat(filepath.Join(taskDir, rel)); err != nil {
|
||||
t.Errorf("expected %s to be preserved, got %v", rel, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify removed paths are gone.
|
||||
for _, rel := range []string{
|
||||
"workdir/repo/node_modules",
|
||||
"workdir/repo/.next",
|
||||
"workdir/repo/.turbo",
|
||||
} {
|
||||
if _, err := os.Stat(filepath.Join(taskDir, rel)); !os.IsNotExist(err) {
|
||||
t.Errorf("expected %s to be removed, stat err=%v", rel, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanTaskArtifacts_RejectsPatternsWithSeparators(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newGCTestDaemon(t, http.NewServeMux())
|
||||
taskDir := t.TempDir()
|
||||
if err := os.MkdirAll(filepath.Join(taskDir, "workdir", "node_modules"), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
removed, _, _ := d.cleanTaskArtifacts(taskDir, []string{"workdir/node_modules", "../etc"})
|
||||
if removed != 0 {
|
||||
t.Fatalf("expected 0 removals from separator-bearing patterns, got %d", removed)
|
||||
}
|
||||
if _, err := os.Stat(filepath.Join(taskDir, "workdir", "node_modules")); err != nil {
|
||||
t.Fatalf("dir should still exist, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanTaskArtifacts_DoesNotFollowSymlinks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newGCTestDaemon(t, http.NewServeMux())
|
||||
taskDir := t.TempDir()
|
||||
outside := t.TempDir()
|
||||
keepFile := filepath.Join(outside, "keep.txt")
|
||||
if err := os.WriteFile(keepFile, []byte("safe"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(taskDir, "workdir"), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
linkPath := filepath.Join(taskDir, "workdir", "node_modules")
|
||||
if err := os.Symlink(outside, linkPath); err != nil {
|
||||
t.Skipf("symlink not supported: %v", err)
|
||||
}
|
||||
|
||||
removed, _, _ := d.cleanTaskArtifacts(taskDir, []string{"node_modules"})
|
||||
if removed != 0 {
|
||||
t.Fatalf("expected 0 removals (symlinked node_modules), got %d", removed)
|
||||
}
|
||||
if _, err := os.Stat(keepFile); err != nil {
|
||||
t.Fatalf("symlinked target was deleted: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActiveEnvRootRefcount(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newGCTestDaemon(t, http.NewServeMux())
|
||||
root := "/tmp/fake/env"
|
||||
|
||||
if d.isActiveEnvRoot(root) {
|
||||
t.Fatal("expected inactive before mark")
|
||||
}
|
||||
d.markActiveEnvRoot(root)
|
||||
d.markActiveEnvRoot(root) // second mark from reuse path
|
||||
if !d.isActiveEnvRoot(root) {
|
||||
t.Fatal("expected active after mark")
|
||||
}
|
||||
d.unmarkActiveEnvRoot(root)
|
||||
if !d.isActiveEnvRoot(root) {
|
||||
t.Fatal("expected still active after one unmark")
|
||||
}
|
||||
d.unmarkActiveEnvRoot(root)
|
||||
if d.isActiveEnvRoot(root) {
|
||||
t.Fatal("expected inactive after both unmarks")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsBareRepo(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -414,6 +414,16 @@ func (h *Handler) AcceptInvitation(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Accepting an invite is the physical event that "completes" onboarding for an
|
||||
// invitee — atomic with CreateMember so the invariant
|
||||
// "member row exists ↔ onboarded_at != null" cannot be violated.
|
||||
// COALESCE in MarkUserOnboarded keeps this idempotent for users joining
|
||||
// additional workspaces after their first.
|
||||
if _, err := qtx.MarkUserOnboarded(r.Context(), user.ID); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to mark user onboarded")
|
||||
return
|
||||
}
|
||||
|
||||
if err := tx.Commit(r.Context()); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to accept invitation")
|
||||
return
|
||||
|
||||
@@ -1693,6 +1693,32 @@ func (h *Handler) BatchUpdateIssues(w http.ResponseWriter, r *http.Request) {
|
||||
json.Unmarshal(raw, &rawUpdates)
|
||||
}
|
||||
|
||||
// Short-circuit when no mutation field is present in `updates`. Without
|
||||
// this, the loop below runs N no-op UPDATEs (every if-guard skips, every
|
||||
// COALESCE preserves the existing value) and reports `{"updated": N}` —
|
||||
// the response cheerfully claims success while nothing changed. Most
|
||||
// real-world cases that hit this path are caller mistakes (status placed
|
||||
// at the top level, "update" misspelled as singular). Telling the truth
|
||||
// here — `{"updated": 0}` — keeps the wire shape stable while making the
|
||||
// count match reality. See multica-ai/multica#1660.
|
||||
hasMutation := req.Updates.Title != nil ||
|
||||
req.Updates.Description != nil ||
|
||||
req.Updates.Status != nil ||
|
||||
req.Updates.Priority != nil ||
|
||||
req.Updates.Position != nil
|
||||
if !hasMutation {
|
||||
for _, k := range []string{"assignee_type", "assignee_id", "due_date", "parent_issue_id", "project_id"} {
|
||||
if _, ok := rawUpdates[k]; ok {
|
||||
hasMutation = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !hasMutation {
|
||||
writeJSON(w, http.StatusOK, map[string]any{"updated": 0})
|
||||
return
|
||||
}
|
||||
|
||||
workspaceID := h.resolveWorkspaceID(r)
|
||||
wsUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace_id")
|
||||
if !ok {
|
||||
|
||||
149
server/internal/handler/issue_batch_test.go
Normal file
149
server/internal/handler/issue_batch_test.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestBatchUpdateNoMutationReturnsZero — regression for #1660.
|
||||
//
|
||||
// When the request payload has valid issue_ids but the "updates" field
|
||||
// is empty, missing, or doesn't decode any known mutation field, the
|
||||
// handler used to walk every issue, run a no-op UPDATE, and increment
|
||||
// `updated` for each one — returning {"updated": N} despite changing
|
||||
// nothing. Reporters saw 200 + a positive count and assumed the call
|
||||
// worked, then chased a phantom persistence bug.
|
||||
//
|
||||
// The fix is "tell the truth": when no mutation field is present, return
|
||||
// {"updated": 0} immediately so the count matches reality.
|
||||
func TestBatchUpdateNoMutationReturnsZero(t *testing.T) {
|
||||
// Two fresh issues so we can also assert no fields actually changed.
|
||||
a := createTestIssue(t, "BU-no-mut A", "todo", "low")
|
||||
b := createTestIssue(t, "BU-no-mut B", "todo", "low")
|
||||
t.Cleanup(func() { deleteTestIssue(t, a) })
|
||||
t.Cleanup(func() { deleteTestIssue(t, b) })
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
body map[string]any
|
||||
}{
|
||||
{
|
||||
desc: "updates_missing",
|
||||
// Most common reporter pattern: status at top level.
|
||||
body: map[string]any{"issue_ids": []string{a, b}, "status": "in_progress"},
|
||||
},
|
||||
{
|
||||
desc: "updates_empty_object",
|
||||
body: map[string]any{"issue_ids": []string{a, b}, "updates": map[string]any{}},
|
||||
},
|
||||
{
|
||||
desc: "updates_misnamed",
|
||||
// Singular "update" instead of plural "updates".
|
||||
body: map[string]any{"issue_ids": []string{a, b}, "update": map[string]any{"status": "done"}},
|
||||
},
|
||||
{
|
||||
desc: "updates_unknown_field_only",
|
||||
// Payload IS nested correctly, but every key inside `updates` is
|
||||
// unknown to the handler — same class of caller mistake as the
|
||||
// shapes above. hasMutation must stay false; behavior is already
|
||||
// correct, this case locks it in against future regressions.
|
||||
body: map[string]any{"issue_ids": []string{a, b}, "updates": map[string]any{"foo": "bar"}},
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("POST", "/api/issues/batch-update", tc.body)
|
||||
testHandler.BatchUpdateIssues(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Updated int `json:"updated"`
|
||||
}
|
||||
json.NewDecoder(w.Body).Decode(&resp)
|
||||
if resp.Updated != 0 {
|
||||
t.Errorf("expected updated=0 when no mutation field present, got %d", resp.Updated)
|
||||
}
|
||||
|
||||
// Belt and braces: confirm the issues weren't touched.
|
||||
for _, id := range []string{a, b} {
|
||||
gw := httptest.NewRecorder()
|
||||
gr := newRequest("GET", "/api/issues/"+id, nil)
|
||||
gr = withURLParam(gr, "id", id)
|
||||
testHandler.GetIssue(gw, gr)
|
||||
var got IssueResponse
|
||||
json.NewDecoder(gw.Body).Decode(&got)
|
||||
if got.Status != "todo" {
|
||||
t.Errorf("issue %s: status changed to %q despite no-mutation request", id, got.Status)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestBatchUpdateValidUpdatesPersistAndCount — positive case to lock in
|
||||
// happy-path behavior alongside the regression test above.
|
||||
func TestBatchUpdateValidUpdatesPersistAndCount(t *testing.T) {
|
||||
a := createTestIssue(t, "BU-ok A", "todo", "low")
|
||||
b := createTestIssue(t, "BU-ok B", "todo", "low")
|
||||
t.Cleanup(func() { deleteTestIssue(t, a) })
|
||||
t.Cleanup(func() { deleteTestIssue(t, b) })
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("POST", "/api/issues/batch-update", map[string]any{
|
||||
"issue_ids": []string{a, b},
|
||||
"updates": map[string]any{"status": "in_progress"},
|
||||
})
|
||||
testHandler.BatchUpdateIssues(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Updated int `json:"updated"`
|
||||
}
|
||||
json.NewDecoder(w.Body).Decode(&resp)
|
||||
if resp.Updated != 2 {
|
||||
t.Errorf("expected updated=2, got %d", resp.Updated)
|
||||
}
|
||||
for _, id := range []string{a, b} {
|
||||
gw := httptest.NewRecorder()
|
||||
gr := newRequest("GET", "/api/issues/"+id, nil)
|
||||
gr = withURLParam(gr, "id", id)
|
||||
testHandler.GetIssue(gw, gr)
|
||||
var got IssueResponse
|
||||
json.NewDecoder(gw.Body).Decode(&got)
|
||||
if got.Status != "in_progress" {
|
||||
t.Errorf("issue %s: expected status=in_progress, got %q", id, got.Status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createTestIssue is a small helper to keep the table-driven cases clean.
|
||||
// Returns the new issue's id; caller is responsible for cleanup.
|
||||
func createTestIssue(t *testing.T, title, status, priority string) string {
|
||||
t.Helper()
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{
|
||||
"title": title,
|
||||
"status": status,
|
||||
"priority": priority,
|
||||
})
|
||||
testHandler.CreateIssue(w, req)
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("CreateIssue %q: expected 201, got %d: %s", title, w.Code, w.Body.String())
|
||||
}
|
||||
var issue IssueResponse
|
||||
json.NewDecoder(w.Body).Decode(&issue)
|
||||
return issue.ID
|
||||
}
|
||||
|
||||
func deleteTestIssue(t *testing.T, id string) {
|
||||
t.Helper()
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/issues/"+id, nil)
|
||||
req = withURLParam(req, "id", id)
|
||||
testHandler.DeleteIssue(w, req)
|
||||
}
|
||||
@@ -52,6 +52,7 @@ var validCompletionPaths = map[string]struct{}{
|
||||
analytics.OnboardingPathRuntimeSkipped: {},
|
||||
analytics.OnboardingPathCloudWaitlist: {},
|
||||
analytics.OnboardingPathSkipExisting: {},
|
||||
analytics.OnboardingPathInviteAccept: {},
|
||||
}
|
||||
|
||||
// CompleteOnboarding marks the authenticated user as having completed
|
||||
|
||||
@@ -201,6 +201,14 @@ func (h *Handler) CreateWorkspace(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Becoming a workspace member is the physical event that "completes" onboarding —
|
||||
// keep this atomic with CreateMember so `member` and `onboarded_at`
|
||||
// can never disagree. COALESCE in MarkUserOnboarded keeps it idempotent.
|
||||
if _, err := qtx.MarkUserOnboarded(r.Context(), parseUUID(userID)); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to mark user onboarded")
|
||||
return
|
||||
}
|
||||
|
||||
if err := tx.Commit(r.Context()); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create workspace")
|
||||
return
|
||||
|
||||
@@ -12,19 +12,20 @@ package handler
|
||||
// (`/new-workspace`, `/create-team`) collide with common user workspace names.
|
||||
var reservedSlugs = map[string]bool{
|
||||
// Auth flow
|
||||
"login": true,
|
||||
"logout": true,
|
||||
"signin": true,
|
||||
"signout": true,
|
||||
"signup": true,
|
||||
"auth": true,
|
||||
"oauth": true,
|
||||
"callback": true,
|
||||
"invite": true,
|
||||
"verify": true,
|
||||
"reset": true,
|
||||
"password": true,
|
||||
"onboarding": true, // historical, kept reserved post-removal
|
||||
"login": true,
|
||||
"logout": true,
|
||||
"signin": true,
|
||||
"signout": true,
|
||||
"signup": true,
|
||||
"auth": true,
|
||||
"oauth": true,
|
||||
"callback": true,
|
||||
"invite": true,
|
||||
"invitations": true,
|
||||
"verify": true,
|
||||
"reset": true,
|
||||
"password": true,
|
||||
"onboarding": true, // historical, kept reserved post-removal
|
||||
|
||||
// Platform / marketing routes (current + likely-future)
|
||||
"api": true,
|
||||
|
||||
@@ -215,6 +215,13 @@ func (s *AutopilotService) dispatchRunOnly(ctx context.Context, ap db.Autopilot,
|
||||
*run = updatedRun
|
||||
}
|
||||
|
||||
// Drop the empty-claim cache and wake the daemon. dispatchRunOnly
|
||||
// inserts the task row directly via Queries.CreateAutopilotTask
|
||||
// (bypassing TaskService.Enqueue*), so without this the runtime
|
||||
// would not get a wakeup and any cached "empty" verdict would
|
||||
// stall the task until the TTL expired.
|
||||
s.TaskSvc.NotifyTaskEnqueued(task)
|
||||
|
||||
slog.Info("autopilot dispatched (run_only)",
|
||||
"autopilot_id", util.UUIDToString(ap.ID),
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
|
||||
198
server/internal/service/empty_claim_cache.go
Normal file
198
server/internal/service/empty_claim_cache.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// emptyClaimCacheKey holds a "no queued task" verdict tagged with the
|
||||
// per-runtime version it was observed under. emptyClaimVersionKey is
|
||||
// the per-runtime monotonic counter that any enqueue path bumps. The
|
||||
// verdict is trusted only when its value equals the current version,
|
||||
// which closes the race where a slow claim writes an empty verdict
|
||||
// AFTER an enqueue has already invalidated it:
|
||||
//
|
||||
// T1 claim: v0 := GET version
|
||||
// SELECT ... -> empty
|
||||
// (slow, e.g. GC pause)
|
||||
// T2 enqueue: INSERT row
|
||||
// INCR version (-> v1)
|
||||
// wakeup
|
||||
// T1 claim: SET empty = v0
|
||||
// T3 claim: v1' := GET version (== v1)
|
||||
// GET empty (== v0) -> v0 != v1, treat as miss -> SELECT
|
||||
//
|
||||
// Without the version tag T3 would have hit the stale empty key and
|
||||
// the just-queued task would sit idle until the empty key's TTL
|
||||
// expired. With it, the only window left is one extra DB SELECT per
|
||||
// runtime per concurrent enqueue, never a stalled task.
|
||||
const (
|
||||
emptyClaimCachePrefix = "mul:claim:runtime:empty:"
|
||||
emptyClaimVersionPrefix = "mul:claim:runtime:version:"
|
||||
)
|
||||
|
||||
// EmptyClaimCacheTTL bounds how long a cached "no queued task" verdict
|
||||
// stays believable. Choice tradeoff: too long means a missed
|
||||
// invalidation delays claim until the TTL expires; too short means the
|
||||
// fast path almost never triggers. 30s matches DefaultPollInterval so
|
||||
// the worst-case staleness is one extra poll cycle — already the
|
||||
// no-cache baseline — while still collapsing the steady-state warm
|
||||
// empty path to a single Redis GET pair.
|
||||
const EmptyClaimCacheTTL = 30 * time.Second
|
||||
|
||||
// emptyClaimVersionTTL keeps the version counter alive long enough that
|
||||
// a rarely-polled runtime doesn't reset to 0 between an enqueue's
|
||||
// INCR and the next claim's GET (which would let a stale tagged
|
||||
// empty key suddenly look valid again). Sliding TTL is renewed on
|
||||
// every Bump and every Get.
|
||||
const emptyClaimVersionTTL = 24 * time.Hour
|
||||
|
||||
// emptyClaimRedisTimeout caps every Redis call from this cache. Enqueue
|
||||
// paths use a background context so the cache outlives the request,
|
||||
// but a wedged Redis must not stall enqueue indefinitely — bound the
|
||||
// blast radius and degrade to "no cache" instead.
|
||||
const emptyClaimRedisTimeout = 250 * time.Millisecond
|
||||
|
||||
// EmptyClaimCache caches "this runtime currently has no queued task"
|
||||
// so the daemon's poll-based claim path can short-circuit before
|
||||
// hitting Postgres. Only the negative result is cached; positive
|
||||
// results always re-check the DB so concurrent claimers race fairly
|
||||
// in `ClaimAgentTask`'s `FOR UPDATE SKIP LOCKED`.
|
||||
//
|
||||
// The cache is invalidated synchronously on every enqueue (see
|
||||
// TaskService.notifyTaskAvailable). A nil *EmptyClaimCache is safe to
|
||||
// use — every method becomes a no-op or reports a cache miss, so
|
||||
// single-node dev / tests with no REDIS_URL degrade cleanly to direct
|
||||
// DB lookups.
|
||||
type EmptyClaimCache struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
// NewEmptyClaimCache returns a cache backed by rdb. Pass nil to
|
||||
// disable caching; the returned *EmptyClaimCache is safe to call but
|
||||
// never hits Redis.
|
||||
func NewEmptyClaimCache(rdb *redis.Client) *EmptyClaimCache {
|
||||
if rdb == nil {
|
||||
return nil
|
||||
}
|
||||
return &EmptyClaimCache{rdb: rdb}
|
||||
}
|
||||
|
||||
func emptyClaimKey(runtimeID string) string { return emptyClaimCachePrefix + runtimeID }
|
||||
func emptyClaimVersion(runtimeID string) string { return emptyClaimVersionPrefix + runtimeID }
|
||||
|
||||
func (c *EmptyClaimCache) bounded(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(ctx, emptyClaimRedisTimeout)
|
||||
}
|
||||
|
||||
// CurrentVersion returns the runtime's current invalidation version.
|
||||
// Callers MUST read this BEFORE the DB SELECT they are about to cache,
|
||||
// then pass it back to MarkEmpty so a concurrent Bump invalidates the
|
||||
// would-be cache write. Returns 0 (treated as "unknown") on cache miss
|
||||
// or any Redis error — the caller falls through to the DB path.
|
||||
//
|
||||
// The version key is read with a short Expire refresh so that a long
|
||||
// idle runtime does not let the counter expire and reset to 0 between
|
||||
// an enqueue's Bump and the next claim's MarkEmpty.
|
||||
func (c *EmptyClaimCache) CurrentVersion(ctx context.Context, runtimeID string) int64 {
|
||||
if c == nil || runtimeID == "" {
|
||||
return 0
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
v, err := c.rdb.Get(bctx, emptyClaimVersion(runtimeID)).Int64()
|
||||
if err != nil {
|
||||
if !errors.Is(err, redis.Nil) {
|
||||
slog.Warn("empty_claim_cache: version get failed; falling back to DB", "error", err)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
// Refresh TTL so the counter doesn't expire and reset on a low-
|
||||
// traffic runtime. Errors here are best-effort.
|
||||
c.rdb.Expire(bctx, emptyClaimVersion(runtimeID), emptyClaimVersionTTL)
|
||||
return v
|
||||
}
|
||||
|
||||
// IsEmpty returns true only when (a) an empty verdict is cached AND
|
||||
// (b) it carries the runtime's current version. A stale verdict
|
||||
// written before a concurrent Bump returns false so the caller falls
|
||||
// through to the DB.
|
||||
func (c *EmptyClaimCache) IsEmpty(ctx context.Context, runtimeID string) bool {
|
||||
if c == nil || runtimeID == "" {
|
||||
return false
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
// MGET returns []interface{} of either the value (string) or nil.
|
||||
vals, err := c.rdb.MGet(bctx, emptyClaimKey(runtimeID), emptyClaimVersion(runtimeID)).Result()
|
||||
if err != nil {
|
||||
slog.Warn("empty_claim_cache: mget failed; falling back to DB", "error", err)
|
||||
return false
|
||||
}
|
||||
if len(vals) != 2 || vals[0] == nil {
|
||||
return false
|
||||
}
|
||||
emptyVer, ok := vals[0].(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// A missing version key means "no enqueue has ever bumped this
|
||||
// runtime", which is logically version 0 — i.e. the same value
|
||||
// CurrentVersion returns on miss. A MarkEmpty written with v=0
|
||||
// must match here, otherwise the fast path would never trigger
|
||||
// for fresh runtimes.
|
||||
curVer := "0"
|
||||
if vals[1] != nil {
|
||||
if s, ok := vals[1].(string); ok {
|
||||
curVer = s
|
||||
}
|
||||
}
|
||||
return emptyVer == curVer
|
||||
}
|
||||
|
||||
// MarkEmpty stores the empty verdict tagged with observedVersion. The
|
||||
// verdict is later trusted only if observedVersion still equals the
|
||||
// current version (see IsEmpty). Pass the value returned by
|
||||
// CurrentVersion BEFORE the SELECT that confirmed the runtime was
|
||||
// empty; a concurrent Bump between the two will make the next reader
|
||||
// reject this entry, forcing a fresh DB check.
|
||||
//
|
||||
// Errors are logged and swallowed — a cache write failure is not a
|
||||
// request failure.
|
||||
func (c *EmptyClaimCache) MarkEmpty(ctx context.Context, runtimeID string, observedVersion int64) {
|
||||
if c == nil || runtimeID == "" {
|
||||
return
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
if err := c.rdb.Set(bctx, emptyClaimKey(runtimeID), strconv.FormatInt(observedVersion, 10), EmptyClaimCacheTTL).Err(); err != nil {
|
||||
slog.Warn("empty_claim_cache: set failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Bump increments the runtime's invalidation version. Called from
|
||||
// every enqueue path BEFORE the daemon WS wakeup so any verdict
|
||||
// written under the previous version is rejected on the next read,
|
||||
// without needing a separate DEL on the empty key.
|
||||
//
|
||||
// Errors are logged and swallowed — a Redis hiccup must not stop a
|
||||
// legitimate enqueue. The empty key still expires on its own TTL so
|
||||
// the worst-case stall is bounded.
|
||||
func (c *EmptyClaimCache) Bump(ctx context.Context, runtimeID string) {
|
||||
if c == nil || runtimeID == "" {
|
||||
return
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
pipe := c.rdb.Pipeline()
|
||||
pipe.Incr(bctx, emptyClaimVersion(runtimeID))
|
||||
pipe.Expire(bctx, emptyClaimVersion(runtimeID), emptyClaimVersionTTL)
|
||||
if _, err := pipe.Exec(bctx); err != nil {
|
||||
slog.Warn("empty_claim_cache: bump failed; entry will expire on TTL", "error", err)
|
||||
}
|
||||
}
|
||||
162
server/internal/service/empty_claim_cache_test.go
Normal file
162
server/internal/service/empty_claim_cache_test.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// newRedisTestClient mirrors the helper in internal/auth: connect to
|
||||
// REDIS_TEST_URL, flush, and skip when unset so `go test ./...` works
|
||||
// on a stock laptop without a Redis instance running.
|
||||
func newRedisTestClient(t *testing.T) *redis.Client {
|
||||
t.Helper()
|
||||
url := os.Getenv("REDIS_TEST_URL")
|
||||
if url == "" {
|
||||
t.Skip("REDIS_TEST_URL not set")
|
||||
}
|
||||
opts, err := redis.ParseURL(url)
|
||||
if err != nil {
|
||||
t.Fatalf("parse REDIS_TEST_URL: %v", err)
|
||||
}
|
||||
rdb := redis.NewClient(opts)
|
||||
ctx := context.Background()
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
|
||||
}
|
||||
if err := rdb.FlushDB(ctx).Err(); err != nil {
|
||||
t.Fatalf("flushdb: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
rdb.FlushDB(context.Background())
|
||||
rdb.Close()
|
||||
})
|
||||
return rdb
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_NilSafe(t *testing.T) {
|
||||
var c *EmptyClaimCache // nil
|
||||
ctx := context.Background()
|
||||
|
||||
if c.IsEmpty(ctx, "any-runtime") {
|
||||
t.Fatal("nil cache must report not-empty (cache miss)")
|
||||
}
|
||||
if v := c.CurrentVersion(ctx, "any-runtime"); v != 0 {
|
||||
t.Fatalf("nil cache CurrentVersion must be 0, got %d", v)
|
||||
}
|
||||
c.MarkEmpty(ctx, "any-runtime", 0)
|
||||
c.Bump(ctx, "any-runtime")
|
||||
}
|
||||
|
||||
func TestNewEmptyClaimCache_NilRedisReturnsNil(t *testing.T) {
|
||||
if c := NewEmptyClaimCache(nil); c != nil {
|
||||
t.Fatalf("NewEmptyClaimCache(nil) must return nil, got %#v", c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_EmptyRuntimeIDIsNoOp(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
c.MarkEmpty(ctx, "", 0)
|
||||
if c.IsEmpty(ctx, "") {
|
||||
t.Fatal("empty runtime ID must not hit cache")
|
||||
}
|
||||
c.Bump(ctx, "")
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_MarkAndIsEmptyVersionMatched(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
if c.IsEmpty(ctx, "rt-1") {
|
||||
t.Fatal("expected miss before mark")
|
||||
}
|
||||
v0 := c.CurrentVersion(ctx, "rt-1")
|
||||
c.MarkEmpty(ctx, "rt-1", v0)
|
||||
if !c.IsEmpty(ctx, "rt-1") {
|
||||
t.Fatal("expected hit when MarkEmpty version matches current")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyClaimCache_BumpInvalidatesPriorMark is the core race-fix
|
||||
// pin: an empty verdict written under v0 must be rejected once Bump
|
||||
// advances the version to v1, even though the empty key itself still
|
||||
// has TTL remaining.
|
||||
func TestEmptyClaimCache_BumpInvalidatesPriorMark(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
v0 := c.CurrentVersion(ctx, "rt-bump")
|
||||
c.MarkEmpty(ctx, "rt-bump", v0)
|
||||
if !c.IsEmpty(ctx, "rt-bump") {
|
||||
t.Fatal("precondition: empty verdict tagged with current version should hit")
|
||||
}
|
||||
|
||||
c.Bump(ctx, "rt-bump")
|
||||
if c.IsEmpty(ctx, "rt-bump") {
|
||||
t.Fatal("Bump must invalidate the prior empty verdict")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyClaimCache_StaleMarkRejected pins the GPT-Boy race: a slow
|
||||
// claim reads version v0, the SELECT sees no rows, an enqueue Bumps
|
||||
// to v1, then the slow claim writes MarkEmpty(v0). The next reader
|
||||
// must NOT trust this verdict.
|
||||
func TestEmptyClaimCache_StaleMarkRejected(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
// Slow claim samples version BEFORE select.
|
||||
v0 := c.CurrentVersion(ctx, "rt-race")
|
||||
|
||||
// Concurrent enqueue happens.
|
||||
c.Bump(ctx, "rt-race")
|
||||
|
||||
// Slow claim writes its empty verdict tagged with the stale v0.
|
||||
c.MarkEmpty(ctx, "rt-race", v0)
|
||||
|
||||
if c.IsEmpty(ctx, "rt-race") {
|
||||
t.Fatal("MarkEmpty written under a pre-Bump version must be rejected on read")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_TTL(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
c.MarkEmpty(ctx, "rt-ttl", 0)
|
||||
ttl, err := rdb.TTL(ctx, emptyClaimKey("rt-ttl")).Result()
|
||||
if err != nil {
|
||||
t.Fatalf("TTL: %v", err)
|
||||
}
|
||||
if ttl <= 0 || ttl > EmptyClaimCacheTTL+time.Second {
|
||||
t.Fatalf("unexpected empty-key TTL %v (want ~%v)", ttl, EmptyClaimCacheTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_RuntimeIsolation(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
vA := c.CurrentVersion(ctx, "rt-A")
|
||||
c.MarkEmpty(ctx, "rt-A", vA)
|
||||
if c.IsEmpty(ctx, "rt-B") {
|
||||
t.Fatal("marking rt-A must not affect rt-B")
|
||||
}
|
||||
c.Bump(ctx, "rt-A")
|
||||
vB := c.CurrentVersion(ctx, "rt-B")
|
||||
c.MarkEmpty(ctx, "rt-B", vB)
|
||||
if c.IsEmpty(ctx, "rt-A") {
|
||||
t.Fatal("marking rt-B must not affect rt-A")
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,12 @@ type TaskService struct {
|
||||
Hub *realtime.Hub
|
||||
Bus *events.Bus
|
||||
Wakeup TaskWakeupNotifier
|
||||
// EmptyClaim caches "this runtime has no queued task" so the daemon
|
||||
// poll path can skip a Postgres scan on the steady-state empty case.
|
||||
// Optional — a nil cache disables the fast path and every claim
|
||||
// goes through the DB. Wired in router.go from the shared Redis
|
||||
// client.
|
||||
EmptyClaim *EmptyClaimCache
|
||||
}
|
||||
|
||||
type TaskWakeupNotifier interface {
|
||||
@@ -451,6 +457,12 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A
|
||||
|
||||
// ClaimTaskForRuntime claims the next runnable task for a runtime while
|
||||
// still respecting each agent's max_concurrent_tasks limit.
|
||||
//
|
||||
// Empty-claim fast path: when EmptyClaim is configured and a recent
|
||||
// check verified the runtime had no queued tasks, returns immediately
|
||||
// without touching Postgres. The cache is invalidated synchronously on
|
||||
// every enqueue (notifyTaskAvailable), so a queued task becomes
|
||||
// claimable on the next call rather than waiting for the TTL.
|
||||
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
|
||||
start := time.Now()
|
||||
var (
|
||||
@@ -476,13 +488,33 @@ func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.
|
||||
)
|
||||
}()
|
||||
|
||||
t0 := start
|
||||
tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID)
|
||||
runtimeKey := util.UUIDToString(runtimeID)
|
||||
if s.EmptyClaim.IsEmpty(ctx, runtimeKey) {
|
||||
outcome = "empty_cache_hit"
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Sample the invalidation version BEFORE the SELECT. If a
|
||||
// concurrent enqueue Bumps between this read and the post-SELECT
|
||||
// MarkEmpty, the next IsEmpty will see the empty key tagged with
|
||||
// a stale version and reject it — closing the race that would
|
||||
// otherwise stall the just-queued task until the empty key's TTL
|
||||
// expired.
|
||||
preSelectVersion := s.EmptyClaim.CurrentVersion(ctx, runtimeKey)
|
||||
|
||||
t0 := time.Now()
|
||||
tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID)
|
||||
listMs = time.Since(t0).Milliseconds()
|
||||
listCount = len(tasks)
|
||||
if err != nil {
|
||||
outcome = "error_list"
|
||||
return nil, fmt.Errorf("list pending tasks: %w", err)
|
||||
return nil, fmt.Errorf("list queued claim candidates: %w", err)
|
||||
}
|
||||
|
||||
if len(tasks) == 0 {
|
||||
s.EmptyClaim.MarkEmpty(ctx, runtimeKey, preSelectVersion)
|
||||
outcome = "empty_db"
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
loopStart := time.Now()
|
||||
@@ -1163,11 +1195,38 @@ func priorityToInt(p string) int32 {
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyTaskEnqueued is the cross-package shim for callers outside
|
||||
// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a
|
||||
// row into agent_task_queue directly. Invalidates the empty-claim
|
||||
// cache and kicks the daemon WS so the new task is claimed without
|
||||
// waiting for the next poll.
|
||||
func (s *TaskService) NotifyTaskEnqueued(task db.AgentTaskQueue) {
|
||||
s.notifyTaskAvailable(task)
|
||||
}
|
||||
|
||||
// notifyTaskAvailable runs after a task has been inserted: bumps the
|
||||
// runtime's invalidation version so any in-flight claim that is about
|
||||
// to write an "empty" verdict will have it rejected on read, then
|
||||
// kicks the daemon WS so the daemon claims without waiting for its
|
||||
// next poll. Order matters — Bump must happen before the wakeup,
|
||||
// otherwise the wakeup-driven claim could read the still-current
|
||||
// empty verdict and return null.
|
||||
func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) {
|
||||
if s.Wakeup == nil || !task.RuntimeID.Valid {
|
||||
if !task.RuntimeID.Valid {
|
||||
return
|
||||
}
|
||||
s.Wakeup.NotifyTaskAvailable(util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID))
|
||||
runtimeKey := util.UUIDToString(task.RuntimeID)
|
||||
// Use a background context: the cache bump / wakeup must outlive
|
||||
// the request that created the task, otherwise an early client
|
||||
// disconnect could leave the empty verdict in place and stall the
|
||||
// just-queued task until the TTL expires. The cache itself bounds
|
||||
// every Redis call with a short timeout so a wedged Redis cannot
|
||||
// block enqueue.
|
||||
s.EmptyClaim.Bump(context.Background(), runtimeKey)
|
||||
if s.Wakeup == nil {
|
||||
return
|
||||
}
|
||||
s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID))
|
||||
}
|
||||
|
||||
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
|
||||
|
||||
103
server/internal/service/task_notify_test.go
Normal file
103
server/internal/service/task_notify_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// stubWakeup records every call so the test can assert that notify
|
||||
// reaches the daemon hub and carries the right runtime / task IDs.
|
||||
type stubWakeup struct {
|
||||
calls []struct{ runtimeID, taskID string }
|
||||
}
|
||||
|
||||
func (s *stubWakeup) NotifyTaskAvailable(runtimeID, taskID string) {
|
||||
s.calls = append(s.calls, struct{ runtimeID, taskID string }{runtimeID, taskID})
|
||||
}
|
||||
|
||||
// TestNotifyTaskAvailable_BumpsBeforeWakeup pins the contract noted in
|
||||
// the EmptyClaimCache docs: the version Bump MUST run before the
|
||||
// daemon WS wakeup, otherwise the wakeup-driven claim could read a
|
||||
// still-current empty verdict and return null while the freshly
|
||||
// queued task sits idle. The test (1) marks the runtime empty under
|
||||
// the current version, (2) fires notifyTaskAvailable, then (3)
|
||||
// asserts the prior verdict is rejected AND the wakeup hook saw the
|
||||
// new task — proving every enqueue path (issue / mention /
|
||||
// quick-create / chat / autopilot / retry) gets the same
|
||||
// bump-then-notify behaviour for free.
|
||||
func TestNotifyTaskAvailable_BumpsBeforeWakeup(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
cache := NewEmptyClaimCache(rdb)
|
||||
wakeup := &stubWakeup{}
|
||||
|
||||
svc := &TaskService{
|
||||
EmptyClaim: cache,
|
||||
Wakeup: wakeup,
|
||||
}
|
||||
|
||||
runtimeID := testUUID(7)
|
||||
taskID := testUUID(8)
|
||||
runtimeKey := util.UUIDToString(runtimeID)
|
||||
|
||||
ctx := context.Background()
|
||||
v0 := cache.CurrentVersion(ctx, runtimeKey)
|
||||
cache.MarkEmpty(ctx, runtimeKey, v0)
|
||||
if !cache.IsEmpty(ctx, runtimeKey) {
|
||||
t.Fatal("precondition: cache should report empty after MarkEmpty under current version")
|
||||
}
|
||||
|
||||
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
||||
ID: taskID,
|
||||
RuntimeID: runtimeID,
|
||||
})
|
||||
|
||||
if cache.IsEmpty(ctx, runtimeKey) {
|
||||
t.Fatal("notifyTaskAvailable must Bump the version so the prior empty verdict is rejected")
|
||||
}
|
||||
if got := len(wakeup.calls); got != 1 {
|
||||
t.Fatalf("expected 1 wakeup call, got %d", got)
|
||||
}
|
||||
if wakeup.calls[0].runtimeID != runtimeKey {
|
||||
t.Fatalf("wakeup runtime mismatch: got %q want %q", wakeup.calls[0].runtimeID, runtimeKey)
|
||||
}
|
||||
if wakeup.calls[0].taskID != util.UUIDToString(taskID) {
|
||||
t.Fatalf("wakeup task mismatch: got %q want %q", wakeup.calls[0].taskID, util.UUIDToString(taskID))
|
||||
}
|
||||
}
|
||||
|
||||
// TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp guards the
|
||||
// no-RuntimeID early return — chat / quick-create / autopilot all set
|
||||
// it on insert, but a buggy caller that forgot must not silently bump
|
||||
// every workspace's version. The cache treats Bump("") as a no-op,
|
||||
// but this test pins that the RuntimeID guard sits above the Bump
|
||||
// call so a future refactor cannot drop the guard without test
|
||||
// coverage.
|
||||
func TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
cache := NewEmptyClaimCache(rdb)
|
||||
wakeup := &stubWakeup{}
|
||||
|
||||
svc := &TaskService{
|
||||
EmptyClaim: cache,
|
||||
Wakeup: wakeup,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
v0 := cache.CurrentVersion(ctx, "rt-stays")
|
||||
cache.MarkEmpty(ctx, "rt-stays", v0)
|
||||
|
||||
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
||||
// RuntimeID intentionally invalid (zero value, Valid=false).
|
||||
ID: testUUID(9),
|
||||
})
|
||||
|
||||
if !cache.IsEmpty(ctx, "rt-stays") {
|
||||
t.Fatal("notifyTaskAvailable with invalid RuntimeID must not touch cache")
|
||||
}
|
||||
if got := len(wakeup.calls); got != 0 {
|
||||
t.Fatalf("expected 0 wakeup calls when RuntimeID is invalid, got %d", got)
|
||||
}
|
||||
}
|
||||
4
server/migrations/065_backfill_onboarded_at.down.sql
Normal file
4
server/migrations/065_backfill_onboarded_at.down.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
-- Backfill is intentionally irreversible: setting onboarded_at back to NULL
|
||||
-- would re-introduce the dirty state we just cleaned up. This down migration
|
||||
-- is a no-op so the migration can be ratcheted forward only.
|
||||
SELECT 1;
|
||||
13
server/migrations/065_backfill_onboarded_at.up.sql
Normal file
13
server/migrations/065_backfill_onboarded_at.up.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
-- Backfill onboarded_at for users who already belong to a workspace.
|
||||
-- PR #1868 (since reverted) routed users by hasWorkspace instead of onboarded_at,
|
||||
-- producing a population of users with workspace memberships but
|
||||
-- onboarded_at == NULL. After the new design enforces
|
||||
-- "member row exists ↔ onboarded_at != null" via backend transactions,
|
||||
-- this one-shot backfill cleans existing dirty rows.
|
||||
--
|
||||
-- Uses created_at as the timestamp because these users were de facto onboarded
|
||||
-- when their account was first created — backfilling with now() would distort
|
||||
-- onboarding-funnel analytics. COALESCE keeps it idempotent.
|
||||
UPDATE "user"
|
||||
SET onboarded_at = COALESCE(onboarded_at, created_at)
|
||||
WHERE id IN (SELECT DISTINCT user_id FROM member);
|
||||
@@ -0,0 +1 @@
|
||||
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_claim_candidates;
|
||||
@@ -0,0 +1,11 @@
|
||||
-- Partial index that backs ListQueuedClaimCandidatesByRuntime. Daemons poll
|
||||
-- /tasks/claim every 30s per runtime; the filter "runtime_id = $1 AND
|
||||
-- status = 'queued'" runs every poll and is the dominant cost on warm paths.
|
||||
-- Restricting to status = 'queued' keeps the index tiny — terminal-state
|
||||
-- rows (completed/failed/cancelled) accumulate forever in the table but are
|
||||
-- excluded from the index, so it stays bounded by current queue depth.
|
||||
-- ORDER BY priority DESC, created_at ASC mirrors the SELECT so the planner
|
||||
-- can serve the query as an index-only scan without an extra sort.
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_claim_candidates
|
||||
ON agent_task_queue (runtime_id, priority DESC, created_at ASC)
|
||||
WHERE status = 'queued';
|
||||
@@ -1448,6 +1448,65 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
|
||||
// Returns rows the runtime can attempt to claim. Status is restricted to
|
||||
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
||||
// 'dispatched') because dispatched rows are by definition already owned
|
||||
// and cannot be re-claimed — including them in the candidate list pads
|
||||
// the result with rows that always lose the per-(issue, agent) race in
|
||||
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
||||
// runtime is busy on a long-running task. Backed by the partial index
|
||||
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
||||
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
||||
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentTaskQueue{}
|
||||
for rows.Next() {
|
||||
var i AgentTaskQueue
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentID,
|
||||
&i.IssueID,
|
||||
&i.Status,
|
||||
&i.Priority,
|
||||
&i.DispatchedAt,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.Result,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.Context,
|
||||
&i.RuntimeID,
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.ChatSessionID,
|
||||
&i.AutopilotRunID,
|
||||
&i.Attempt,
|
||||
&i.MaxAttempts,
|
||||
&i.ParentTaskID,
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE issue_id = $1
|
||||
|
||||
@@ -323,6 +323,19 @@ SELECT * FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
ORDER BY priority DESC, created_at ASC;
|
||||
|
||||
-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
-- Returns rows the runtime can attempt to claim. Status is restricted to
|
||||
-- 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
||||
-- 'dispatched') because dispatched rows are by definition already owned
|
||||
-- and cannot be re-claimed — including them in the candidate list pads
|
||||
-- the result with rows that always lose the per-(issue, agent) race in
|
||||
-- ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
||||
-- runtime is busy on a long-running task. Backed by the partial index
|
||||
-- idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
||||
SELECT * FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC;
|
||||
|
||||
-- name: ListActiveTasksByIssue :many
|
||||
SELECT * FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
||||
|
||||
Reference in New Issue
Block a user