feat(daemon): Redis empty-claim fast path for /tasks/claim polling

Daemons poll /tasks/claim every 30s per runtime; the steady-state
warm-empty case currently runs ListPendingTasksByRuntime against
Postgres on every poll. This collapses that path:

- New ListQueuedClaimCandidatesByRuntime query restricts to status =
  'queued' (the old query also returned 'dispatched' rows that can
  never be reclaimed) and is backed by a partial index keyed on
  (runtime_id, priority DESC, created_at ASC).
- New EmptyClaimCache caches the negative verdict in Redis with a
  30s TTL. ClaimTaskForRuntime checks the cache before SELECT and
  populates it on confirmed-empty results.
- notifyTaskAvailable now invalidates the runtime's empty key before
  kicking the daemon WS, so newly enqueued tasks become claimable
  immediately rather than waiting out the TTL.
- AutopilotService.dispatchRunOnly now goes through
  TaskService.NotifyTaskEnqueued so run_only tasks get the same
  invalidate-then-wakeup contract as every other enqueue path.

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Jiang Bohan
2026-04-29 18:50:22 +08:00
parent da5dbc6224
commit dc127f1da4
10 changed files with 469 additions and 5 deletions

View File

@@ -119,6 +119,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
h.PATCache = patCache
h.DaemonTokenCache = daemonTokenCache
// Empty-claim cache: lets the daemon poll path skip a Postgres
// scan when a recent check confirmed the runtime had no queued
// task. Returns nil when rdb is nil — TaskService treats that
// as "no cache, always hit DB" (existing behavior).
h.TaskService.EmptyClaim = service.NewEmptyClaimCache(rdb)
// Wire WS heartbeat after stores are finalized so the WS path uses the
// same (possibly Redis-backed) stores as the HTTP path.
daemonHub.SetHeartbeatHandler(h.HandleDaemonWSHeartbeat)

View File

