Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
807720f902 fix(server): persist RunStartedAt + retry model report on transient failures
Two follow-ups from PR #2022 review:

1. RedisModelListStore was dropping ModelListRequest.RunStartedAt on
   persistence — the field is tagged json:"-" so it doesn't leak into
   the HTTP response, which made plain json.Marshal(req) silently
   discard it. Across-node readers saw RunStartedAt=nil and
   applyModelListTimeout's running branch became a no-op, so the 60s
   running-timeout escape hatch never fired. CI's
   TestRedisModelListStore_RunningTimeout was failing on this exact
   case. Fix mirrors RedisLocalSkillImportStore's envelope pattern —
   wrap in an internal struct that re-promotes the field. HTTP shape
   stays clean. Adds a no-Redis unit test that pins the round trip.

2. Daemon's handleModelList called d.client.ReportModelListResult
   directly and swallowed any 5xx, leaving the pending request
   stranded in "running" until its 60s server-side timeout — exactly
   the failure mode the multi-node store fix was meant to eliminate.
   Generalize the existing local-skill retry helper into
   reportRuntimeResultWithRetry (kind: model_list / local_skill_list /
   local_skill_import) and wire handleModelList through a new
   reportModelListResult helper. Renames the test-overridable
   var localSkillReportBackoffs → runtimeReportBackoffs to match.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-03 11:10:38 +08:00
Jiang Bohan
a5deb1e395 fix(server): persist ModelListStore across replicas via Redis
The model picker uses a pending-request pattern: the frontend POSTs to
create a request, the daemon pops it on its next heartbeat, runs
agent.ListModels locally, and reports back. Until now the store was a
plain in-memory map per Handler instance.

That works for self-hosted single-instance deploys but fails in any
multi-replica environment (Multica Cloud). Each replica has its own
map, so:

  POST /runtimes/:id/models               → request stored in replica A
  GET  /runtimes/:id/models/<requestId>   → polls land on B/C → 404
  daemon heartbeat                        → only A sees PendingModelList
  POST .../<requestId>/result             → daemon's report has to land on A

Success probability ~1/N². The visible symptom is "No models available"
in the picker for every provider, even those (Claude/Codex) whose
catalog is statically populated end-to-end.

Same shape of bug, same Redis-backed fix as multica-ai/multica#1557 did
for LocalSkillListStore / LocalSkillImportStore. Reuse the operational
playbook (namespaced keys, ZSET-backed pending queue, atomic
ZREM+SET-running via the shared Lua script) so we don't introduce a
second concurrency model for the same primitive.

Changes:
- Convert ModelListStore from struct to interface with context-aware
  methods. Add HasPending for cheap heartbeat-side probing.
- InMemoryModelListStore — single-node fallback, used when REDIS_URL
  is unset (self-hosted dev / tests).
- RedisModelListStore — multi-node implementation using the same key
  layout and Lua atomic claim as RedisLocalSkillListStore.
- Use RunStartedAt (not UpdatedAt) as the running-timeout reference
  point, matching the local-skill stores so subsequent UpdatedAt
  bumps don't reset the running clock.
- Heartbeat now uses the probe-then-pop pattern for the model queue
  (matching local-skills) so a slow Redis can't stall every connected
  daemon. Extends heartbeatMetrics + slow-log with probe_model_ms /
  pop_model_ms / probe_model_timed_out for parity.
- Wire the Redis backend in NewRouterWithOptions when rdb != nil.
- Tests for both backends. Redis tests gate on REDIS_TEST_URL so
  laptop runs without Redis still pass; CI provides it.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-03 10:58:35 +08:00
10 changed files with 967 additions and 120 deletions

View File

@@ -106,6 +106,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
h.TaskService.Wakeup = opts.DaemonWakeup
}
if rdb != nil {
h.ModelListStore = handler.NewRedisModelListStore(rdb)
h.LocalSkillListStore = handler.NewRedisLocalSkillListStore(rdb)
h.LocalSkillImportStore = handler.NewRedisLocalSkillImportStore(rdb)
}

View File

