mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
* feat(daemon): Redis empty-claim fast path for /tasks/claim polling Daemons poll /tasks/claim every 30s per runtime; the steady-state warm-empty case currently runs ListPendingTasksByRuntime against Postgres on every poll. This collapses that path: - New ListQueuedClaimCandidatesByRuntime query restricts to status = 'queued' (the old query also returned 'dispatched' rows that can never be reclaimed) and is backed by a partial index keyed on (runtime_id, priority DESC, created_at ASC). - New EmptyClaimCache caches the negative verdict in Redis with a 30s TTL. ClaimTaskForRuntime checks the cache before SELECT and populates it on confirmed-empty results. - notifyTaskAvailable now invalidates the runtime's empty key before kicking the daemon WS, so newly enqueued tasks become claimable immediately rather than waiting out the TTL. - AutopilotService.dispatchRunOnly now goes through TaskService.NotifyTaskEnqueued so run_only tasks get the same invalidate-then-wakeup contract as every other enqueue path. Co-authored-by: multica-agent <github@multica.ai> * fix(daemon): close MarkEmpty/Bump race in empty-claim fast path GPT-Boy's review on PR #1860 caught a real concurrency bug. Under the prior implementation it was possible for a slow claim to write an empty verdict AFTER a concurrent enqueue had already invalidated it: T1 claim: SELECT -> empty T2 enqueue: INSERT row, DEL empty key (no-op, key not set yet), wakeup T1 claim: SET empty (writes a stale "empty" verdict) T3 wakeup: IsEmpty -> hit -> returns null The just-queued task would then sit idle until the empty key's TTL expired (up to 30s). Replace the DEL-based invalidation with a per-runtime version counter: - CurrentVersion(rt) is a Redis INCR counter at mul:claim:runtime:version:<rt> with a 24h sliding TTL. - Claim samples version BEFORE the SELECT and passes it to MarkEmpty, which stores the verdict's value as the observed-version string. - IsEmpty MGETs both keys and trusts the verdict only when the empty-key value equals the current version. - Enqueue Bumps the version (INCR + EXPIRE) before the wakeup, causing any verdict written under a prior version to be rejected on the next read. Also bound every Redis call from this cache with a 250ms timeout — notifyTaskAvailable uses a background context so a wedged Redis must not block enqueue. Tests against a real Redis (REDIS_TEST_URL) cover: - MarkEmpty + IsEmpty under matching version returns hit - Bump invalidates a prior empty verdict (race-fix pin) - A MarkEmpty written under a stale pre-Bump version is rejected - TTL clamping, per-runtime isolation, nil-cache safety - notifyTaskAvailable Bumps before the wakeup fires Co-authored-by: multica-agent <github@multica.ai> * chore(daemon): renumber claim-candidate index migration to 067 Slot 064 was taken on main by 064_notification_preference. The migration runner tracks per-version in schema_migrations and would silently skip the second 064_*, leaving the index uncreated. Rename to 067 (next free slot). Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: multica-agent <github@multica.ai>
104 lines
3.4 KiB
Go
104 lines
3.4 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/multica-ai/multica/server/internal/util"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
)
|
|
|
|
// stubWakeup records every call so the test can assert that notify
|
|
// reaches the daemon hub and carries the right runtime / task IDs.
|
|
type stubWakeup struct {
|
|
calls []struct{ runtimeID, taskID string }
|
|
}
|
|
|
|
func (s *stubWakeup) NotifyTaskAvailable(runtimeID, taskID string) {
|
|
s.calls = append(s.calls, struct{ runtimeID, taskID string }{runtimeID, taskID})
|
|
}
|
|
|
|
// TestNotifyTaskAvailable_BumpsBeforeWakeup pins the contract noted in
|
|
// the EmptyClaimCache docs: the version Bump MUST run before the
|
|
// daemon WS wakeup, otherwise the wakeup-driven claim could read a
|
|
// still-current empty verdict and return null while the freshly
|
|
// queued task sits idle. The test (1) marks the runtime empty under
|
|
// the current version, (2) fires notifyTaskAvailable, then (3)
|
|
// asserts the prior verdict is rejected AND the wakeup hook saw the
|
|
// new task — proving every enqueue path (issue / mention /
|
|
// quick-create / chat / autopilot / retry) gets the same
|
|
// bump-then-notify behaviour for free.
|
|
func TestNotifyTaskAvailable_BumpsBeforeWakeup(t *testing.T) {
|
|
rdb := newRedisTestClient(t)
|
|
cache := NewEmptyClaimCache(rdb)
|
|
wakeup := &stubWakeup{}
|
|
|
|
svc := &TaskService{
|
|
EmptyClaim: cache,
|
|
Wakeup: wakeup,
|
|
}
|
|
|
|
runtimeID := testUUID(7)
|
|
taskID := testUUID(8)
|
|
runtimeKey := util.UUIDToString(runtimeID)
|
|
|
|
ctx := context.Background()
|
|
v0 := cache.CurrentVersion(ctx, runtimeKey)
|
|
cache.MarkEmpty(ctx, runtimeKey, v0)
|
|
if !cache.IsEmpty(ctx, runtimeKey) {
|
|
t.Fatal("precondition: cache should report empty after MarkEmpty under current version")
|
|
}
|
|
|
|
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
|
ID: taskID,
|
|
RuntimeID: runtimeID,
|
|
})
|
|
|
|
if cache.IsEmpty(ctx, runtimeKey) {
|
|
t.Fatal("notifyTaskAvailable must Bump the version so the prior empty verdict is rejected")
|
|
}
|
|
if got := len(wakeup.calls); got != 1 {
|
|
t.Fatalf("expected 1 wakeup call, got %d", got)
|
|
}
|
|
if wakeup.calls[0].runtimeID != runtimeKey {
|
|
t.Fatalf("wakeup runtime mismatch: got %q want %q", wakeup.calls[0].runtimeID, runtimeKey)
|
|
}
|
|
if wakeup.calls[0].taskID != util.UUIDToString(taskID) {
|
|
t.Fatalf("wakeup task mismatch: got %q want %q", wakeup.calls[0].taskID, util.UUIDToString(taskID))
|
|
}
|
|
}
|
|
|
|
// TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp guards the
|
|
// no-RuntimeID early return — chat / quick-create / autopilot all set
|
|
// it on insert, but a buggy caller that forgot must not silently bump
|
|
// every workspace's version. The cache treats Bump("") as a no-op,
|
|
// but this test pins that the RuntimeID guard sits above the Bump
|
|
// call so a future refactor cannot drop the guard without test
|
|
// coverage.
|
|
func TestNotifyTaskAvailable_InvalidWithoutRuntimeIsNoOp(t *testing.T) {
|
|
rdb := newRedisTestClient(t)
|
|
cache := NewEmptyClaimCache(rdb)
|
|
wakeup := &stubWakeup{}
|
|
|
|
svc := &TaskService{
|
|
EmptyClaim: cache,
|
|
Wakeup: wakeup,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
v0 := cache.CurrentVersion(ctx, "rt-stays")
|
|
cache.MarkEmpty(ctx, "rt-stays", v0)
|
|
|
|
svc.notifyTaskAvailable(db.AgentTaskQueue{
|
|
// RuntimeID intentionally invalid (zero value, Valid=false).
|
|
ID: testUUID(9),
|
|
})
|
|
|
|
if !cache.IsEmpty(ctx, "rt-stays") {
|
|
t.Fatal("notifyTaskAvailable with invalid RuntimeID must not touch cache")
|
|
}
|
|
if got := len(wakeup.calls); got != 0 {
|
|
t.Fatalf("expected 0 wakeup calls when RuntimeID is invalid, got %d", got)
|
|
}
|
|
}
|