@@ -215,6 +215,13 @@ func (s *AutopilotService) dispatchRunOnly(ctx context.Context, ap db.Autopilot,
*run = updatedRun
}
// Drop the empty-claim cache and wake the daemon. dispatchRunOnly
// inserts the task row directly via Queries.CreateAutopilotTask
// (bypassing TaskService.Enqueue*), so without this the runtime
// would not get a wakeup and any cached "empty" verdict would
// stall the task until the TTL expired.
s.TaskSvc.NotifyTaskEnqueued(task)
slog.Info("autopilot dispatched (run_only)",
"autopilot_id", util.UUIDToString(ap.ID),
"task_id", util.UUIDToString(task.ID),

View File

@@ -0,0 +1,98 @@
package service
import (
"context"
"errors"
"log/slog"
"time"
"github.com/redis/go-redis/v9"
)
// emptyClaimCachePrefix namespaces empty-claim keys away from realtime
// (ws:*) and auth (mul:auth:*) keys.
const emptyClaimCachePrefix = "mul:claim:runtime:empty:"
// EmptyClaimCacheTTL bounds how long a cached "no queued task" verdict
// stays believable. Choice tradeoff: too long means a missed
// invalidation delays claim until the TTL expires; too short means the
// fast path almost never triggers. 30s matches DefaultPollInterval so
// the worst-case staleness is one extra poll cycle — already the
// no-cache baseline — while still collapsing the steady-state warm
// empty path to a single Redis GET.
const EmptyClaimCacheTTL = 30 * time.Second
// EmptyClaimCache caches "this runtime currently has no queued task"
// so the daemon's poll-based claim path can short-circuit before
// hitting Postgres. Only the negative result is cached; positive
// results always re-check the DB so concurrent claimers race fairly
// in `ClaimAgentTask`'s `FOR UPDATE SKIP LOCKED`.
//
// The cache is invalidated synchronously on every enqueue (see
// TaskService.notifyTaskAvailable). A nil *EmptyClaimCache is safe to
// use — every method becomes a no-op or reports a cache miss, so
// single-node dev / tests with no REDIS_URL degrade cleanly to direct
// DB lookups.
type EmptyClaimCache struct {
rdb *redis.Client
}
// NewEmptyClaimCache returns a cache backed by rdb. Pass nil to
// disable caching; the returned *EmptyClaimCache is safe to call but
// never hits Redis.
func NewEmptyClaimCache(rdb *redis.Client) *EmptyClaimCache {
if rdb == nil {
return nil
}
return &EmptyClaimCache{rdb: rdb}
}
func emptyClaimCacheKey(runtimeID string) string {
return emptyClaimCachePrefix + runtimeID
}
// IsEmpty reports whether a recent cached check confirmed the runtime
// had no queued task. Returns false on cache miss or any Redis error —
// a dead Redis must not stop legitimate claims.
func (c *EmptyClaimCache) IsEmpty(ctx context.Context, runtimeID string) bool {
if c == nil || runtimeID == "" {
return false
}
_, err := c.rdb.Get(ctx, emptyClaimCacheKey(runtimeID)).Result()
if err != nil {
if !errors.Is(err, redis.Nil) {
slog.Warn("empty_claim_cache: get failed; falling back to DB", "error", err)
}
return false
}
return true
}
// MarkEmpty stores the empty verdict for the given runtime with the
// default TTL. Errors are logged and swallowed — a cache write
// failure is not a request failure.
func (c *EmptyClaimCache) MarkEmpty(ctx context.Context, runtimeID string) {
if c == nil || runtimeID == "" {
return
}
if err := c.rdb.Set(ctx, emptyClaimCacheKey(runtimeID), "1", EmptyClaimCacheTTL).Err(); err != nil {
slog.Warn("empty_claim_cache: set failed", "error", err)
}
}
// Invalidate removes the empty verdict for the given runtime. Called
// from every enqueue path so a newly queued task is claimable
// immediately rather than waiting for the TTL to expire.
//
// Invalidation MUST run before the daemon WS wakeup is published —
// otherwise the wakeup arrives, the daemon claims, and the still-cached
// empty key returns null while the task sits queued for up to one full
// TTL window.
func (c *EmptyClaimCache) Invalidate(ctx context.Context, runtimeID string) {
if c == nil || runtimeID == "" {
return
}
if err := c.rdb.Del(ctx, emptyClaimCacheKey(runtimeID)).Err(); err != nil {
slog.Warn("empty_claim_cache: invalidate failed; entry will expire on TTL", "error", err)
}
}

View File

@@ -0,0 +1,116 @@
package service
import (
"context"
"os"
"testing"
"time"
"github.com/redis/go-redis/v9"
)
// newRedisTestClient mirrors the helper in internal/auth: connect to
// REDIS_TEST_URL, flush, and skip when unset so `go test ./...` works
// on a stock laptop without a Redis instance running.
func newRedisTestClient(t *testing.T) *redis.Client {
t.Helper()
url := os.Getenv("REDIS_TEST_URL")
if url == "" {
t.Skip("REDIS_TEST_URL not set")
}
opts, err := redis.ParseURL(url)
if err != nil {
t.Fatalf("parse REDIS_TEST_URL: %v", err)
}
rdb := redis.NewClient(opts)
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
}
if err := rdb.FlushDB(ctx).Err(); err != nil {
t.Fatalf("flushdb: %v", err)
}
t.Cleanup(func() {
rdb.FlushDB(context.Background())
rdb.Close()
})
return rdb
}
func TestEmptyClaimCache_NilSafe(t *testing.T) {
var c *EmptyClaimCache // nil
ctx := context.Background()
if c.IsEmpty(ctx, "any-runtime") {
t.Fatal("nil cache must report not-empty (cache miss)")
}
c.MarkEmpty(ctx, "any-runtime")
c.Invalidate(ctx, "any-runtime")
}
func TestNewEmptyClaimCache_NilRedisReturnsNil(t *testing.T) {
if c := NewEmptyClaimCache(nil); c != nil {
t.Fatalf("NewEmptyClaimCache(nil) must return nil, got %#v", c)
}
}
func TestEmptyClaimCache_EmptyRuntimeIDIsNoOp(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
c.MarkEmpty(ctx, "")
if c.IsEmpty(ctx, "") {
t.Fatal("empty runtime ID must not hit cache")
}
c.Invalidate(ctx, "")
}
func TestEmptyClaimCache_MarkIsEmptyInvalidate(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
if c.IsEmpty(ctx, "rt-1") {
t.Fatal("expected miss before mark")
}
c.MarkEmpty(ctx, "rt-1")
if !c.IsEmpty(ctx, "rt-1") {
t.Fatal("expected hit after mark")
}
c.Invalidate(ctx, "rt-1")
if c.IsEmpty(ctx, "rt-1") {
t.Fatal("expected miss after invalidate")
}
}
func TestEmptyClaimCache_TTL(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
c.MarkEmpty(ctx, "rt-ttl")
ttl, err := rdb.TTL(ctx, emptyClaimCacheKey("rt-ttl")).Result()
if err != nil {
t.Fatalf("TTL: %v", err)
}
if ttl <= 0 || ttl > EmptyClaimCacheTTL+time.Second {
t.Fatalf("unexpected TTL %v (want ~%v)", ttl, EmptyClaimCacheTTL)
}
}
func TestEmptyClaimCache_RuntimeIsolation(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
c.MarkEmpty(ctx, "rt-A")
if c.IsEmpty(ctx, "rt-B") {
t.Fatal("marking rt-A must not affect rt-B")
}
c.Invalidate(ctx, "rt-A")
c.MarkEmpty(ctx, "rt-B")
if c.IsEmpty(ctx, "rt-A") {
t.Fatal("marking rt-B must not affect rt-A")
}
}

