diff --git a/server/cmd/server/autopilot_listeners_test.go b/server/cmd/server/autopilot_listeners_test.go index 8fb81e93b..6db6999c6 100644 --- a/server/cmd/server/autopilot_listeners_test.go +++ b/server/cmd/server/autopilot_listeners_test.go @@ -120,3 +120,93 @@ func TestAutopilotRunOnlyTaskTerminalEventsUpdateRun(t *testing.T) { }) } } + +// TestAutopilotDispatchSkipsWhenRuntimeOffline locks in the MUL-1899 +// admission gate: when the assignee agent's runtime is not online we must +// record a `skipped` autopilot_run with a failure_reason and NOT enqueue an +// agent_task_queue row. This is the fix for "活跃 schedule 持续给离线 local +// agent 入队". +func TestAutopilotDispatchSkipsWhenRuntimeOffline(t *testing.T) { + ctx := context.Background() + queries := db.New(testPool) + bus := events.New() + taskSvc := service.NewTaskService(queries, testPool, nil, bus) + autopilotSvc := service.NewAutopilotService(queries, testPool, bus, taskSvc) + + // Spin up a dedicated runtime + agent so we can flip the runtime to + // offline without affecting the shared fixture used by other tests. + var runtimeID, agentID string + if err := testPool.QueryRow(ctx, ` + INSERT INTO agent_runtime ( + workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at + ) + VALUES ($1, NULL, 'Offline runtime', 'local', 'mul1899_offline_runtime', 'offline', '{}'::jsonb, '{}'::jsonb, now()) + RETURNING id::text + `, parseUUID(testWorkspaceID)).Scan(&runtimeID); err != nil { + t.Fatalf("create offline runtime: %v", err) + } + t.Cleanup(func() { + _, _ = testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, runtimeID) + }) + + if err := testPool.QueryRow(ctx, ` + INSERT INTO agent ( + workspace_id, name, description, runtime_mode, runtime_config, + runtime_id, visibility, max_concurrent_tasks, owner_id + ) + VALUES ($1, 'mul1899-offline-agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3) + RETURNING id::text + `, parseUUID(testWorkspaceID), runtimeID, parseUUID(testUserID)).Scan(&agentID); err != nil { + t.Fatalf("create offline agent: %v", err) + } + t.Cleanup(func() { + _, _ = testPool.Exec(context.Background(), `DELETE FROM agent WHERE id = $1`, agentID) + }) + + ap, err := queries.CreateAutopilot(ctx, db.CreateAutopilotParams{ + WorkspaceID: parseUUID(testWorkspaceID), + Title: "Offline-runtime autopilot", + Description: pgtype.Text{String: "MUL-1899 admission test", Valid: true}, + AssigneeID: parseUUID(agentID), + Status: "active", + ExecutionMode: "run_only", + IssueTitleTemplate: pgtype.Text{}, + CreatedByType: "member", + CreatedByID: parseUUID(testUserID), + }) + if err != nil { + t.Fatalf("CreateAutopilot: %v", err) + } + t.Cleanup(func() { + _, _ = testPool.Exec(context.Background(), `DELETE FROM autopilot WHERE id = $1`, ap.ID) + }) + + run, err := autopilotSvc.DispatchAutopilot(ctx, ap, pgtype.UUID{}, "schedule", nil) + if err != nil { + t.Fatalf("DispatchAutopilot: %v", err) + } + if run == nil { + t.Fatal("expected a run, got nil") + } + if run.Status != "skipped" { + t.Fatalf("expected run status 'skipped', got %q", run.Status) + } + if !run.FailureReason.Valid || !strings.Contains(run.FailureReason.String, "offline") { + t.Fatalf("expected failure reason mentioning 'offline', got %+v", run.FailureReason) + } + if run.TaskID.Valid { + t.Fatalf("expected no task to be enqueued, got task_id %v", run.TaskID) + } + + // Defensive: confirm at the DB layer that nothing landed on the queue. + var taskCount int + if err := testPool.QueryRow(ctx, + `SELECT count(*) FROM agent_task_queue WHERE agent_id = $1`, + agentID, + ).Scan(&taskCount); err != nil { + t.Fatalf("count tasks: %v", err) + } + if taskCount != 0 { + t.Fatalf("expected 0 queued tasks for offline-runtime agent, got %d", taskCount) + } +} diff --git a/server/cmd/server/runtime_sweeper.go b/server/cmd/server/runtime_sweeper.go index 908cfe62d..0c7939cbc 100644 --- a/server/cmd/server/runtime_sweeper.go +++ b/server/cmd/server/runtime_sweeper.go @@ -38,6 +38,24 @@ const ( // runningTimeoutSeconds fails tasks stuck in 'running' beyond this. // The default agent timeout is 2h, so 2.5h gives a generous buffer. runningTimeoutSeconds = 9000.0 + // queuedTTLSeconds expires tasks that have been sitting in 'queued' + // for longer than this without ever being claimed. This is the cleanup + // arm of the MUL-1899 backlog fix: even with the dispatch-time + // admission gate that blocks new enqueues against offline runtimes, + // tasks already on the queue when a runtime drops off (or that lost + // the race against a runtime that went offline mid-tick) need a + // time-bounded exit. 2 hours is conservatively above any reasonable + // "queued behind a long-running task" window for an online runtime + // (default agent timeout is 2h, sweeper interval is 30s) so we don't + // expire legitimately-pending work, while still draining the historical + // 87k autopilot backlog within ~24h once enabled. + queuedTTLSeconds = 2 * 3600.0 + // queuedExpireBatchSize caps how many queued rows a single sweeper tick + // transitions to failed. Keeps the sweep transaction short even when + // the historical backlog is large (~89k at MUL-1899 baseline). At 30s + // ticks and 500 rows/tick we drain 60k rows/hour worst case — plenty + // of headroom for the documented backlog without monopolising DB CPU. + queuedExpireBatchSize = 500 ) // runRuntimeSweeper periodically marks runtimes as offline if their @@ -62,6 +80,7 @@ func runRuntimeSweeper(ctx context.Context, queries *db.Queries, liveness handle case <-ticker.C: sweepStaleRuntimes(ctx, queries, liveness, taskSvc, bus) sweepStaleTasks(ctx, queries, taskSvc, bus) + sweepExpiredQueuedTasks(ctx, queries, taskSvc) gcRuntimes(ctx, queries, bus) } } @@ -237,6 +256,29 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, taskSvc *service. taskSvc.HandleFailedTasks(ctx, failedTasks) } +// sweepExpiredQueuedTasks fails tasks that have been sitting in 'queued' for +// longer than the TTL. Companion to the dispatch-time admission gate added +// in MUL-1899: that gate prevents new doomed enqueues; this gate drains the +// historical backlog and catches the race where a runtime goes offline AFTER +// a task is already queued. Capped to queuedExpireBatchSize per tick so a +// big backlog can't monopolise the DB. +func sweepExpiredQueuedTasks(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService) { + failedTasks, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{ + TtlSecs: queuedTTLSeconds, + MaxPerTick: queuedExpireBatchSize, + }) + if err != nil { + slog.Warn("task sweeper: failed to expire stale queued tasks", "error", err) + return + } + if len(failedTasks) == 0 { + return + } + + slog.Info("task sweeper: expired stale queued tasks", "count", len(failedTasks)) + taskSvc.HandleFailedTasks(ctx, failedTasks) +} + // broadcastFailedTasks is preserved as a thin shim for the integration tests // in this package. New call sites should use TaskService.HandleFailedTasks // directly so the side effects (event broadcast, agent reconcile, issue diff --git a/server/cmd/server/runtime_sweeper_test.go b/server/cmd/server/runtime_sweeper_test.go index 6d7cd9307..45d8cfa2a 100644 --- a/server/cmd/server/runtime_sweeper_test.go +++ b/server/cmd/server/runtime_sweeper_test.go @@ -505,6 +505,190 @@ func TestSweepDoesNotResetIssueAlreadyInReview(t *testing.T) { } } +// TestExpireStaleQueuedTasks verifies the MUL-1899 queued-TTL sweeper: +// tasks that have been sitting in 'queued' beyond the TTL are transitioned +// to 'failed' with failure_reason='queued_expired', while fresh queued tasks +// are left alone and the per-tick batch limit is respected. +func TestExpireStaleQueuedTasks(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + ctx := context.Background() + + // Find the integration test agent + var agentID, runtimeID string + if err := testPool.QueryRow(ctx, ` + SELECT a.id, a.runtime_id FROM agent a + JOIN member m ON m.workspace_id = a.workspace_id + JOIN "user" u ON u.id = m.user_id + WHERE u.email = $1 + LIMIT 1 + `, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil { + t.Fatalf("failed to find test agent: %v", err) + } + + // One ancient queued task (should expire) and one fresh queued task (should not). + // Constraint: idx_one_pending_task_per_issue_agent → use distinct issues. + mkIssue := func(label string) string { + var issueID string + if err := testPool.QueryRow(ctx, ` + WITH bumped AS ( + UPDATE workspace SET issue_counter = issue_counter + 1 + WHERE id = $1 RETURNING issue_counter + ) + INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number) + SELECT $1, $3, 'todo', 'none', 'member', m.user_id, 'agent', $2, (SELECT issue_counter FROM bumped) + FROM member m WHERE m.workspace_id = $1 LIMIT 1 + RETURNING id + `, testWorkspaceID, agentID, label).Scan(&issueID); err != nil { + t.Fatalf("failed to create %s issue: %v", label, err) + } + return issueID + } + oldIssueID := mkIssue("Queued TTL test (old)") + freshIssueID := mkIssue("Queued TTL test (fresh)") + t.Cleanup(func() { + testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id IN ($1, $2)`, oldIssueID, freshIssueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id IN ($1, $2)`, oldIssueID, freshIssueID) + }) + + var oldTaskID, freshTaskID string + if err := testPool.QueryRow(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at) + VALUES ($1, $2, $3, 'queued', 0, now() - interval '5 hours') + RETURNING id + `, agentID, runtimeID, oldIssueID).Scan(&oldTaskID); err != nil { + t.Fatalf("failed to insert old queued task: %v", err) + } + if err := testPool.QueryRow(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at) + VALUES ($1, $2, $3, 'queued', 0, now()) + RETURNING id + `, agentID, runtimeID, freshIssueID).Scan(&freshTaskID); err != nil { + t.Fatalf("failed to insert fresh queued task: %v", err) + } + + queries := db.New(testPool) + failed, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{ + TtlSecs: 3600.0, // 1h TTL — old task is 5h, fresh task is 0s + MaxPerTick: 100, + }) + if err != nil { + t.Fatalf("ExpireStaleQueuedTasks failed: %v", err) + } + if len(failed) != 1 { + t.Fatalf("expected exactly 1 expired task, got %d", len(failed)) + } + if failed[0].ID.Bytes != parseUUIDBytes(oldTaskID) { + t.Fatalf("expired the wrong task: got %x", failed[0].ID.Bytes) + } + + // DB assertions: old → failed/queued_expired, fresh → still queued. + var oldStatus, oldReason, oldErr string + if err := testPool.QueryRow(ctx, ` + SELECT status, COALESCE(failure_reason, ''), COALESCE(error, '') + FROM agent_task_queue WHERE id = $1 + `, oldTaskID).Scan(&oldStatus, &oldReason, &oldErr); err != nil { + t.Fatalf("failed to read old task: %v", err) + } + if oldStatus != "failed" { + t.Fatalf("old task: expected status=failed, got %q", oldStatus) + } + if oldReason != "queued_expired" { + t.Fatalf("old task: expected failure_reason=queued_expired, got %q", oldReason) + } + if !strings.Contains(oldErr, "expired in queue") { + t.Fatalf("old task: expected error to mention expiry, got %q", oldErr) + } + + var freshStatus string + if err := testPool.QueryRow(ctx, ` + SELECT status FROM agent_task_queue WHERE id = $1 + `, freshTaskID).Scan(&freshStatus); err != nil { + t.Fatalf("failed to read fresh task: %v", err) + } + if freshStatus != "queued" { + t.Fatalf("fresh task: expected status=queued, got %q", freshStatus) + } +} + +// TestExpireStaleQueuedTasksRespectsBatchLimit verifies the per-tick cap so +// that a large historical backlog cannot monopolise a single sweep. +func TestExpireStaleQueuedTasksRespectsBatchLimit(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + ctx := context.Background() + + var agentID, runtimeID string + if err := testPool.QueryRow(ctx, ` + SELECT a.id, a.runtime_id FROM agent a + JOIN member m ON m.workspace_id = a.workspace_id + JOIN "user" u ON u.id = m.user_id + WHERE u.email = $1 + LIMIT 1 + `, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil { + t.Fatalf("failed to find test agent: %v", err) + } + + // Create 5 issues, each with one stale queued task — necessary because of the + // idx_one_pending_task_per_issue_agent unique constraint. + var issueIDs []string + t.Cleanup(func() { + for _, id := range issueIDs { + testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, id) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, id) + } + }) + for i := 0; i < 5; i++ { + var issueID string + if err := testPool.QueryRow(ctx, ` + WITH bumped AS ( + UPDATE workspace SET issue_counter = issue_counter + 1 + WHERE id = $1 RETURNING issue_counter + ) + INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number) + SELECT $1, 'Queued TTL batch test', 'todo', 'none', 'member', m.user_id, 'agent', $2, (SELECT issue_counter FROM bumped) + FROM member m WHERE m.workspace_id = $1 LIMIT 1 + RETURNING id + `, testWorkspaceID, agentID).Scan(&issueID); err != nil { + t.Fatalf("failed to create issue %d: %v", i, err) + } + issueIDs = append(issueIDs, issueID) + if _, err := testPool.Exec(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at) + VALUES ($1, $2, $3, 'queued', 0, now() - interval '5 hours') + `, agentID, runtimeID, issueID); err != nil { + t.Fatalf("failed to insert backlog task %d: %v", i, err) + } + } + + queries := db.New(testPool) + failed, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{ + TtlSecs: 3600.0, + MaxPerTick: 2, // cap below the backlog + }) + if err != nil { + t.Fatalf("ExpireStaleQueuedTasks failed: %v", err) + } + if len(failed) != 2 { + t.Fatalf("expected batch cap of 2, got %d", len(failed)) + } + + var remaining int + if err := testPool.QueryRow(ctx, ` + SELECT COUNT(*) FROM agent_task_queue + WHERE issue_id = ANY($1::uuid[]) AND status = 'queued' + `, issueIDs).Scan(&remaining); err != nil { + t.Fatalf("failed to count remaining queued: %v", err) + } + if remaining != 3 { + t.Fatalf("expected 3 queued tasks remaining after batched sweep, got %d", remaining) + } +} + // parseUUIDBytes converts a UUID string to the 16-byte array used by pgtype.UUID. func parseUUIDBytes(s string) [16]byte { s = strings.ReplaceAll(s, "-", "") diff --git a/server/internal/service/autopilot.go b/server/internal/service/autopilot.go index 7f531b2c8..c92949b1c 100644 --- a/server/internal/service/autopilot.go +++ b/server/internal/service/autopilot.go @@ -35,6 +35,12 @@ func NewAutopilotService(q *db.Queries, tx TxStarter, bus *events.Bus, taskSvc * // DispatchAutopilot is the core execution entry point. // It creates a run and either creates an issue or enqueues a direct agent task // depending on execution_mode. +// +// Before any work is queued we run an admission check against the assignee +// agent's runtime: if it is not online, we record a `skipped` run with a +// failure_reason and return without enqueueing. This is the "触发时准入" gate +// from MUL-1899 — without it a paused laptop / offline daemon causes scheduled +// autopilots to pile thousands of doomed tasks onto agent_task_queue. func (s *AutopilotService) DispatchAutopilot( ctx context.Context, autopilot db.Autopilot, @@ -42,6 +48,10 @@ func (s *AutopilotService) DispatchAutopilot( source string, payload []byte, ) (*db.AutopilotRun, error) { + if reason, skip := s.shouldSkipDispatch(ctx, autopilot); skip { + return s.recordSkippedRun(ctx, autopilot, triggerID, source, payload, reason) + } + // Determine initial status based on execution mode. initialStatus := "issue_created" if autopilot.ExecutionMode == "run_only" { @@ -335,6 +345,96 @@ func (s *AutopilotService) failRun(ctx context.Context, runID pgtype.UUID, reaso } } +// shouldSkipDispatch is the pre-flight admission check from MUL-1899. +// Returns (reason, true) when dispatching now would only enqueue a doomed +// task — i.e. the assignee agent is gone, archived, has no runtime bound, or +// its runtime is not currently online. Returns ("", false) on the happy path. +// +// Errors loading the agent / runtime are logged but treated as "do not skip" +// so a transient DB hiccup never silently swallows a scheduled run. +func (s *AutopilotService) shouldSkipDispatch(ctx context.Context, ap db.Autopilot) (string, bool) { + if !ap.AssigneeID.Valid { + return "autopilot has no assignee", true + } + agent, err := s.Queries.GetAgent(ctx, ap.AssigneeID) + if err != nil { + slog.Warn("autopilot admission: failed to load assignee agent", + "autopilot_id", util.UUIDToString(ap.ID), + "agent_id", util.UUIDToString(ap.AssigneeID), + "error", err, + ) + return "", false + } + if agent.ArchivedAt.Valid { + return "assignee agent is archived", true + } + if !agent.RuntimeID.Valid { + return "assignee agent has no runtime bound", true + } + rt, err := s.Queries.GetAgentRuntime(ctx, agent.RuntimeID) + if err != nil { + slog.Warn("autopilot admission: failed to load runtime", + "autopilot_id", util.UUIDToString(ap.ID), + "runtime_id", util.UUIDToString(agent.RuntimeID), + "error", err, + ) + return "", false + } + if rt.Status != "online" { + return "agent runtime is " + rt.Status + " at dispatch time", true + } + return "", false +} + +// recordSkippedRun persists a `skipped` autopilot_run with the given reason +// and emits the same WS / analytics signals that a normal terminal transition +// would. Returns the run + nil error so callers (scheduler tick, manual +// trigger handler) treat this as a successful — but no-op — dispatch. +func (s *AutopilotService) recordSkippedRun( + ctx context.Context, + autopilot db.Autopilot, + triggerID pgtype.UUID, + source string, + payload []byte, + reason string, +) (*db.AutopilotRun, error) { + run, err := s.Queries.CreateAutopilotRun(ctx, db.CreateAutopilotRunParams{ + AutopilotID: autopilot.ID, + TriggerID: triggerID, + Source: source, + Status: "skipped", + TriggerPayload: payload, + }) + if err != nil { + return nil, fmt.Errorf("create skipped run: %w", err) + } + + updated, err := s.Queries.UpdateAutopilotRunSkipped(ctx, db.UpdateAutopilotRunSkippedParams{ + ID: run.ID, + FailureReason: pgtype.Text{String: reason, Valid: true}, + }) + if err == nil { + run = updated + } else { + slog.Warn("failed to set skip reason on autopilot run", + "run_id", util.UUIDToString(run.ID), "error", err) + } + + slog.Info("autopilot dispatch skipped", + "autopilot_id", util.UUIDToString(autopilot.ID), + "run_id", util.UUIDToString(run.ID), + "source", source, + "reason", reason, + ) + + // Bump last_run_at so scheduler advancement and "last seen" UI both + // reflect that we did evaluate the trigger this tick. + s.Queries.UpdateAutopilotLastRunAt(ctx, autopilot.ID) + + s.publishRunDone(util.UUIDToString(autopilot.WorkspaceID), run, "skipped") + return &run, nil +} + func (s *AutopilotService) publishRunDone(workspaceID string, run db.AutopilotRun, status string) { s.Bus.Publish(events.Event{ Type: protocol.EventAutopilotRunDone, diff --git a/server/migrations/079_autopilot_run_skipped_status.down.sql b/server/migrations/079_autopilot_run_skipped_status.down.sql new file mode 100644 index 000000000..4e6ebfb5d --- /dev/null +++ b/server/migrations/079_autopilot_run_skipped_status.down.sql @@ -0,0 +1,11 @@ +-- Migrate any 'skipped' rows to 'failed' before tightening the constraint +-- (mirrors what 043 did for the original removal). +UPDATE autopilot_run +SET status = 'failed', + completed_at = COALESCE(completed_at, now()), + failure_reason = COALESCE(failure_reason, 'migrated from skipped status') +WHERE status = 'skipped'; + +ALTER TABLE autopilot_run DROP CONSTRAINT IF EXISTS autopilot_run_status_check; +ALTER TABLE autopilot_run ADD CONSTRAINT autopilot_run_status_check + CHECK (status IN ('issue_created', 'running', 'completed', 'failed')); diff --git a/server/migrations/079_autopilot_run_skipped_status.up.sql b/server/migrations/079_autopilot_run_skipped_status.up.sql new file mode 100644 index 000000000..b01890003 --- /dev/null +++ b/server/migrations/079_autopilot_run_skipped_status.up.sql @@ -0,0 +1,16 @@ +-- MUL-1899: re-introduce the 'skipped' terminal status for autopilot_run. +-- Migration 043 removed 'skipped' along with the broken concurrency_policy +-- feature, but the offline-runtime admission gate added in this PR needs a +-- non-failure terminal status to record dispatches that were intentionally +-- declined (e.g. assignee runtime is offline). Reusing 'failed' would +-- pollute the failure-rate signal that drives the auto-pause monitor. +ALTER TABLE autopilot_run DROP CONSTRAINT IF EXISTS autopilot_run_status_check; +ALTER TABLE autopilot_run ADD CONSTRAINT autopilot_run_status_check + CHECK (status IN ('issue_created', 'running', 'completed', 'failed', 'skipped')); + +-- Partial index on status for in-flight runs is unchanged: 'skipped' is +-- terminal so the existing index (issue_created/running) still matches. +-- +-- The companion partial index for the queued-task TTL sweeper lives in +-- migration 080 — it must be created CONCURRENTLY (hot table) and therefore +-- cannot share a multi-statement file with the constraint change above. diff --git a/server/migrations/080_agent_task_queue_queued_index.down.sql b/server/migrations/080_agent_task_queue_queued_index.down.sql new file mode 100644 index 000000000..c68d015dc --- /dev/null +++ b/server/migrations/080_agent_task_queue_queued_index.down.sql @@ -0,0 +1 @@ +DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_queued_created_at; diff --git a/server/migrations/080_agent_task_queue_queued_index.up.sql b/server/migrations/080_agent_task_queue_queued_index.up.sql new file mode 100644 index 000000000..02d2dce0e --- /dev/null +++ b/server/migrations/080_agent_task_queue_queued_index.up.sql @@ -0,0 +1,18 @@ +-- Partial index that backs the queued-task TTL sweeper added in MUL-1899 +-- (sweepExpiredQueuedTasks in cmd/server/runtime_sweeper.go). The sweeper +-- runs every 30s and looks up the oldest queued tasks with: +-- WHERE status = 'queued' AND created_at < now() - interval '...' +-- ORDER BY created_at ASC LIMIT 500 +-- Without a queued-only partial index on created_at this devolves into a +-- full scan once historical terminal rows accumulate (MUL-1899 baseline: +-- ~89k+ rows). The partial index stays tiny because only in-flight rows +-- live in 'queued'. +-- +-- CONCURRENTLY because agent_task_queue is hot — a plain CREATE INDEX would +-- take an ACCESS EXCLUSIVE lock and block the dispatch path during build. +-- Matches the pattern in 035/067/074/075/078; 068 documents that the +-- migration runner cannot mix CONCURRENTLY with other statements in the +-- same file, so this lives in its own single-statement migration. +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_queued_created_at + ON agent_task_queue (created_at) + WHERE status = 'queued'; diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 9625e6b8f..c44f30b91 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -789,6 +789,100 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas return i, err } +const expireStaleQueuedTasks = `-- name: ExpireStaleQueuedTasks :many +WITH victims AS ( + SELECT id FROM agent_task_queue + WHERE status = 'queued' + AND created_at < now() - make_interval(secs => $1::double precision) + ORDER BY created_at ASC + LIMIT $2::int + FOR UPDATE SKIP LOCKED +) +UPDATE agent_task_queue t +SET status = 'failed', + completed_at = now(), + error = 'task expired in queue', + failure_reason = 'queued_expired' +FROM victims v +WHERE t.id = v.id + AND t.status = 'queued' + AND t.created_at < now() - make_interval(secs => $1::double precision) +RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session +` + +type ExpireStaleQueuedTasksParams struct { + TtlSecs float64 `json:"ttl_secs"` + MaxPerTick int32 `json:"max_per_tick"` +} + +// Fails tasks that have been sitting in 'queued' for longer than the TTL. +// This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the +// new dispatch-time admission gate that refuses to enqueue when the runtime +// is offline, we still need to drain the historical 87k+ doomed rows and +// handle edge cases where a runtime goes offline AFTER a task is already +// queued (the admission check protects new enqueues, not in-flight queue +// depth). +// +// Concurrency safety: the daemon's claim path may race with this sweeper to +// transition the same row out of 'queued'. We protect against that two +// ways: +// 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is +// currently being claimed (or otherwise locked) is skipped — no lock +// contention with the dispatch path, and we won't queue up behind it. +// 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at +// apply time. If a daemon claimed the row between selection and update +// (e.g. lock released after the claim transaction commits), the row is +// already 'dispatched'/'running' and the WHERE clause filters it out +// so we cannot clobber an in-flight task. +// +// Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise +// the DB when the backlog is large — the sweeper drains the rest on +// subsequent ticks. +func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQueuedTasksParams) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, expireStaleQueuedTasks, arg.TtlSecs, arg.MaxPerTick) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.TriggerSummary, + &i.ForceFreshSession, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const failAgentTask = `-- name: FailAgentTask :one UPDATE agent_task_queue SET status = 'failed', diff --git a/server/pkg/db/generated/autopilot.sql.go b/server/pkg/db/generated/autopilot.sql.go index 439be8454..8be3aeaae 100644 --- a/server/pkg/db/generated/autopilot.sql.go +++ b/server/pkg/db/generated/autopilot.sql.go @@ -689,7 +689,7 @@ const selectAutopilotsExceedingFailureThreshold = `-- name: SelectAutopilotsExce WITH stats AS ( SELECT autopilot_id, - count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total, + count(*) FILTER (WHERE status IN ('completed', 'failed')) AS total, count(*) FILTER (WHERE status = 'failed') AS failed FROM autopilot_run WHERE created_at >= $3::timestamptz @@ -728,8 +728,13 @@ type SelectAutopilotsExceedingFailureThresholdRow struct { // Failure-rate auto-pause // ===================== // Find active autopilots whose recent run failure rate exceeds the threshold. -// Counts only terminal runs (completed | failed | skipped); pending, -// issue_created and running are excluded so in-flight work isn't penalised. +// Counts only "real" terminal runs (completed | failed). 'skipped' is +// excluded from BOTH numerator and denominator: an admission-skipped run +// (e.g. assignee runtime offline at dispatch time, MUL-1899) is neither a +// success nor a failure, so it must not dilute the failure ratio (which +// would let a 100%-failing autopilot mask itself behind a wall of skips) +// nor inflate it. issue_created/running are still excluded so in-flight +// work isn't penalised. // Used by the failure monitor to auto-pause sustained-failure autopilots // (the canonical example from MUL-1336 was an autopilot scheduled every 5 min // that 100% failed for days, burning ~1.5k useless tasks per week). @@ -988,6 +993,45 @@ func (q *Queries) UpdateAutopilotRunRunning(ctx context.Context, arg UpdateAutop return i, err } +const updateAutopilotRunSkipped = `-- name: UpdateAutopilotRunSkipped :one +UPDATE autopilot_run +SET status = 'skipped', completed_at = now(), failure_reason = $2 +WHERE id = $1 +RETURNING id, autopilot_id, trigger_id, source, status, issue_id, task_id, triggered_at, completed_at, failure_reason, trigger_payload, result, created_at +` + +type UpdateAutopilotRunSkippedParams struct { + ID pgtype.UUID `json:"id"` + FailureReason pgtype.Text `json:"failure_reason"` +} + +// Marks an autopilot_run as skipped without enqueueing any task. Used by the +// pre-flight admission check when the assignee agent's runtime is offline: +// creating an issue / task in that state would just pile a doomed job onto +// agent_task_queue (the canonical "持续给离线 local agent 入队" symptom from +// MUL-1899). Recording the skip + reason gives the UI / failure monitor / ops +// a paper trail without polluting the failure ratio. +func (q *Queries) UpdateAutopilotRunSkipped(ctx context.Context, arg UpdateAutopilotRunSkippedParams) (AutopilotRun, error) { + row := q.db.QueryRow(ctx, updateAutopilotRunSkipped, arg.ID, arg.FailureReason) + var i AutopilotRun + err := row.Scan( + &i.ID, + &i.AutopilotID, + &i.TriggerID, + &i.Source, + &i.Status, + &i.IssueID, + &i.TaskID, + &i.TriggeredAt, + &i.CompletedAt, + &i.FailureReason, + &i.TriggerPayload, + &i.Result, + &i.CreatedAt, + ) + return i, err +} + const updateAutopilotTrigger = `-- name: UpdateAutopilotTrigger :one UPDATE autopilot_trigger SET enabled = COALESCE($2::boolean, enabled), diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index b6ea988ad..32f3b1410 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -320,6 +320,48 @@ WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => @ OR (status = 'running' AND started_at < now() - make_interval(secs => @running_timeout_secs::double precision)) RETURNING *; +-- name: ExpireStaleQueuedTasks :many +-- Fails tasks that have been sitting in 'queued' for longer than the TTL. +-- This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the +-- new dispatch-time admission gate that refuses to enqueue when the runtime +-- is offline, we still need to drain the historical 87k+ doomed rows and +-- handle edge cases where a runtime goes offline AFTER a task is already +-- queued (the admission check protects new enqueues, not in-flight queue +-- depth). +-- +-- Concurrency safety: the daemon's claim path may race with this sweeper to +-- transition the same row out of 'queued'. We protect against that two +-- ways: +-- 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is +-- currently being claimed (or otherwise locked) is skipped — no lock +-- contention with the dispatch path, and we won't queue up behind it. +-- 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at +-- apply time. If a daemon claimed the row between selection and update +-- (e.g. lock released after the claim transaction commits), the row is +-- already 'dispatched'/'running' and the WHERE clause filters it out +-- so we cannot clobber an in-flight task. +-- Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise +-- the DB when the backlog is large — the sweeper drains the rest on +-- subsequent ticks. +WITH victims AS ( + SELECT id FROM agent_task_queue + WHERE status = 'queued' + AND created_at < now() - make_interval(secs => @ttl_secs::double precision) + ORDER BY created_at ASC + LIMIT @max_per_tick::int + FOR UPDATE SKIP LOCKED +) +UPDATE agent_task_queue t +SET status = 'failed', + completed_at = now(), + error = 'task expired in queue', + failure_reason = 'queued_expired' +FROM victims v +WHERE t.id = v.id + AND t.status = 'queued' + AND t.created_at < now() - make_interval(secs => @ttl_secs::double precision) +RETURNING t.*; + -- name: CancelAgentTask :one UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() diff --git a/server/pkg/db/queries/autopilot.sql b/server/pkg/db/queries/autopilot.sql index bd7623a7a..4685d98ed 100644 --- a/server/pkg/db/queries/autopilot.sql +++ b/server/pkg/db/queries/autopilot.sql @@ -134,6 +134,18 @@ SET status = 'failed', completed_at = now(), failure_reason = $2 WHERE id = $1 RETURNING *; +-- name: UpdateAutopilotRunSkipped :one +-- Marks an autopilot_run as skipped without enqueueing any task. Used by the +-- pre-flight admission check when the assignee agent's runtime is offline: +-- creating an issue / task in that state would just pile a doomed job onto +-- agent_task_queue (the canonical "持续给离线 local agent 入队" symptom from +-- MUL-1899). Recording the skip + reason gives the UI / failure monitor / ops +-- a paper trail without polluting the failure ratio. +UPDATE autopilot_run +SET status = 'skipped', completed_at = now(), failure_reason = $2 +WHERE id = $1 +RETURNING *; + -- ===================== -- Scheduler Queries -- ===================== @@ -201,14 +213,19 @@ WHERE t.kind = 'schedule' -- name: SelectAutopilotsExceedingFailureThreshold :many -- Find active autopilots whose recent run failure rate exceeds the threshold. --- Counts only terminal runs (completed | failed | skipped); pending, --- issue_created and running are excluded so in-flight work isn't penalised. +-- Counts only "real" terminal runs (completed | failed). 'skipped' is +-- excluded from BOTH numerator and denominator: an admission-skipped run +-- (e.g. assignee runtime offline at dispatch time, MUL-1899) is neither a +-- success nor a failure, so it must not dilute the failure ratio (which +-- would let a 100%-failing autopilot mask itself behind a wall of skips) +-- nor inflate it. issue_created/running are still excluded so in-flight +-- work isn't penalised. -- Used by the failure monitor to auto-pause sustained-failure autopilots -- (the canonical example from MUL-1336 was an autopilot scheduled every 5 min -- that 100% failed for days, burning ~1.5k useless tasks per week). WITH stats AS ( SELECT autopilot_id, - count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total, + count(*) FILTER (WHERE status IN ('completed', 'failed')) AS total, count(*) FILTER (WHERE status = 'failed') AS failed FROM autopilot_run WHERE created_at >= sqlc.arg('since')::timestamptz