mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
3 Commits
agent/lamb
...
agent/j/a7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b1661880f | ||
|
|
26a9b31c3c | ||
|
|
dc127f1da4 |
@@ -119,6 +119,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
h.PATCache = patCache
|
||||
h.DaemonTokenCache = daemonTokenCache
|
||||
|
||||
// Empty-claim cache: lets the daemon poll path skip a Postgres
|
||||
// scan when a recent check confirmed the runtime had no queued
|
||||
// task. Returns nil when rdb is nil — TaskService treats that
|
||||
// as "no cache, always hit DB" (existing behavior).
|
||||
h.TaskService.EmptyClaim = service.NewEmptyClaimCache(rdb)
|
||||
|
||||
// Wire WS heartbeat after stores are finalized so the WS path uses the
|
||||
// same (possibly Redis-backed) stores as the HTTP path.
|
||||
daemonHub.SetHeartbeatHandler(h.HandleDaemonWSHeartbeat)
|
||||
|
||||
@@ -215,6 +215,13 @@ func (s *AutopilotService) dispatchRunOnly(ctx context.Context, ap db.Autopilot,
|
||||
*run = updatedRun
|
||||
}
|
||||
|
||||
// Drop the empty-claim cache and wake the daemon. dispatchRunOnly
|
||||
// inserts the task row directly via Queries.CreateAutopilotTask
|
||||
// (bypassing TaskService.Enqueue*), so without this the runtime
|
||||
// would not get a wakeup and any cached "empty" verdict would
|
||||
// stall the task until the TTL expired.
|
||||
s.TaskSvc.NotifyTaskEnqueued(task)
|
||||
|
||||
slog.Info("autopilot dispatched (run_only)",
|
||||
"autopilot_id", util.UUIDToString(ap.ID),
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
|
||||
198
server/internal/service/empty_claim_cache.go
Normal file
198
server/internal/service/empty_claim_cache.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// emptyClaimCacheKey holds a "no queued task" verdict tagged with the
|
||||
// per-runtime version it was observed under. emptyClaimVersionKey is
|
||||
// the per-runtime monotonic counter that any enqueue path bumps. The
|
||||
// verdict is trusted only when its value equals the current version,
|
||||
// which closes the race where a slow claim writes an empty verdict
|
||||
// AFTER an enqueue has already invalidated it:
|
||||
//
|
||||
// T1 claim: v0 := GET version
|
||||
// SELECT ... -> empty
|
||||
// (slow, e.g. GC pause)
|
||||
// T2 enqueue: INSERT row
|
||||
// INCR version (-> v1)
|
||||
// wakeup
|
||||
// T1 claim: SET empty = v0
|
||||
// T3 claim: v1' := GET version (== v1)
|
||||
// GET empty (== v0) -> v0 != v1, treat as miss -> SELECT
|
||||
//
|
||||
// Without the version tag T3 would have hit the stale empty key and
|
||||
// the just-queued task would sit idle until the empty key's TTL
|
||||
// expired. With it, the only window left is one extra DB SELECT per
|
||||
// runtime per concurrent enqueue, never a stalled task.
|
||||
const (
|
||||
emptyClaimCachePrefix = "mul:claim:runtime:empty:"
|
||||
emptyClaimVersionPrefix = "mul:claim:runtime:version:"
|
||||
)
|
||||
|
||||
// EmptyClaimCacheTTL bounds how long a cached "no queued task" verdict
|
||||
// stays believable. Choice tradeoff: too long means a missed
|
||||
// invalidation delays claim until the TTL expires; too short means the
|
||||
// fast path almost never triggers. 30s matches DefaultPollInterval so
|
||||
// the worst-case staleness is one extra poll cycle — already the
|
||||
// no-cache baseline — while still collapsing the steady-state warm
|
||||
// empty path to a single Redis GET pair.
|
||||
const EmptyClaimCacheTTL = 30 * time.Second
|
||||
|
||||
// emptyClaimVersionTTL keeps the version counter alive long enough that
|
||||
// a rarely-polled runtime doesn't reset to 0 between an enqueue's
|
||||
// INCR and the next claim's GET (which would let a stale tagged
|
||||
// empty key suddenly look valid again). Sliding TTL is renewed on
|
||||
// every Bump and every Get.
|
||||
const emptyClaimVersionTTL = 24 * time.Hour
|
||||
|
||||
// emptyClaimRedisTimeout caps every Redis call from this cache. Enqueue
|
||||
// paths use a background context so the cache outlives the request,
|
||||
// but a wedged Redis must not stall enqueue indefinitely — bound the
|
||||
// blast radius and degrade to "no cache" instead.
|
||||
const emptyClaimRedisTimeout = 250 * time.Millisecond
|
||||
|
||||
// EmptyClaimCache caches "this runtime currently has no queued task"
|
||||
// so the daemon's poll-based claim path can short-circuit before
|
||||
// hitting Postgres. Only the negative result is cached; positive
|
||||
// results always re-check the DB so concurrent claimers race fairly
|
||||
// in `ClaimAgentTask`'s `FOR UPDATE SKIP LOCKED`.
|
||||
//
|
||||
// The cache is invalidated synchronously on every enqueue (see
|
||||
// TaskService.notifyTaskAvailable). A nil *EmptyClaimCache is safe to
|
||||
// use — every method becomes a no-op or reports a cache miss, so
|
||||
// single-node dev / tests with no REDIS_URL degrade cleanly to direct
|
||||
// DB lookups.
|
||||
type EmptyClaimCache struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
// NewEmptyClaimCache returns a cache backed by rdb. Pass nil to
|
||||
// disable caching; the returned *EmptyClaimCache is safe to call but
|
||||
// never hits Redis.
|
||||
func NewEmptyClaimCache(rdb *redis.Client) *EmptyClaimCache {
|
||||
if rdb == nil {
|
||||
return nil
|
||||
}
|
||||
return &EmptyClaimCache{rdb: rdb}
|
||||
}
|
||||
|
||||
func emptyClaimKey(runtimeID string) string { return emptyClaimCachePrefix + runtimeID }
|
||||
func emptyClaimVersion(runtimeID string) string { return emptyClaimVersionPrefix + runtimeID }
|
||||
|
||||
func (c *EmptyClaimCache) bounded(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(ctx, emptyClaimRedisTimeout)
|
||||
}
|
||||
|
||||
// CurrentVersion returns the runtime's current invalidation version.
|
||||
// Callers MUST read this BEFORE the DB SELECT they are about to cache,
|
||||
// then pass it back to MarkEmpty so a concurrent Bump invalidates the
|
||||
// would-be cache write. Returns 0 (treated as "unknown") on cache miss
|
||||
// or any Redis error — the caller falls through to the DB path.
|
||||
//
|
||||
// The version key is read with a short Expire refresh so that a long
|
||||
// idle runtime does not let the counter expire and reset to 0 between
|
||||
// an enqueue's Bump and the next claim's MarkEmpty.
|
||||
func (c *EmptyClaimCache) CurrentVersion(ctx context.Context, runtimeID string) int64 {
|
||||
if c == nil || runtimeID == "" {
|
||||
return 0
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
v, err := c.rdb.Get(bctx, emptyClaimVersion(runtimeID)).Int64()
|
||||
if err != nil {
|
||||
if !errors.Is(err, redis.Nil) {
|
||||
slog.Warn("empty_claim_cache: version get failed; falling back to DB", "error", err)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
// Refresh TTL so the counter doesn't expire and reset on a low-
|
||||
// traffic runtime. Errors here are best-effort.
|
||||
c.rdb.Expire(bctx, emptyClaimVersion(runtimeID), emptyClaimVersionTTL)
|
||||
return v
|
||||
}
|
||||
|
||||
// IsEmpty returns true only when (a) an empty verdict is cached AND
|
||||
// (b) it carries the runtime's current version. A stale verdict
|
||||
// written before a concurrent Bump returns false so the caller falls
|
||||
// through to the DB.
|
||||
func (c *EmptyClaimCache) IsEmpty(ctx context.Context, runtimeID string) bool {
|
||||
if c == nil || runtimeID == "" {
|
||||
return false
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
// MGET returns []interface{} of either the value (string) or nil.
|
||||
vals, err := c.rdb.MGet(bctx, emptyClaimKey(runtimeID), emptyClaimVersion(runtimeID)).Result()
|
||||
if err != nil {
|
||||
slog.Warn("empty_claim_cache: mget failed; falling back to DB", "error", err)
|
||||
return false
|
||||
}
|
||||
if len(vals) != 2 || vals[0] == nil {
|
||||
return false
|
||||
}
|
||||
emptyVer, ok := vals[0].(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// A missing version key means "no enqueue has ever bumped this
|
||||
// runtime", which is logically version 0 — i.e. the same value
|
||||
// CurrentVersion returns on miss. A MarkEmpty written with v=0
|
||||
// must match here, otherwise the fast path would never trigger
|
||||
// for fresh runtimes.
|
||||
curVer := "0"
|
||||
if vals[1] != nil {
|
||||
if s, ok := vals[1].(string); ok {
|
||||
curVer = s
|
||||
}
|
||||
}
|
||||
return emptyVer == curVer
|
||||
}
|
||||
|
||||
// MarkEmpty stores the empty verdict tagged with observedVersion. The
|
||||
// verdict is later trusted only if observedVersion still equals the
|
||||
// current version (see IsEmpty). Pass the value returned by
|
||||
// CurrentVersion BEFORE the SELECT that confirmed the runtime was
|
||||
// empty; a concurrent Bump between the two will make the next reader
|
||||
// reject this entry, forcing a fresh DB check.
|
||||
//
|
||||
// Errors are logged and swallowed — a cache write failure is not a
|
||||
// request failure.
|
||||
func (c *EmptyClaimCache) MarkEmpty(ctx context.Context, runtimeID string, observedVersion int64) {
|
||||
if c == nil || runtimeID == "" {
|
||||
return
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
if err := c.rdb.Set(bctx, emptyClaimKey(runtimeID), strconv.FormatInt(observedVersion, 10), EmptyClaimCacheTTL).Err(); err != nil {
|
||||
slog.Warn("empty_claim_cache: set failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Bump increments the runtime's invalidation version. Called from
|
||||
// every enqueue path BEFORE the daemon WS wakeup so any verdict
|
||||
// written under the previous version is rejected on the next read,
|
||||
// without needing a separate DEL on the empty key.
|
||||
//
|
||||
// Errors are logged and swallowed — a Redis hiccup must not stop a
|
||||
// legitimate enqueue. The empty key still expires on its own TTL so
|
||||
// the worst-case stall is bounded.
|
||||
func (c *EmptyClaimCache) Bump(ctx context.Context, runtimeID string) {
|
||||
if c == nil || runtimeID == "" {
|
||||
return
|
||||
}
|
||||
bctx, cancel := c.bounded(ctx)
|
||||
defer cancel()
|
||||
pipe := c.rdb.Pipeline()
|
||||
pipe.Incr(bctx, emptyClaimVersion(runtimeID))
|
||||
pipe.Expire(bctx, emptyClaimVersion(runtimeID), emptyClaimVersionTTL)
|
||||
if _, err := pipe.Exec(bctx); err != nil {
|
||||
slog.Warn("empty_claim_cache: bump failed; entry will expire on TTL", "error", err)
|
||||
}
|
||||
}
|
||||
162
server/internal/service/empty_claim_cache_test.go
Normal file
162
server/internal/service/empty_claim_cache_test.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// newRedisTestClient mirrors the helper in internal/auth: connect to
|
||||
// REDIS_TEST_URL, flush, and skip when unset so `go test ./...` works
|
||||
// on a stock laptop without a Redis instance running.
|
||||
func newRedisTestClient(t *testing.T) *redis.Client {
|
||||
t.Helper()
|
||||
url := os.Getenv("REDIS_TEST_URL")
|
||||
if url == "" {
|
||||
t.Skip("REDIS_TEST_URL not set")
|
||||
}
|
||||
opts, err := redis.ParseURL(url)
|
||||
if err != nil {
|
||||
t.Fatalf("parse REDIS_TEST_URL: %v", err)
|
||||
}
|
||||
rdb := redis.NewClient(opts)
|
||||
ctx := context.Background()
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
|
||||
}
|
||||
if err := rdb.FlushDB(ctx).Err(); err != nil {
|
||||
t.Fatalf("flushdb: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
rdb.FlushDB(context.Background())
|
||||
rdb.Close()
|
||||
})
|
||||
return rdb
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_NilSafe(t *testing.T) {
|
||||
var c *EmptyClaimCache // nil
|
||||
ctx := context.Background()
|
||||
|
||||
if c.IsEmpty(ctx, "any-runtime") {
|
||||
t.Fatal("nil cache must report not-empty (cache miss)")
|
||||
}
|
||||
if v := c.CurrentVersion(ctx, "any-runtime"); v != 0 {
|
||||
t.Fatalf("nil cache CurrentVersion must be 0, got %d", v)
|
||||
}
|
||||
c.MarkEmpty(ctx, "any-runtime", 0)
|
||||
c.Bump(ctx, "any-runtime")
|
||||
}
|
||||
|
||||
func TestNewEmptyClaimCache_NilRedisReturnsNil(t *testing.T) {
|
||||
if c := NewEmptyClaimCache(nil); c != nil {
|
||||
t.Fatalf("NewEmptyClaimCache(nil) must return nil, got %#v", c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_EmptyRuntimeIDIsNoOp(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
c.MarkEmpty(ctx, "", 0)
|
||||
if c.IsEmpty(ctx, "") {
|
||||
t.Fatal("empty runtime ID must not hit cache")
|
||||
}
|
||||
c.Bump(ctx, "")
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_MarkAndIsEmptyVersionMatched(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
if c.IsEmpty(ctx, "rt-1") {
|
||||
t.Fatal("expected miss before mark")
|
||||
}
|
||||
v0 := c.CurrentVersion(ctx, "rt-1")
|
||||
c.MarkEmpty(ctx, "rt-1", v0)
|
||||
if !c.IsEmpty(ctx, "rt-1") {
|
||||
t.Fatal("expected hit when MarkEmpty version matches current")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyClaimCache_BumpInvalidatesPriorMark is the core race-fix
|
||||
// pin: an empty verdict written under v0 must be rejected once Bump
|
||||
// advances the version to v1, even though the empty key itself still
|
||||
// has TTL remaining.
|
||||
func TestEmptyClaimCache_BumpInvalidatesPriorMark(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
v0 := c.CurrentVersion(ctx, "rt-bump")
|
||||
c.MarkEmpty(ctx, "rt-bump", v0)
|
||||
if !c.IsEmpty(ctx, "rt-bump") {
|
||||
t.Fatal("precondition: empty verdict tagged with current version should hit")
|
||||
}
|
||||
|
||||
c.Bump(ctx, "rt-bump")
|
||||
if c.IsEmpty(ctx, "rt-bump") {
|
||||
t.Fatal("Bump must invalidate the prior empty verdict")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyClaimCache_StaleMarkRejected pins the GPT-Boy race: a slow
|
||||
// claim reads version v0, the SELECT sees no rows, an enqueue Bumps
|
||||
// to v1, then the slow claim writes MarkEmpty(v0). The next reader
|
||||
// must NOT trust this verdict.
|
||||
func TestEmptyClaimCache_StaleMarkRejected(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
// Slow claim samples version BEFORE select.
|
||||
v0 := c.CurrentVersion(ctx, "rt-race")
|
||||
|
||||
// Concurrent enqueue happens.
|
||||
c.Bump(ctx, "rt-race")
|
||||
|
||||
// Slow claim writes its empty verdict tagged with the stale v0.
|
||||
c.MarkEmpty(ctx, "rt-race", v0)
|
||||
|
||||
if c.IsEmpty(ctx, "rt-race") {
|
||||
t.Fatal("MarkEmpty written under a pre-Bump version must be rejected on read")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_TTL(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
c.MarkEmpty(ctx, "rt-ttl", 0)
|
||||
ttl, err := rdb.TTL(ctx, emptyClaimKey("rt-ttl")).Result()
|
||||
if err != nil {
|
||||
t.Fatalf("TTL: %v", err)
|
||||
}
|
||||
if ttl <= 0 || ttl > EmptyClaimCacheTTL+time.Second {
|
||||
t.Fatalf("unexpected empty-key TTL %v (want ~%v)", ttl, EmptyClaimCacheTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyClaimCache_RuntimeIsolation(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
c := NewEmptyClaimCache(rdb)
|
||||
ctx := context.Background()
|
||||
|
||||
vA := c.CurrentVersion(ctx, "rt-A")
|
||||
c.MarkEmpty(ctx, "rt-A", vA)
|
||||
if c.IsEmpty(ctx, "rt-B") {
|
||||
t.Fatal("marking rt-A must not affect rt-B")
|
||||
}
|
||||
c.Bump(ctx, "rt-A")
|
||||
vB := c.CurrentVersion(ctx, "rt-B")
|
||||
c.MarkEmpty(ctx, "rt-B", vB)
|
||||
if c.IsEmpty(ctx, "rt-A") {
|
||||
t.Fatal("marking rt-B must not affect rt-A")
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,12 @@ type TaskService struct {
|
||||
Hub *realtime.Hub
|
||||
Bus *events.Bus
|
||||
Wakeup TaskWakeupNotifier
|
||||
// EmptyClaim caches "this runtime has no queued task" so the daemon
|
||||
// poll path can skip a Postgres scan on the steady-state empty case.
|
||||
// Optional — a nil cache disables the fast path and every claim
|
||||
// goes through the DB. Wired in router.go from the shared Redis
|
||||
// client.
|
||||
EmptyClaim *EmptyClaimCache
|
||||
}
|
||||
|
||||
type TaskWakeupNotifier interface {
|
||||
@@ -451,6 +457,12 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A
|
||||
|
||||
// ClaimTaskForRuntime claims the next runnable task for a runtime while
|
||||
// still respecting each agent's max_concurrent_tasks limit.
|
||||
//
|
||||
// Empty-claim fast path: when EmptyClaim is configured and a recent
|
||||
// check verified the runtime had no queued tasks, returns immediately
|
||||
// without touching Postgres. The cache is invalidated synchronously on
|
||||
// every enqueue (notifyTaskAvailable), so a queued task becomes
|
||||
// claimable on the next call rather than waiting for the TTL.
|
||||
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
|
||||
start := time.Now()
|
||||
var (
|
||||
@@ -476,13 +488,33 @@ func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.
|
||||
)
|
||||
}()
|
||||
|
||||
t0 := start
|
||||
tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID)
|
||||
runtimeKey := util.UUIDToString(runtimeID)
|
||||
if s.EmptyClaim.IsEmpty(ctx, runtimeKey) {
|
||||
outcome = "empty_cache_hit"
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Sample the invalidation version BEFORE the SELECT. If a
|
||||
// concurrent enqueue Bumps between this read and the post-SELECT
|
||||
// MarkEmpty, the next IsEmpty will see the empty key tagged with
|
||||
// a stale version and reject it — closing the race that would
|
||||
// otherwise stall the just-queued task until the empty key's TTL
|
||||
// expired.
|
||||
preSelectVersion := s.EmptyClaim.CurrentVersion(ctx, runtimeKey)
|
||||
|
||||
t0 := time.Now()
|
||||
tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID)
|
||||
listMs = time.Since(t0).Milliseconds()
|
||||
listCount = len(tasks)
|
||||
if err != nil {
|
||||
outcome = "error_list"
|
||||
return nil, fmt.Errorf("list pending tasks: %w", err)
|
||||
return nil, fmt.Errorf("list queued claim candidates: %w", err)
|
||||
}
|
||||
|
||||
if len(tasks) == 0 {
|
||||
s.EmptyClaim.MarkEmpty(ctx, runtimeKey, preSelectVersion)
|
||||
outcome = "empty_db"
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
loopStart := time.Now()
|
||||
@@ -1163,11 +1195,38 @@ func priorityToInt(p string) int32 {
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyTaskEnqueued is the cross-package shim for callers outside
|
||||
// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a
|
||||
// row into agent_task_queue directly. Invalidates the empty-claim
|
||||
// cache and kicks the daemon WS so the new task is claimed without
|
||||
// waiting for the next poll.
|
||||
func (s *TaskService) NotifyTaskEnqueued(task db.AgentTaskQueue) {
|
||||
s.notifyTaskAvailable(task)
|
||||
}
|
||||
|
||||
// notifyTaskAvailable runs after a task has been inserted: bumps the
|
||||
// runtime's invalidation version so any in-flight claim that is about
|
||||
// to write an "empty" verdict will have it rejected on read, then
|
||||
// kicks the daemon WS so the daemon claims without waiting for its
|
||||
// next poll. Order matters — Bump must happen before the wakeup,
|
||||
// otherwise the wakeup-driven claim could read the still-current
|
||||
// empty verdict and return null.
|
||||
func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) {
|
||||
if s.Wakeup == nil || !task.RuntimeID.Valid {
|
||||
if !task.RuntimeID.Valid {
|
||||
return
|
||||
}
|
||||
s.Wakeup.NotifyTaskAvailable(util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID))
|
||||
runtimeKey := util.UUIDToString(task.RuntimeID)
|
||||
// Use a background context: the cache bump / wakeup must outlive
|
||||
// the request that created the task, otherwise an early client
|
||||
// disconnect could leave the empty verdict in place and stall the
|
||||
// just-queued task until the TTL expires. The cache itself bounds
|
||||
// every Redis call with a short timeout so a wedged Redis cannot
|
||||
// block enqueue.
|
||||
s.EmptyClaim.Bump(context.Background(), runtimeKey)
|
||||
if s.Wakeup == nil {
|
||||
return
|
||||
}
|
||||
s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID))
|
||||
}
|
||||
|
||||
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
|
||||
|
||||
103
server/internal/service/task_notify_test.go
Normal file
103
server/internal/service/task_notify_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// stubWakeup records every call so the test can assert that notify
|
||||
// reaches the daemon hub and carries the right runtime / task IDs.
|
||||
type stubWakeup struct {
|
||||
calls []struct{ runtimeID, taskID string }
|
||||
}
|
||||
|
||||
func (s *stubWakeup) NotifyTaskAvailable(runtimeID, taskID string) {
|
||||
s.calls = append(s.calls, struct{ runtimeID, taskID string }{runtimeID, taskID})
|
||||
}
|
||||
|
||||
// TestNotifyTaskAvailable_BumpsBeforeWakeup pins the contract noted in
|
||||
// the EmptyClaimCache docs: the version Bump MUST run before the
|
||||
// daemon WS wakeup, otherwise the wakeup-driven claim could read a
|
||||
// still-current empty verdict and return null while the freshly
|
||||
// queued task sits idle. The test (1) marks the runtime empty under
|
||||
// the current version, (2) fires notifyTaskAvailable, then (3)
|
||||
// asserts the prior verdict is rejected AND the wakeup hook saw the
|
||||
// new task — proving every enqueue path (issue / mention /
|
||||
// quick-create / chat / autopilot / retry) gets the same
|
||||
// bump-then-notify behaviour for free.
|
||||
func TestNotifyTaskAvailable_BumpsBeforeWakeup(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
cache := NewEmptyClaimCache(rdb)
|
||||
wakeup := &stubWakeup{}
|
||||
|
||||
svc := &TaskService{
|
||||
EmptyClaim: cache,
|
||||
Wakeup: wakeup,
|
||||
}
|
||||
|
||||
runtimeID := testUUID(7)
|
||||
taskID := testUUID(8)
|
||||
runtimeKey := util.UUIDToString(runtimeID)
|
||||
|
||||
ctx := context.Background()
|
||||
v0 := cache.CurrentVersion(ctx, runtimeKey)
|
||||
cache.MarkEmpty(ctx, runtimeKey, v0)
|
||||
if !cache.IsEmpty(ctx, runtimeKey) {
|
||||
t.Fatal("precondition: cache should report empty after MarkEmpty under current version")
|
||||
}
|
||||
|
||||
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
||||
ID: taskID,
|
||||
RuntimeID: runtimeID,
|
||||
})
|
||||
|
||||
if cache.IsEmpty(ctx, runtimeKey) {
|
||||
t.Fatal("notifyTaskAvailable must Bump the version so the prior empty verdict is rejected")
|
||||
}
|
||||
if got := len(wakeup.calls); got != 1 {
|
||||
t.Fatalf("expected 1 wakeup call, got %d", got)
|
||||
}
|
||||
if wakeup.calls[0].runtimeID != runtimeKey {
|
||||
t.Fatalf("wakeup runtime mismatch: got %q want %q", wakeup.calls[0].runtimeID, runtimeKey)
|
||||
}
|
||||
if wakeup.calls[0].taskID != util.UUIDToString(taskID) {
|
||||
t.Fatalf("wakeup task mismatch: got %q want %q", wakeup.calls[0].taskID, util.UUIDToString(taskID))
|
||||
}
|
||||
}
|
||||
|
||||
// TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp guards the
|
||||
// no-RuntimeID early return — chat / quick-create / autopilot all set
|
||||
// it on insert, but a buggy caller that forgot must not silently bump
|
||||
// every workspace's version. The cache treats Bump("") as a no-op,
|
||||
// but this test pins that the RuntimeID guard sits above the Bump
|
||||
// call so a future refactor cannot drop the guard without test
|
||||
// coverage.
|
||||
func TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
cache := NewEmptyClaimCache(rdb)
|
||||
wakeup := &stubWakeup{}
|
||||
|
||||
svc := &TaskService{
|
||||
EmptyClaim: cache,
|
||||
Wakeup: wakeup,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
v0 := cache.CurrentVersion(ctx, "rt-stays")
|
||||
cache.MarkEmpty(ctx, "rt-stays", v0)
|
||||
|
||||
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
||||
// RuntimeID intentionally invalid (zero value, Valid=false).
|
||||
ID: testUUID(9),
|
||||
})
|
||||
|
||||
if !cache.IsEmpty(ctx, "rt-stays") {
|
||||
t.Fatal("notifyTaskAvailable with invalid RuntimeID must not touch cache")
|
||||
}
|
||||
if got := len(wakeup.calls); got != 0 {
|
||||
t.Fatalf("expected 0 wakeup calls when RuntimeID is invalid, got %d", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_claim_candidates;
|
||||
@@ -0,0 +1,11 @@
|
||||
-- Partial index that backs ListQueuedClaimCandidatesByRuntime. Daemons poll
|
||||
-- /tasks/claim every 30s per runtime; the filter "runtime_id = $1 AND
|
||||
-- status = 'queued'" runs every poll and is the dominant cost on warm paths.
|
||||
-- Restricting to status = 'queued' keeps the index tiny — terminal-state
|
||||
-- rows (completed/failed/cancelled) accumulate forever in the table but are
|
||||
-- excluded from the index, so it stays bounded by current queue depth.
|
||||
-- ORDER BY priority DESC, created_at ASC mirrors the SELECT so the planner
|
||||
-- can serve the query as an index-only scan without an extra sort.
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_claim_candidates
|
||||
ON agent_task_queue (runtime_id, priority DESC, created_at ASC)
|
||||
WHERE status = 'queued';
|
||||
@@ -1448,6 +1448,65 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
|
||||
// Returns rows the runtime can attempt to claim. Status is restricted to
|
||||
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
||||
// 'dispatched') because dispatched rows are by definition already owned
|
||||
// and cannot be re-claimed — including them in the candidate list pads
|
||||
// the result with rows that always lose the per-(issue, agent) race in
|
||||
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
||||
// runtime is busy on a long-running task. Backed by the partial index
|
||||
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
||||
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
||||
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentTaskQueue{}
|
||||
for rows.Next() {
|
||||
var i AgentTaskQueue
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentID,
|
||||
&i.IssueID,
|
||||
&i.Status,
|
||||
&i.Priority,
|
||||
&i.DispatchedAt,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.Result,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.Context,
|
||||
&i.RuntimeID,
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.ChatSessionID,
|
||||
&i.AutopilotRunID,
|
||||
&i.Attempt,
|
||||
&i.MaxAttempts,
|
||||
&i.ParentTaskID,
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE issue_id = $1
|
||||
|
||||
@@ -323,6 +323,19 @@ SELECT * FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
ORDER BY priority DESC, created_at ASC;
|
||||
|
||||
-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
-- Returns rows the runtime can attempt to claim. Status is restricted to
|
||||
-- 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
||||
-- 'dispatched') because dispatched rows are by definition already owned
|
||||
-- and cannot be re-claimed — including them in the candidate list pads
|
||||
-- the result with rows that always lose the per-(issue, agent) race in
|
||||
-- ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
||||
-- runtime is busy on a long-running task. Backed by the partial index
|
||||
-- idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
||||
SELECT * FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC;
|
||||
|
||||
-- name: ListActiveTasksByIssue :many
|
||||
SELECT * FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
||||
|
||||
Reference in New Issue
Block a user