View File

@@ -28,6 +28,12 @@ type TaskService struct {
Hub *realtime.Hub
Bus *events.Bus
Wakeup TaskWakeupNotifier
// EmptyClaim caches "this runtime has no queued task" so the daemon
// poll path can skip a Postgres scan on the steady-state empty case.
// Optional — a nil cache disables the fast path and every claim
// goes through the DB. Wired in router.go from the shared Redis
// client.
EmptyClaim *EmptyClaimCache
}
type TaskWakeupNotifier interface {
@@ -451,6 +457,12 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A
// ClaimTaskForRuntime claims the next runnable task for a runtime while
// still respecting each agent's max_concurrent_tasks limit.
//
// Empty-claim fast path: when EmptyClaim is configured and a recent
// check verified the runtime had no queued tasks, returns immediately
// without touching Postgres. The cache is invalidated synchronously on
// every enqueue (notifyTaskAvailable), so a queued task becomes
// claimable on the next call rather than waiting for the TTL.
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
start := time.Now()
var (
@@ -476,13 +488,29 @@ func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.
)
}()
t0 := start
tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID)
runtimeKey := util.UUIDToString(runtimeID)
if s.EmptyClaim.IsEmpty(ctx, runtimeKey) {
outcome = "empty_cache_hit"
return nil, nil
}
t0 := time.Now()
tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID)
listMs = time.Since(t0).Milliseconds()
listCount = len(tasks)
if err != nil {
outcome = "error_list"
return nil, fmt.Errorf("list pending tasks: %w", err)
return nil, fmt.Errorf("list queued claim candidates: %w", err)
}
if len(tasks) == 0 {
// Cache the empty verdict so subsequent polls skip the SELECT.
// Set MUST happen before any wakeup-driven enqueue could
// overlap; that's enforced by ordering inside the enqueue
// path (notifyTaskAvailable invalidates before notifying).
s.EmptyClaim.MarkEmpty(ctx, runtimeKey)
outcome = "empty_db"
return nil, nil
}
loopStart := time.Now()
@@ -1163,11 +1191,35 @@ func priorityToInt(p string) int32 {
}
}
// NotifyTaskEnqueued is the cross-package shim for callers outside
// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a
// row into agent_task_queue directly. Invalidates the empty-claim
// cache and kicks the daemon WS so the new task is claimed without
// waiting for the next poll.
func (s *TaskService) NotifyTaskEnqueued(task db.AgentTaskQueue) {
s.notifyTaskAvailable(task)
}
// notifyTaskAvailable runs after a task has been inserted: drops the
// runtime's empty-claim cache entry (so the next claim hits the DB
// and finds this row) then kicks the daemon WS so the daemon claims
// without waiting for its next poll. Order matters — see
// EmptyClaimCache.Invalidate for why invalidation must precede the
// wakeup.
func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) {
if s.Wakeup == nil || !task.RuntimeID.Valid {
if !task.RuntimeID.Valid {
return
}
s.Wakeup.NotifyTaskAvailable(util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID))
runtimeKey := util.UUIDToString(task.RuntimeID)
// Use a background context: the cache invalidate / wakeup must
// outlive the request that created the task, otherwise an early
// client disconnect could leave the empty key in place and stall
// the just-queued task until the TTL expires.
s.EmptyClaim.Invalidate(context.Background(), runtimeKey)
if s.Wakeup == nil {
return
}
s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID))
}
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {

View File

@@ -0,0 +1,101 @@
package service
import (
"context"
"testing"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// stubWakeup records every call so the test can assert that notify
// reaches the daemon hub and carries the right runtime / task IDs.
type stubWakeup struct {
calls []struct{ runtimeID, taskID string }
}
func (s *stubWakeup) NotifyTaskAvailable(runtimeID, taskID string) {
s.calls = append(s.calls, struct{ runtimeID, taskID string }{runtimeID, taskID})
}
// TestNotifyTaskAvailable_InvalidatesEmptyClaim is the behavioural pin
// for the contract noted in the EmptyClaimCache docs: invalidation
// MUST run before the daemon WS wakeup, otherwise the wakeup arrives,
// the daemon claims, and the still-cached empty key returns null
// while the freshly queued task sits idle for up to one full TTL
// window. The test marks the runtime empty, fires
// notifyTaskAvailable, and asserts both that the cache entry is gone
// AND the wakeup hook saw the new task — proving every enqueue path
// (issue / mention / quick-create / chat / autopilot / retry) gets
// the same invalidate-then-notify behaviour for free.
func TestNotifyTaskAvailable_InvalidatesEmptyClaim(t *testing.T) {
rdb := newRedisTestClient(t)
cache := NewEmptyClaimCache(rdb)
wakeup := &stubWakeup{}
svc := &TaskService{
EmptyClaim: cache,
Wakeup: wakeup,
}
runtimeID := testUUID(7)
taskID := testUUID(8)
runtimeKey := util.UUIDToString(runtimeID)
ctx := context.Background()
cache.MarkEmpty(ctx, runtimeKey)
if !cache.IsEmpty(ctx, runtimeKey) {
t.Fatal("precondition: cache should report empty after MarkEmpty")
}
svc.notifyTaskAvailable(db.AgentTaskQueue{
ID: taskID,
RuntimeID: runtimeID,
})
if cache.IsEmpty(ctx, runtimeKey) {
t.Fatal("notifyTaskAvailable must invalidate the empty-claim cache entry")
}
if got := len(wakeup.calls); got != 1 {
t.Fatalf("expected 1 wakeup call, got %d", got)
}
if wakeup.calls[0].runtimeID != runtimeKey {
t.Fatalf("wakeup runtime mismatch: got %q want %q", wakeup.calls[0].runtimeID, runtimeKey)
}
if wakeup.calls[0].taskID != util.UUIDToString(taskID) {
t.Fatalf("wakeup task mismatch: got %q want %q", wakeup.calls[0].taskID, util.UUIDToString(taskID))
}
}
// TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp guards the
// no-RuntimeID early return — chat / quick-create / autopilot all set
// it on insert, but a buggy caller that forgot must not silently wipe
// every workspace's empty cache. The cache treats Invalidate("") as a
// no-op, but this test pins that the RuntimeID guard sits above the
// Invalidate call so a future refactor cannot drop the guard without
// test coverage.
func TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp(t *testing.T) {
rdb := newRedisTestClient(t)
cache := NewEmptyClaimCache(rdb)
wakeup := &stubWakeup{}
svc := &TaskService{
EmptyClaim: cache,
Wakeup: wakeup,
}
ctx := context.Background()
cache.MarkEmpty(ctx, "rt-stays")
svc.notifyTaskAvailable(db.AgentTaskQueue{
// RuntimeID intentionally invalid (zero value, Valid=false).
ID: testUUID(9),
})
if !cache.IsEmpty(ctx, "rt-stays") {
t.Fatal("notifyTaskAvailable with invalid RuntimeID must not touch cache")
}
if got := len(wakeup.calls); got != 0 {
t.Fatalf("expected 0 wakeup calls when RuntimeID is invalid, got %d", got)
}
}

View File

@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_claim_candidates;

View File

@@ -0,0 +1,11 @@
-- Partial index that backs ListQueuedClaimCandidatesByRuntime. Daemons poll
-- /tasks/claim every 30s per runtime; the filter "runtime_id = $1 AND
-- status = 'queued'" runs every poll and is the dominant cost on warm paths.
-- Restricting to status = 'queued' keeps the index tiny — terminal-state
-- rows (completed/failed/cancelled) accumulate forever in the table but are
-- excluded from the index, so it stays bounded by current queue depth.
-- ORDER BY priority DESC, created_at ASC mirrors the SELECT so the planner
-- can serve the query as an index-only scan without an extra sort.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_claim_candidates
ON agent_task_queue (runtime_id, priority DESC, created_at ASC)
WHERE status = 'queued';

View File

@@ -1448,6 +1448,65 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
return items, nil
}
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC
`
// Returns rows the runtime can attempt to claim. Status is restricted to
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
// 'dispatched') because dispatched rows are by definition already owned
// and cannot be re-claimed — including them in the candidate list pads
// the result with rows that always lose the per-(issue, agent) race in
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
// runtime is busy on a long-running task. Backed by the partial index
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE issue_id = $1

View File

@@ -323,6 +323,19 @@ SELECT * FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC;
-- name: ListQueuedClaimCandidatesByRuntime :many
-- Returns rows the runtime can attempt to claim. Status is restricted to
-- 'queued' (in contrast to ListPendingTasksByRuntime which also includes
-- 'dispatched') because dispatched rows are by definition already owned
-- and cannot be re-claimed — including them in the candidate list pads
-- the result with rows that always lose the per-(issue, agent) race in
-- ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
-- runtime is busy on a long-running task. Backed by the partial index
-- idx_agent_task_queue_claim_candidates so the warm path is cheap.
SELECT * FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC;
-- name: ListActiveTasksByIssue :many
SELECT * FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('dispatched', 'running')