@@ -79,11 +79,11 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
// server can split logs/metrics by client version (parallel to the CLI).
client.SetVersion(cfg.CLIVersion)
return &Daemon{
cfg: cfg,
client: client,
repoCache: repocache.New(cacheRoot, logger),
logger: logger,
workspaces: make(map[string]*workspaceState),
cfg: cfg,
client: client,
repoCache: repocache.New(cacheRoot, logger),
logger: logger,
workspaces: make(map[string]*workspaceState),
runtimeIndex: make(map[string]Runtime),
runtimeSetCh: make(chan struct{}, 1),
agentVersions: make(map[string]string),
@@ -715,7 +715,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
entry, ok := d.cfg.Agents[rt.Provider]
if !ok {
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
d.reportModelListResult(ctx, rt, requestID, map[string]any{
"status": "failed",
"error": fmt.Sprintf("no agent configured for provider %q", rt.Provider),
})
@@ -724,7 +724,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
models, err := agent.ListModels(ctx, rt.Provider, entry.Path)
if err != nil {
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
d.reportModelListResult(ctx, rt, requestID, map[string]any{
"status": "failed",
"error": err.Error(),
})
@@ -749,7 +749,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
Default: m.Default,
})
}
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
d.reportModelListResult(ctx, rt, requestID, map[string]any{
"status": "completed",
"models": wire,
"supported": agent.ModelSelectionSupported(rt.Provider),
@@ -800,19 +800,20 @@ func (d *Daemon) handleLocalSkillImport(ctx context.Context, rt Runtime, pending
})
}
// localSkillReportBackoffs defines the retry schedule for delivering a
// local-skill result to the server. First attempt runs immediately, then we
// back off. The sum (≈6.5s) stays well under the server-side running timeout
// (60s) so a report that eventually lands still updates the request instead
// of racing a timeout transition.
// runtimeReportBackoffs defines the retry schedule for delivering any
// daemon→server async result (model list, local-skill list, local-skill
// import). First attempt runs immediately, then we back off. The sum
// (≈6.5s) stays well under the server-side running timeout (60s) so a
// report that eventually lands still updates the request instead of
// racing a timeout transition.
//
// Overridable for tests to avoid real sleeps.
var localSkillReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second}
var runtimeReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second}
// reportLocalSkillListResult delivers a list-report to the server with retry
// on transient failures. See reportLocalSkillResultWithRetry for semantics.
// on transient failures. See reportRuntimeResultWithRetry for semantics.
func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
d.reportLocalSkillResultWithRetry(ctx, "list", rt.ID, requestID, func(ctx context.Context) error {
d.reportRuntimeResultWithRetry(ctx, "local_skill_list", rt.ID, requestID, func(ctx context.Context) error {
return d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, payload)
})
}
@@ -820,29 +821,39 @@ func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, req
// reportLocalSkillImportResult delivers an import-report to the server with
// retry on transient failures.
func (d *Daemon) reportLocalSkillImportResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
d.reportLocalSkillResultWithRetry(ctx, "import", rt.ID, requestID, func(ctx context.Context) error {
d.reportRuntimeResultWithRetry(ctx, "local_skill_import", rt.ID, requestID, func(ctx context.Context) error {
return d.client.ReportLocalSkillImportResult(ctx, rt.ID, requestID, payload)
})
}
// reportLocalSkillResultWithRetry retries `fn` on 5xx / network errors and
// stops on success, 4xx, or after exhausting localSkillReportBackoffs.
// reportModelListResult delivers a model-list report to the server with retry
// on transient failures. Without this the daemon used to fire once and
// swallow any 5xx, leaving the request stranded in "running" on the server
// until its 60s timeout — defeating the multi-node store fix.
func (d *Daemon) reportModelListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
d.reportRuntimeResultWithRetry(ctx, "model_list", rt.ID, requestID, func(ctx context.Context) error {
return d.client.ReportModelListResult(ctx, rt.ID, requestID, payload)
})
}
// reportRuntimeResultWithRetry retries `fn` on 5xx / network errors and
// stops on success, 4xx, or after exhausting runtimeReportBackoffs.
//
// Why this exists: the server persists the report through a Redis / DB
// write; on a transient store failure it now correctly returns 500 (see
// PR #1557). Without a client-side retry the daemon would fire once,
// swallow the error, and the pending request stays in "running" on the
// server until the 60s timeout — which is exactly the "daemon did not
// respond" failure mode the whole store refactor was meant to fix. 4xx is
// treated as permanent (request-not-found, cross-workspace token rejected,
// bad body) — retrying those just wastes heartbeat cycles.
func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) {
// write; on a transient store failure it correctly returns 500. Without a
// client-side retry the daemon would fire once, swallow the error, and the
// pending request stays in "running" on the server until its timeout — which
// is exactly the "daemon did not respond" failure mode the multi-node store
// fix was meant to eliminate. 4xx is treated as permanent (request-not-found,
// cross-workspace token rejected, bad body) — retrying those just wastes
// heartbeat cycles.
func (d *Daemon) reportRuntimeResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) {
var lastErr error
for attempt, wait := range localSkillReportBackoffs {
for attempt, wait := range runtimeReportBackoffs {
if wait > 0 {
select {
case <-ctx.Done():
d.logger.Error("local skill report cancelled",
d.logger.Error("runtime async report cancelled",
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
"attempt", attempt, "error", ctx.Err())
return
@@ -852,7 +863,7 @@ func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runt
err := fn(ctx)
if err == nil {
if attempt > 0 {
d.logger.Info("local skill report succeeded after retry",
d.logger.Info("runtime async report succeeded after retry",
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
"attempt", attempt+1)
}
@@ -864,17 +875,17 @@ func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runt
// body). No amount of retrying will make it succeed.
var reqErr *requestError
if errors.As(err, &reqErr) && reqErr.StatusCode >= 400 && reqErr.StatusCode < 500 {
d.logger.Error("local skill report rejected — not retrying",
d.logger.Error("runtime async report rejected — not retrying",
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
"status", reqErr.StatusCode, "error", err)
return
}
d.logger.Warn("local skill report failed — will retry",
d.logger.Warn("runtime async report failed — will retry",
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
"attempt", attempt+1, "error", err)
}
d.logger.Error("local skill report exhausted retries",
d.logger.Error("runtime async report exhausted retries",
"kind", kind, "runtime_id", runtimeID, "request_id", requestID, "error", lastErr)
}

View File

@@ -16,9 +16,9 @@ import (
// the production schedule on cleanup.
func withFastLocalSkillReportBackoffs(t *testing.T) {
t.Helper()
prev := localSkillReportBackoffs
localSkillReportBackoffs = []time.Duration{0, 0, 0, 0}
t.Cleanup(func() { localSkillReportBackoffs = prev })
prev := runtimeReportBackoffs
runtimeReportBackoffs = []time.Duration{0, 0, 0, 0}
t.Cleanup(func() { runtimeReportBackoffs = prev })
}
// localSkillReportDaemon wires a Daemon instance around an httptest.Server
@@ -110,18 +110,18 @@ func TestReportLocalSkillResult_GivesUpAfterAllAttemptsFail(t *testing.T) {
d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
// Each element in localSkillReportBackoffs is one attempt — a persistent
// Each element in runtimeReportBackoffs is one attempt — a persistent
// outage should burn through every slot and then stop (logging Error).
if got := atomic.LoadInt32(calls); int(got) != len(localSkillReportBackoffs) {
t.Fatalf("expected %d attempts, got %d", len(localSkillReportBackoffs), got)
if got := atomic.LoadInt32(calls); int(got) != len(runtimeReportBackoffs) {
t.Fatalf("expected %d attempts, got %d", len(runtimeReportBackoffs), got)
}
}
func TestReportLocalSkillResult_AbortsOnContextCancel(t *testing.T) {
// Keep one real delay in the schedule so cancel lands mid-backoff.
prev := localSkillReportBackoffs
localSkillReportBackoffs = []time.Duration{0, 200 * time.Millisecond}
t.Cleanup(func() { localSkillReportBackoffs = prev })
prev := runtimeReportBackoffs
runtimeReportBackoffs = []time.Duration{0, 200 * time.Millisecond}
t.Cleanup(func() { runtimeReportBackoffs = prev })
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "{}", http.StatusInternalServerError)

View File

@@ -0,0 +1,74 @@
package daemon
import (
"context"
"net/http"
"strings"
"sync/atomic"
"testing"
)
// TestReportModelListResult_RetriesOn500AndEventuallySucceeds pins the
// regression GPT-Boy flagged on PR #2022: handleModelList used to call
// d.client.ReportModelListResult directly and swallow any 5xx, leaving the
// pending request stranded in "running" until its 60s server-side timeout —
// which is exactly the failure mode the multi-node store fix was meant to
// eliminate. With the retry helper in place a transient store failure on
// the server side gets re-tried until it lands.
func TestReportModelListResult_RetriesOn500AndEventuallySucceeds(t *testing.T) {
withFastLocalSkillReportBackoffs(t)
var hits int32
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
n := atomic.AddInt32(&hits, 1)
if n <= 2 {
http.Error(w, "{}", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
})
d.reportModelListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
if got := atomic.LoadInt32(calls); got != 3 {
t.Fatalf("expected 3 attempts (2 failures + 1 success), got %d", got)
}
}
// TestReportModelListResult_DoesNotRetryOn4xx pins that 4xx (e.g. the request
// expired or was cross-workspace) is treated as terminal — retrying just
// burns heartbeat cycles.
func TestReportModelListResult_DoesNotRetryOn4xx(t *testing.T) {
withFastLocalSkillReportBackoffs(t)
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, `{"error":"request not found"}`, http.StatusNotFound)
})
d.reportModelListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
if got := atomic.LoadInt32(calls); got != 1 {
t.Fatalf("expected exactly 1 attempt (4xx is terminal), got %d", got)
}
}
// TestReportModelListResult_SendsCorrectPath smoke-tests the URL the daemon
// posts to, so a future client refactor doesn't silently aim reports at the
// wrong endpoint.
func TestReportModelListResult_SendsCorrectPath(t *testing.T) {
withFastLocalSkillReportBackoffs(t)
var path string
d, _ := localSkillReportDaemon(t, func(w http.ResponseWriter, r *http.Request) {
path = r.URL.Path
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
})
d.reportModelListResult(context.Background(), Runtime{ID: "rt-a"}, "req-1", map[string]any{"status": "completed"})
if !strings.HasSuffix(path, "/api/daemon/runtimes/rt-a/models/req-1/result") {
t.Fatalf("model list report path = %q", path)
}
}

View File

@@ -527,14 +527,14 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
start := time.Now()
authPath := middleware.DaemonAuthPathFromContext(r.Context())
var (
outcome = "unauth"
runtimeID string
decodeMs, runtimeLookupMs, workspaceCheckMs int64
authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
probeSkillsTimedOut, probeImportTimedOut bool
outcome = "unauth"
runtimeID string
decodeMs, runtimeLookupMs, workspaceCheckMs int64
authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool
)
defer func() {
logHeartbeatEndpointSlow(runtimeID, outcome, authPath, start, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeSkillsTimedOut, probeImportTimedOut)
logHeartbeatEndpointSlow(runtimeID, outcome, authPath, start, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut)
}()
decodeStart := time.Now()
@@ -584,10 +584,13 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
ack, m, err := h.processHeartbeat(r.Context(), rt)
updateMs = m.UpdateMs
probeModelMs = m.ProbeModelMs
popModelMs = m.PopModelMs
probeSkillsMs = m.ProbeSkillsMs
popSkillsMs = m.PopSkillsMs
probeImportMs = m.ProbeImportMs
popImportMs = m.PopImportMs
probeModelTimedOut = m.ProbeModelTimedOut
probeSkillsTimedOut = m.ProbeSkillsTimedOut
probeImportTimedOut = m.ProbeImportTimedOut
if err != nil {
@@ -642,8 +645,8 @@ func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws
// heartbeatMetrics carries per-stage timings out of processHeartbeat so the
// HTTP slow-log can stay structured. The WS path discards them.
type heartbeatMetrics struct {
UpdateMs, ProbeSkillsMs, PopSkillsMs, ProbeImportMs, PopImportMs int64
ProbeSkillsTimedOut, ProbeImportTimedOut bool
UpdateMs, ProbeModelMs, PopModelMs, ProbeSkillsMs, PopSkillsMs, ProbeImportMs, PopImportMs int64
ProbeModelTimedOut, ProbeSkillsTimedOut, ProbeImportTimedOut bool
}
// processHeartbeat does the work shared by HTTP POST /api/daemon/heartbeat and
@@ -675,8 +678,32 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
}
}
if pending := h.ModelListStore.PopPending(runtimeID); pending != nil {
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pending.ID}
// Probe then claim the model list queue. Same pattern as the local-skill
// queues below — a slow shared store cannot stall the heartbeat on
// empty-queue ticks, but the claim itself runs unbounded because its
// Lua side effects cannot be safely aborted mid-script.
probeModelStart := time.Now()
probeModelCtx, cancelProbeModel := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
hasModel, probeModelErr := h.ModelListStore.HasPending(probeModelCtx, runtimeID)
cancelProbeModel()
m.ProbeModelMs = time.Since(probeModelStart).Milliseconds()
switch {
case probeModelErr == nil && hasModel:
popStart := time.Now()
pendingModel, popErr := h.ModelListStore.PopPending(ctx, runtimeID)
m.PopModelMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("model list PopPending failed", "error", popErr, "runtime_id", runtimeID)
} else if pendingModel != nil {
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pendingModel.ID}
}
case probeModelErr != nil:
if errors.Is(probeModelErr, context.DeadlineExceeded) || errors.Is(probeModelErr, context.Canceled) {
m.ProbeModelTimedOut = true
slog.Warn("model list HasPending timed out", "runtime_id", runtimeID, "elapsed_ms", m.ProbeModelMs)
} else {
slog.Warn("model list HasPending failed", "error", probeModelErr, "runtime_id", runtimeID)
}
}
// Probe then claim the local-skill list queue. The probe is bounded so a
@@ -743,9 +770,9 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
// auth_ms is further decomposed into decode_ms, runtime_lookup_ms, and
// workspace_check_ms; auth_path labels which token kind authenticated the
// request ("daemon_token", "pat", or "jwt"). Mirrors logClaimEndpointSlow.
func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Time, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeSkillsTimedOut, probeImportTimedOut bool) {
func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Time, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 500 && !probeSkillsTimedOut && !probeImportTimedOut {
if totalMs < 500 && !probeModelTimedOut && !probeSkillsTimedOut && !probeImportTimedOut {
return
}
slog.Info("heartbeat_endpoint slow",
@@ -758,10 +785,13 @@ func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Ti
"runtime_lookup_ms", runtimeLookupMs,
"workspace_check_ms", workspaceCheckMs,
"update_ms", updateMs,
"probe_model_ms", probeModelMs,
"pop_model_ms", popModelMs,
"probe_skills_ms", probeSkillsMs,
"pop_skills_ms", popSkillsMs,
"probe_import_ms", probeImportMs,
"pop_import_ms", popImportMs,
"probe_model_timed_out", probeModelTimedOut,
"probe_skills_timed_out", probeSkillsTimedOut,
"probe_import_timed_out", probeImportTimedOut,
)

View File

@@ -60,7 +60,7 @@ type Handler struct {
AutopilotService *service.AutopilotService
EmailService *service.EmailService
UpdateStore *UpdateStore
ModelListStore *ModelListStore
ModelListStore ModelListStore
LocalSkillListStore LocalSkillListStore
LocalSkillImportStore LocalSkillImportStore
Storage storage.Storage
@@ -98,7 +98,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
EmailService: emailService,
UpdateStore: NewUpdateStore(),
ModelListStore: NewModelListStore(),
ModelListStore: NewInMemoryModelListStore(),
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
Storage: store,

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
@@ -11,13 +12,23 @@ import (
)
// ---------------------------------------------------------------------------
// In-memory model-list request store
// Model list request store
// ---------------------------------------------------------------------------
//
// The server cannot call the daemon directly (the daemon is behind the user's
// NAT and only polls the server). So "list models for this runtime" uses a
// pending-request pattern: server creates a pending request, daemon pops it
// on the next heartbeat, executes locally, and reports the result back.
// pending-request pattern: a frontend POST creates a pending request, the
// daemon pops it on the next heartbeat, executes locally, and reports the
// result back.
//
// The store is the cross-cutting state for that flow. It MUST stay coherent
// across API replicas — POST, heartbeat and poll can each land on a different
// node, and they all need to see the same request lifecycle. The single-node
// in-memory implementation is fine for self-hosted dev; multi-node deploys
// (Multica Cloud) MUST use the Redis-backed implementation, otherwise the
// pending request is invisible to whichever replica receives the next call
// and the picker shows "No models available" (regression: see issue
// review on multica-ai/multica#2009).
// ModelListStatus represents the lifecycle of a model list request.
type ModelListStatus string
@@ -35,15 +46,20 @@ const (
// selection entirely (currently: hermes). The UI uses this to
// disable its dropdown rather than silently accepting a value the
// backend will drop.
//
// RunStartedAt is set when PopPending claims the request. It is
// `json:"-"` because it's a server-side bookkeeping field — the UI only
// needs Status / UpdatedAt to drive the polling loop.
type ModelListRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status ModelListStatus `json:"status"`
Models []ModelEntry `json:"models,omitempty"`
Supported bool `json:"supported"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status ModelListStatus `json:"status"`
Models []ModelEntry `json:"models,omitempty"`
Supported bool `json:"supported"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RunStartedAt *time.Time `json:"-"`
}
// ModelEntry mirrors agent.Model for the wire. `Default` tags the
@@ -67,32 +83,83 @@ const (
// carrying `pending_model_list` is lost in transit (e.g. HTTP client
// timeout after PopPending already mutated store state): without this
// transition the UI would keep polling a record that is stuck in
// `running` until the 2-minute memory GC sweeps it.
// `running` until retention sweeps it.
modelListRunningTimeout = 60 * time.Second
// modelListStoreRetention bounds how long any stored request lives in
// the backing store. The Redis backend uses it as a TTL; the in-memory
// backend GCs on Create. The window is deliberately wider than the
// running/pending timeouts so terminal records are still readable when
// the UI's last poll arrives.
modelListStoreRetention = 2 * time.Minute
)
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
// to bound memory use; the UI polls /requests/:id until status is terminal.
type ModelListStore struct {
// ModelListStore is the contract every backend (in-memory single-node,
// Redis multi-node) must satisfy. Methods take a context so the Redis
// implementation can honour the heartbeat-side timeout that gates a
// slow shared store from stalling the rest of the heartbeat.
type ModelListStore interface {
Create(ctx context.Context, runtimeID string) (*ModelListRequest, error)
Get(ctx context.Context, id string) (*ModelListRequest, error)
// HasPending is a cheap read-only probe used by the heartbeat hot path
// to gate the side-effecting PopPending. A spurious "true" is fine —
// PopPending handles "queue empty after probe" by returning nil.
HasPending(ctx context.Context, runtimeID string) (bool, error)
PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error)
Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error
Fail(ctx context.Context, id string, errMsg string) error
}
// applyModelListTimeout transitions a request to ModelListTimeout when it has
// been stuck in a non-terminal state past its threshold. Returns true when
// the record was modified so callers can persist the change. The pending
// threshold catches "daemon never picked this up"; the running threshold
// catches "daemon picked it up but the result report was lost" — without
// the running escape, only retention sweep ends the polling loop.
func applyModelListTimeout(req *ModelListRequest, now time.Time) bool {
switch req.Status {
case ModelListPending:
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
return true
}
case ModelListRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > modelListRunningTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
return true
}
}
return false
}
// InMemoryModelListStore is the single-node implementation. Adequate for
// self-hosted dev and the test suite, but unsafe in multi-node deploys
// (each replica gets its own map and the pending request is invisible to
// every replica that didn't receive the POST).
type InMemoryModelListStore struct {
mu sync.Mutex
requests map[string]*ModelListRequest
}
func NewModelListStore() *ModelListStore {
return &ModelListStore{requests: make(map[string]*ModelListRequest)}
func NewInMemoryModelListStore() *InMemoryModelListStore {
return &InMemoryModelListStore{requests: make(map[string]*ModelListRequest)}
}
func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
func (s *InMemoryModelListStore) Create(_ context.Context, runtimeID string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Garbage-collect stale entries so the map can't grow unbounded.
for id, req := range s.requests {
if time.Since(req.CreatedAt) > 2*time.Minute {
if time.Since(req.CreatedAt) > modelListStoreRetention {
delete(s.requests, id)
}
}
now := time.Now()
req := &ModelListRequest{
ID: randomID(),
RuntimeID: runtimeID,
@@ -100,55 +167,47 @@ func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
// Default to true; the daemon overrides this in the report
// for providers that don't support per-agent model selection.
Supported: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
CreatedAt: now,
UpdatedAt: now,
}
s.requests[req.ID] = req
return req
return req, nil
}
func (s *ModelListStore) Get(id string) *ModelListRequest {
func (s *InMemoryModelListStore) Get(_ context.Context, id string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil
return nil, nil
}
applyModelListTimeout(req, time.Now())
return req
return req, nil
}
// applyModelListTimeout transitions a request to ModelListTimeout when it has
// been stuck in a non-terminal state past its threshold. The pending threshold
// catches "daemon never picked this up"; the running threshold catches
// "daemon picked it up but the result report was lost" — previously the only
// escape from running was the 2-minute memory GC, which exceeded the UI's
// polling window and surfaced as a silent discovery failure.
func applyModelListTimeout(req *ModelListRequest, now time.Time) {
switch req.Status {
case ModelListPending:
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
}
case ModelListRunning:
if now.Sub(req.UpdatedAt) > modelListRunningTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
func (s *InMemoryModelListStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for _, req := range s.requests {
applyModelListTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == ModelListPending {
return true, nil
}
}
return false, nil
}
// PopPending returns and marks-running the oldest pending request for a runtime.
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
func (s *InMemoryModelListStore) PopPending(_ context.Context, runtimeID string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
var oldest *ModelListRequest
now := time.Now()
for _, req := range s.requests {
applyModelListTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == ModelListPending {
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
oldest = req
@@ -157,12 +216,14 @@ func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
}
if oldest != nil {
oldest.Status = ModelListRunning
oldest.UpdatedAt = time.Now()
startedAt := now
oldest.RunStartedAt = &startedAt
oldest.UpdatedAt = now
}
return oldest
return oldest, nil
}
func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool) {
func (s *InMemoryModelListStore) Complete(_ context.Context, id string, models []ModelEntry, supported bool) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -172,9 +233,10 @@ func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool
req.Supported = supported
req.UpdatedAt = time.Now()
}
return nil
}
func (s *ModelListStore) Fail(id string, errMsg string) {
func (s *InMemoryModelListStore) Fail(_ context.Context, id string, errMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -183,6 +245,11 @@ func (s *ModelListStore) Fail(id string, errMsg string) {
req.Error = errMsg
req.UpdatedAt = time.Now()
}
return nil
}
func modelListRequestTerminal(status ModelListStatus) bool {
return status == ModelListCompleted || status == ModelListFailed || status == ModelListTimeout
}
// ---------------------------------------------------------------------------
@@ -211,7 +278,11 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
return
}
req := h.ModelListStore.Create(uuidToString(rt.ID))
req, err := h.ModelListStore.Create(r.Context(), uuidToString(rt.ID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to enqueue model list request: "+err.Error())
return
}
writeJSON(w, http.StatusOK, req)
}
@@ -219,7 +290,11 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
func (h *Handler) GetModelListRequest(w http.ResponseWriter, r *http.Request) {
requestID := chi.URLParam(r, "requestId")
req := h.ModelListStore.Get(requestID)
req, err := h.ModelListStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if req == nil {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -237,6 +312,24 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
requestID := chi.URLParam(r, "requestId")
// Fetch first so we can ignore stale reports for already-terminal
// requests (e.g. the heartbeat response that triggered the daemon
// run was a retry, and the original report already landed).
existing, err := h.ModelListStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if existing == nil || existing.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "request not found")
return
}
if modelListRequestTerminal(existing.Status) {
slog.Debug("ignoring stale model list report", "runtime_id", runtimeID, "request_id", requestID, "status", existing.Status)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
var body struct {
Status string `json:"status"` // "completed" or "failed"
Models []ModelEntry `json:"models"`
@@ -255,9 +348,21 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
if body.Supported != nil {
supported = *body.Supported
}
h.ModelListStore.Complete(requestID, body.Models, supported)
if err := h.ModelListStore.Complete(r.Context(), requestID, body.Models, supported); err != nil {
// Surface the store failure as 5xx so the daemon can retry instead
// of swallowing the report (leaves the request stuck in running
// until the server-side timeout, which is exactly the "looks OK
// but nothing happens" class of bug we're trying to avoid).
slog.Error("ModelListStore Complete failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist completion")
return
}
} else {
h.ModelListStore.Fail(requestID, body.Error)
if err := h.ModelListStore.Fail(r.Context(), requestID, body.Error); err != nil {
slog.Error("ModelListStore Fail failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
}
slog.Debug("model list report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Models))

View File

@@ -0,0 +1,250 @@
package handler
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// Redis-backed implementation of ModelListStore. The wire layout matches
// runtime_local_skills_redis_store.go (which solves the same multi-node
// dispatch problem for skill lists/imports) so the operational story is
// identical: namespaced keys, ZSET-backed pending queue, atomic claim via
// the shared Lua script.
//
// Key layout:
//
// mul:model_list:req:<request_id> → JSON-encoded ModelListRequest, TTL = retention
// mul:model_list:pending:<runtime_id> → ZSET { member = request_id, score = created_at UnixNano }
// TTL = retention*2 (kept alive long enough for
// lazy sweep on PopPending)
//
// PopPending uses claimPendingScript (defined in
// runtime_local_skills_redis_store.go) to atomically ZREM the pending entry
// and SET the record to "running" — splitting those two writes would strand
// requests on a transient Redis hiccup between them.
const (
// Namespaced under mul:model_list:* so the key set doesn't collide with
// the realtime relay (ws:*) or the local-skill stores (mul:local_skill:*).
modelListKeyPrefix = "mul:model_list:req:"
modelListPendingPrefix = "mul:model_list:pending:"
modelListRedisPopMaxRetries = 5
)
func modelListKey(id string) string { return modelListKeyPrefix + id }
func modelListPendingKey(runtimeID string) string { return modelListPendingPrefix + runtimeID }
// RedisModelListStore stores model list requests in Redis so every API node
// agrees on the same pending / running / terminal state.
type RedisModelListStore struct {
rdb *redis.Client
}
func NewRedisModelListStore(rdb *redis.Client) *RedisModelListStore {
return &RedisModelListStore{rdb: rdb}
}
func (s *RedisModelListStore) Create(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
now := time.Now()
req := &ModelListRequest{
ID: randomID(),
RuntimeID: runtimeID,
Status: ModelListPending,
Supported: true,
CreatedAt: now,
UpdatedAt: now,
}
data, err := s.marshalRequest(req)
if err != nil {
return nil, err
}
pipe := s.rdb.TxPipeline()
pipe.Set(ctx, modelListKey(req.ID), data, modelListStoreRetention)
pipe.ZAdd(ctx, modelListPendingKey(runtimeID), redis.Z{
Score: float64(now.UnixNano()),
Member: req.ID,
})
// Keep the pending zset alive past the per-record retention so stale
// members can be lazily swept on PopPending.
pipe.Expire(ctx, modelListPendingKey(runtimeID), modelListStoreRetention*2)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("persist model list request: %w", err)
}
return req, nil
}
func (s *RedisModelListStore) Get(ctx context.Context, id string) (*ModelListRequest, error) {
return s.loadRequest(ctx, id)
}
// loadRequest fetches a single record, applies timeout transitions if the
// stored state has aged past the threshold, and persists the transition so
// sibling nodes observe the same terminal state.
func (s *RedisModelListStore) loadRequest(ctx context.Context, id string) (*ModelListRequest, error) {
raw, err := s.rdb.Get(ctx, modelListKey(id)).Bytes()
if errors.Is(err, redis.Nil) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get model list request: %w", err)
}
req, err := s.unmarshalRequest(raw)
if err != nil {
return nil, err
}
if applyModelListTimeout(req, time.Now()) {
if err := s.persistRequest(ctx, req); err != nil {
return nil, err
}
// Drop from pending zset on terminal transition. PopPending would
// also do this, but doing it here keeps the set clean for readers
// that never call PopPending.
s.rdb.ZRem(ctx, modelListPendingKey(req.RuntimeID), req.ID)
}
return req, nil
}
func (s *RedisModelListStore) persistRequest(ctx context.Context, req *ModelListRequest) error {
data, err := s.marshalRequest(req)
if err != nil {
return err
}
if err := s.rdb.Set(ctx, modelListKey(req.ID), data, modelListStoreRetention).Err(); err != nil {
return fmt.Errorf("persist model list request: %w", err)
}
return nil
}
// ModelListRequest tags RunStartedAt as `json:"-"` so the server-side
// bookkeeping field doesn't leak into the HTTP response (the UI only
// needs Status / UpdatedAt to drive its polling loop). Redis persistence
// has to keep that field, otherwise the running-timeout escape hatch
// silently breaks across nodes — every reader sees RunStartedAt=nil and
// applyModelListTimeout's running branch becomes a no-op. Wrap in an
// internal envelope that re-promotes the field on the wire.
type redisModelListEnvelope struct {
Public *ModelListRequest `json:"r"`
RunStartedAt *time.Time `json:"s,omitempty"`
}
func (s *RedisModelListStore) marshalRequest(req *ModelListRequest) ([]byte, error) {
env := redisModelListEnvelope{Public: req, RunStartedAt: req.RunStartedAt}
data, err := json.Marshal(env)
if err != nil {
return nil, fmt.Errorf("marshal model list request: %w", err)
}
return data, nil
}
func (s *RedisModelListStore) unmarshalRequest(raw []byte) (*ModelListRequest, error) {
var env redisModelListEnvelope
if err := json.Unmarshal(raw, &env); err != nil {
return nil, fmt.Errorf("decode model list request: %w", err)
}
if env.Public == nil {
return nil, fmt.Errorf("decode model list request: missing payload")
}
env.Public.RunStartedAt = env.RunStartedAt
return env.Public, nil
}
// HasPending is a cheap read-only ZCARD probe used by the heartbeat hot path
// to decide whether to invoke the side-effecting PopPending.
func (s *RedisModelListStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
cnt, err := s.rdb.ZCard(ctx, modelListPendingKey(runtimeID)).Result()
if err != nil {
return false, fmt.Errorf("zcard pending: %w", err)
}
return cnt > 0, nil
}
func (s *RedisModelListStore) PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
pendingKey := modelListPendingKey(runtimeID)
for attempt := 0; attempt < modelListRedisPopMaxRetries; attempt++ {
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
if err != nil {
return nil, fmt.Errorf("zrange pending: %w", err)
}
if len(ids) == 0 {
return nil, nil
}
id := ids[0]
req, err := s.loadRequest(ctx, id)
if err != nil {
return nil, err
}
if req == nil {
// Record expired but the zset still references it — drop and retry.
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
if req.Status != ModelListPending {
// Either the timeout fired inside loadRequest or another node
// already picked it up. Unlink from the pending set and retry.
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
now := time.Now()
req.Status = ModelListRunning
req.RunStartedAt = &now
req.UpdatedAt = now
data, err := s.marshalRequest(req)
if err != nil {
return nil, err
}
result, err := claimPendingScript.Run(
ctx, s.rdb,
[]string{pendingKey, modelListKey(id)},
id, data, int(modelListStoreRetention.Seconds()),
).Int64()
if err != nil {
return nil, fmt.Errorf("claim pending: %w", err)
}
if result == 0 {
// Another node won the race. The record is owned by the winner;
// retry to pick up whatever else is queued (or nothing).
continue
}
return req, nil
}
return nil, nil
}
func (s *RedisModelListStore) Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error {
req, err := s.loadRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = ModelListCompleted
req.Models = models
req.Supported = supported
req.UpdatedAt = time.Now()
return s.persistRequest(ctx, req)
}
func (s *RedisModelListStore) Fail(ctx context.Context, id string, errMsg string) error {
req, err := s.loadRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = ModelListFailed
req.Error = errMsg
req.UpdatedAt = time.Now()
return s.persistRequest(ctx, req)
}

View File

@@ -0,0 +1,300 @@
package handler
import (
"context"
"sync"
"testing"
"time"
)
// Reuses the newRedisTestClient helper from
// runtime_local_skills_redis_store_test.go: same Redis instance, same gating
// on REDIS_TEST_URL, same FlushDB-per-test isolation.
// TestRedisModelListStore_EnvelopePersistsRunStartedAt is a pure marshal/
// unmarshal round-trip — no Redis required. Pins the regression that the
// `json:"-"` tag on ModelListRequest.RunStartedAt was silently dropping the
// field on persistence, which broke the running-timeout escape hatch
// across nodes (CI failure for TestRedisModelListStore_RunningTimeout
// before this fix).
func TestRedisModelListStore_EnvelopePersistsRunStartedAt(t *testing.T) {
store := &RedisModelListStore{}
now := time.Now().UTC().Truncate(time.Microsecond) // JSON loses sub-µs precision
req := &ModelListRequest{
ID: "id-1",
RuntimeID: "rt-1",
Status: ModelListRunning,
Supported: true,
CreatedAt: now.Add(-time.Second),
UpdatedAt: now,
RunStartedAt: &now,
}
data, err := store.marshalRequest(req)
if err != nil {
t.Fatalf("marshal: %v", err)
}
got, err := store.unmarshalRequest(data)
if err != nil {
t.Fatalf("unmarshal: %v", err)
}
if got.RunStartedAt == nil {
t.Fatal("RunStartedAt lost on round trip — running timeout would never fire across nodes")
}
if !got.RunStartedAt.Equal(now) {
t.Errorf("RunStartedAt drifted: got %s, want %s", got.RunStartedAt, now)
}
if got.Status != ModelListRunning {
t.Errorf("Status lost: got %s", got.Status)
}
if got.ID != "id-1" || got.RuntimeID != "rt-1" {
t.Errorf("identifiers lost: %+v", got)
}
}
func TestRedisModelListStore_CreateGetComplete(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-1")
if err != nil {
t.Fatalf("create: %v", err)
}
if req.Status != ModelListPending {
t.Fatalf("initial status = %s", req.Status)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil || got.ID != req.ID {
t.Fatalf("round trip lost id: got=%v", got)
}
models := []ModelEntry{
{ID: "claude-sonnet-4-6", Label: "Claude Sonnet 4.6", Provider: "anthropic", Default: true},
{ID: "claude-opus-4-7", Label: "Claude Opus 4.7", Provider: "anthropic"},
}
if err := store.Complete(ctx, req.ID, models, true); err != nil {
t.Fatalf("complete: %v", err)
}
got, err = store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get after complete: %v", err)
}
if got.Status != ModelListCompleted {
t.Fatalf("status after complete = %s", got.Status)
}
if len(got.Models) != 2 {
t.Fatalf("models not persisted: %+v", got.Models)
}
if !got.Models[0].Default {
t.Fatalf("default flag lost on round trip: %+v", got.Models[0])
}
if !got.Supported {
t.Fatalf("supported flag lost on round trip")
}
}
// TestRedisModelListStore_PopPendingAcrossInstances is the regression test
// for the exact bug this PR fixes: two API replicas share one Redis, one
// receives the POST that creates the request, the other receives the daemon
// heartbeat that PopPending-s it. Before this change the in-memory store made
// node B see nothing, the request timed out, and the picker showed
// "No models available" forever.
func TestRedisModelListStore_PopPendingAcrossInstances(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
nodeA := NewRedisModelListStore(rdb)
nodeB := NewRedisModelListStore(rdb)
req, err := nodeA.Create(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node A create: %v", err)
}
popped, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B pop: %v", err)
}
if popped == nil {
t.Fatal("node B did not see node A's pending request")
}
if popped.ID != req.ID {
t.Fatalf("popped id = %s, want %s", popped.ID, req.ID)
}
if popped.Status != ModelListRunning {
t.Fatalf("popped status = %s, want running", popped.Status)
}
if popped.RunStartedAt == nil {
t.Fatal("run_started_at not set after pop")
}
// A third pop must see nothing (claim was atomic).
again, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B second pop: %v", err)
}
if again != nil {
t.Fatalf("expected no more pending, got %+v", again)
}
}
// TestRedisModelListStore_PopPendingConcurrent asserts the ZREM-wins race
// guard: N concurrent PopPending calls against a single pending request
// return exactly one winner.
func TestRedisModelListStore_PopPendingConcurrent(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-race")
if err != nil {
t.Fatalf("create: %v", err)
}
const N = 8
var wg sync.WaitGroup
results := make(chan *ModelListRequest, N)
errs := make(chan error, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
popped, err := store.PopPending(ctx, "runtime-race")
if err != nil {
errs <- err
return
}
results <- popped
}()
}
wg.Wait()
close(results)
close(errs)
for err := range errs {
t.Fatalf("concurrent pop error: %v", err)
}
winners := 0
for popped := range results {
if popped != nil {
winners++
if popped.ID != req.ID {
t.Fatalf("winner popped wrong id: %s", popped.ID)
}
}
}
if winners != 1 {
t.Fatalf("expected exactly one winner, got %d", winners)
}
}
// TestRedisModelListStore_PendingTimeout pins the lazy timeout sweep — a
// pending request whose CreatedAt has aged past the 30s threshold MUST
// transition to Timeout on the next Get and be evicted from the pending
// zset so a subsequent PopPending doesn't re-claim it.
func TestRedisModelListStore_PendingTimeout(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("create: %v", err)
}
// Rewind CreatedAt so the pending threshold is blown — simulates 31s of
// daemon silence without actually blocking the test that long.
req.CreatedAt = time.Now().Add(-modelListPendingTimeout - time.Second)
if err := store.persistRequest(ctx, req); err != nil {
t.Fatalf("persist rewound: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Status != ModelListTimeout {
t.Fatalf("status = %s, want timeout", got.Status)
}
// A subsequent PopPending must NOT return a timed-out request.
popped, err := store.PopPending(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("pop after timeout: %v", err)
}
if popped != nil {
t.Fatalf("expected no pending after timeout, got %+v", popped)
}
}
// TestRedisModelListStore_RunningTimeout pins the second escape hatch — a
// claimed request whose RunStartedAt has aged past the 60s threshold MUST
// flip to Timeout so the UI's polling loop terminates instead of waiting
// for the retention sweep.
func TestRedisModelListStore_RunningTimeout(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
req, err := store.Create(ctx, "runtime-running-timeout")
if err != nil {
t.Fatalf("create: %v", err)
}
popped, err := store.PopPending(ctx, "runtime-running-timeout")
if err != nil {
t.Fatalf("pop: %v", err)
}
if popped == nil || popped.Status != ModelListRunning {
t.Fatalf("expected running, got %+v", popped)
}
// Rewind RunStartedAt past the running threshold.
aged := time.Now().Add(-modelListRunningTimeout - time.Second)
popped.RunStartedAt = &aged
if err := store.persistRequest(ctx, popped); err != nil {
t.Fatalf("persist rewound: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Status != ModelListTimeout {
t.Fatalf("status = %s, want timeout", got.Status)
}
}
// TestRedisModelListStore_HasPending pins the cheap probe used by the
// heartbeat hot path so a slow Redis can't stall every connected daemon.
func TestRedisModelListStore_HasPending(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisModelListStore(rdb)
if has, err := store.HasPending(ctx, "rt-empty"); err != nil || has {
t.Fatalf("empty store should not report pending: has=%v err=%v", has, err)
}
if _, err := store.Create(ctx, "rt-1"); err != nil {
t.Fatalf("create: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || !has {
t.Fatalf("expected pending=true after Create: has=%v err=%v", has, err)
}
if has, err := store.HasPending(ctx, "rt-other"); err != nil || has {
t.Fatalf("expected pending=false for unrelated runtime: has=%v err=%v", has, err)
}
if _, err := store.PopPending(ctx, "rt-1"); err != nil {
t.Fatalf("pop: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
t.Fatalf("expected pending=false after PopPending: has=%v err=%v", has, err)
}
}

View File

@@ -2,6 +2,7 @@ package handler
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
@@ -16,21 +17,35 @@ import (
// way out of Running was the 2-minute memory GC, which exceeded the UI
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
store := NewModelListStore()
req := store.Create("runtime-xyz")
claimed := store.PopPending("runtime-xyz")
ctx := context.Background()
store := NewInMemoryModelListStore()
req, err := store.Create(ctx, "runtime-xyz")
if err != nil {
t.Fatalf("create: %v", err)
}
claimed, err := store.PopPending(ctx, "runtime-xyz")
if err != nil {
t.Fatalf("pop: %v", err)
}
if claimed == nil {
t.Fatal("expected PopPending to claim the pending request")
}
if claimed.Status != ModelListRunning {
t.Fatalf("expected Running after PopPending, got %s", claimed.Status)
}
if claimed.RunStartedAt == nil {
t.Fatal("expected RunStartedAt to be set on PopPending")
}
// Age the running record past the threshold without the daemon ever
// reporting a result. Get() must flip it to Timeout so the UI can
// terminate polling instead of waiting for the 2-minute GC.
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
got := store.Get(req.ID)
// terminate polling instead of waiting for the retention sweep.
aged := time.Now().Add(-(modelListRunningTimeout + time.Second))
claimed.RunStartedAt = &aged
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected stored request")
}
@@ -48,8 +63,12 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
// dropped here (e.g. by going through a map[string]string), the badge
// silently disappears.
func TestReportModelListResult_PreservesDefault(t *testing.T) {
store := NewModelListStore()
req := store.Create("runtime-xyz")
ctx := context.Background()
store := NewInMemoryModelListStore()
req, err := store.Create(ctx, "runtime-xyz")
if err != nil {
t.Fatalf("create: %v", err)
}
// Report a completed result with one default entry and one not.
body := map[string]any{
@@ -72,9 +91,14 @@ func TestReportModelListResult_PreservesDefault(t *testing.T) {
if err := json.Unmarshal(raw, &parsed); err != nil {
t.Fatalf("unmarshal report body: %v", err)
}
store.Complete(req.ID, parsed.Models, true)
if err := store.Complete(ctx, req.ID, parsed.Models, true); err != nil {
t.Fatalf("complete: %v", err)
}
got := store.Get(req.ID)
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected stored result")
}
@@ -120,3 +144,55 @@ func TestReportModelListResult_DecodesJSONBodyDefault(t *testing.T) {
t.Errorf("default flag lost on model[0]: %+v", body.Models[0])
}
}
// TestInMemoryModelListStore_HasPending pins the cheap probe used by the
// heartbeat hot path. Empty queue → false; pending request → true; after
// PopPending claims the record → false again.
func TestInMemoryModelListStore_HasPending(t *testing.T) {
ctx := context.Background()
store := NewInMemoryModelListStore()
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
t.Fatalf("empty store should not report pending: has=%v err=%v", has, err)
}
if _, err := store.Create(ctx, "rt-1"); err != nil {
t.Fatalf("create: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || !has {
t.Fatalf("expected pending=true after Create: has=%v err=%v", has, err)
}
// Other runtimes don't see this runtime's queue.
if has, err := store.HasPending(ctx, "rt-2"); err != nil || has {
t.Fatalf("expected pending=false for unrelated runtime: has=%v err=%v", has, err)
}
if _, err := store.PopPending(ctx, "rt-1"); err != nil {
t.Fatalf("pop: %v", err)
}
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
t.Fatalf("expected pending=false after PopPending: has=%v err=%v", has, err)
}
}
// TestInMemoryModelListStore_PopPendingPicksOldest documents the FIFO
// ordering so a daemon that handles one request per heartbeat doesn't
// starve early queue entries.
func TestInMemoryModelListStore_PopPendingPicksOldest(t *testing.T) {
ctx := context.Background()
store := NewInMemoryModelListStore()
first, _ := store.Create(ctx, "rt-1")
// Force a measurable gap so the FIFO comparison isn't on equal
// CreatedAt values (possible on platforms with coarse clocks).
time.Sleep(2 * time.Millisecond)
second, _ := store.Create(ctx, "rt-1")
got, err := store.PopPending(ctx, "rt-1")
if err != nil {
t.Fatalf("pop: %v", err)
}
if got == nil || got.ID != first.ID {
t.Fatalf("expected first request, got %+v (second was %s)", got, second.ID)
}
}