Files
multica/server/internal/handler/runtime_models_redis_store_test.go
Bohan Jiang ba5b7db78e fix(server): persist ModelListStore across replicas via Redis (#2022)
* fix(server): persist ModelListStore across replicas via Redis

The model picker uses a pending-request pattern: the frontend POSTs to
create a request, the daemon pops it on its next heartbeat, runs
agent.ListModels locally, and reports back. Until now the store was a
plain in-memory map per Handler instance.

That works for self-hosted single-instance deploys but fails in any
multi-replica environment (Multica Cloud). Each replica has its own
map, so:

  POST /runtimes/:id/models               → request stored in replica A
  GET  /runtimes/:id/models/<requestId>   → polls land on B/C → 404
  daemon heartbeat                        → only A sees PendingModelList
  POST .../<requestId>/result             → daemon's report has to land on A

Success probability ~1/N². The visible symptom is "No models available"
in the picker for every provider, even those (Claude/Codex) whose
catalog is statically populated end-to-end.

Same shape of bug, same Redis-backed fix as multica-ai/multica#1557 did
for LocalSkillListStore / LocalSkillImportStore. Reuse the operational
playbook (namespaced keys, ZSET-backed pending queue, atomic
ZREM+SET-running via the shared Lua script) so we don't introduce a
second concurrency model for the same primitive.

Changes:
- Convert ModelListStore from struct to interface with context-aware
  methods. Add HasPending for cheap heartbeat-side probing.
- InMemoryModelListStore — single-node fallback, used when REDIS_URL
  is unset (self-hosted dev / tests).
- RedisModelListStore — multi-node implementation using the same key
  layout and Lua atomic claim as RedisLocalSkillListStore.
- Use RunStartedAt (not UpdatedAt) as the running-timeout reference
  point, matching the local-skill stores so subsequent UpdatedAt
  bumps don't reset the running clock.
- Heartbeat now uses the probe-then-pop pattern for the model queue
  (matching local-skills) so a slow Redis can't stall every connected
  daemon. Extends heartbeatMetrics + slow-log with probe_model_ms /
  pop_model_ms / probe_model_timed_out for parity.
- Wire the Redis backend in NewRouterWithOptions when rdb != nil.
- Tests for both backends. Redis tests gate on REDIS_TEST_URL so
  laptop runs without Redis still pass; CI provides it.

Co-authored-by: multica-agent <github@multica.ai>

* fix(server): persist RunStartedAt + retry model report on transient failures

Two follow-ups from PR #2022 review:

1. RedisModelListStore was dropping ModelListRequest.RunStartedAt on
   persistence — the field is tagged json:"-" so it doesn't leak into
   the HTTP response, which made plain json.Marshal(req) silently
   discard it. Across-node readers saw RunStartedAt=nil and
   applyModelListTimeout's running branch became a no-op, so the 60s
   running-timeout escape hatch never fired. CI's
   TestRedisModelListStore_RunningTimeout was failing on this exact
   case. Fix mirrors RedisLocalSkillImportStore's envelope pattern —
   wrap in an internal struct that re-promotes the field. HTTP shape
   stays clean. Adds a no-Redis unit test that pins the round trip.

2. Daemon's handleModelList called d.client.ReportModelListResult
   directly and swallowed any 5xx, leaving the pending request
   stranded in "running" until its 60s server-side timeout — exactly
   the failure mode the multi-node store fix was meant to eliminate.
   Generalize the existing local-skill retry helper into
   reportRuntimeResultWithRetry (kind: model_list / local_skill_list /
   local_skill_import) and wire handleModelList through a new
   reportModelListResult helper. Renames the test-overridable
   var localSkillReportBackoffs → runtimeReportBackoffs to match.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-03 11:13:34 +08:00

301 lines
8.8 KiB
Go

package handler
import (
"context"
"sync"
"testing"
"time"
)
// Reuses the newRedisTestClient helper from
// runtime_local_skills_redis_store_test.go: same Redis instance, same gating
// on REDIS_TEST_URL, same FlushDB-per-test isolation.
// TestRedisModelListStore_EnvelopePersistsRunStartedAt is a pure marshal/
// unmarshal round-trip — no Redis required. Pins the regression that the
// `json:"-"` tag on ModelListRequest.RunStartedAt was silently dropping the
// field on persistence, which broke the running-timeout escape hatch
// across nodes (CI failure for TestRedisModelListStore_RunningTimeout
// before this fix).
func TestRedisModelListStore_EnvelopePersistsRunStartedAt(t *testing.T) {
store := &RedisModelListStore{}
now := time.Now().UTC().Truncate(time.Microsecond) // JSON loses sub-µs precision
req := &ModelListRequest{
ID: "id-1",
RuntimeID: "rt-1",
Status: ModelListRunning,
Supported: true,
CreatedAt: now.Add(-time.Second),
UpdatedAt: now,
RunStartedAt: &now,
}
data, err := store.marshalRequest(req)
if err != nil {
t.Fatalf("marshal: %v", err)
}
got, err := store.unmarshalRequest(data)
if err != nil {
t.Fatalf("unmarshal: %v", err)
}
if got.RunStartedAt == nil {
t.Fatal("RunStartedAt lost on round trip — running timeout would never fire across nodes")
}
if !got.RunStartedAt.Equal(now) {
t.Errorf("RunStartedAt drifted: got %s, want %s", got.RunStartedAt, now)
}
if got.Status != ModelListRunning {
t.Errorf("Status lost: got %s", got.Status)
}
if got.ID != "id-1" || got.RuntimeID != "rt-1" {
t.Errorf("identifiers lost: %+v", got)
}
}
func TestRedisModelListStore_CreateGetComplete(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-1")
if err != nil {
t.Fatalf("create: %v", err)
}
if req.Status != ModelListPending {
t.Fatalf("initial status = %s", req.Status)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil || got.ID != req.ID {
t.Fatalf("round trip lost id: got=%v", got)
}
models := []ModelEntry{
{ID: "claude-sonnet-4-6", Label: "Claude Sonnet 4.6", Provider: "anthropic", Default: true},
{ID: "claude-opus-4-7", Label: "Claude Opus 4.7", Provider: "anthropic"},
}
if err := store.Complete(ctx, req.ID, models, true); err != nil {
t.Fatalf("complete: %v", err)
}
got, err = store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get after complete: %v", err)
}
if got.Status != ModelListCompleted {
t.Fatalf("status after complete = %s", got.Status)
}
if len(got.Models) != 2 {
t.Fatalf("models not persisted: %+v", got.Models)
}
if !got.Models[0].Default {
t.Fatalf("default flag lost on round trip: %+v", got.Models[0])
}
if !got.Supported {
t.Fatalf("supported flag lost on round trip")
}
}
// TestRedisModelListStore_PopPendingAcrossInstances is the regression test
// for the exact bug this PR fixes: two API replicas share one Redis, one
// receives the POST that creates the request, the other receives the daemon
// heartbeat that PopPending-s it. Before this change the in-memory store made
// node B see nothing, the request timed out, and the picker showed
// "No models available" forever.
func TestRedisModelListStore_PopPendingAcrossInstances(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
nodeA := NewRedisModelListStore(rdb)
nodeB := NewRedisModelListStore(rdb)
req, err := nodeA.Create(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node A create: %v", err)
}
popped, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B pop: %v", err)
}
if popped == nil {
t.Fatal("node B did not see node A's pending request")
}
if popped.ID != req.ID {
t.Fatalf("popped id = %s, want %s", popped.ID, req.ID)
}
if popped.Status != ModelListRunning {
t.Fatalf("popped status = %s, want running", popped.Status)
}
if popped.RunStartedAt == nil {
t.Fatal("run_started_at not set after pop")
}
// A third pop must see nothing (claim was atomic).
again, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B second pop: %v", err)
}
if again != nil {
t.Fatalf("expected no more pending, got %+v", again)
}
}
// TestRedisModelListStore_PopPendingConcurrent asserts the ZREM-wins race
// guard: N concurrent PopPending calls against a single pending request
// return exactly one winner.
func TestRedisModelListStore_PopPendingConcurrent(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-race")
if err != nil {
t.Fatalf("create: %v", err)
}
const N = 8
var wg sync.WaitGroup
results := make(chan *ModelListRequest, N)
errs := make(chan error, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
popped, err := store.PopPending(ctx, "runtime-race")
if err != nil {
errs <- err
return
}
results <- popped
}()
}
wg.Wait()
close(results)
close(errs)
for err := range errs {
t.Fatalf("concurrent pop error: %v", err)
}
winners := 0
for popped := range results {
if popped != nil {
winners++
if popped.ID != req.ID {
t.Fatalf("winner popped wrong id: %s", popped.ID)
}
}
}
if winners != 1 {
t.Fatalf("expected exactly one winner, got %d", winners)
}
}
// TestRedisModelListStore_PendingTimeout pins the lazy timeout sweep — a
// pending request whose CreatedAt has aged past the 30s threshold MUST
// transition to Timeout on the next Get and be evicted from the pending
// zset so a subsequent PopPending doesn't re-claim it.
func TestRedisModelListStore_PendingTimeout(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("create: %v", err)
}
// Rewind CreatedAt so the pending threshold is blown — simulates 31s of
// daemon silence without actually blocking the test that long.
req.CreatedAt = time.Now().Add(-modelListPendingTimeout - time.Second)
if err := store.persistRequest(ctx, req); err != nil {
t.Fatalf("persist rewound: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Status != ModelListTimeout {
t.Fatalf("status = %s, want timeout", got.Status)
}
// A subsequent PopPending must NOT return a timed-out request.
popped, err := store.PopPending(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("pop after timeout: %v", err)
}
if popped != nil {
t.Fatalf("expected no pending after timeout, got %+v", popped)
}
}
// TestRedisModelListStore_RunningTimeout pins the second escape hatch — a
// claimed request whose RunStartedAt has aged past the 60s threshold MUST
// flip to Timeout so the UI's polling loop terminates instead of waiting
// for the retention sweep.
func TestRedisModelListStore_RunningTimeout(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-running-timeout")
if err != nil {
t.Fatalf("create: %v", err)
}
popped, err := store.PopPending(ctx, "runtime-running-timeout")
if err != nil {
t.Fatalf("pop: %v", err)
}
if popped == nil || popped.Status != ModelListRunning {
t.Fatalf("expected running, got %+v", popped)
}
// Rewind RunStartedAt past the running threshold.
aged := time.Now().Add(-modelListRunningTimeout - time.Second)
popped.RunStartedAt = &aged
if err := store.persistRequest(ctx, popped); err != nil {
t.Fatalf("persist rewound: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Status != ModelListTimeout {
t.Fatalf("status = %s, want timeout", got.Status)
}
}
// TestRedisModelListStore_HasPending pins the cheap probe used by the
// heartbeat hot path so a slow Redis can't stall every connected daemon.
func TestRedisModelListStore_HasPending(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
if has, err := store.HasPending(ctx, "rt-empty"); err != nil || has {
t.Fatalf("empty store should not report pending: has=%v err=%v", has, err)
}
if _, err := store.Create(ctx, "rt-1"); err != nil {
t.Fatalf("create: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || !has {
t.Fatalf("expected pending=true after Create: has=%v err=%v", has, err)
}
if has, err := store.HasPending(ctx, "rt-other"); err != nil || has {
t.Fatalf("expected pending=false for unrelated runtime: has=%v err=%v", has, err)
}
if _, err := store.PopPending(ctx, "rt-1"); err != nil {
t.Fatalf("pop: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
t.Fatalf("expected pending=false after PopPending: has=%v err=%v", has, err)
}
}