feat(autopilot): skip dispatch when assignee runtime is offline (MUL-1899) (#2311)

* feat(autopilot): skip dispatch when assignee runtime is offline (MUL-1899)

Prevents scheduled autopilots from accumulating doomed tasks against
offline / archived / unbound agents. Before this change, a paused laptop
or crashed daemon would let a 5-minute-cron autopilot pile up thousands
of queued agent_task_queue rows that no runtime would ever drain — this
is the dominant source of the 89k stuck-task backlog flagged in MUL-1899.

DispatchAutopilot now performs a pre-flight admission check on the
assignee agent's runtime status. If the runtime is not 'online' (or the
agent is archived / has no runtime bound / has no assignee), the run is
recorded as 'skipped' with a failure_reason and no task is enqueued.
Skipped runs still emit autopilot:run.done so the UI / activity feed
reflect that the trigger fired and was evaluated.

Skipped runs are deliberately NOT counted toward the failure-ratio
auto-pause: a user who closes their laptop overnight should not have
their autopilot paused. Sustained server-side failures keep their
existing pause path via the failure monitor.

Tests: added an integration test that creates an offline runtime and
asserts DispatchAutopilot records a skipped run with no task enqueued.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* feat(scheduler): expire stale queued tasks via TTL sweeper (MUL-1899)

Companion to the dispatch-time admission gate added in this PR. The
admission gate prevents *new* tasks from being enqueued against an
offline runtime, but it does not drain the historical backlog
(~89k stuck queued rows observed at MUL-1899 baseline) and does not
help when a runtime goes offline *after* a task has already been
queued. This adds a passive TTL sweeper:

- New SQL query `ExpireStaleQueuedTasks` transitions queued tasks
  older than the TTL to status='failed' with
  failure_reason='queued_expired' and a clear error message.
- Sweep is capped per tick (`queuedExpireBatchSize`, default 500) via
  a CTE+LIMIT so that draining a large backlog cannot monopolise the
  DB on a single tick. At 30s ticks the worst case is 60k rows/hour.
- Wired into the existing 30s `runRuntimeSweeper` loop alongside
  `sweepStaleTasks` and reuses `taskSvc.HandleFailedTasks` so the
  expired tasks broadcast `task:failed` events, reconcile agent
  status, and roll back any in-progress issues — same lifecycle as
  any other failed task.
- Default TTL = 2h. Conservatively above any reasonable
  "queued behind a long-running task" window (default agent timeout
  is 2h, sweeper runs every 30s) so legitimate work isn't expired.
- Integration tests cover the happy path (stale → expired, fresh →
  left alone, correct status/reason/error) and the per-tick batch cap.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(autopilot): address review blockers from PR #2311 (MUL-1899)

GPT-Boy review of the offline-runtime + queued-TTL PR flagged four
blockers; this commit addresses them all.

1. Restore the 'skipped' autopilot_run status in the DB constraint.
   Migration 043 had removed 'skipped' along with the now-defunct
   concurrency_policy feature, so the new admission gate's INSERT of
   status='skipped' violated `autopilot_run_status_check` and broke
   `TestAutopilotDispatchSkipsWhenRuntimeOffline` in CI. New
   migration 079 re-adds 'skipped' to the CHECK list. The down
   migration migrates skipped → failed before re-tightening, mirror-
   ing what 043 did for the original removal.

2. Make `ExpireStaleQueuedTasks` race-safe.
   The CTE-then-UPDATE pattern could clobber a task that the daemon
   claimed between victim selection and the outer update. Two
   guards added:
     - `FOR UPDATE SKIP LOCKED` in the CTE so we never wait on a
       row that's currently being claimed (and never block the
       claim path either).
     - The outer UPDATE now re-checks `t.status = 'queued'` AND the
       TTL predicate so even if a row's lock is released after a
       successful claim, we cannot transition a now-dispatched/
       running task to 'failed'.

3. Add a partial index for the queued-TTL sweeper.
   `idx_agent_task_queue_queued_created_at` on `created_at WHERE
   status = 'queued'` — keeps the 30s sweep query (status=queued
   AND created_at < ... ORDER BY created_at LIMIT 500) cheap even
   when historical terminal rows accumulate (~89k+ at MUL-1899
   baseline). The partial predicate keeps the index tiny because
   only in-flight rows live in 'queued'.

4. Fix the failure-monitor denominator.
   `SelectAutopilotsExceedingFailureThreshold` had been counting
   'skipped' toward total runs, which would have diluted the failure
   ratio: a 100%-failing autopilot could mask itself behind a wall
   of admission skips. With 'skipped' restored as a real status,
   the auto-pause monitor must explicitly exclude it from BOTH
   numerator and denominator — admission skips are neither a
   success nor a failure.

