From dc127f1da4ee8c5d6d676802bcfe261db43aa82f Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Wed, 29 Apr 2026 18:50:22 +0800 Subject: [PATCH] feat(daemon): Redis empty-claim fast path for /tasks/claim polling Daemons poll /tasks/claim every 30s per runtime; the steady-state warm-empty case currently runs ListPendingTasksByRuntime against Postgres on every poll. This collapses that path: - New ListQueuedClaimCandidatesByRuntime query restricts to status = 'queued' (the old query also returned 'dispatched' rows that can never be reclaimed) and is backed by a partial index keyed on (runtime_id, priority DESC, created_at ASC). - New EmptyClaimCache caches the negative verdict in Redis with a 30s TTL. ClaimTaskForRuntime checks the cache before SELECT and populates it on confirmed-empty results. - notifyTaskAvailable now invalidates the runtime's empty key before kicking the daemon WS, so newly enqueued tasks become claimable immediately rather than waiting out the TTL. - AutopilotService.dispatchRunOnly now goes through TaskService.NotifyTaskEnqueued so run_only tasks get the same invalidate-then-wakeup contract as every other enqueue path. Co-authored-by: multica-agent --- server/cmd/server/router.go | 6 + server/internal/service/autopilot.go | 7 ++ server/internal/service/empty_claim_cache.go | 98 +++++++++++++++ .../service/empty_claim_cache_test.go | 116 ++++++++++++++++++ server/internal/service/task.go | 62 +++++++++- server/internal/service/task_notify_test.go | 101 +++++++++++++++ ..._task_queue_claim_candidate_index.down.sql | 1 + ...64_task_queue_claim_candidate_index.up.sql | 11 ++ server/pkg/db/generated/agent.sql.go | 59 +++++++++ server/pkg/db/queries/agent.sql | 13 ++ 10 files changed, 469 insertions(+), 5 deletions(-) create mode 100644 server/internal/service/empty_claim_cache.go create mode 100644 server/internal/service/empty_claim_cache_test.go create mode 100644 server/internal/service/task_notify_test.go create mode 100644 server/migrations/064_task_queue_claim_candidate_index.down.sql create mode 100644 server/migrations/064_task_queue_claim_candidate_index.up.sql diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 346110012..05a2a7bf2 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -119,6 +119,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus h.PATCache = patCache h.DaemonTokenCache = daemonTokenCache + // Empty-claim cache: lets the daemon poll path skip a Postgres + // scan when a recent check confirmed the runtime had no queued + // task. Returns nil when rdb is nil — TaskService treats that + // as "no cache, always hit DB" (existing behavior). + h.TaskService.EmptyClaim = service.NewEmptyClaimCache(rdb) + // Wire WS heartbeat after stores are finalized so the WS path uses the // same (possibly Redis-backed) stores as the HTTP path. daemonHub.SetHeartbeatHandler(h.HandleDaemonWSHeartbeat) diff --git a/server/internal/service/autopilot.go b/server/internal/service/autopilot.go index 64d470d45..9505a24b0 100644 --- a/server/internal/service/autopilot.go +++ b/server/internal/service/autopilot.go @@ -215,6 +215,13 @@ func (s *AutopilotService) dispatchRunOnly(ctx context.Context, ap db.Autopilot, *run = updatedRun } + // Drop the empty-claim cache and wake the daemon. dispatchRunOnly + // inserts the task row directly via Queries.CreateAutopilotTask + // (bypassing TaskService.Enqueue*), so without this the runtime + // would not get a wakeup and any cached "empty" verdict would + // stall the task until the TTL expired. + s.TaskSvc.NotifyTaskEnqueued(task) + slog.Info("autopilot dispatched (run_only)", "autopilot_id", util.UUIDToString(ap.ID), "task_id", util.UUIDToString(task.ID), diff --git a/server/internal/service/empty_claim_cache.go b/server/internal/service/empty_claim_cache.go new file mode 100644 index 000000000..e2a34423f --- /dev/null +++ b/server/internal/service/empty_claim_cache.go @@ -0,0 +1,98 @@ +package service + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/redis/go-redis/v9" +) + +// emptyClaimCachePrefix namespaces empty-claim keys away from realtime +// (ws:*) and auth (mul:auth:*) keys. +const emptyClaimCachePrefix = "mul:claim:runtime:empty:" + +// EmptyClaimCacheTTL bounds how long a cached "no queued task" verdict +// stays believable. Choice tradeoff: too long means a missed +// invalidation delays claim until the TTL expires; too short means the +// fast path almost never triggers. 30s matches DefaultPollInterval so +// the worst-case staleness is one extra poll cycle — already the +// no-cache baseline — while still collapsing the steady-state warm +// empty path to a single Redis GET. +const EmptyClaimCacheTTL = 30 * time.Second + +// EmptyClaimCache caches "this runtime currently has no queued task" +// so the daemon's poll-based claim path can short-circuit before +// hitting Postgres. Only the negative result is cached; positive +// results always re-check the DB so concurrent claimers race fairly +// in `ClaimAgentTask`'s `FOR UPDATE SKIP LOCKED`. +// +// The cache is invalidated synchronously on every enqueue (see +// TaskService.notifyTaskAvailable). A nil *EmptyClaimCache is safe to +// use — every method becomes a no-op or reports a cache miss, so +// single-node dev / tests with no REDIS_URL degrade cleanly to direct +// DB lookups. +type EmptyClaimCache struct { + rdb *redis.Client +} + +// NewEmptyClaimCache returns a cache backed by rdb. Pass nil to +// disable caching; the returned *EmptyClaimCache is safe to call but +// never hits Redis. +func NewEmptyClaimCache(rdb *redis.Client) *EmptyClaimCache { + if rdb == nil { + return nil + } + return &EmptyClaimCache{rdb: rdb} +} + +func emptyClaimCacheKey(runtimeID string) string { + return emptyClaimCachePrefix + runtimeID +} + +// IsEmpty reports whether a recent cached check confirmed the runtime +// had no queued task. Returns false on cache miss or any Redis error — +// a dead Redis must not stop legitimate claims. +func (c *EmptyClaimCache) IsEmpty(ctx context.Context, runtimeID string) bool { + if c == nil || runtimeID == "" { + return false + } + _, err := c.rdb.Get(ctx, emptyClaimCacheKey(runtimeID)).Result() + if err != nil { + if !errors.Is(err, redis.Nil) { + slog.Warn("empty_claim_cache: get failed; falling back to DB", "error", err) + } + return false + } + return true +} + +// MarkEmpty stores the empty verdict for the given runtime with the +// default TTL. Errors are logged and swallowed — a cache write +// failure is not a request failure. +func (c *EmptyClaimCache) MarkEmpty(ctx context.Context, runtimeID string) { + if c == nil || runtimeID == "" { + return + } + if err := c.rdb.Set(ctx, emptyClaimCacheKey(runtimeID), "1", EmptyClaimCacheTTL).Err(); err != nil { + slog.Warn("empty_claim_cache: set failed", "error", err) + } +} + +// Invalidate removes the empty verdict for the given runtime. Called +// from every enqueue path so a newly queued task is claimable +// immediately rather than waiting for the TTL to expire. +// +// Invalidation MUST run before the daemon WS wakeup is published — +// otherwise the wakeup arrives, the daemon claims, and the still-cached +// empty key returns null while the task sits queued for up to one full +// TTL window. +func (c *EmptyClaimCache) Invalidate(ctx context.Context, runtimeID string) { + if c == nil || runtimeID == "" { + return + } + if err := c.rdb.Del(ctx, emptyClaimCacheKey(runtimeID)).Err(); err != nil { + slog.Warn("empty_claim_cache: invalidate failed; entry will expire on TTL", "error", err) + } +} diff --git a/server/internal/service/empty_claim_cache_test.go b/server/internal/service/empty_claim_cache_test.go new file mode 100644 index 000000000..74d43ef50 --- /dev/null +++ b/server/internal/service/empty_claim_cache_test.go @@ -0,0 +1,116 @@ +package service + +import ( + "context" + "os" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// newRedisTestClient mirrors the helper in internal/auth: connect to +// REDIS_TEST_URL, flush, and skip when unset so `go test ./...` works +// on a stock laptop without a Redis instance running. +func newRedisTestClient(t *testing.T) *redis.Client { + t.Helper() + url := os.Getenv("REDIS_TEST_URL") + if url == "" { + t.Skip("REDIS_TEST_URL not set") + } + opts, err := redis.ParseURL(url) + if err != nil { + t.Fatalf("parse REDIS_TEST_URL: %v", err) + } + rdb := redis.NewClient(opts) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("REDIS_TEST_URL unreachable: %v", err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { + rdb.FlushDB(context.Background()) + rdb.Close() + }) + return rdb +} + +func TestEmptyClaimCache_NilSafe(t *testing.T) { + var c *EmptyClaimCache // nil + ctx := context.Background() + + if c.IsEmpty(ctx, "any-runtime") { + t.Fatal("nil cache must report not-empty (cache miss)") + } + c.MarkEmpty(ctx, "any-runtime") + c.Invalidate(ctx, "any-runtime") +} + +func TestNewEmptyClaimCache_NilRedisReturnsNil(t *testing.T) { + if c := NewEmptyClaimCache(nil); c != nil { + t.Fatalf("NewEmptyClaimCache(nil) must return nil, got %#v", c) + } +} + +func TestEmptyClaimCache_EmptyRuntimeIDIsNoOp(t *testing.T) { + rdb := newRedisTestClient(t) + c := NewEmptyClaimCache(rdb) + ctx := context.Background() + + c.MarkEmpty(ctx, "") + if c.IsEmpty(ctx, "") { + t.Fatal("empty runtime ID must not hit cache") + } + c.Invalidate(ctx, "") +} + +func TestEmptyClaimCache_MarkIsEmptyInvalidate(t *testing.T) { + rdb := newRedisTestClient(t) + c := NewEmptyClaimCache(rdb) + ctx := context.Background() + + if c.IsEmpty(ctx, "rt-1") { + t.Fatal("expected miss before mark") + } + c.MarkEmpty(ctx, "rt-1") + if !c.IsEmpty(ctx, "rt-1") { + t.Fatal("expected hit after mark") + } + c.Invalidate(ctx, "rt-1") + if c.IsEmpty(ctx, "rt-1") { + t.Fatal("expected miss after invalidate") + } +} + +func TestEmptyClaimCache_TTL(t *testing.T) { + rdb := newRedisTestClient(t) + c := NewEmptyClaimCache(rdb) + ctx := context.Background() + + c.MarkEmpty(ctx, "rt-ttl") + ttl, err := rdb.TTL(ctx, emptyClaimCacheKey("rt-ttl")).Result() + if err != nil { + t.Fatalf("TTL: %v", err) + } + if ttl <= 0 || ttl > EmptyClaimCacheTTL+time.Second { + t.Fatalf("unexpected TTL %v (want ~%v)", ttl, EmptyClaimCacheTTL) + } +} + +func TestEmptyClaimCache_RuntimeIsolation(t *testing.T) { + rdb := newRedisTestClient(t) + c := NewEmptyClaimCache(rdb) + ctx := context.Background() + + c.MarkEmpty(ctx, "rt-A") + if c.IsEmpty(ctx, "rt-B") { + t.Fatal("marking rt-A must not affect rt-B") + } + c.Invalidate(ctx, "rt-A") + c.MarkEmpty(ctx, "rt-B") + if c.IsEmpty(ctx, "rt-A") { + t.Fatal("marking rt-B must not affect rt-A") + } +} diff --git a/server/internal/service/task.go b/server/internal/service/task.go index b62cdcf73..dfd7e7ad3 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -28,6 +28,12 @@ type TaskService struct { Hub *realtime.Hub Bus *events.Bus Wakeup TaskWakeupNotifier + // EmptyClaim caches "this runtime has no queued task" so the daemon + // poll path can skip a Postgres scan on the steady-state empty case. + // Optional — a nil cache disables the fast path and every claim + // goes through the DB. Wired in router.go from the shared Redis + // client. + EmptyClaim *EmptyClaimCache } type TaskWakeupNotifier interface { @@ -451,6 +457,12 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A // ClaimTaskForRuntime claims the next runnable task for a runtime while // still respecting each agent's max_concurrent_tasks limit. +// +// Empty-claim fast path: when EmptyClaim is configured and a recent +// check verified the runtime had no queued tasks, returns immediately +// without touching Postgres. The cache is invalidated synchronously on +// every enqueue (notifyTaskAvailable), so a queued task becomes +// claimable on the next call rather than waiting for the TTL. func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) { start := time.Now() var ( @@ -476,13 +488,29 @@ func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype. ) }() - t0 := start - tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID) + runtimeKey := util.UUIDToString(runtimeID) + if s.EmptyClaim.IsEmpty(ctx, runtimeKey) { + outcome = "empty_cache_hit" + return nil, nil + } + + t0 := time.Now() + tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID) listMs = time.Since(t0).Milliseconds() listCount = len(tasks) if err != nil { outcome = "error_list" - return nil, fmt.Errorf("list pending tasks: %w", err) + return nil, fmt.Errorf("list queued claim candidates: %w", err) + } + + if len(tasks) == 0 { + // Cache the empty verdict so subsequent polls skip the SELECT. + // Set MUST happen before any wakeup-driven enqueue could + // overlap; that's enforced by ordering inside the enqueue + // path (notifyTaskAvailable invalidates before notifying). + s.EmptyClaim.MarkEmpty(ctx, runtimeKey) + outcome = "empty_db" + return nil, nil } loopStart := time.Now() @@ -1163,11 +1191,35 @@ func priorityToInt(p string) int32 { } } +// NotifyTaskEnqueued is the cross-package shim for callers outside +// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a +// row into agent_task_queue directly. Invalidates the empty-claim +// cache and kicks the daemon WS so the new task is claimed without +// waiting for the next poll. +func (s *TaskService) NotifyTaskEnqueued(task db.AgentTaskQueue) { + s.notifyTaskAvailable(task) +} + +// notifyTaskAvailable runs after a task has been inserted: drops the +// runtime's empty-claim cache entry (so the next claim hits the DB +// and finds this row) then kicks the daemon WS so the daemon claims +// without waiting for its next poll. Order matters — see +// EmptyClaimCache.Invalidate for why invalidation must precede the +// wakeup. func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) { - if s.Wakeup == nil || !task.RuntimeID.Valid { + if !task.RuntimeID.Valid { return } - s.Wakeup.NotifyTaskAvailable(util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID)) + runtimeKey := util.UUIDToString(task.RuntimeID) + // Use a background context: the cache invalidate / wakeup must + // outlive the request that created the task, otherwise an early + // client disconnect could leave the empty key in place and stall + // the just-queued task until the TTL expires. + s.EmptyClaim.Invalidate(context.Background(), runtimeKey) + if s.Wakeup == nil { + return + } + s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID)) } func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) { diff --git a/server/internal/service/task_notify_test.go b/server/internal/service/task_notify_test.go new file mode 100644 index 000000000..d2f814ad7 --- /dev/null +++ b/server/internal/service/task_notify_test.go @@ -0,0 +1,101 @@ +package service + +import ( + "context" + "testing" + + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// stubWakeup records every call so the test can assert that notify +// reaches the daemon hub and carries the right runtime / task IDs. +type stubWakeup struct { + calls []struct{ runtimeID, taskID string } +} + +func (s *stubWakeup) NotifyTaskAvailable(runtimeID, taskID string) { + s.calls = append(s.calls, struct{ runtimeID, taskID string }{runtimeID, taskID}) +} + +// TestNotifyTaskAvailable_InvalidatesEmptyClaim is the behavioural pin +// for the contract noted in the EmptyClaimCache docs: invalidation +// MUST run before the daemon WS wakeup, otherwise the wakeup arrives, +// the daemon claims, and the still-cached empty key returns null +// while the freshly queued task sits idle for up to one full TTL +// window. The test marks the runtime empty, fires +// notifyTaskAvailable, and asserts both that the cache entry is gone +// AND the wakeup hook saw the new task — proving every enqueue path +// (issue / mention / quick-create / chat / autopilot / retry) gets +// the same invalidate-then-notify behaviour for free. +func TestNotifyTaskAvailable_InvalidatesEmptyClaim(t *testing.T) { + rdb := newRedisTestClient(t) + cache := NewEmptyClaimCache(rdb) + wakeup := &stubWakeup{} + + svc := &TaskService{ + EmptyClaim: cache, + Wakeup: wakeup, + } + + runtimeID := testUUID(7) + taskID := testUUID(8) + runtimeKey := util.UUIDToString(runtimeID) + + ctx := context.Background() + cache.MarkEmpty(ctx, runtimeKey) + if !cache.IsEmpty(ctx, runtimeKey) { + t.Fatal("precondition: cache should report empty after MarkEmpty") + } + + svc.notifyTaskAvailable(db.AgentTaskQueue{ + ID: taskID, + RuntimeID: runtimeID, + }) + + if cache.IsEmpty(ctx, runtimeKey) { + t.Fatal("notifyTaskAvailable must invalidate the empty-claim cache entry") + } + if got := len(wakeup.calls); got != 1 { + t.Fatalf("expected 1 wakeup call, got %d", got) + } + if wakeup.calls[0].runtimeID != runtimeKey { + t.Fatalf("wakeup runtime mismatch: got %q want %q", wakeup.calls[0].runtimeID, runtimeKey) + } + if wakeup.calls[0].taskID != util.UUIDToString(taskID) { + t.Fatalf("wakeup task mismatch: got %q want %q", wakeup.calls[0].taskID, util.UUIDToString(taskID)) + } +} + +// TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp guards the +// no-RuntimeID early return — chat / quick-create / autopilot all set +// it on insert, but a buggy caller that forgot must not silently wipe +// every workspace's empty cache. The cache treats Invalidate("") as a +// no-op, but this test pins that the RuntimeID guard sits above the +// Invalidate call so a future refactor cannot drop the guard without +// test coverage. +func TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp(t *testing.T) { + rdb := newRedisTestClient(t) + cache := NewEmptyClaimCache(rdb) + wakeup := &stubWakeup{} + + svc := &TaskService{ + EmptyClaim: cache, + Wakeup: wakeup, + } + + ctx := context.Background() + cache.MarkEmpty(ctx, "rt-stays") + + svc.notifyTaskAvailable(db.AgentTaskQueue{ + // RuntimeID intentionally invalid (zero value, Valid=false). + ID: testUUID(9), + }) + + if !cache.IsEmpty(ctx, "rt-stays") { + t.Fatal("notifyTaskAvailable with invalid RuntimeID must not touch cache") + } + if got := len(wakeup.calls); got != 0 { + t.Fatalf("expected 0 wakeup calls when RuntimeID is invalid, got %d", got) + } +} diff --git a/server/migrations/064_task_queue_claim_candidate_index.down.sql b/server/migrations/064_task_queue_claim_candidate_index.down.sql new file mode 100644 index 000000000..d609736c2 --- /dev/null +++ b/server/migrations/064_task_queue_claim_candidate_index.down.sql @@ -0,0 +1 @@ +DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_claim_candidates; diff --git a/server/migrations/064_task_queue_claim_candidate_index.up.sql b/server/migrations/064_task_queue_claim_candidate_index.up.sql new file mode 100644 index 000000000..1fe031123 --- /dev/null +++ b/server/migrations/064_task_queue_claim_candidate_index.up.sql @@ -0,0 +1,11 @@ +-- Partial index that backs ListQueuedClaimCandidatesByRuntime. Daemons poll +-- /tasks/claim every 30s per runtime; the filter "runtime_id = $1 AND +-- status = 'queued'" runs every poll and is the dominant cost on warm paths. +-- Restricting to status = 'queued' keeps the index tiny — terminal-state +-- rows (completed/failed/cancelled) accumulate forever in the table but are +-- excluded from the index, so it stays bounded by current queue depth. +-- ORDER BY priority DESC, created_at ASC mirrors the SELECT so the planner +-- can serve the query as an index-only scan without an extra sort. +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_claim_candidates + ON agent_task_queue (runtime_id, priority DESC, created_at ASC) + WHERE status = 'queued'; diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 003e61477..8ea886f2c 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -1448,6 +1448,65 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp return items, nil } +const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +WHERE runtime_id = $1 AND status = 'queued' +ORDER BY priority DESC, created_at ASC +` + +// Returns rows the runtime can attempt to claim. Status is restricted to +// 'queued' (in contrast to ListPendingTasksByRuntime which also includes +// 'dispatched') because dispatched rows are by definition already owned +// and cannot be re-claimed — including them in the candidate list pads +// the result with rows that always lose the per-(issue, agent) race in +// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the +// runtime is busy on a long-running task. Backed by the partial index +// idx_agent_task_queue_claim_candidates so the warm path is cheap. +func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.LastHeartbeatAt, + &i.TriggerSummary, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listTasksByIssue = `-- name: ListTasksByIssue :many SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE issue_id = $1 diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 42b8b5c73..7bdd292d9 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -323,6 +323,19 @@ SELECT * FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC; +-- name: ListQueuedClaimCandidatesByRuntime :many +-- Returns rows the runtime can attempt to claim. Status is restricted to +-- 'queued' (in contrast to ListPendingTasksByRuntime which also includes +-- 'dispatched') because dispatched rows are by definition already owned +-- and cannot be re-claimed — including them in the candidate list pads +-- the result with rows that always lose the per-(issue, agent) race in +-- ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the +-- runtime is busy on a long-running task. Backed by the partial index +-- idx_agent_task_queue_claim_candidates so the warm path is cheap. +SELECT * FROM agent_task_queue +WHERE runtime_id = $1 AND status = 'queued' +ORDER BY priority DESC, created_at ASC; + -- name: ListActiveTasksByIssue :many SELECT * FROM agent_task_queue WHERE issue_id = $1 AND status IN ('dispatched', 'running')