Files
multica/server/internal/service/empty_claim_cache_test.go
Bohan Jiang 2dddfaa196 feat(daemon): Redis empty-claim fast path for /tasks/claim polling (#1860)
* feat(daemon): Redis empty-claim fast path for /tasks/claim polling

Daemons poll /tasks/claim every 30s per runtime; the steady-state
warm-empty case currently runs ListPendingTasksByRuntime against
Postgres on every poll. This collapses that path:

- New ListQueuedClaimCandidatesByRuntime query restricts to status =
  'queued' (the old query also returned 'dispatched' rows that can
  never be reclaimed) and is backed by a partial index keyed on
  (runtime_id, priority DESC, created_at ASC).
- New EmptyClaimCache caches the negative verdict in Redis with a
  30s TTL. ClaimTaskForRuntime checks the cache before SELECT and
  populates it on confirmed-empty results.
- notifyTaskAvailable now invalidates the runtime's empty key before
  kicking the daemon WS, so newly enqueued tasks become claimable
  immediately rather than waiting out the TTL.
- AutopilotService.dispatchRunOnly now goes through
  TaskService.NotifyTaskEnqueued so run_only tasks get the same
  invalidate-then-wakeup contract as every other enqueue path.

Co-authored-by: multica-agent <github@multica.ai>

* fix(daemon): close MarkEmpty/Bump race in empty-claim fast path

GPT-Boy's review on PR #1860 caught a real concurrency bug. Under the
prior implementation it was possible for a slow claim to write an
empty verdict AFTER a concurrent enqueue had already invalidated it:

  T1 claim:   SELECT -> empty
  T2 enqueue: INSERT row, DEL empty key (no-op, key not set yet),
              wakeup
  T1 claim:   SET empty (writes a stale "empty" verdict)
  T3 wakeup:  IsEmpty -> hit -> returns null

The just-queued task would then sit idle until the empty key's TTL
expired (up to 30s).

Replace the DEL-based invalidation with a per-runtime version
counter:

- CurrentVersion(rt) is a Redis INCR counter at
  mul:claim:runtime:version:<rt> with a 24h sliding TTL.
- Claim samples version BEFORE the SELECT and passes it to MarkEmpty,
  which stores the verdict's value as the observed-version string.
- IsEmpty MGETs both keys and trusts the verdict only when the
  empty-key value equals the current version.
- Enqueue Bumps the version (INCR + EXPIRE) before the wakeup,
  causing any verdict written under a prior version to be rejected
  on the next read.

Also bound every Redis call from this cache with a 250ms timeout —
notifyTaskAvailable uses a background context so a wedged Redis
must not block enqueue.

Tests against a real Redis (REDIS_TEST_URL) cover:
- MarkEmpty + IsEmpty under matching version returns hit
- Bump invalidates a prior empty verdict (race-fix pin)
- A MarkEmpty written under a stale pre-Bump version is rejected
- TTL clamping, per-runtime isolation, nil-cache safety
- notifyTaskAvailable Bumps before the wakeup fires

Co-authored-by: multica-agent <github@multica.ai>

* chore(daemon): renumber claim-candidate index migration to 067

Slot 064 was taken on main by 064_notification_preference. The
migration runner tracks per-version in schema_migrations and would
silently skip the second 064_*, leaving the index uncreated.
Rename to 067 (next free slot).

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-04-30 15:50:05 +08:00

163 lines
4.3 KiB
Go

package service
import (
"context"
"os"
"testing"
"time"
"github.com/redis/go-redis/v9"
)
// newRedisTestClient mirrors the helper in internal/auth: connect to
// REDIS_TEST_URL, flush, and skip when unset so `go test ./...` works
// on a stock laptop without a Redis instance running.
func newRedisTestClient(t *testing.T) *redis.Client {
t.Helper()
url := os.Getenv("REDIS_TEST_URL")
if url == "" {
t.Skip("REDIS_TEST_URL not set")
}
opts, err := redis.ParseURL(url)
if err != nil {
t.Fatalf("parse REDIS_TEST_URL: %v", err)
}
rdb := redis.NewClient(opts)
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
}
if err := rdb.FlushDB(ctx).Err(); err != nil {
t.Fatalf("flushdb: %v", err)
}
t.Cleanup(func() {
rdb.FlushDB(context.Background())
rdb.Close()
})
return rdb
}
func TestEmptyClaimCache_NilSafe(t *testing.T) {
var c *EmptyClaimCache // nil
ctx := context.Background()
if c.IsEmpty(ctx, "any-runtime") {
t.Fatal("nil cache must report not-empty (cache miss)")
}
if v := c.CurrentVersion(ctx, "any-runtime"); v != 0 {
t.Fatalf("nil cache CurrentVersion must be 0, got %d", v)
}
c.MarkEmpty(ctx, "any-runtime", 0)
c.Bump(ctx, "any-runtime")
}
func TestNewEmptyClaimCache_NilRedisReturnsNil(t *testing.T) {
if c := NewEmptyClaimCache(nil); c != nil {
t.Fatalf("NewEmptyClaimCache(nil) must return nil, got %#v", c)
}
}
func TestEmptyClaimCache_EmptyRuntimeIDIsNoOp(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
c.MarkEmpty(ctx, "", 0)
if c.IsEmpty(ctx, "") {
t.Fatal("empty runtime ID must not hit cache")
}
c.Bump(ctx, "")
}
func TestEmptyClaimCache_MarkAndIsEmptyVersionMatched(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
if c.IsEmpty(ctx, "rt-1") {
t.Fatal("expected miss before mark")
}
v0 := c.CurrentVersion(ctx, "rt-1")
c.MarkEmpty(ctx, "rt-1", v0)
if !c.IsEmpty(ctx, "rt-1") {
t.Fatal("expected hit when MarkEmpty version matches current")
}
}
// TestEmptyClaimCache_BumpInvalidatesPriorMark is the core race-fix
// pin: an empty verdict written under v0 must be rejected once Bump
// advances the version to v1, even though the empty key itself still
// has TTL remaining.
func TestEmptyClaimCache_BumpInvalidatesPriorMark(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
v0 := c.CurrentVersion(ctx, "rt-bump")
c.MarkEmpty(ctx, "rt-bump", v0)
if !c.IsEmpty(ctx, "rt-bump") {
t.Fatal("precondition: empty verdict tagged with current version should hit")
}
c.Bump(ctx, "rt-bump")
if c.IsEmpty(ctx, "rt-bump") {
t.Fatal("Bump must invalidate the prior empty verdict")
}
}
// TestEmptyClaimCache_StaleMarkRejected pins the GPT-Boy race: a slow
// claim reads version v0, the SELECT sees no rows, an enqueue Bumps
// to v1, then the slow claim writes MarkEmpty(v0). The next reader
// must NOT trust this verdict.
func TestEmptyClaimCache_StaleMarkRejected(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
// Slow claim samples version BEFORE select.
v0 := c.CurrentVersion(ctx, "rt-race")
// Concurrent enqueue happens.
c.Bump(ctx, "rt-race")
// Slow claim writes its empty verdict tagged with the stale v0.
c.MarkEmpty(ctx, "rt-race", v0)
if c.IsEmpty(ctx, "rt-race") {
t.Fatal("MarkEmpty written under a pre-Bump version must be rejected on read")
}
}
func TestEmptyClaimCache_TTL(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
c.MarkEmpty(ctx, "rt-ttl", 0)
ttl, err := rdb.TTL(ctx, emptyClaimKey("rt-ttl")).Result()
if err != nil {
t.Fatalf("TTL: %v", err)
}
if ttl <= 0 || ttl > EmptyClaimCacheTTL+time.Second {
t.Fatalf("unexpected empty-key TTL %v (want ~%v)", ttl, EmptyClaimCacheTTL)
}
}
func TestEmptyClaimCache_RuntimeIsolation(t *testing.T) {
rdb := newRedisTestClient(t)
c := NewEmptyClaimCache(rdb)
ctx := context.Background()
vA := c.CurrentVersion(ctx, "rt-A")
c.MarkEmpty(ctx, "rt-A", vA)
if c.IsEmpty(ctx, "rt-B") {
t.Fatal("marking rt-A must not affect rt-B")
}
c.Bump(ctx, "rt-A")
vB := c.CurrentVersion(ctx, "rt-B")
c.MarkEmpty(ctx, "rt-B", vB)
if c.IsEmpty(ctx, "rt-A") {
t.Fatal("marking rt-B must not affect rt-A")
}
}