Verified: `go test ./cmd/server/... ./internal/service/...` passes
(including TestAutopilotDispatchSkipsWhenRuntimeOffline,
TestExpireStaleQueuedTasks, TestExpireStaleQueuedTasksRespectsBatch
Limit). `go build ./... && go vet ./...` clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(migrations): split queued-task TTL index into concurrent migration

Per PR #2311 review: agent_task_queue is a hot table, so building the
new partial index with plain CREATE INDEX inside migration 079 would
hold ACCESS EXCLUSIVE on the queue and block dispatch during deploy.

The migration runner does not allow CONCURRENTLY to share a file with
other statements (documented in 068), so split the index into its own
single-statement file 080 — matching the existing pattern in 035 /
067 / 074 / 075 / 078. Migration 079 keeps the autopilot_run
constraint change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica-ai.local>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Multica Eve
2026-05-09 15:07:57 +08:00
committed by GitHub
parent 6d9ebb0fdd
commit a2dd80d4f6
12 changed files with 665 additions and 6 deletions

View File

@@ -120,3 +120,93 @@ func TestAutopilotRunOnlyTaskTerminalEventsUpdateRun(t *testing.T) {
})
}
}
// TestAutopilotDispatchSkipsWhenRuntimeOffline locks in the MUL-1899
// admission gate: when the assignee agent's runtime is not online we must
// record a `skipped` autopilot_run with a failure_reason and NOT enqueue an
// agent_task_queue row. This is the fix for "活跃 schedule 持续给离线 local
// agent 入队".
func TestAutopilotDispatchSkipsWhenRuntimeOffline(t *testing.T) {
ctx := context.Background()
queries := db.New(testPool)
bus := events.New()
taskSvc := service.NewTaskService(queries, testPool, nil, bus)
autopilotSvc := service.NewAutopilotService(queries, testPool, bus, taskSvc)
// Spin up a dedicated runtime + agent so we can flip the runtime to
// offline without affecting the shared fixture used by other tests.
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'Offline runtime', 'local', 'mul1899_offline_runtime', 'offline', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id::text
`, parseUUID(testWorkspaceID)).Scan(&runtimeID); err != nil {
t.Fatalf("create offline runtime: %v", err)
}
t.Cleanup(func() {
_, _ = testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, runtimeID)
})
if err := testPool.QueryRow(ctx, `
INSERT INTO agent (
workspace_id, name, description, runtime_mode, runtime_config,
runtime_id, visibility, max_concurrent_tasks, owner_id
)
VALUES ($1, 'mul1899-offline-agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3)
RETURNING id::text
`, parseUUID(testWorkspaceID), runtimeID, parseUUID(testUserID)).Scan(&agentID); err != nil {
t.Fatalf("create offline agent: %v", err)
}
t.Cleanup(func() {
_, _ = testPool.Exec(context.Background(), `DELETE FROM agent WHERE id = $1`, agentID)
})
ap, err := queries.CreateAutopilot(ctx, db.CreateAutopilotParams{
WorkspaceID: parseUUID(testWorkspaceID),
Title: "Offline-runtime autopilot",
Description: pgtype.Text{String: "MUL-1899 admission test", Valid: true},
AssigneeID: parseUUID(agentID),
Status: "active",
ExecutionMode: "run_only",
IssueTitleTemplate: pgtype.Text{},
CreatedByType: "member",
CreatedByID: parseUUID(testUserID),
})
if err != nil {
t.Fatalf("CreateAutopilot: %v", err)
}
t.Cleanup(func() {
_, _ = testPool.Exec(context.Background(), `DELETE FROM autopilot WHERE id = $1`, ap.ID)
})
run, err := autopilotSvc.DispatchAutopilot(ctx, ap, pgtype.UUID{}, "schedule", nil)
if err != nil {
t.Fatalf("DispatchAutopilot: %v", err)
}
if run == nil {
t.Fatal("expected a run, got nil")
}
if run.Status != "skipped" {
t.Fatalf("expected run status 'skipped', got %q", run.Status)
}
if !run.FailureReason.Valid || !strings.Contains(run.FailureReason.String, "offline") {
t.Fatalf("expected failure reason mentioning 'offline', got %+v", run.FailureReason)
}
if run.TaskID.Valid {
t.Fatalf("expected no task to be enqueued, got task_id %v", run.TaskID)
}
// Defensive: confirm at the DB layer that nothing landed on the queue.
var taskCount int
if err := testPool.QueryRow(ctx,
`SELECT count(*) FROM agent_task_queue WHERE agent_id = $1`,
agentID,
).Scan(&taskCount); err != nil {
t.Fatalf("count tasks: %v", err)
}
if taskCount != 0 {
t.Fatalf("expected 0 queued tasks for offline-runtime agent, got %d", taskCount)
}
}

View File

@@ -38,6 +38,24 @@ const (
// runningTimeoutSeconds fails tasks stuck in 'running' beyond this.
// The default agent timeout is 2h, so 2.5h gives a generous buffer.
runningTimeoutSeconds = 9000.0
// queuedTTLSeconds expires tasks that have been sitting in 'queued'
// for longer than this without ever being claimed. This is the cleanup
// arm of the MUL-1899 backlog fix: even with the dispatch-time
// admission gate that blocks new enqueues against offline runtimes,
// tasks already on the queue when a runtime drops off (or that lost
// the race against a runtime that went offline mid-tick) need a
// time-bounded exit. 2 hours is conservatively above any reasonable
// "queued behind a long-running task" window for an online runtime
// (default agent timeout is 2h, sweeper interval is 30s) so we don't
// expire legitimately-pending work, while still draining the historical
// 87k autopilot backlog within ~24h once enabled.
queuedTTLSeconds = 2 * 3600.0
// queuedExpireBatchSize caps how many queued rows a single sweeper tick
// transitions to failed. Keeps the sweep transaction short even when
// the historical backlog is large (~89k at MUL-1899 baseline). At 30s
// ticks and 500 rows/tick we drain 60k rows/hour worst case — plenty
// of headroom for the documented backlog without monopolising DB CPU.
queuedExpireBatchSize = 500
)
// runRuntimeSweeper periodically marks runtimes as offline if their
@@ -62,6 +80,7 @@ func runRuntimeSweeper(ctx context.Context, queries *db.Queries, liveness handle
case <-ticker.C:
sweepStaleRuntimes(ctx, queries, liveness, taskSvc, bus)
sweepStaleTasks(ctx, queries, taskSvc, bus)
sweepExpiredQueuedTasks(ctx, queries, taskSvc)
gcRuntimes(ctx, queries, bus)
}
}
@@ -237,6 +256,29 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, taskSvc *service.
taskSvc.HandleFailedTasks(ctx, failedTasks)
}
// sweepExpiredQueuedTasks fails tasks that have been sitting in 'queued' for
// longer than the TTL. Companion to the dispatch-time admission gate added
// in MUL-1899: that gate prevents new doomed enqueues; this gate drains the
// historical backlog and catches the race where a runtime goes offline AFTER
// a task is already queued. Capped to queuedExpireBatchSize per tick so a
// big backlog can't monopolise the DB.
func sweepExpiredQueuedTasks(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService) {
failedTasks, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{
TtlSecs: queuedTTLSeconds,
MaxPerTick: queuedExpireBatchSize,
})
if err != nil {
slog.Warn("task sweeper: failed to expire stale queued tasks", "error", err)
return
}
if len(failedTasks) == 0 {
return
}
slog.Info("task sweeper: expired stale queued tasks", "count", len(failedTasks))
taskSvc.HandleFailedTasks(ctx, failedTasks)
}
// broadcastFailedTasks is preserved as a thin shim for the integration tests
// in this package. New call sites should use TaskService.HandleFailedTasks
// directly so the side effects (event broadcast, agent reconcile, issue

View File

@@ -505,6 +505,190 @@ func TestSweepDoesNotResetIssueAlreadyInReview(t *testing.T) {
}
}
// TestExpireStaleQueuedTasks verifies the MUL-1899 queued-TTL sweeper:
// tasks that have been sitting in 'queued' beyond the TTL are transitioned
// to 'failed' with failure_reason='queued_expired', while fresh queued tasks
// are left alone and the per-tick batch limit is respected.
func TestExpireStaleQueuedTasks(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
ctx := context.Background()
// Find the integration test agent
var agentID, runtimeID string
if err := testPool.QueryRow(ctx, `
SELECT a.id, a.runtime_id FROM agent a
JOIN member m ON m.workspace_id = a.workspace_id
JOIN "user" u ON u.id = m.user_id
WHERE u.email = $1
LIMIT 1
`, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil {
t.Fatalf("failed to find test agent: %v", err)
}
// One ancient queued task (should expire) and one fresh queued task (should not).
// Constraint: idx_one_pending_task_per_issue_agent → use distinct issues.
mkIssue := func(label string) string {
var issueID string
if err := testPool.QueryRow(ctx, `
WITH bumped AS (
UPDATE workspace SET issue_counter = issue_counter + 1
WHERE id = $1 RETURNING issue_counter
)
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number)
SELECT $1, $3, 'todo', 'none', 'member', m.user_id, 'agent', $2, (SELECT issue_counter FROM bumped)
FROM member m WHERE m.workspace_id = $1 LIMIT 1
RETURNING id
`, testWorkspaceID, agentID, label).Scan(&issueID); err != nil {
t.Fatalf("failed to create %s issue: %v", label, err)
}
return issueID
}
oldIssueID := mkIssue("Queued TTL test (old)")
freshIssueID := mkIssue("Queued TTL test (fresh)")
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id IN ($1, $2)`, oldIssueID, freshIssueID)
testPool.Exec(ctx, `DELETE FROM issue WHERE id IN ($1, $2)`, oldIssueID, freshIssueID)
})
var oldTaskID, freshTaskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at)
VALUES ($1, $2, $3, 'queued', 0, now() - interval '5 hours')
RETURNING id
`, agentID, runtimeID, oldIssueID).Scan(&oldTaskID); err != nil {
t.Fatalf("failed to insert old queued task: %v", err)
}
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at)
VALUES ($1, $2, $3, 'queued', 0, now())
RETURNING id
`, agentID, runtimeID, freshIssueID).Scan(&freshTaskID); err != nil {
t.Fatalf("failed to insert fresh queued task: %v", err)
}
queries := db.New(testPool)
failed, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{
TtlSecs: 3600.0, // 1h TTL — old task is 5h, fresh task is 0s
MaxPerTick: 100,
})
if err != nil {
t.Fatalf("ExpireStaleQueuedTasks failed: %v", err)
}
if len(failed) != 1 {
t.Fatalf("expected exactly 1 expired task, got %d", len(failed))
}
if failed[0].ID.Bytes != parseUUIDBytes(oldTaskID) {
t.Fatalf("expired the wrong task: got %x", failed[0].ID.Bytes)
}
// DB assertions: old → failed/queued_expired, fresh → still queued.
var oldStatus, oldReason, oldErr string
if err := testPool.QueryRow(ctx, `
SELECT status, COALESCE(failure_reason, ''), COALESCE(error, '')
FROM agent_task_queue WHERE id = $1
`, oldTaskID).Scan(&oldStatus, &oldReason, &oldErr); err != nil {
t.Fatalf("failed to read old task: %v", err)
}
if oldStatus != "failed" {
t.Fatalf("old task: expected status=failed, got %q", oldStatus)
}
if oldReason != "queued_expired" {
t.Fatalf("old task: expected failure_reason=queued_expired, got %q", oldReason)
}
if !strings.Contains(oldErr, "expired in queue") {
t.Fatalf("old task: expected error to mention expiry, got %q", oldErr)
}
var freshStatus string
if err := testPool.QueryRow(ctx, `
SELECT status FROM agent_task_queue WHERE id = $1
`, freshTaskID).Scan(&freshStatus); err != nil {
t.Fatalf("failed to read fresh task: %v", err)
}
if freshStatus != "queued" {
t.Fatalf("fresh task: expected status=queued, got %q", freshStatus)
}
}
// TestExpireStaleQueuedTasksRespectsBatchLimit verifies the per-tick cap so
// that a large historical backlog cannot monopolise a single sweep.
func TestExpireStaleQueuedTasksRespectsBatchLimit(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
ctx := context.Background()
var agentID, runtimeID string
if err := testPool.QueryRow(ctx, `
SELECT a.id, a.runtime_id FROM agent a
JOIN member m ON m.workspace_id = a.workspace_id
JOIN "user" u ON u.id = m.user_id
WHERE u.email = $1
LIMIT 1
`, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil {
t.Fatalf("failed to find test agent: %v", err)
}
// Create 5 issues, each with one stale queued task — necessary because of the
// idx_one_pending_task_per_issue_agent unique constraint.
var issueIDs []string
t.Cleanup(func() {
for _, id := range issueIDs {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, id)
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, id)
}
})
for i := 0; i < 5; i++ {
var issueID string
if err := testPool.QueryRow(ctx, `
WITH bumped AS (
UPDATE workspace SET issue_counter = issue_counter + 1
WHERE id = $1 RETURNING issue_counter
)
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number)
SELECT $1, 'Queued TTL batch test', 'todo', 'none', 'member', m.user_id, 'agent', $2, (SELECT issue_counter FROM bumped)
FROM member m WHERE m.workspace_id = $1 LIMIT 1
RETURNING id
`, testWorkspaceID, agentID).Scan(&issueID); err != nil {
t.Fatalf("failed to create issue %d: %v", i, err)
}
issueIDs = append(issueIDs, issueID)
if _, err := testPool.Exec(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, created_at)
VALUES ($1, $2, $3, 'queued', 0, now() - interval '5 hours')
`, agentID, runtimeID, issueID); err != nil {
t.Fatalf("failed to insert backlog task %d: %v", i, err)
}
}
queries := db.New(testPool)
failed, err := queries.ExpireStaleQueuedTasks(ctx, db.ExpireStaleQueuedTasksParams{
TtlSecs: 3600.0,
MaxPerTick: 2, // cap below the backlog
})
if err != nil {
t.Fatalf("ExpireStaleQueuedTasks failed: %v", err)
}
if len(failed) != 2 {
t.Fatalf("expected batch cap of 2, got %d", len(failed))
}
var remaining int
if err := testPool.QueryRow(ctx, `
SELECT COUNT(*) FROM agent_task_queue
WHERE issue_id = ANY($1::uuid[]) AND status = 'queued'
`, issueIDs).Scan(&remaining); err != nil {
t.Fatalf("failed to count remaining queued: %v", err)
}
if remaining != 3 {
t.Fatalf("expected 3 queued tasks remaining after batched sweep, got %d", remaining)
}
}
// parseUUIDBytes converts a UUID string to the 16-byte array used by pgtype.UUID.
func parseUUIDBytes(s string) [16]byte {
s = strings.ReplaceAll(s, "-", "")

View File

@@ -35,6 +35,12 @@ func NewAutopilotService(q *db.Queries, tx TxStarter, bus *events.Bus, taskSvc *
// DispatchAutopilot is the core execution entry point.
// It creates a run and either creates an issue or enqueues a direct agent task
// depending on execution_mode.
//
// Before any work is queued we run an admission check against the assignee
// agent's runtime: if it is not online, we record a `skipped` run with a
// failure_reason and return without enqueueing. This is the "触发时准入" gate
// from MUL-1899 — without it a paused laptop / offline daemon causes scheduled
// autopilots to pile thousands of doomed tasks onto agent_task_queue.
func (s *AutopilotService) DispatchAutopilot(
ctx context.Context,
autopilot db.Autopilot,
@@ -42,6 +48,10 @@ func (s *AutopilotService) DispatchAutopilot(
source string,
payload []byte,
) (*db.AutopilotRun, error) {
if reason, skip := s.shouldSkipDispatch(ctx, autopilot); skip {
return s.recordSkippedRun(ctx, autopilot, triggerID, source, payload, reason)
}
// Determine initial status based on execution mode.
initialStatus := "issue_created"
if autopilot.ExecutionMode == "run_only" {
@@ -335,6 +345,96 @@ func (s *AutopilotService) failRun(ctx context.Context, runID pgtype.UUID, reaso
}
}
// shouldSkipDispatch is the pre-flight admission check from MUL-1899.
// Returns (reason, true) when dispatching now would only enqueue a doomed
// task — i.e. the assignee agent is gone, archived, has no runtime bound, or
// its runtime is not currently online. Returns ("", false) on the happy path.
//
// Errors loading the agent / runtime are logged but treated as "do not skip"
// so a transient DB hiccup never silently swallows a scheduled run.
func (s *AutopilotService) shouldSkipDispatch(ctx context.Context, ap db.Autopilot) (string, bool) {
if !ap.AssigneeID.Valid {
return "autopilot has no assignee", true
}
agent, err := s.Queries.GetAgent(ctx, ap.AssigneeID)
if err != nil {
slog.Warn("autopilot admission: failed to load assignee agent",
"autopilot_id", util.UUIDToString(ap.ID),
"agent_id", util.UUIDToString(ap.AssigneeID),
"error", err,
)
return "", false
}
if agent.ArchivedAt.Valid {
return "assignee agent is archived", true
}
if !agent.RuntimeID.Valid {
return "assignee agent has no runtime bound", true
}
rt, err := s.Queries.GetAgentRuntime(ctx, agent.RuntimeID)
if err != nil {
slog.Warn("autopilot admission: failed to load runtime",
"autopilot_id", util.UUIDToString(ap.ID),
"runtime_id", util.UUIDToString(agent.RuntimeID),
"error", err,
)
return "", false
}
if rt.Status != "online" {
return "agent runtime is " + rt.Status + " at dispatch time", true
}
return "", false
}
// recordSkippedRun persists a `skipped` autopilot_run with the given reason
// and emits the same WS / analytics signals that a normal terminal transition
// would. Returns the run + nil error so callers (scheduler tick, manual
// trigger handler) treat this as a successful — but no-op — dispatch.
func (s *AutopilotService) recordSkippedRun(
ctx context.Context,
autopilot db.Autopilot,
triggerID pgtype.UUID,
source string,
payload []byte,
reason string,
) (*db.AutopilotRun, error) {
run, err := s.Queries.CreateAutopilotRun(ctx, db.CreateAutopilotRunParams{
AutopilotID: autopilot.ID,
TriggerID: triggerID,
Source: source,
Status: "skipped",
TriggerPayload: payload,
})
if err != nil {
return nil, fmt.Errorf("create skipped run: %w", err)
}
updated, err := s.Queries.UpdateAutopilotRunSkipped(ctx, db.UpdateAutopilotRunSkippedParams{
ID: run.ID,
FailureReason: pgtype.Text{String: reason, Valid: true},
})
if err == nil {
run = updated
} else {
slog.Warn("failed to set skip reason on autopilot run",
"run_id", util.UUIDToString(run.ID), "error", err)
}
slog.Info("autopilot dispatch skipped",
"autopilot_id", util.UUIDToString(autopilot.ID),
"run_id", util.UUIDToString(run.ID),
"source", source,
"reason", reason,
)
// Bump last_run_at so scheduler advancement and "last seen" UI both
// reflect that we did evaluate the trigger this tick.
s.Queries.UpdateAutopilotLastRunAt(ctx, autopilot.ID)
s.publishRunDone(util.UUIDToString(autopilot.WorkspaceID), run, "skipped")
return &run, nil
}
func (s *AutopilotService) publishRunDone(workspaceID string, run db.AutopilotRun, status string) {
s.Bus.Publish(events.Event{
Type: protocol.EventAutopilotRunDone,

View File

@@ -0,0 +1,11 @@
-- Migrate any 'skipped' rows to 'failed' before tightening the constraint
-- (mirrors what 043 did for the original removal).
UPDATE autopilot_run
SET status = 'failed',
completed_at = COALESCE(completed_at, now()),
failure_reason = COALESCE(failure_reason, 'migrated from skipped status')
WHERE status = 'skipped';
ALTER TABLE autopilot_run DROP CONSTRAINT IF EXISTS autopilot_run_status_check;
ALTER TABLE autopilot_run ADD CONSTRAINT autopilot_run_status_check
CHECK (status IN ('issue_created', 'running', 'completed', 'failed'));

View File

@@ -0,0 +1,16 @@
-- MUL-1899: re-introduce the 'skipped' terminal status for autopilot_run.
-- Migration 043 removed 'skipped' along with the broken concurrency_policy
-- feature, but the offline-runtime admission gate added in this PR needs a
-- non-failure terminal status to record dispatches that were intentionally
-- declined (e.g. assignee runtime is offline). Reusing 'failed' would
-- pollute the failure-rate signal that drives the auto-pause monitor.
ALTER TABLE autopilot_run DROP CONSTRAINT IF EXISTS autopilot_run_status_check;
ALTER TABLE autopilot_run ADD CONSTRAINT autopilot_run_status_check
CHECK (status IN ('issue_created', 'running', 'completed', 'failed', 'skipped'));
-- Partial index on status for in-flight runs is unchanged: 'skipped' is
-- terminal so the existing index (issue_created/running) still matches.
--
-- The companion partial index for the queued-task TTL sweeper lives in
-- migration 080 — it must be created CONCURRENTLY (hot table) and therefore
-- cannot share a multi-statement file with the constraint change above.

View File

@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_queued_created_at;

View File

@@ -0,0 +1,18 @@
-- Partial index that backs the queued-task TTL sweeper added in MUL-1899
-- (sweepExpiredQueuedTasks in cmd/server/runtime_sweeper.go). The sweeper
-- runs every 30s and looks up the oldest queued tasks with:
-- WHERE status = 'queued' AND created_at < now() - interval '...'
-- ORDER BY created_at ASC LIMIT 500
-- Without a queued-only partial index on created_at this devolves into a
-- full scan once historical terminal rows accumulate (MUL-1899 baseline:
-- ~89k+ rows). The partial index stays tiny because only in-flight rows
-- live in 'queued'.
--
-- CONCURRENTLY because agent_task_queue is hot — a plain CREATE INDEX would
-- take an ACCESS EXCLUSIVE lock and block the dispatch path during build.
-- Matches the pattern in 035/067/074/075/078; 068 documents that the
-- migration runner cannot mix CONCURRENTLY with other statements in the
-- same file, so this lives in its own single-statement migration.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_queued_created_at
ON agent_task_queue (created_at)
WHERE status = 'queued';

View File

@@ -789,6 +789,100 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
return i, err
}
const expireStaleQueuedTasks = `-- name: ExpireStaleQueuedTasks :many
WITH victims AS (
SELECT id FROM agent_task_queue
WHERE status = 'queued'
AND created_at < now() - make_interval(secs => $1::double precision)
ORDER BY created_at ASC
LIMIT $2::int
FOR UPDATE SKIP LOCKED
)
UPDATE agent_task_queue t
SET status = 'failed',
completed_at = now(),
error = 'task expired in queue',
failure_reason = 'queued_expired'
FROM victims v
WHERE t.id = v.id
AND t.status = 'queued'
AND t.created_at < now() - make_interval(secs => $1::double precision)
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session
`
type ExpireStaleQueuedTasksParams struct {
TtlSecs float64 `json:"ttl_secs"`
MaxPerTick int32 `json:"max_per_tick"`
}
// Fails tasks that have been sitting in 'queued' for longer than the TTL.
// This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the
// new dispatch-time admission gate that refuses to enqueue when the runtime
// is offline, we still need to drain the historical 87k+ doomed rows and
// handle edge cases where a runtime goes offline AFTER a task is already
// queued (the admission check protects new enqueues, not in-flight queue
// depth).
//
// Concurrency safety: the daemon's claim path may race with this sweeper to
// transition the same row out of 'queued'. We protect against that two
// ways:
// 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is
// currently being claimed (or otherwise locked) is skipped — no lock
// contention with the dispatch path, and we won't queue up behind it.
// 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at
// apply time. If a daemon claimed the row between selection and update
// (e.g. lock released after the claim transaction commits), the row is
// already 'dispatched'/'running' and the WHERE clause filters it out
// so we cannot clobber an in-flight task.
//
// Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise
// the DB when the backlog is large — the sweeper drains the rest on
// subsequent ticks.
func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQueuedTasksParams) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, expireStaleQueuedTasks, arg.TtlSecs, arg.MaxPerTick)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const failAgentTask = `-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed',

View File

@@ -689,7 +689,7 @@ const selectAutopilotsExceedingFailureThreshold = `-- name: SelectAutopilotsExce
WITH stats AS (
SELECT autopilot_id,
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
count(*) FILTER (WHERE status IN ('completed', 'failed')) AS total,
count(*) FILTER (WHERE status = 'failed') AS failed
FROM autopilot_run
WHERE created_at >= $3::timestamptz
@@ -728,8 +728,13 @@ type SelectAutopilotsExceedingFailureThresholdRow struct {
// Failure-rate auto-pause
// =====================
// Find active autopilots whose recent run failure rate exceeds the threshold.
// Counts only terminal runs (completed | failed | skipped); pending,
// issue_created and running are excluded so in-flight work isn't penalised.
// Counts only "real" terminal runs (completed | failed). 'skipped' is
// excluded from BOTH numerator and denominator: an admission-skipped run
// (e.g. assignee runtime offline at dispatch time, MUL-1899) is neither a
// success nor a failure, so it must not dilute the failure ratio (which
// would let a 100%-failing autopilot mask itself behind a wall of skips)
// nor inflate it. issue_created/running are still excluded so in-flight
// work isn't penalised.
// Used by the failure monitor to auto-pause sustained-failure autopilots
// (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
// that 100% failed for days, burning ~1.5k useless tasks per week).
@@ -988,6 +993,45 @@ func (q *Queries) UpdateAutopilotRunRunning(ctx context.Context, arg UpdateAutop
return i, err
}
const updateAutopilotRunSkipped = `-- name: UpdateAutopilotRunSkipped :one
UPDATE autopilot_run
SET status = 'skipped', completed_at = now(), failure_reason = $2
WHERE id = $1
RETURNING id, autopilot_id, trigger_id, source, status, issue_id, task_id, triggered_at, completed_at, failure_reason, trigger_payload, result, created_at
`
type UpdateAutopilotRunSkippedParams struct {
ID pgtype.UUID `json:"id"`
FailureReason pgtype.Text `json:"failure_reason"`
}
// Marks an autopilot_run as skipped without enqueueing any task. Used by the
// pre-flight admission check when the assignee agent's runtime is offline:
// creating an issue / task in that state would just pile a doomed job onto
// agent_task_queue (the canonical "持续给离线 local agent 入队" symptom from
// MUL-1899). Recording the skip + reason gives the UI / failure monitor / ops
// a paper trail without polluting the failure ratio.
func (q *Queries) UpdateAutopilotRunSkipped(ctx context.Context, arg UpdateAutopilotRunSkippedParams) (AutopilotRun, error) {
row := q.db.QueryRow(ctx, updateAutopilotRunSkipped, arg.ID, arg.FailureReason)
var i AutopilotRun
err := row.Scan(
&i.ID,
&i.AutopilotID,
&i.TriggerID,
&i.Source,
&i.Status,
&i.IssueID,
&i.TaskID,
&i.TriggeredAt,
&i.CompletedAt,
&i.FailureReason,
&i.TriggerPayload,
&i.Result,
&i.CreatedAt,
)
return i, err
}
const updateAutopilotTrigger = `-- name: UpdateAutopilotTrigger :one
UPDATE autopilot_trigger SET
enabled = COALESCE($2::boolean, enabled),

View File

@@ -320,6 +320,48 @@ WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => @
OR (status = 'running' AND started_at < now() - make_interval(secs => @running_timeout_secs::double precision))
RETURNING *;
-- name: ExpireStaleQueuedTasks :many
-- Fails tasks that have been sitting in 'queued' for longer than the TTL.
-- This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the
-- new dispatch-time admission gate that refuses to enqueue when the runtime
-- is offline, we still need to drain the historical 87k+ doomed rows and
-- handle edge cases where a runtime goes offline AFTER a task is already
-- queued (the admission check protects new enqueues, not in-flight queue
-- depth).
--
-- Concurrency safety: the daemon's claim path may race with this sweeper to
-- transition the same row out of 'queued'. We protect against that two
-- ways:
-- 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is
-- currently being claimed (or otherwise locked) is skipped — no lock
-- contention with the dispatch path, and we won't queue up behind it.
-- 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at
-- apply time. If a daemon claimed the row between selection and update
-- (e.g. lock released after the claim transaction commits), the row is
-- already 'dispatched'/'running' and the WHERE clause filters it out
-- so we cannot clobber an in-flight task.
-- Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise
-- the DB when the backlog is large — the sweeper drains the rest on
-- subsequent ticks.
WITH victims AS (
SELECT id FROM agent_task_queue
WHERE status = 'queued'
AND created_at < now() - make_interval(secs => @ttl_secs::double precision)
ORDER BY created_at ASC
LIMIT @max_per_tick::int
FOR UPDATE SKIP LOCKED
)
UPDATE agent_task_queue t
SET status = 'failed',
completed_at = now(),
error = 'task expired in queue',
failure_reason = 'queued_expired'
FROM victims v
WHERE t.id = v.id
AND t.status = 'queued'
AND t.created_at < now() - make_interval(secs => @ttl_secs::double precision)
RETURNING t.*;
-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()

View File

@@ -134,6 +134,18 @@ SET status = 'failed', completed_at = now(), failure_reason = $2
WHERE id = $1
RETURNING *;
-- name: UpdateAutopilotRunSkipped :one
-- Marks an autopilot_run as skipped without enqueueing any task. Used by the
-- pre-flight admission check when the assignee agent's runtime is offline:
-- creating an issue / task in that state would just pile a doomed job onto
-- agent_task_queue (the canonical "持续给离线 local agent 入队" symptom from
-- MUL-1899). Recording the skip + reason gives the UI / failure monitor / ops
-- a paper trail without polluting the failure ratio.
UPDATE autopilot_run
SET status = 'skipped', completed_at = now(), failure_reason = $2
WHERE id = $1
RETURNING *;
-- =====================
-- Scheduler Queries
-- =====================
@@ -201,14 +213,19 @@ WHERE t.kind = 'schedule'
-- name: SelectAutopilotsExceedingFailureThreshold :many
-- Find active autopilots whose recent run failure rate exceeds the threshold.
-- Counts only terminal runs (completed | failed | skipped); pending,
-- issue_created and running are excluded so in-flight work isn't penalised.
-- Counts only "real" terminal runs (completed | failed). 'skipped' is
-- excluded from BOTH numerator and denominator: an admission-skipped run
-- (e.g. assignee runtime offline at dispatch time, MUL-1899) is neither a
-- success nor a failure, so it must not dilute the failure ratio (which
-- would let a 100%-failing autopilot mask itself behind a wall of skips)
-- nor inflate it. issue_created/running are still excluded so in-flight
-- work isn't penalised.
-- Used by the failure monitor to auto-pause sustained-failure autopilots
-- (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
-- that 100% failed for days, burning ~1.5k useless tasks per week).
WITH stats AS (
SELECT autopilot_id,
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
count(*) FILTER (WHERE status IN ('completed', 'failed')) AS total,
count(*) FILTER (WHERE status = 'failed') AS failed
FROM autopilot_run
WHERE created_at >= sqlc.arg('since')::timestamptz