mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 10:02:36 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/76
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47c4c6a25f | ||
|
|
72a7ca7199 |
@@ -239,6 +239,9 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
r.Get("/tasks/{taskId}/messages", h.ListTaskMessages)
|
||||
|
||||
r.Get("/issues/{issueId}/gc-check", h.GetIssueGCCheck)
|
||||
r.Get("/chat-sessions/{sessionId}/gc-check", h.GetChatSessionGCCheck)
|
||||
r.Get("/autopilot-runs/{runId}/gc-check", h.GetAutopilotRunGCCheck)
|
||||
r.Get("/tasks/{taskId}/gc-check", h.GetTaskGCCheck)
|
||||
|
||||
r.Post("/runtimes/{runtimeId}/recover-orphans", h.RecoverOrphanedTasks)
|
||||
r.Post("/tasks/{taskId}/session", h.PinTaskSession)
|
||||
|
||||
@@ -303,6 +303,59 @@ func (c *Client) GetIssueGCCheck(ctx context.Context, issueID string) (*IssueGCS
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ChatSessionGCStatus mirrors IssueGCStatus for chat sessions.
|
||||
type ChatSessionGCStatus struct {
|
||||
Status string `json:"status"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
// GetChatSessionGCCheck returns the status of a chat session for GC decisions.
|
||||
// A 404 from this endpoint indicates the session row was hard-deleted (the
|
||||
// user explicitly removed it), which the caller treats as an immediate-clean
|
||||
// signal.
|
||||
func (c *Client) GetChatSessionGCCheck(ctx context.Context, sessionID string) (*ChatSessionGCStatus, error) {
|
||||
var resp ChatSessionGCStatus
|
||||
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/chat-sessions/%s/gc-check", sessionID), &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// AutopilotRunGCStatus carries the status of an autopilot run. CompletedAt
|
||||
// is the run's terminal timestamp (zero for non-terminal runs); the GC loop
|
||||
// uses it as the TTL anchor instead of UpdatedAt because autopilot_run rows
|
||||
// have no updated_at column.
|
||||
type AutopilotRunGCStatus struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt time.Time `json:"completed_at"`
|
||||
}
|
||||
|
||||
// GetAutopilotRunGCCheck returns the status of an autopilot run for GC decisions.
|
||||
func (c *Client) GetAutopilotRunGCCheck(ctx context.Context, runID string) (*AutopilotRunGCStatus, error) {
|
||||
var resp AutopilotRunGCStatus
|
||||
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/autopilot-runs/%s/gc-check", runID), &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// TaskGCStatus carries the agent_task_queue status for quick-create cleanup.
|
||||
// Quick-create tasks have no separate parent record, so GC keys directly on
|
||||
// the task itself.
|
||||
type TaskGCStatus struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt time.Time `json:"completed_at"`
|
||||
}
|
||||
|
||||
// GetTaskGCCheck returns the status of an agent task for GC decisions.
|
||||
func (c *Client) GetTaskGCCheck(ctx context.Context, taskID string) (*TaskGCStatus, error) {
|
||||
var resp TaskGCStatus
|
||||
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/gc-check", taskID), &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) Deregister(ctx context.Context, runtimeIDs []string) error {
|
||||
return c.postJSON(ctx, "/api/daemon/deregister", map[string]any{
|
||||
"runtime_ids": runtimeIDs,
|
||||
|
||||
@@ -1519,15 +1519,51 @@ func (d *Daemon) handleTask(ctx context.Context, task Task, slot int) {
|
||||
}
|
||||
|
||||
// Write GC metadata after the task finishes so the periodic GC loop
|
||||
// can look up the issue later. Written last so that a mid-task crash
|
||||
// leaves the directory as an orphan (cleaned up by GCOrphanTTL).
|
||||
// can look up the parent record (issue / chat session / autopilot run /
|
||||
// task itself for quick-create) later. Written last so that a mid-task
|
||||
// crash leaves the directory as an orphan (cleaned up by GCOrphanTTL).
|
||||
if result.EnvRoot != "" {
|
||||
if err := execenv.WriteGCMeta(result.EnvRoot, task.IssueID, task.WorkspaceID, taskLog); err != nil {
|
||||
taskLog.Warn("write gc meta failed (non-fatal)", "error", err)
|
||||
if meta, ok := gcMetaForTask(task); ok {
|
||||
if err := execenv.WriteGCMeta(result.EnvRoot, meta, taskLog); err != nil {
|
||||
taskLog.Warn("write gc meta failed (non-fatal)", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// gcMetaForTask classifies a finished task and produces a GCMeta of the right
|
||||
// kind. The discriminator order matters: a task carrying both an issue_id
|
||||
// and a chat_session_id (theoretical, not produced today) should be treated
|
||||
// as a chat task because the chat session is the longer-lived parent record.
|
||||
//
|
||||
// Returns ok=false when the task has no recognizable parent (e.g. an
|
||||
// internal task with no IDs at all). The caller skips writing a meta file
|
||||
// in that case so the directory falls back to mtime-based orphan cleanup.
|
||||
func gcMetaForTask(task Task) (execenv.GCMeta, bool) {
|
||||
meta := execenv.GCMeta{WorkspaceID: task.WorkspaceID}
|
||||
switch {
|
||||
case task.ChatSessionID != "":
|
||||
meta.Kind = execenv.GCKindChat
|
||||
meta.ChatSessionID = task.ChatSessionID
|
||||
case task.AutopilotRunID != "":
|
||||
meta.Kind = execenv.GCKindAutopilotRun
|
||||
meta.AutopilotRunID = task.AutopilotRunID
|
||||
case task.IssueID != "":
|
||||
meta.Kind = execenv.GCKindIssue
|
||||
meta.IssueID = task.IssueID
|
||||
case task.QuickCreatePrompt != "":
|
||||
// Quick-create tasks reach WriteGCMeta before the server runs
|
||||
// LinkTaskToIssue, so IssueID is always empty here. Persist the
|
||||
// task ID instead and let the GC loop ask the server for terminal
|
||||
// state via the task gc-check endpoint.
|
||||
meta.Kind = execenv.GCKindQuickCreate
|
||||
meta.TaskID = task.ID
|
||||
default:
|
||||
return execenv.GCMeta{}, false
|
||||
}
|
||||
return meta, true
|
||||
}
|
||||
|
||||
func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot int, taskLog *slog.Logger) (TaskResult, error) {
|
||||
// Refuse to spawn an agent without a workspace. An empty workspace_id
|
||||
// here would make MULTICA_WORKSPACE_ID empty in the agent env, and the
|
||||
|
||||
@@ -203,30 +203,52 @@ func writeCodexWorkspaceSkills(codexHome string, skills []SkillContextForEnv) er
|
||||
return writeSkillFiles(filepath.Join(codexHome, "skills"), skills)
|
||||
}
|
||||
|
||||
// GCMetaKind identifies which kind of parent record a task workdir belongs to.
|
||||
// The GC loop dispatches its decision tree on this value so chat / autopilot /
|
||||
// quick-create tasks are no longer forced through the issue-centric path.
|
||||
type GCMetaKind string
|
||||
|
||||
const (
|
||||
GCKindIssue GCMetaKind = "issue"
|
||||
GCKindChat GCMetaKind = "chat"
|
||||
GCKindAutopilotRun GCMetaKind = "autopilot_run"
|
||||
GCKindQuickCreate GCMetaKind = "quick_create"
|
||||
)
|
||||
|
||||
// GCMeta is persisted to .gc_meta.json inside the env root so the GC loop
|
||||
// can determine which issue this directory belongs to.
|
||||
// can decide whether the directory is reclaimable. It is a discriminated
|
||||
// union keyed on Kind: only the ID field matching Kind is meaningful.
|
||||
//
|
||||
// Older meta files (pre-v2) lack the Kind field; readers must default empty
|
||||
// Kind to GCKindIssue for backward compatibility — only IssueID was written
|
||||
// before, and only issue-centric tasks ever produced a meta file.
|
||||
type GCMeta struct {
|
||||
IssueID string `json:"issue_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
CompletedAt time.Time `json:"completed_at"`
|
||||
Kind GCMetaKind `json:"kind,omitempty"`
|
||||
IssueID string `json:"issue_id,omitempty"`
|
||||
ChatSessionID string `json:"chat_session_id,omitempty"`
|
||||
AutopilotRunID string `json:"autopilot_run_id,omitempty"`
|
||||
TaskID string `json:"task_id,omitempty"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
CompletedAt time.Time `json:"completed_at"`
|
||||
}
|
||||
|
||||
const gcMetaFile = ".gc_meta.json"
|
||||
|
||||
// WriteGCMeta writes GC metadata into the given directory.
|
||||
func WriteGCMeta(envRoot, issueID, workspaceID string, logger *slog.Logger) error {
|
||||
if issueID == "" {
|
||||
logger.Warn("execenv: skipping .gc_meta.json write: issue_id is empty", "envRoot", envRoot, "workspaceID", workspaceID)
|
||||
return nil
|
||||
}
|
||||
// WriteGCMeta writes GC metadata into the given directory. The caller is
|
||||
// responsible for choosing Kind and populating the matching ID field;
|
||||
// CompletedAt is stamped here so callers don't have to think about clocks.
|
||||
func WriteGCMeta(envRoot string, meta GCMeta, logger *slog.Logger) error {
|
||||
if envRoot == "" {
|
||||
return nil
|
||||
}
|
||||
meta := GCMeta{
|
||||
IssueID: issueID,
|
||||
WorkspaceID: workspaceID,
|
||||
CompletedAt: time.Now().UTC(),
|
||||
if meta.Kind == "" {
|
||||
// Defensive: a task that doesn't fit any known kind would write a
|
||||
// meta file the GC loop can't dispatch on. Skip silently — the
|
||||
// directory falls back to the orphan-by-mtime path.
|
||||
logger.Debug("execenv: skipping .gc_meta.json write: kind is empty", "envRoot", envRoot)
|
||||
return nil
|
||||
}
|
||||
meta.CompletedAt = time.Now().UTC()
|
||||
data, err := json.Marshal(meta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal gc meta: %w", err)
|
||||
@@ -234,7 +256,9 @@ func WriteGCMeta(envRoot, issueID, workspaceID string, logger *slog.Logger) erro
|
||||
return os.WriteFile(filepath.Join(envRoot, gcMetaFile), data, 0o644)
|
||||
}
|
||||
|
||||
// ReadGCMeta reads GC metadata from a task directory root.
|
||||
// ReadGCMeta reads GC metadata from a task directory root. Pre-v2 meta files
|
||||
// (no kind field) are normalized to GCKindIssue so the legacy issue path
|
||||
// keeps working without a migration.
|
||||
func ReadGCMeta(envRoot string) (*GCMeta, error) {
|
||||
data, err := os.ReadFile(filepath.Join(envRoot, gcMetaFile))
|
||||
if err != nil {
|
||||
@@ -244,6 +268,9 @@ func ReadGCMeta(envRoot string) (*GCMeta, error) {
|
||||
if err := json.Unmarshal(data, &meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.Kind == "" {
|
||||
meta.Kind = GCKindIssue
|
||||
}
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1966,7 +1966,11 @@ func TestWriteReadGCMeta(t *testing.T) {
|
||||
issueID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
wsID := "ws-test-001"
|
||||
|
||||
if err := WriteGCMeta(dir, issueID, wsID, discardLogger()); err != nil {
|
||||
if err := WriteGCMeta(dir, GCMeta{
|
||||
Kind: GCKindIssue,
|
||||
IssueID: issueID,
|
||||
WorkspaceID: wsID,
|
||||
}, discardLogger()); err != nil {
|
||||
t.Fatalf("WriteGCMeta: %v", err)
|
||||
}
|
||||
|
||||
@@ -1975,6 +1979,9 @@ func TestWriteReadGCMeta(t *testing.T) {
|
||||
t.Fatalf("ReadGCMeta: %v", err)
|
||||
}
|
||||
|
||||
if meta.Kind != GCKindIssue {
|
||||
t.Errorf("Kind = %q, want %q", meta.Kind, GCKindIssue)
|
||||
}
|
||||
if meta.IssueID != issueID {
|
||||
t.Errorf("IssueID = %q, want %q", meta.IssueID, issueID)
|
||||
}
|
||||
@@ -1988,23 +1995,77 @@ func TestWriteReadGCMeta(t *testing.T) {
|
||||
|
||||
func TestWriteGCMeta_EmptyRoot(t *testing.T) {
|
||||
t.Parallel()
|
||||
if err := WriteGCMeta("", "issue", "ws", discardLogger()); err != nil {
|
||||
if err := WriteGCMeta("", GCMeta{Kind: GCKindIssue, IssueID: "x", WorkspaceID: "ws"}, discardLogger()); err != nil {
|
||||
t.Fatalf("expected nil for empty root, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteGCMeta_EmptyIssueID(t *testing.T) {
|
||||
func TestWriteGCMeta_EmptyKind(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir := t.TempDir()
|
||||
|
||||
if err := WriteGCMeta(dir, "", "ws", discardLogger()); err != nil {
|
||||
t.Fatalf("expected nil for empty issue ID, got %v", err)
|
||||
if err := WriteGCMeta(dir, GCMeta{WorkspaceID: "ws"}, discardLogger()); err != nil {
|
||||
t.Fatalf("expected nil for empty kind, got %v", err)
|
||||
}
|
||||
if _, err := os.Stat(filepath.Join(dir, gcMetaFile)); !os.IsNotExist(err) {
|
||||
t.Fatalf("expected gc meta file to be absent, got err=%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-v2 meta files lacked the kind field. ReadGCMeta must default an empty
|
||||
// kind to GCKindIssue so the existing on-disk meta files keep flowing
|
||||
// through the issue path.
|
||||
func TestReadGCMeta_LegacyFileDefaultsToIssueKind(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir := t.TempDir()
|
||||
legacy := []byte(`{"issue_id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890","workspace_id":"ws","completed_at":"2025-01-01T00:00:00Z"}`)
|
||||
if err := os.WriteFile(filepath.Join(dir, gcMetaFile), legacy, 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
meta, err := ReadGCMeta(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadGCMeta: %v", err)
|
||||
}
|
||||
if meta.Kind != GCKindIssue {
|
||||
t.Fatalf("legacy kind: want %q, got %q", GCKindIssue, meta.Kind)
|
||||
}
|
||||
if meta.IssueID != "a1b2c3d4-e5f6-7890-abcd-ef1234567890" {
|
||||
t.Fatalf("legacy issue_id: got %q", meta.IssueID)
|
||||
}
|
||||
}
|
||||
|
||||
// New v2 meta files for chat / autopilot / quick-create round-trip without
|
||||
// being misclassified as the issue kind.
|
||||
func TestWriteReadGCMeta_KindRoundTrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := []struct {
|
||||
name string
|
||||
meta GCMeta
|
||||
want GCMetaKind
|
||||
}{
|
||||
{"chat", GCMeta{Kind: GCKindChat, ChatSessionID: "cs-1", WorkspaceID: "ws"}, GCKindChat},
|
||||
{"autopilot_run", GCMeta{Kind: GCKindAutopilotRun, AutopilotRunID: "ar-1", WorkspaceID: "ws"}, GCKindAutopilotRun},
|
||||
{"quick_create", GCMeta{Kind: GCKindQuickCreate, TaskID: "t-1", WorkspaceID: "ws"}, GCKindQuickCreate},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir := t.TempDir()
|
||||
if err := WriteGCMeta(dir, tc.meta, discardLogger()); err != nil {
|
||||
t.Fatalf("WriteGCMeta: %v", err)
|
||||
}
|
||||
got, err := ReadGCMeta(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadGCMeta: %v", err)
|
||||
}
|
||||
if got.Kind != tc.want {
|
||||
t.Fatalf("Kind: want %q, got %q", tc.want, got.Kind)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadGCMeta_NoFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir := t.TempDir()
|
||||
|
||||
@@ -161,6 +161,8 @@ const (
|
||||
)
|
||||
|
||||
// shouldCleanTaskDir decides whether a task directory should be removed.
|
||||
// Dispatches on meta.Kind so chat / autopilot / quick-create tasks each
|
||||
// follow the parent record that actually governs their lifecycle.
|
||||
func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcAction {
|
||||
// A task currently running on this env root must never be reclaimed —
|
||||
// not even on the done/cancelled or orphan-404 paths. A new comment on
|
||||
@@ -173,37 +175,59 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
|
||||
meta, err := execenv.ReadGCMeta(taskDir)
|
||||
if err != nil {
|
||||
// No .gc_meta.json — check mtime for orphan cleanup.
|
||||
info, statErr := os.Stat(taskDir)
|
||||
if statErr != nil {
|
||||
return gcActionSkip
|
||||
}
|
||||
if time.Since(info.ModTime()) > d.cfg.GCOrphanTTL {
|
||||
d.logger.Info("gc: orphan directory (no meta)", "dir", taskDir, "age", time.Since(info.ModTime()).Round(time.Hour))
|
||||
return gcActionOrphan
|
||||
}
|
||||
return gcActionSkip
|
||||
return d.orphanByMTime(taskDir, "no meta")
|
||||
}
|
||||
|
||||
switch meta.Kind {
|
||||
case execenv.GCKindIssue:
|
||||
return d.gcDecisionIssue(ctx, taskDir, meta)
|
||||
case execenv.GCKindChat:
|
||||
return d.gcDecisionChat(ctx, taskDir, meta)
|
||||
case execenv.GCKindAutopilotRun:
|
||||
return d.gcDecisionAutopilotRun(ctx, taskDir, meta)
|
||||
case execenv.GCKindQuickCreate:
|
||||
return d.gcDecisionQuickCreate(ctx, taskDir, meta)
|
||||
default:
|
||||
// Unknown kind: fall back to mtime-based orphan cleanup so a future
|
||||
// daemon writing a kind we don't recognize doesn't get insta-wiped.
|
||||
return d.orphanByMTime(taskDir, "unknown kind")
|
||||
}
|
||||
}
|
||||
|
||||
// orphanByMTime returns gcActionOrphan if the directory is older than
|
||||
// GCOrphanTTL, gcActionSkip otherwise. Centralizes the "we have no parent
|
||||
// record signal so just look at the disk" fallback used by every kind.
|
||||
func (d *Daemon) orphanByMTime(taskDir, reason string) gcAction {
|
||||
info, err := os.Stat(taskDir)
|
||||
if err != nil {
|
||||
return gcActionSkip
|
||||
}
|
||||
if time.Since(info.ModTime()) > d.cfg.GCOrphanTTL {
|
||||
d.logger.Info("gc: orphan directory", "dir", taskDir, "reason", reason, "age", time.Since(info.ModTime()).Round(time.Hour))
|
||||
return gcActionOrphan
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
// isAccessNotFound detects the 404 returned by gc-check endpoints. The same
|
||||
// status covers "row deleted" and "daemon token can't see this workspace"
|
||||
// (the requireDaemonWorkspaceAccess anti-enumeration shape), so callers
|
||||
// can't tell the two apart from the response alone.
|
||||
func isAccessNotFound(err error) bool {
|
||||
var reqErr *requestError
|
||||
return errors.As(err, &reqErr) && reqErr.StatusCode == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (d *Daemon) gcDecisionIssue(ctx context.Context, taskDir string, meta *execenv.GCMeta) gcAction {
|
||||
status, err := d.client.GetIssueGCCheck(ctx, meta.IssueID)
|
||||
if err != nil {
|
||||
var reqErr *requestError
|
||||
if errors.As(err, &reqErr) && reqErr.StatusCode == http.StatusNotFound {
|
||||
// 404 is ambiguous: the server returns it for both "issue deleted"
|
||||
// and "daemon token has no access to the workspace" (anti-enumeration,
|
||||
// see requireDaemonWorkspaceAccess). Fall back to the mtime-gated
|
||||
// orphan cleanup so a scoped-down token can't instantly wipe dirs
|
||||
// whose issues are still live.
|
||||
info, statErr := os.Stat(taskDir)
|
||||
if statErr != nil {
|
||||
return gcActionSkip
|
||||
}
|
||||
if time.Since(info.ModTime()) > d.cfg.GCOrphanTTL {
|
||||
d.logger.Info("gc: orphan directory (issue not accessible)", "dir", taskDir, "issue", meta.IssueID)
|
||||
return gcActionOrphan
|
||||
}
|
||||
if isAccessNotFound(err) {
|
||||
// 404 is ambiguous: server returns it for both "issue deleted"
|
||||
// and "daemon token has no access to the workspace". Fall back
|
||||
// to the mtime-gated orphan cleanup so a scoped-down token
|
||||
// can't instantly wipe dirs whose issues are still live.
|
||||
return d.orphanByMTime(taskDir, "issue not accessible")
|
||||
}
|
||||
// API error (network, auth, etc.) — skip and retry next cycle.
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
@@ -211,6 +235,7 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
time.Since(status.UpdatedAt) > d.cfg.GCTTL {
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "issue",
|
||||
"issue", meta.IssueID,
|
||||
"status", status.Status,
|
||||
"updated_at", status.UpdatedAt.Format(time.RFC3339),
|
||||
@@ -218,15 +243,11 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
return gcActionClean
|
||||
}
|
||||
|
||||
// Artifact-only cleanup: issue is still open but the task itself completed
|
||||
// long enough ago that its build artifacts are unlikely to be reused.
|
||||
// Active-root protection is handled by the early return above; skip here
|
||||
// only when artifact GC is disabled or the meta has no completed_at
|
||||
// (defensive — that means the task crashed before WriteGCMeta).
|
||||
if d.cfg.GCArtifactTTL > 0 && len(d.cfg.GCArtifactPatterns) > 0 &&
|
||||
!meta.CompletedAt.IsZero() && time.Since(meta.CompletedAt) > d.cfg.GCArtifactTTL {
|
||||
d.logger.Info("gc: eligible for artifact cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "issue",
|
||||
"issue", meta.IssueID,
|
||||
"status", status.Status,
|
||||
"completed_at", meta.CompletedAt.Format(time.RFC3339),
|
||||
@@ -237,6 +258,145 @@ func (d *Daemon) shouldCleanTaskDir(ctx context.Context, taskDir string) gcActio
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
func (d *Daemon) gcDecisionChat(ctx context.Context, taskDir string, meta *execenv.GCMeta) gcAction {
|
||||
status, err := d.client.GetChatSessionGCCheck(ctx, meta.ChatSessionID)
|
||||
if err != nil {
|
||||
if isAccessNotFound(err) {
|
||||
// 404 means the chat_session row is gone — DeleteChatSession is
|
||||
// a real DELETE, so a hard delete propagates here as soon as
|
||||
// the user clicks the button. This is the strongest reclaim
|
||||
// signal we get and it's exactly acceptance criterion #3:
|
||||
// reclaim within one GC cycle (≤ GCInterval), not 72h.
|
||||
//
|
||||
// We don't gate on mtime: every chat_session_id in a meta file
|
||||
// was written by this daemon under its current token, so there
|
||||
// is no cross-workspace probe to defend against.
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "chat",
|
||||
"chat_session", meta.ChatSessionID,
|
||||
"reason", "session not accessible (hard-deleted)",
|
||||
)
|
||||
return gcActionClean
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
switch status.Status {
|
||||
case "active":
|
||||
// An active chat session must never be reclaimed by mtime — that
|
||||
// would silently kill a user's idle session and break "PriorWorkDir"
|
||||
// resume on their next message. This is the explicit short-circuit
|
||||
// the issue body called out as verifyable behavior #2.
|
||||
return gcActionSkip
|
||||
case "archived":
|
||||
if time.Since(status.UpdatedAt) > d.cfg.GCTTL {
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "chat",
|
||||
"chat_session", meta.ChatSessionID,
|
||||
"status", status.Status,
|
||||
"updated_at", status.UpdatedAt.Format(time.RFC3339),
|
||||
)
|
||||
return gcActionClean
|
||||
}
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
func (d *Daemon) gcDecisionAutopilotRun(ctx context.Context, taskDir string, meta *execenv.GCMeta) gcAction {
|
||||
status, err := d.client.GetAutopilotRunGCCheck(ctx, meta.AutopilotRunID)
|
||||
if err != nil {
|
||||
if isAccessNotFound(err) {
|
||||
return d.orphanByMTime(taskDir, "autopilot run not accessible")
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
// Terminal states per the autopilot_run CHECK constraint:
|
||||
// completed, failed, skipped — the run finished its own work.
|
||||
// issue_created — the run produced an issue task that owns
|
||||
// its own workdir; this run's workdir is
|
||||
// dead weight from here on.
|
||||
// Non-terminal: pending, running. Skip until they reach a terminal state
|
||||
// rather than trying to bound them by mtime — long autopilots are real.
|
||||
if isAutopilotRunTerminal(status.Status) {
|
||||
anchor := status.CompletedAt
|
||||
if anchor.IsZero() {
|
||||
// Defensive: terminal status without completed_at means the
|
||||
// run finished but the column wasn't stamped (older code path).
|
||||
// Fall back to the meta's CompletedAt so we still GC eventually.
|
||||
anchor = meta.CompletedAt
|
||||
}
|
||||
if !anchor.IsZero() && time.Since(anchor) > d.cfg.GCTTL {
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "autopilot_run",
|
||||
"autopilot_run", meta.AutopilotRunID,
|
||||
"status", status.Status,
|
||||
"completed_at", anchor.Format(time.RFC3339),
|
||||
)
|
||||
return gcActionClean
|
||||
}
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
// isAutopilotRunTerminal mirrors the run.status CHECK in
|
||||
// migrations/042_autopilot.up.sql. Non-terminal states are pending/running;
|
||||
// every other value the schema allows is a final resting state from the
|
||||
// daemon's POV (the run is no longer producing work in this workdir).
|
||||
func isAutopilotRunTerminal(status string) bool {
|
||||
switch status {
|
||||
case "completed", "failed", "skipped", "issue_created":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) gcDecisionQuickCreate(ctx context.Context, taskDir string, meta *execenv.GCMeta) gcAction {
|
||||
status, err := d.client.GetTaskGCCheck(ctx, meta.TaskID)
|
||||
if err != nil {
|
||||
if isAccessNotFound(err) {
|
||||
// Task row was hard-deleted, or token can't see it. Either way,
|
||||
// fall back to mtime-gated orphan to stay safe across scoped
|
||||
// tokens — same reasoning as the issue path.
|
||||
return d.orphanByMTime(taskDir, "task not accessible")
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
// Quick-create workdirs are not reused by the issue task that
|
||||
// LinkTaskToIssue eventually attaches — that issue gets its own
|
||||
// envRoot. So as soon as the quick-create task itself reaches a
|
||||
// terminal state we can reclaim the directory immediately, without
|
||||
// waiting for GCTTL. If the user wants to revisit, the linked issue
|
||||
// has the agent's output already.
|
||||
if isAgentTaskTerminal(status.Status) {
|
||||
d.logger.Info("gc: eligible for cleanup",
|
||||
"dir", filepath.Base(taskDir),
|
||||
"kind", "quick_create",
|
||||
"task", meta.TaskID,
|
||||
"status", status.Status,
|
||||
)
|
||||
return gcActionClean
|
||||
}
|
||||
return gcActionSkip
|
||||
}
|
||||
|
||||
// isAgentTaskTerminal reports whether a value of agent_task_queue.status
|
||||
// represents a final state. Mirrors the status enum used across the
|
||||
// task service — see service/task.go for the canonical list.
|
||||
func isAgentTaskTerminal(status string) bool {
|
||||
switch status {
|
||||
case "completed", "failed", "cancelled":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// cleanTaskDir removes a task directory and logs the result.
|
||||
func (d *Daemon) cleanTaskDir(taskDir string) {
|
||||
if err := os.RemoveAll(taskDir); err != nil {
|
||||
|
||||
@@ -653,3 +653,319 @@ func TestIsBareRepo(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestShouldCleanTaskDir_KindDispatch covers the four GCMeta kinds across
|
||||
// active / terminal / 404 / non-terminal axes. Each entry stands up a mock
|
||||
// server returning the expected payload (or 404) and asserts the action.
|
||||
func TestShouldCleanTaskDir_KindDispatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const (
|
||||
issueID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaa01"
|
||||
chatID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbb01"
|
||||
runID = "cccccccc-cccc-cccc-cccc-cccccccccc01"
|
||||
quickTask = "dddddddd-dddd-dddd-dddd-dddddddddd01"
|
||||
legacyMeta = "eeeeeeee-eeee-eeee-eeee-eeeeeeeeee01"
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
overTTL := now.Add(-10 * 24 * time.Hour)
|
||||
withinTTL := now.Add(-1 * time.Hour)
|
||||
|
||||
type serverResp struct {
|
||||
// Path to register on the mux. Empty entries are skipped (used for
|
||||
// 404 cases where the mux returns the default not-found handler).
|
||||
path string
|
||||
status int
|
||||
body map[string]any
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
meta *execenv.GCMeta
|
||||
servers []serverResp
|
||||
want gcAction
|
||||
}{
|
||||
// ---- chat ---------------------------------------------------------
|
||||
{
|
||||
name: "chat active session — never reclaimed",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindChat, ChatSessionID: chatID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/chat-sessions/" + chatID + "/gc-check",
|
||||
body: map[string]any{"status": "active", "updated_at": overTTL},
|
||||
}},
|
||||
want: gcActionSkip,
|
||||
},
|
||||
{
|
||||
name: "chat archived over TTL — clean",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindChat, ChatSessionID: chatID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/chat-sessions/" + chatID + "/gc-check",
|
||||
body: map[string]any{"status": "archived", "updated_at": overTTL},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
{
|
||||
name: "chat archived within TTL — skip",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindChat, ChatSessionID: chatID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/chat-sessions/" + chatID + "/gc-check",
|
||||
body: map[string]any{"status": "archived", "updated_at": withinTTL},
|
||||
}},
|
||||
want: gcActionSkip,
|
||||
},
|
||||
{
|
||||
name: "chat 404 — hard-deleted, clean immediately (no mtime gate)",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindChat, ChatSessionID: chatID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/chat-sessions/" + chatID + "/gc-check",
|
||||
status: http.StatusNotFound,
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
|
||||
// ---- autopilot run -----------------------------------------------
|
||||
{
|
||||
name: "autopilot completed over TTL — clean",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindAutopilotRun, AutopilotRunID: runID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/autopilot-runs/" + runID + "/gc-check",
|
||||
body: map[string]any{"status": "completed", "completed_at": overTTL},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
{
|
||||
name: "autopilot issue_created counts as terminal",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindAutopilotRun, AutopilotRunID: runID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/autopilot-runs/" + runID + "/gc-check",
|
||||
body: map[string]any{"status": "issue_created", "completed_at": overTTL},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
{
|
||||
name: "autopilot running — skip",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindAutopilotRun, AutopilotRunID: runID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/autopilot-runs/" + runID + "/gc-check",
|
||||
body: map[string]any{"status": "running"},
|
||||
}},
|
||||
want: gcActionSkip,
|
||||
},
|
||||
{
|
||||
name: "autopilot completed within TTL — skip",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindAutopilotRun, AutopilotRunID: runID, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/autopilot-runs/" + runID + "/gc-check",
|
||||
body: map[string]any{"status": "completed", "completed_at": withinTTL},
|
||||
}},
|
||||
want: gcActionSkip,
|
||||
},
|
||||
|
||||
// ---- quick-create -------------------------------------------------
|
||||
{
|
||||
name: "quick_create completed task — clean immediately",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindQuickCreate, TaskID: quickTask, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/tasks/" + quickTask + "/gc-check",
|
||||
body: map[string]any{"status": "completed", "completed_at": withinTTL},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
{
|
||||
name: "quick_create cancelled — clean",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindQuickCreate, TaskID: quickTask, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/tasks/" + quickTask + "/gc-check",
|
||||
body: map[string]any{"status": "cancelled"},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
{
|
||||
name: "quick_create still running — skip",
|
||||
meta: &execenv.GCMeta{Kind: execenv.GCKindQuickCreate, TaskID: quickTask, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/tasks/" + quickTask + "/gc-check",
|
||||
body: map[string]any{"status": "running"},
|
||||
}},
|
||||
want: gcActionSkip,
|
||||
},
|
||||
|
||||
// ---- legacy meta (no kind) → issue path ---------------------------
|
||||
{
|
||||
name: "legacy meta with no kind defaults to issue path — done over TTL = clean",
|
||||
meta: &execenv.GCMeta{IssueID: legacyMeta, WorkspaceID: "ws"},
|
||||
servers: []serverResp{{
|
||||
path: "/api/daemon/issues/" + legacyMeta + "/gc-check",
|
||||
body: map[string]any{"status": "done", "updated_at": overTTL},
|
||||
}},
|
||||
want: gcActionClean,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
mux := http.NewServeMux()
|
||||
for _, s := range tc.servers {
|
||||
if s.path == "" {
|
||||
continue
|
||||
}
|
||||
resp := s
|
||||
mux.HandleFunc(resp.path, func(w http.ResponseWriter, r *http.Request) {
|
||||
if resp.status != 0 {
|
||||
w.WriteHeader(resp.status)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp.body)
|
||||
})
|
||||
}
|
||||
d := newGCTestDaemon(t, mux)
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws", tc.name, tc.meta)
|
||||
got := d.shouldCleanTaskDir(context.Background(), taskDir)
|
||||
if got != tc.want {
|
||||
t.Fatalf("kind dispatch %q: want %d, got %d", tc.name, tc.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestShouldCleanTaskDir_ChatHardDeletedFreshMtime locks acceptance #3:
|
||||
// when a user hard-deletes a chat session, the workdir must be reclaimed
|
||||
// on the next GC cycle (≤ GCInterval), not deferred to GCOrphanTTL. A
|
||||
// directory that was just created (mtime well within GCOrphanTTL) but
|
||||
// whose chat session now 404s must therefore return gcActionClean.
|
||||
func TestShouldCleanTaskDir_ChatHardDeletedFreshMtime(t *testing.T) {
|
||||
t.Parallel()
|
||||
chatID := "ffffffff-ffff-ffff-ffff-ffffffffff02"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/chat-sessions/%s/gc-check", chatID), func(w http.ResponseWriter, r *http.Request) {
|
||||
// Simulate hard-deleted session (DeleteChatSession ran).
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
// Crank GCOrphanTTL up so the mtime path is unmistakably not in play —
|
||||
// the only way the directory gets reclaimed is the chat-404 fast path.
|
||||
d.cfg.GCOrphanTTL = 365 * 24 * time.Hour
|
||||
meta := &execenv.GCMeta{
|
||||
Kind: execenv.GCKindChat,
|
||||
ChatSessionID: chatID,
|
||||
WorkspaceID: "ws",
|
||||
CompletedAt: time.Now(),
|
||||
}
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws", "hard-deleted-chat", meta)
|
||||
// taskDir mtime is now-ish — well within any sane GCOrphanTTL.
|
||||
|
||||
if got := d.shouldCleanTaskDir(context.Background(), taskDir); got != gcActionClean {
|
||||
t.Fatalf("hard-deleted chat with fresh mtime must clean immediately, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestShouldCleanTaskDir_ChatActiveResistsOldMtime is the explicit acceptance
|
||||
// criterion #2: an active chat session whose workdir is older than
|
||||
// GCOrphanTTL must NOT be reclaimed. The only path to clean an active
|
||||
// session's workdir is for the user to archive or hard-delete the session.
|
||||
func TestShouldCleanTaskDir_ChatActiveResistsOldMtime(t *testing.T) {
|
||||
t.Parallel()
|
||||
chatID := "ffffffff-ffff-ffff-ffff-ffffffffff01"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(fmt.Sprintf("/api/daemon/chat-sessions/%s/gc-check", chatID), func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"status": "active",
|
||||
"updated_at": time.Now().Add(-100 * 24 * time.Hour),
|
||||
})
|
||||
})
|
||||
|
||||
d := newGCTestDaemon(t, mux)
|
||||
d.cfg.GCOrphanTTL = 0 // every directory is "older than orphan TTL"
|
||||
meta := &execenv.GCMeta{
|
||||
Kind: execenv.GCKindChat,
|
||||
ChatSessionID: chatID,
|
||||
WorkspaceID: "ws",
|
||||
CompletedAt: time.Now().Add(-200 * 24 * time.Hour),
|
||||
}
|
||||
taskDir := createTaskDir(t, d.cfg.WorkspacesRoot, "ws", "active-chat", meta)
|
||||
if err := os.Chtimes(taskDir, time.Now().Add(-200*24*time.Hour), time.Now().Add(-200*24*time.Hour)); err != nil {
|
||||
t.Fatalf("chtimes: %v", err)
|
||||
}
|
||||
|
||||
if got := d.shouldCleanTaskDir(context.Background(), taskDir); got != gcActionSkip {
|
||||
t.Fatalf("active chat session must not be reclaimed even with stale mtime, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGCMetaForTask covers the discriminator priority used by the daemon
|
||||
// when selecting which GCMetaKind to write at task completion.
|
||||
func TestGCMetaForTask(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := []struct {
|
||||
name string
|
||||
task Task
|
||||
want execenv.GCMetaKind
|
||||
idOK func(m execenv.GCMeta) bool
|
||||
}{
|
||||
{
|
||||
name: "chat task",
|
||||
task: Task{ID: "t1", WorkspaceID: "ws", ChatSessionID: "c1"},
|
||||
want: execenv.GCKindChat,
|
||||
idOK: func(m execenv.GCMeta) bool { return m.ChatSessionID == "c1" },
|
||||
},
|
||||
{
|
||||
name: "autopilot run task",
|
||||
task: Task{ID: "t2", WorkspaceID: "ws", AutopilotRunID: "r1"},
|
||||
want: execenv.GCKindAutopilotRun,
|
||||
idOK: func(m execenv.GCMeta) bool { return m.AutopilotRunID == "r1" },
|
||||
},
|
||||
{
|
||||
name: "issue task",
|
||||
task: Task{ID: "t3", WorkspaceID: "ws", IssueID: "i1"},
|
||||
want: execenv.GCKindIssue,
|
||||
idOK: func(m execenv.GCMeta) bool { return m.IssueID == "i1" },
|
||||
},
|
||||
{
|
||||
name: "quick-create task — issue_id always empty at WriteGCMeta time",
|
||||
task: Task{ID: "t4", WorkspaceID: "ws", QuickCreatePrompt: "do the thing"},
|
||||
want: execenv.GCKindQuickCreate,
|
||||
idOK: func(m execenv.GCMeta) bool { return m.TaskID == "t4" },
|
||||
},
|
||||
{
|
||||
name: "chat wins over issue when both set (defensive ordering)",
|
||||
task: Task{ID: "t5", WorkspaceID: "ws", IssueID: "i1", ChatSessionID: "c1"},
|
||||
want: execenv.GCKindChat,
|
||||
idOK: func(m execenv.GCMeta) bool { return m.ChatSessionID == "c1" && m.IssueID == "" },
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
meta, ok := gcMetaForTask(tc.task)
|
||||
if !ok {
|
||||
t.Fatalf("expected gcMetaForTask to recognize task, got ok=false")
|
||||
}
|
||||
if meta.Kind != tc.want {
|
||||
t.Fatalf("kind: want %q, got %q", tc.want, meta.Kind)
|
||||
}
|
||||
if !tc.idOK(meta) {
|
||||
t.Fatalf("ID field mismatch: %+v", meta)
|
||||
}
|
||||
if meta.WorkspaceID != "ws" {
|
||||
t.Fatalf("workspace_id: want %q, got %q", "ws", meta.WorkspaceID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("unrecognized task — ok=false", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, ok := gcMetaForTask(Task{ID: "tX", WorkspaceID: "ws"})
|
||||
if ok {
|
||||
t.Fatal("expected gcMetaForTask to return ok=false for task with no IDs")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1809,3 +1809,80 @@ func (h *Handler) GetIssueGCCheck(w http.ResponseWriter, r *http.Request) {
|
||||
"updated_at": issue.UpdatedAt.Time,
|
||||
})
|
||||
}
|
||||
|
||||
// GetChatSessionGCCheck returns the status and updated_at of a chat session
|
||||
// for the daemon GC loop. A 404 here means the session was hard-deleted
|
||||
// (DeleteChatSession in chat.go runs a real DELETE), which the daemon treats
|
||||
// as an immediate-clean signal — the user's explicit delete is the strongest
|
||||
// reclaim authorization we can get.
|
||||
//
|
||||
// Same anti-enumeration shape as GetIssueGCCheck: workspace mismatch returns
|
||||
// the same 404 so a scoped daemon token can't probe other workspaces.
|
||||
func (h *Handler) GetChatSessionGCCheck(w http.ResponseWriter, r *http.Request) {
|
||||
sessionID := chi.URLParam(r, "sessionId")
|
||||
sessionUUID, ok := parseUUIDOrBadRequest(w, sessionID, "session_id")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
session, err := h.Queries.GetChatSession(r.Context(), sessionUUID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "chat session not found")
|
||||
return
|
||||
}
|
||||
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(session.WorkspaceID)) {
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"status": session.Status,
|
||||
"updated_at": session.UpdatedAt.Time,
|
||||
})
|
||||
}
|
||||
|
||||
// GetAutopilotRunGCCheck returns the status and completed_at of an autopilot
|
||||
// run for the daemon GC loop. autopilot_run has no updated_at column; the
|
||||
// daemon uses completed_at as the TTL anchor for terminal runs, and treats
|
||||
// non-terminal status as a skip signal regardless of timestamp.
|
||||
//
|
||||
// Workspace ownership is resolved via the parent autopilot row.
|
||||
func (h *Handler) GetAutopilotRunGCCheck(w http.ResponseWriter, r *http.Request) {
|
||||
runID := chi.URLParam(r, "runId")
|
||||
runUUID, ok := parseUUIDOrBadRequest(w, runID, "run_id")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
run, err := h.Queries.GetAutopilotRun(r.Context(), runUUID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "autopilot run not found")
|
||||
return
|
||||
}
|
||||
autopilot, err := h.Queries.GetAutopilot(r.Context(), run.AutopilotID)
|
||||
if err != nil {
|
||||
// Parent autopilot is gone — treat as not found rather than 500
|
||||
// so the daemon can fall through to its orphan-by-mtime path.
|
||||
writeError(w, http.StatusNotFound, "autopilot run not found")
|
||||
return
|
||||
}
|
||||
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(autopilot.WorkspaceID)) {
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"status": run.Status,
|
||||
"completed_at": run.CompletedAt.Time,
|
||||
})
|
||||
}
|
||||
|
||||
// GetTaskGCCheck returns the agent_task_queue status for quick-create cleanup.
|
||||
// Quick-create tasks have no parent record (no issue_id at WriteGCMeta time,
|
||||
// no chat session, no autopilot run) so the daemon keys GC directly on the
|
||||
// task row itself.
|
||||
func (h *Handler) GetTaskGCCheck(w http.ResponseWriter, r *http.Request) {
|
||||
taskID := chi.URLParam(r, "taskId")
|
||||
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"status": task.Status,
|
||||
"completed_at": task.CompletedAt.Time,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2285,3 +2285,217 @@ func TestClaimTask_ChatLegacyNullRuntimeFallsBackToTaskRow(t *testing.T) {
|
||||
t.Fatalf("legacy fallback: expected PriorWorkDir='/tmp/legacy-fallback-workdir', got %q", task.PriorWorkDir)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetChatSessionGCCheck verifies the chat session gc-check endpoint
|
||||
// matches the same anti-enumeration shape as GetIssueGCCheck: cross-workspace
|
||||
// daemon tokens get 404, same-workspace tokens get the live status.
|
||||
func TestGetChatSessionGCCheck(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("setup: get agent: %v", err)
|
||||
}
|
||||
|
||||
var sessionID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO chat_session (workspace_id, agent_id, creator_id, title, status)
|
||||
VALUES ($1, $2, $3, 'gc-check fixture', 'active')
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID, testUserID).Scan(&sessionID); err != nil {
|
||||
t.Fatalf("setup: create chat session: %v", err)
|
||||
}
|
||||
defer testPool.Exec(ctx, `DELETE FROM chat_session WHERE id = $1`, sessionID)
|
||||
|
||||
// Cross-workspace daemon token must 404 with no oracle.
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest("GET", "/api/daemon/chat-sessions/"+sessionID+"/gc-check", nil,
|
||||
"00000000-0000-0000-0000-000000000000", "attacker-daemon")
|
||||
req = withURLParam(req, "sessionId", sessionID)
|
||||
testHandler.GetChatSessionGCCheck(w, req)
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("cross-workspace token: expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// Same-workspace daemon token sees the live row.
|
||||
w = httptest.NewRecorder()
|
||||
req = newDaemonTokenRequest("GET", "/api/daemon/chat-sessions/"+sessionID+"/gc-check", nil,
|
||||
testWorkspaceID, "legit-daemon")
|
||||
req = withURLParam(req, "sessionId", sessionID)
|
||||
testHandler.GetChatSessionGCCheck(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("same-workspace token: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Status string `json:"status"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if resp.Status != "active" {
|
||||
t.Fatalf("expected status %q, got %q", "active", resp.Status)
|
||||
}
|
||||
if resp.UpdatedAt == "" {
|
||||
t.Fatal("expected updated_at to be set")
|
||||
}
|
||||
|
||||
// Hard-deleted session: 404 — exactly what the daemon needs to reclaim
|
||||
// the workdir on the next GC pass after a user runs DeleteChatSession.
|
||||
if _, err := testPool.Exec(ctx, `DELETE FROM chat_session WHERE id = $1`, sessionID); err != nil {
|
||||
t.Fatalf("delete chat session: %v", err)
|
||||
}
|
||||
w = httptest.NewRecorder()
|
||||
req = newDaemonTokenRequest("GET", "/api/daemon/chat-sessions/"+sessionID+"/gc-check", nil,
|
||||
testWorkspaceID, "legit-daemon")
|
||||
req = withURLParam(req, "sessionId", sessionID)
|
||||
testHandler.GetChatSessionGCCheck(w, req)
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("hard-deleted session: expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetAutopilotRunGCCheck verifies the autopilot-run gc-check endpoint:
|
||||
// 200 with status+completed_at on success, 404 on cross-workspace probe.
|
||||
func TestGetAutopilotRunGCCheck(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("setup: get agent: %v", err)
|
||||
}
|
||||
|
||||
var autopilotID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO autopilot (
|
||||
workspace_id, title, assignee_id, execution_mode,
|
||||
created_by_type, created_by_id
|
||||
)
|
||||
VALUES ($1, 'gc-check autopilot', $2, 'run_only', 'member', $3)
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID, testUserID).Scan(&autopilotID); err != nil {
|
||||
t.Fatalf("setup: create autopilot: %v", err)
|
||||
}
|
||||
defer testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
|
||||
|
||||
var runID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO autopilot_run (autopilot_id, source, status, completed_at)
|
||||
VALUES ($1, 'manual', 'completed', NOW() - INTERVAL '6 days')
|
||||
RETURNING id
|
||||
`, autopilotID).Scan(&runID); err != nil {
|
||||
t.Fatalf("setup: create autopilot_run: %v", err)
|
||||
}
|
||||
|
||||
// Cross-workspace probe.
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest("GET", "/api/daemon/autopilot-runs/"+runID+"/gc-check", nil,
|
||||
"00000000-0000-0000-0000-000000000000", "attacker-daemon")
|
||||
req = withURLParam(req, "runId", runID)
|
||||
testHandler.GetAutopilotRunGCCheck(w, req)
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("cross-workspace token: expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// Same-workspace probe.
|
||||
w = httptest.NewRecorder()
|
||||
req = newDaemonTokenRequest("GET", "/api/daemon/autopilot-runs/"+runID+"/gc-check", nil,
|
||||
testWorkspaceID, "legit-daemon")
|
||||
req = withURLParam(req, "runId", runID)
|
||||
testHandler.GetAutopilotRunGCCheck(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("same-workspace token: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt string `json:"completed_at"`
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if resp.Status != "completed" {
|
||||
t.Fatalf("expected status %q, got %q", "completed", resp.Status)
|
||||
}
|
||||
if resp.CompletedAt == "" {
|
||||
t.Fatal("expected completed_at to be set for terminal run")
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetTaskGCCheck verifies the task gc-check endpoint that quick-create
|
||||
// workdirs key on. Same anti-enumeration shape via requireDaemonTaskAccess.
|
||||
func TestGetTaskGCCheck(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var agentID, runtimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT a.id, a.runtime_id FROM agent a WHERE a.workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&agentID, &runtimeID); err != nil {
|
||||
t.Fatalf("setup: get agent: %v", err)
|
||||
}
|
||||
|
||||
// Quick-create-shaped task: no issue_id, no chat_session_id, no run id.
|
||||
// context.type is set so ResolveTaskWorkspaceID can recover workspace.
|
||||
quickContext, _ := json.Marshal(map[string]any{
|
||||
"type": "quick_create",
|
||||
"prompt": "fixture",
|
||||
"requester_id": testUserID,
|
||||
"workspace_id": testWorkspaceID,
|
||||
})
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (
|
||||
agent_id, runtime_id, status, priority, context, completed_at
|
||||
)
|
||||
VALUES ($1, $2, 'completed', 0, $3, NOW())
|
||||
RETURNING id
|
||||
`, agentID, runtimeID, quickContext).Scan(&taskID); err != nil {
|
||||
t.Fatalf("setup: create quick-create task: %v", err)
|
||||
}
|
||||
defer testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
|
||||
|
||||
// Cross-workspace probe.
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest("GET", "/api/daemon/tasks/"+taskID+"/gc-check", nil,
|
||||
"00000000-0000-0000-0000-000000000000", "attacker-daemon")
|
||||
req = withURLParam(req, "taskId", taskID)
|
||||
testHandler.GetTaskGCCheck(w, req)
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("cross-workspace token: expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// Same-workspace probe — terminal task returns its status.
|
||||
w = httptest.NewRecorder()
|
||||
req = newDaemonTokenRequest("GET", "/api/daemon/tasks/"+taskID+"/gc-check", nil,
|
||||
testWorkspaceID, "legit-daemon")
|
||||
req = withURLParam(req, "taskId", taskID)
|
||||
testHandler.GetTaskGCCheck(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("same-workspace token: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt string `json:"completed_at"`
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if resp.Status != "completed" {
|
||||
t.Fatalf("expected status %q, got %q", "completed", resp.Status)
|
||||
}
|
||||
if resp.CompletedAt == "" {
|
||||
t.Fatal("expected completed_at to be set for completed task")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user