mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
2 Commits
agent/lamb
...
fix/model-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
807720f902 | ||
|
|
a5deb1e395 |
@@ -106,6 +106,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
h.TaskService.Wakeup = opts.DaemonWakeup
|
||||
}
|
||||
if rdb != nil {
|
||||
h.ModelListStore = handler.NewRedisModelListStore(rdb)
|
||||
h.LocalSkillListStore = handler.NewRedisLocalSkillListStore(rdb)
|
||||
h.LocalSkillImportStore = handler.NewRedisLocalSkillImportStore(rdb)
|
||||
}
|
||||
|
||||
@@ -79,11 +79,11 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
// server can split logs/metrics by client version (parallel to the CLI).
|
||||
client.SetVersion(cfg.CLIVersion)
|
||||
return &Daemon{
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
repoCache: repocache.New(cacheRoot, logger),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
repoCache: repocache.New(cacheRoot, logger),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSetCh: make(chan struct{}, 1),
|
||||
agentVersions: make(map[string]string),
|
||||
@@ -715,7 +715,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
|
||||
|
||||
entry, ok := d.cfg.Agents[rt.Provider]
|
||||
if !ok {
|
||||
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
|
||||
d.reportModelListResult(ctx, rt, requestID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": fmt.Sprintf("no agent configured for provider %q", rt.Provider),
|
||||
})
|
||||
@@ -724,7 +724,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
|
||||
|
||||
models, err := agent.ListModels(ctx, rt.Provider, entry.Path)
|
||||
if err != nil {
|
||||
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
|
||||
d.reportModelListResult(ctx, rt, requestID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": err.Error(),
|
||||
})
|
||||
@@ -749,7 +749,7 @@ func (d *Daemon) handleModelList(ctx context.Context, rt Runtime, requestID stri
|
||||
Default: m.Default,
|
||||
})
|
||||
}
|
||||
d.client.ReportModelListResult(ctx, rt.ID, requestID, map[string]any{
|
||||
d.reportModelListResult(ctx, rt, requestID, map[string]any{
|
||||
"status": "completed",
|
||||
"models": wire,
|
||||
"supported": agent.ModelSelectionSupported(rt.Provider),
|
||||
@@ -800,19 +800,20 @@ func (d *Daemon) handleLocalSkillImport(ctx context.Context, rt Runtime, pending
|
||||
})
|
||||
}
|
||||
|
||||
// localSkillReportBackoffs defines the retry schedule for delivering a
|
||||
// local-skill result to the server. First attempt runs immediately, then we
|
||||
// back off. The sum (≈6.5s) stays well under the server-side running timeout
|
||||
// (60s) so a report that eventually lands still updates the request instead
|
||||
// of racing a timeout transition.
|
||||
// runtimeReportBackoffs defines the retry schedule for delivering any
|
||||
// daemon→server async result (model list, local-skill list, local-skill
|
||||
// import). First attempt runs immediately, then we back off. The sum
|
||||
// (≈6.5s) stays well under the server-side running timeout (60s) so a
|
||||
// report that eventually lands still updates the request instead of
|
||||
// racing a timeout transition.
|
||||
//
|
||||
// Overridable for tests to avoid real sleeps.
|
||||
var localSkillReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second}
|
||||
var runtimeReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second}
|
||||
|
||||
// reportLocalSkillListResult delivers a list-report to the server with retry
|
||||
// on transient failures. See reportLocalSkillResultWithRetry for semantics.
|
||||
// on transient failures. See reportRuntimeResultWithRetry for semantics.
|
||||
func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
|
||||
d.reportLocalSkillResultWithRetry(ctx, "list", rt.ID, requestID, func(ctx context.Context) error {
|
||||
d.reportRuntimeResultWithRetry(ctx, "local_skill_list", rt.ID, requestID, func(ctx context.Context) error {
|
||||
return d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, payload)
|
||||
})
|
||||
}
|
||||
@@ -820,29 +821,39 @@ func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, req
|
||||
// reportLocalSkillImportResult delivers an import-report to the server with
|
||||
// retry on transient failures.
|
||||
func (d *Daemon) reportLocalSkillImportResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
|
||||
d.reportLocalSkillResultWithRetry(ctx, "import", rt.ID, requestID, func(ctx context.Context) error {
|
||||
d.reportRuntimeResultWithRetry(ctx, "local_skill_import", rt.ID, requestID, func(ctx context.Context) error {
|
||||
return d.client.ReportLocalSkillImportResult(ctx, rt.ID, requestID, payload)
|
||||
})
|
||||
}
|
||||
|
||||
// reportLocalSkillResultWithRetry retries `fn` on 5xx / network errors and
|
||||
// stops on success, 4xx, or after exhausting localSkillReportBackoffs.
|
||||
// reportModelListResult delivers a model-list report to the server with retry
|
||||
// on transient failures. Without this the daemon used to fire once and
|
||||
// swallow any 5xx, leaving the request stranded in "running" on the server
|
||||
// until its 60s timeout — defeating the multi-node store fix.
|
||||
func (d *Daemon) reportModelListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
|
||||
d.reportRuntimeResultWithRetry(ctx, "model_list", rt.ID, requestID, func(ctx context.Context) error {
|
||||
return d.client.ReportModelListResult(ctx, rt.ID, requestID, payload)
|
||||
})
|
||||
}
|
||||
|
||||
// reportRuntimeResultWithRetry retries `fn` on 5xx / network errors and
|
||||
// stops on success, 4xx, or after exhausting runtimeReportBackoffs.
|
||||
//
|
||||
// Why this exists: the server persists the report through a Redis / DB
|
||||
// write; on a transient store failure it now correctly returns 500 (see
|
||||
// PR #1557). Without a client-side retry the daemon would fire once,
|
||||
// swallow the error, and the pending request stays in "running" on the
|
||||
// server until the 60s timeout — which is exactly the "daemon did not
|
||||
// respond" failure mode the whole store refactor was meant to fix. 4xx is
|
||||
// treated as permanent (request-not-found, cross-workspace token rejected,
|
||||
// bad body) — retrying those just wastes heartbeat cycles.
|
||||
func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) {
|
||||
// write; on a transient store failure it correctly returns 500. Without a
|
||||
// client-side retry the daemon would fire once, swallow the error, and the
|
||||
// pending request stays in "running" on the server until its timeout — which
|
||||
// is exactly the "daemon did not respond" failure mode the multi-node store
|
||||
// fix was meant to eliminate. 4xx is treated as permanent (request-not-found,
|
||||
// cross-workspace token rejected, bad body) — retrying those just wastes
|
||||
// heartbeat cycles.
|
||||
func (d *Daemon) reportRuntimeResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) {
|
||||
var lastErr error
|
||||
for attempt, wait := range localSkillReportBackoffs {
|
||||
for attempt, wait := range runtimeReportBackoffs {
|
||||
if wait > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
d.logger.Error("local skill report cancelled",
|
||||
d.logger.Error("runtime async report cancelled",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt, "error", ctx.Err())
|
||||
return
|
||||
@@ -852,7 +863,7 @@ func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runt
|
||||
err := fn(ctx)
|
||||
if err == nil {
|
||||
if attempt > 0 {
|
||||
d.logger.Info("local skill report succeeded after retry",
|
||||
d.logger.Info("runtime async report succeeded after retry",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt+1)
|
||||
}
|
||||
@@ -864,17 +875,17 @@ func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runt
|
||||
// body). No amount of retrying will make it succeed.
|
||||
var reqErr *requestError
|
||||
if errors.As(err, &reqErr) && reqErr.StatusCode >= 400 && reqErr.StatusCode < 500 {
|
||||
d.logger.Error("local skill report rejected — not retrying",
|
||||
d.logger.Error("runtime async report rejected — not retrying",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"status", reqErr.StatusCode, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Warn("local skill report failed — will retry",
|
||||
d.logger.Warn("runtime async report failed — will retry",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt+1, "error", err)
|
||||
}
|
||||
d.logger.Error("local skill report exhausted retries",
|
||||
d.logger.Error("runtime async report exhausted retries",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID, "error", lastErr)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,9 +16,9 @@ import (
|
||||
// the production schedule on cleanup.
|
||||
func withFastLocalSkillReportBackoffs(t *testing.T) {
|
||||
t.Helper()
|
||||
prev := localSkillReportBackoffs
|
||||
localSkillReportBackoffs = []time.Duration{0, 0, 0, 0}
|
||||
t.Cleanup(func() { localSkillReportBackoffs = prev })
|
||||
prev := runtimeReportBackoffs
|
||||
runtimeReportBackoffs = []time.Duration{0, 0, 0, 0}
|
||||
t.Cleanup(func() { runtimeReportBackoffs = prev })
|
||||
}
|
||||
|
||||
// localSkillReportDaemon wires a Daemon instance around an httptest.Server
|
||||
@@ -110,18 +110,18 @@ func TestReportLocalSkillResult_GivesUpAfterAllAttemptsFail(t *testing.T) {
|
||||
|
||||
d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
// Each element in localSkillReportBackoffs is one attempt — a persistent
|
||||
// Each element in runtimeReportBackoffs is one attempt — a persistent
|
||||
// outage should burn through every slot and then stop (logging Error).
|
||||
if got := atomic.LoadInt32(calls); int(got) != len(localSkillReportBackoffs) {
|
||||
t.Fatalf("expected %d attempts, got %d", len(localSkillReportBackoffs), got)
|
||||
if got := atomic.LoadInt32(calls); int(got) != len(runtimeReportBackoffs) {
|
||||
t.Fatalf("expected %d attempts, got %d", len(runtimeReportBackoffs), got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillResult_AbortsOnContextCancel(t *testing.T) {
|
||||
// Keep one real delay in the schedule so cancel lands mid-backoff.
|
||||
prev := localSkillReportBackoffs
|
||||
localSkillReportBackoffs = []time.Duration{0, 200 * time.Millisecond}
|
||||
t.Cleanup(func() { localSkillReportBackoffs = prev })
|
||||
prev := runtimeReportBackoffs
|
||||
runtimeReportBackoffs = []time.Duration{0, 200 * time.Millisecond}
|
||||
t.Cleanup(func() { runtimeReportBackoffs = prev })
|
||||
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, "{}", http.StatusInternalServerError)
|
||||
|
||||
74
server/internal/daemon/model_list_report_test.go
Normal file
74
server/internal/daemon/model_list_report_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestReportModelListResult_RetriesOn500AndEventuallySucceeds pins the
|
||||
// regression GPT-Boy flagged on PR #2022: handleModelList used to call
|
||||
// d.client.ReportModelListResult directly and swallow any 5xx, leaving the
|
||||
// pending request stranded in "running" until its 60s server-side timeout —
|
||||
// which is exactly the failure mode the multi-node store fix was meant to
|
||||
// eliminate. With the retry helper in place a transient store failure on
|
||||
// the server side gets re-tried until it lands.
|
||||
func TestReportModelListResult_RetriesOn500AndEventuallySucceeds(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
var hits int32
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
n := atomic.AddInt32(&hits, 1)
|
||||
if n <= 2 {
|
||||
http.Error(w, "{}", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"status":"ok"}`))
|
||||
})
|
||||
|
||||
d.reportModelListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 3 {
|
||||
t.Fatalf("expected 3 attempts (2 failures + 1 success), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportModelListResult_DoesNotRetryOn4xx pins that 4xx (e.g. the request
|
||||
// expired or was cross-workspace) is treated as terminal — retrying just
|
||||
// burns heartbeat cycles.
|
||||
func TestReportModelListResult_DoesNotRetryOn4xx(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, `{"error":"request not found"}`, http.StatusNotFound)
|
||||
})
|
||||
|
||||
d.reportModelListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 1 {
|
||||
t.Fatalf("expected exactly 1 attempt (4xx is terminal), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportModelListResult_SendsCorrectPath smoke-tests the URL the daemon
|
||||
// posts to, so a future client refactor doesn't silently aim reports at the
|
||||
// wrong endpoint.
|
||||
func TestReportModelListResult_SendsCorrectPath(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
var path string
|
||||
d, _ := localSkillReportDaemon(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
path = r.URL.Path
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"status":"ok"}`))
|
||||
})
|
||||
|
||||
d.reportModelListResult(context.Background(), Runtime{ID: "rt-a"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if !strings.HasSuffix(path, "/api/daemon/runtimes/rt-a/models/req-1/result") {
|
||||
t.Fatalf("model list report path = %q", path)
|
||||
}
|
||||
}
|
||||
@@ -527,14 +527,14 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
authPath := middleware.DaemonAuthPathFromContext(r.Context())
|
||||
var (
|
||||
outcome = "unauth"
|
||||
runtimeID string
|
||||
decodeMs, runtimeLookupMs, workspaceCheckMs int64
|
||||
authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
|
||||
probeSkillsTimedOut, probeImportTimedOut bool
|
||||
outcome = "unauth"
|
||||
runtimeID string
|
||||
decodeMs, runtimeLookupMs, workspaceCheckMs int64
|
||||
authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
|
||||
probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool
|
||||
)
|
||||
defer func() {
|
||||
logHeartbeatEndpointSlow(runtimeID, outcome, authPath, start, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeSkillsTimedOut, probeImportTimedOut)
|
||||
logHeartbeatEndpointSlow(runtimeID, outcome, authPath, start, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut)
|
||||
}()
|
||||
|
||||
decodeStart := time.Now()
|
||||
@@ -584,10 +584,13 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ack, m, err := h.processHeartbeat(r.Context(), rt)
|
||||
updateMs = m.UpdateMs
|
||||
probeModelMs = m.ProbeModelMs
|
||||
popModelMs = m.PopModelMs
|
||||
probeSkillsMs = m.ProbeSkillsMs
|
||||
popSkillsMs = m.PopSkillsMs
|
||||
probeImportMs = m.ProbeImportMs
|
||||
popImportMs = m.PopImportMs
|
||||
probeModelTimedOut = m.ProbeModelTimedOut
|
||||
probeSkillsTimedOut = m.ProbeSkillsTimedOut
|
||||
probeImportTimedOut = m.ProbeImportTimedOut
|
||||
if err != nil {
|
||||
@@ -642,8 +645,8 @@ func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws
|
||||
// heartbeatMetrics carries per-stage timings out of processHeartbeat so the
|
||||
// HTTP slow-log can stay structured. The WS path discards them.
|
||||
type heartbeatMetrics struct {
|
||||
UpdateMs, ProbeSkillsMs, PopSkillsMs, ProbeImportMs, PopImportMs int64
|
||||
ProbeSkillsTimedOut, ProbeImportTimedOut bool
|
||||
UpdateMs, ProbeModelMs, PopModelMs, ProbeSkillsMs, PopSkillsMs, ProbeImportMs, PopImportMs int64
|
||||
ProbeModelTimedOut, ProbeSkillsTimedOut, ProbeImportTimedOut bool
|
||||
}
|
||||
|
||||
// processHeartbeat does the work shared by HTTP POST /api/daemon/heartbeat and
|
||||
@@ -675,8 +678,32 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
|
||||
}
|
||||
}
|
||||
|
||||
if pending := h.ModelListStore.PopPending(runtimeID); pending != nil {
|
||||
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pending.ID}
|
||||
// Probe then claim the model list queue. Same pattern as the local-skill
|
||||
// queues below — a slow shared store cannot stall the heartbeat on
|
||||
// empty-queue ticks, but the claim itself runs unbounded because its
|
||||
// Lua side effects cannot be safely aborted mid-script.
|
||||
probeModelStart := time.Now()
|
||||
probeModelCtx, cancelProbeModel := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
|
||||
hasModel, probeModelErr := h.ModelListStore.HasPending(probeModelCtx, runtimeID)
|
||||
cancelProbeModel()
|
||||
m.ProbeModelMs = time.Since(probeModelStart).Milliseconds()
|
||||
switch {
|
||||
case probeModelErr == nil && hasModel:
|
||||
popStart := time.Now()
|
||||
pendingModel, popErr := h.ModelListStore.PopPending(ctx, runtimeID)
|
||||
m.PopModelMs = time.Since(popStart).Milliseconds()
|
||||
if popErr != nil {
|
||||
slog.Warn("model list PopPending failed", "error", popErr, "runtime_id", runtimeID)
|
||||
} else if pendingModel != nil {
|
||||
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pendingModel.ID}
|
||||
}
|
||||
case probeModelErr != nil:
|
||||
if errors.Is(probeModelErr, context.DeadlineExceeded) || errors.Is(probeModelErr, context.Canceled) {
|
||||
m.ProbeModelTimedOut = true
|
||||
slog.Warn("model list HasPending timed out", "runtime_id", runtimeID, "elapsed_ms", m.ProbeModelMs)
|
||||
} else {
|
||||
slog.Warn("model list HasPending failed", "error", probeModelErr, "runtime_id", runtimeID)
|
||||
}
|
||||
}
|
||||
|
||||
// Probe then claim the local-skill list queue. The probe is bounded so a
|
||||
@@ -743,9 +770,9 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
|
||||
// auth_ms is further decomposed into decode_ms, runtime_lookup_ms, and
|
||||
// workspace_check_ms; auth_path labels which token kind authenticated the
|
||||
// request ("daemon_token", "pat", or "jwt"). Mirrors logClaimEndpointSlow.
|
||||
func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Time, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeSkillsTimedOut, probeImportTimedOut bool) {
|
||||
func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Time, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool) {
|
||||
totalMs := time.Since(start).Milliseconds()
|
||||
if totalMs < 500 && !probeSkillsTimedOut && !probeImportTimedOut {
|
||||
if totalMs < 500 && !probeModelTimedOut && !probeSkillsTimedOut && !probeImportTimedOut {
|
||||
return
|
||||
}
|
||||
slog.Info("heartbeat_endpoint slow",
|
||||
@@ -758,10 +785,13 @@ func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Ti
|
||||
"runtime_lookup_ms", runtimeLookupMs,
|
||||
"workspace_check_ms", workspaceCheckMs,
|
||||
"update_ms", updateMs,
|
||||
"probe_model_ms", probeModelMs,
|
||||
"pop_model_ms", popModelMs,
|
||||
"probe_skills_ms", probeSkillsMs,
|
||||
"pop_skills_ms", popSkillsMs,
|
||||
"probe_import_ms", probeImportMs,
|
||||
"pop_import_ms", popImportMs,
|
||||
"probe_model_timed_out", probeModelTimedOut,
|
||||
"probe_skills_timed_out", probeSkillsTimedOut,
|
||||
"probe_import_timed_out", probeImportTimedOut,
|
||||
)
|
||||
|
||||
@@ -60,7 +60,7 @@ type Handler struct {
|
||||
AutopilotService *service.AutopilotService
|
||||
EmailService *service.EmailService
|
||||
UpdateStore *UpdateStore
|
||||
ModelListStore *ModelListStore
|
||||
ModelListStore ModelListStore
|
||||
LocalSkillListStore LocalSkillListStore
|
||||
LocalSkillImportStore LocalSkillImportStore
|
||||
Storage storage.Storage
|
||||
@@ -98,7 +98,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
|
||||
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
|
||||
EmailService: emailService,
|
||||
UpdateStore: NewUpdateStore(),
|
||||
ModelListStore: NewModelListStore(),
|
||||
ModelListStore: NewInMemoryModelListStore(),
|
||||
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
|
||||
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
|
||||
Storage: store,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -11,13 +12,23 @@ import (
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// In-memory model-list request store
|
||||
// Model list request store
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// The server cannot call the daemon directly (the daemon is behind the user's
|
||||
// NAT and only polls the server). So "list models for this runtime" uses a
|
||||
// pending-request pattern: server creates a pending request, daemon pops it
|
||||
// on the next heartbeat, executes locally, and reports the result back.
|
||||
// pending-request pattern: a frontend POST creates a pending request, the
|
||||
// daemon pops it on the next heartbeat, executes locally, and reports the
|
||||
// result back.
|
||||
//
|
||||
// The store is the cross-cutting state for that flow. It MUST stay coherent
|
||||
// across API replicas — POST, heartbeat and poll can each land on a different
|
||||
// node, and they all need to see the same request lifecycle. The single-node
|
||||
// in-memory implementation is fine for self-hosted dev; multi-node deploys
|
||||
// (Multica Cloud) MUST use the Redis-backed implementation, otherwise the
|
||||
// pending request is invisible to whichever replica receives the next call
|
||||
// and the picker shows "No models available" (regression: see issue
|
||||
// review on multica-ai/multica#2009).
|
||||
|
||||
// ModelListStatus represents the lifecycle of a model list request.
|
||||
type ModelListStatus string
|
||||
@@ -35,15 +46,20 @@ const (
|
||||
// selection entirely (currently: hermes). The UI uses this to
|
||||
// disable its dropdown rather than silently accepting a value the
|
||||
// backend will drop.
|
||||
//
|
||||
// RunStartedAt is set when PopPending claims the request. It is
|
||||
// `json:"-"` because it's a server-side bookkeeping field — the UI only
|
||||
// needs Status / UpdatedAt to drive the polling loop.
|
||||
type ModelListRequest struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status ModelListStatus `json:"status"`
|
||||
Models []ModelEntry `json:"models,omitempty"`
|
||||
Supported bool `json:"supported"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status ModelListStatus `json:"status"`
|
||||
Models []ModelEntry `json:"models,omitempty"`
|
||||
Supported bool `json:"supported"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
}
|
||||
|
||||
// ModelEntry mirrors agent.Model for the wire. `Default` tags the
|
||||
@@ -67,32 +83,83 @@ const (
|
||||
// carrying `pending_model_list` is lost in transit (e.g. HTTP client
|
||||
// timeout after PopPending already mutated store state): without this
|
||||
// transition the UI would keep polling a record that is stuck in
|
||||
// `running` until the 2-minute memory GC sweeps it.
|
||||
// `running` until retention sweeps it.
|
||||
modelListRunningTimeout = 60 * time.Second
|
||||
// modelListStoreRetention bounds how long any stored request lives in
|
||||
// the backing store. The Redis backend uses it as a TTL; the in-memory
|
||||
// backend GCs on Create. The window is deliberately wider than the
|
||||
// running/pending timeouts so terminal records are still readable when
|
||||
// the UI's last poll arrives.
|
||||
modelListStoreRetention = 2 * time.Minute
|
||||
)
|
||||
|
||||
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
|
||||
// to bound memory use; the UI polls /requests/:id until status is terminal.
|
||||
type ModelListStore struct {
|
||||
// ModelListStore is the contract every backend (in-memory single-node,
|
||||
// Redis multi-node) must satisfy. Methods take a context so the Redis
|
||||
// implementation can honour the heartbeat-side timeout that gates a
|
||||
// slow shared store from stalling the rest of the heartbeat.
|
||||
type ModelListStore interface {
|
||||
Create(ctx context.Context, runtimeID string) (*ModelListRequest, error)
|
||||
Get(ctx context.Context, id string) (*ModelListRequest, error)
|
||||
// HasPending is a cheap read-only probe used by the heartbeat hot path
|
||||
// to gate the side-effecting PopPending. A spurious "true" is fine —
|
||||
// PopPending handles "queue empty after probe" by returning nil.
|
||||
HasPending(ctx context.Context, runtimeID string) (bool, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error)
|
||||
Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
}
|
||||
|
||||
// applyModelListTimeout transitions a request to ModelListTimeout when it has
|
||||
// been stuck in a non-terminal state past its threshold. Returns true when
|
||||
// the record was modified so callers can persist the change. The pending
|
||||
// threshold catches "daemon never picked this up"; the running threshold
|
||||
// catches "daemon picked it up but the result report was lost" — without
|
||||
// the running escape, only retention sweep ends the polling loop.
|
||||
func applyModelListTimeout(req *ModelListRequest, now time.Time) bool {
|
||||
switch req.Status {
|
||||
case ModelListPending:
|
||||
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
case ModelListRunning:
|
||||
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > modelListRunningTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// InMemoryModelListStore is the single-node implementation. Adequate for
|
||||
// self-hosted dev and the test suite, but unsafe in multi-node deploys
|
||||
// (each replica gets its own map and the pending request is invisible to
|
||||
// every replica that didn't receive the POST).
|
||||
type InMemoryModelListStore struct {
|
||||
mu sync.Mutex
|
||||
requests map[string]*ModelListRequest
|
||||
}
|
||||
|
||||
func NewModelListStore() *ModelListStore {
|
||||
return &ModelListStore{requests: make(map[string]*ModelListRequest)}
|
||||
func NewInMemoryModelListStore() *InMemoryModelListStore {
|
||||
return &InMemoryModelListStore{requests: make(map[string]*ModelListRequest)}
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) Create(_ context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Garbage-collect stale entries so the map can't grow unbounded.
|
||||
for id, req := range s.requests {
|
||||
if time.Since(req.CreatedAt) > 2*time.Minute {
|
||||
if time.Since(req.CreatedAt) > modelListStoreRetention {
|
||||
delete(s.requests, id)
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req := &ModelListRequest{
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
@@ -100,55 +167,47 @@ func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
|
||||
// Default to true; the daemon overrides this in the report
|
||||
// for providers that don't support per-agent model selection.
|
||||
Supported: true,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
s.requests[req.ID] = req
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Get(id string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) Get(_ context.Context, id string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
req, ok := s.requests[id]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
applyModelListTimeout(req, time.Now())
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// applyModelListTimeout transitions a request to ModelListTimeout when it has
|
||||
// been stuck in a non-terminal state past its threshold. The pending threshold
|
||||
// catches "daemon never picked this up"; the running threshold catches
|
||||
// "daemon picked it up but the result report was lost" — previously the only
|
||||
// escape from running was the 2-minute memory GC, which exceeded the UI's
|
||||
// polling window and surfaced as a silent discovery failure.
|
||||
func applyModelListTimeout(req *ModelListRequest, now time.Time) {
|
||||
switch req.Status {
|
||||
case ModelListPending:
|
||||
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
case ModelListRunning:
|
||||
if now.Sub(req.UpdatedAt) > modelListRunningTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
func (s *InMemoryModelListStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
applyModelListTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == ModelListPending {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// PopPending returns and marks-running the oldest pending request for a runtime.
|
||||
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) PopPending(_ context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var oldest *ModelListRequest
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
applyModelListTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == ModelListPending {
|
||||
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
|
||||
oldest = req
|
||||
@@ -157,12 +216,14 @@ func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
|
||||
}
|
||||
if oldest != nil {
|
||||
oldest.Status = ModelListRunning
|
||||
oldest.UpdatedAt = time.Now()
|
||||
startedAt := now
|
||||
oldest.RunStartedAt = &startedAt
|
||||
oldest.UpdatedAt = now
|
||||
}
|
||||
return oldest
|
||||
return oldest, nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool) {
|
||||
func (s *InMemoryModelListStore) Complete(_ context.Context, id string, models []ModelEntry, supported bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -172,9 +233,10 @@ func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool
|
||||
req.Supported = supported
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Fail(id string, errMsg string) {
|
||||
func (s *InMemoryModelListStore) Fail(_ context.Context, id string, errMsg string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -183,6 +245,11 @@ func (s *ModelListStore) Fail(id string, errMsg string) {
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func modelListRequestTerminal(status ModelListStatus) bool {
|
||||
return status == ModelListCompleted || status == ModelListFailed || status == ModelListTimeout
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -211,7 +278,11 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
req := h.ModelListStore.Create(uuidToString(rt.ID))
|
||||
req, err := h.ModelListStore.Create(r.Context(), uuidToString(rt.ID))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to enqueue model list request: "+err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, req)
|
||||
}
|
||||
|
||||
@@ -219,7 +290,11 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) GetModelListRequest(w http.ResponseWriter, r *http.Request) {
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
|
||||
req := h.ModelListStore.Get(requestID)
|
||||
req, err := h.ModelListStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req == nil {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -237,6 +312,24 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
|
||||
// Fetch first so we can ignore stale reports for already-terminal
|
||||
// requests (e.g. the heartbeat response that triggered the daemon
|
||||
// run was a retry, and the original report already landed).
|
||||
existing, err := h.ModelListStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if existing == nil || existing.RuntimeID != runtimeID {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
}
|
||||
if modelListRequestTerminal(existing.Status) {
|
||||
slog.Debug("ignoring stale model list report", "runtime_id", runtimeID, "request_id", requestID, "status", existing.Status)
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Status string `json:"status"` // "completed" or "failed"
|
||||
Models []ModelEntry `json:"models"`
|
||||
@@ -255,9 +348,21 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
|
||||
if body.Supported != nil {
|
||||
supported = *body.Supported
|
||||
}
|
||||
h.ModelListStore.Complete(requestID, body.Models, supported)
|
||||
if err := h.ModelListStore.Complete(r.Context(), requestID, body.Models, supported); err != nil {
|
||||
// Surface the store failure as 5xx so the daemon can retry instead
|
||||
// of swallowing the report (leaves the request stuck in running
|
||||
// until the server-side timeout, which is exactly the "looks OK
|
||||
// but nothing happens" class of bug we're trying to avoid).
|
||||
slog.Error("ModelListStore Complete failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist completion")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
h.ModelListStore.Fail(requestID, body.Error)
|
||||
if err := h.ModelListStore.Fail(r.Context(), requestID, body.Error); err != nil {
|
||||
slog.Error("ModelListStore Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
slog.Debug("model list report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Models))
|
||||
|
||||
250
server/internal/handler/runtime_models_redis_store.go
Normal file
250
server/internal/handler/runtime_models_redis_store.go
Normal file
@@ -0,0 +1,250 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Redis-backed implementation of ModelListStore. The wire layout matches
|
||||
// runtime_local_skills_redis_store.go (which solves the same multi-node
|
||||
// dispatch problem for skill lists/imports) so the operational story is
|
||||
// identical: namespaced keys, ZSET-backed pending queue, atomic claim via
|
||||
// the shared Lua script.
|
||||
//
|
||||
// Key layout:
|
||||
//
|
||||
// mul:model_list:req:<request_id> → JSON-encoded ModelListRequest, TTL = retention
|
||||
// mul:model_list:pending:<runtime_id> → ZSET { member = request_id, score = created_at UnixNano }
|
||||
// TTL = retention*2 (kept alive long enough for
|
||||
// lazy sweep on PopPending)
|
||||
//
|
||||
// PopPending uses claimPendingScript (defined in
|
||||
// runtime_local_skills_redis_store.go) to atomically ZREM the pending entry
|
||||
// and SET the record to "running" — splitting those two writes would strand
|
||||
// requests on a transient Redis hiccup between them.
|
||||
|
||||
const (
|
||||
// Namespaced under mul:model_list:* so the key set doesn't collide with
|
||||
// the realtime relay (ws:*) or the local-skill stores (mul:local_skill:*).
|
||||
modelListKeyPrefix = "mul:model_list:req:"
|
||||
modelListPendingPrefix = "mul:model_list:pending:"
|
||||
modelListRedisPopMaxRetries = 5
|
||||
)
|
||||
|
||||
func modelListKey(id string) string { return modelListKeyPrefix + id }
|
||||
func modelListPendingKey(runtimeID string) string { return modelListPendingPrefix + runtimeID }
|
||||
|
||||
// RedisModelListStore stores model list requests in Redis so every API node
|
||||
// agrees on the same pending / running / terminal state.
|
||||
type RedisModelListStore struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewRedisModelListStore(rdb *redis.Client) *RedisModelListStore {
|
||||
return &RedisModelListStore{rdb: rdb}
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) Create(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
now := time.Now()
|
||||
req := &ModelListRequest{
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
Status: ModelListPending,
|
||||
Supported: true,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
data, err := s.marshalRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipe := s.rdb.TxPipeline()
|
||||
pipe.Set(ctx, modelListKey(req.ID), data, modelListStoreRetention)
|
||||
pipe.ZAdd(ctx, modelListPendingKey(runtimeID), redis.Z{
|
||||
Score: float64(now.UnixNano()),
|
||||
Member: req.ID,
|
||||
})
|
||||
// Keep the pending zset alive past the per-record retention so stale
|
||||
// members can be lazily swept on PopPending.
|
||||
pipe.Expire(ctx, modelListPendingKey(runtimeID), modelListStoreRetention*2)
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
return nil, fmt.Errorf("persist model list request: %w", err)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) Get(ctx context.Context, id string) (*ModelListRequest, error) {
|
||||
return s.loadRequest(ctx, id)
|
||||
}
|
||||
|
||||
// loadRequest fetches a single record, applies timeout transitions if the
|
||||
// stored state has aged past the threshold, and persists the transition so
|
||||
// sibling nodes observe the same terminal state.
|
||||
func (s *RedisModelListStore) loadRequest(ctx context.Context, id string) (*ModelListRequest, error) {
|
||||
raw, err := s.rdb.Get(ctx, modelListKey(id)).Bytes()
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get model list request: %w", err)
|
||||
}
|
||||
req, err := s.unmarshalRequest(raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if applyModelListTimeout(req, time.Now()) {
|
||||
if err := s.persistRequest(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Drop from pending zset on terminal transition. PopPending would
|
||||
// also do this, but doing it here keeps the set clean for readers
|
||||
// that never call PopPending.
|
||||
s.rdb.ZRem(ctx, modelListPendingKey(req.RuntimeID), req.ID)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) persistRequest(ctx context.Context, req *ModelListRequest) error {
|
||||
data, err := s.marshalRequest(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.rdb.Set(ctx, modelListKey(req.ID), data, modelListStoreRetention).Err(); err != nil {
|
||||
return fmt.Errorf("persist model list request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ModelListRequest tags RunStartedAt as `json:"-"` so the server-side
|
||||
// bookkeeping field doesn't leak into the HTTP response (the UI only
|
||||
// needs Status / UpdatedAt to drive its polling loop). Redis persistence
|
||||
// has to keep that field, otherwise the running-timeout escape hatch
|
||||
// silently breaks across nodes — every reader sees RunStartedAt=nil and
|
||||
// applyModelListTimeout's running branch becomes a no-op. Wrap in an
|
||||
// internal envelope that re-promotes the field on the wire.
|
||||
type redisModelListEnvelope struct {
|
||||
Public *ModelListRequest `json:"r"`
|
||||
RunStartedAt *time.Time `json:"s,omitempty"`
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) marshalRequest(req *ModelListRequest) ([]byte, error) {
|
||||
env := redisModelListEnvelope{Public: req, RunStartedAt: req.RunStartedAt}
|
||||
data, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal model list request: %w", err)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) unmarshalRequest(raw []byte) (*ModelListRequest, error) {
|
||||
var env redisModelListEnvelope
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
return nil, fmt.Errorf("decode model list request: %w", err)
|
||||
}
|
||||
if env.Public == nil {
|
||||
return nil, fmt.Errorf("decode model list request: missing payload")
|
||||
}
|
||||
env.Public.RunStartedAt = env.RunStartedAt
|
||||
return env.Public, nil
|
||||
}
|
||||
|
||||
// HasPending is a cheap read-only ZCARD probe used by the heartbeat hot path
|
||||
// to decide whether to invoke the side-effecting PopPending.
|
||||
func (s *RedisModelListStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
|
||||
cnt, err := s.rdb.ZCard(ctx, modelListPendingKey(runtimeID)).Result()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("zcard pending: %w", err)
|
||||
}
|
||||
return cnt > 0, nil
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
pendingKey := modelListPendingKey(runtimeID)
|
||||
|
||||
for attempt := 0; attempt < modelListRedisPopMaxRetries; attempt++ {
|
||||
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("zrange pending: %w", err)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
id := ids[0]
|
||||
|
||||
req, err := s.loadRequest(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req == nil {
|
||||
// Record expired but the zset still references it — drop and retry.
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
if req.Status != ModelListPending {
|
||||
// Either the timeout fired inside loadRequest or another node
|
||||
// already picked it up. Unlink from the pending set and retry.
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req.Status = ModelListRunning
|
||||
req.RunStartedAt = &now
|
||||
req.UpdatedAt = now
|
||||
data, err := s.marshalRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := claimPendingScript.Run(
|
||||
ctx, s.rdb,
|
||||
[]string{pendingKey, modelListKey(id)},
|
||||
id, data, int(modelListStoreRetention.Seconds()),
|
||||
).Int64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claim pending: %w", err)
|
||||
}
|
||||
if result == 0 {
|
||||
// Another node won the race. The record is owned by the winner;
|
||||
// retry to pick up whatever else is queued (or nothing).
|
||||
continue
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error {
|
||||
req, err := s.loadRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = ModelListCompleted
|
||||
req.Models = models
|
||||
req.Supported = supported
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistRequest(ctx, req)
|
||||
}
|
||||
|
||||
func (s *RedisModelListStore) Fail(ctx context.Context, id string, errMsg string) error {
|
||||
req, err := s.loadRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = ModelListFailed
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistRequest(ctx, req)
|
||||
}
|
||||
300
server/internal/handler/runtime_models_redis_store_test.go
Normal file
300
server/internal/handler/runtime_models_redis_store_test.go
Normal file
@@ -0,0 +1,300 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Reuses the newRedisTestClient helper from
|
||||
// runtime_local_skills_redis_store_test.go: same Redis instance, same gating
|
||||
// on REDIS_TEST_URL, same FlushDB-per-test isolation.
|
||||
|
||||
// TestRedisModelListStore_EnvelopePersistsRunStartedAt is a pure marshal/
|
||||
// unmarshal round-trip — no Redis required. Pins the regression that the
|
||||
// `json:"-"` tag on ModelListRequest.RunStartedAt was silently dropping the
|
||||
// field on persistence, which broke the running-timeout escape hatch
|
||||
// across nodes (CI failure for TestRedisModelListStore_RunningTimeout
|
||||
// before this fix).
|
||||
func TestRedisModelListStore_EnvelopePersistsRunStartedAt(t *testing.T) {
|
||||
store := &RedisModelListStore{}
|
||||
now := time.Now().UTC().Truncate(time.Microsecond) // JSON loses sub-µs precision
|
||||
req := &ModelListRequest{
|
||||
ID: "id-1",
|
||||
RuntimeID: "rt-1",
|
||||
Status: ModelListRunning,
|
||||
Supported: true,
|
||||
CreatedAt: now.Add(-time.Second),
|
||||
UpdatedAt: now,
|
||||
RunStartedAt: &now,
|
||||
}
|
||||
data, err := store.marshalRequest(req)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
got, err := store.unmarshalRequest(data)
|
||||
if err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if got.RunStartedAt == nil {
|
||||
t.Fatal("RunStartedAt lost on round trip — running timeout would never fire across nodes")
|
||||
}
|
||||
if !got.RunStartedAt.Equal(now) {
|
||||
t.Errorf("RunStartedAt drifted: got %s, want %s", got.RunStartedAt, now)
|
||||
}
|
||||
if got.Status != ModelListRunning {
|
||||
t.Errorf("Status lost: got %s", got.Status)
|
||||
}
|
||||
if got.ID != "id-1" || got.RuntimeID != "rt-1" {
|
||||
t.Errorf("identifiers lost: %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisModelListStore_CreateGetComplete(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisModelListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-1")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
if req.Status != ModelListPending {
|
||||
t.Fatalf("initial status = %s", req.Status)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil || got.ID != req.ID {
|
||||
t.Fatalf("round trip lost id: got=%v", got)
|
||||
}
|
||||
|
||||
models := []ModelEntry{
|
||||
{ID: "claude-sonnet-4-6", Label: "Claude Sonnet 4.6", Provider: "anthropic", Default: true},
|
||||
{ID: "claude-opus-4-7", Label: "Claude Opus 4.7", Provider: "anthropic"},
|
||||
}
|
||||
if err := store.Complete(ctx, req.ID, models, true); err != nil {
|
||||
t.Fatalf("complete: %v", err)
|
||||
}
|
||||
|
||||
got, err = store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get after complete: %v", err)
|
||||
}
|
||||
if got.Status != ModelListCompleted {
|
||||
t.Fatalf("status after complete = %s", got.Status)
|
||||
}
|
||||
if len(got.Models) != 2 {
|
||||
t.Fatalf("models not persisted: %+v", got.Models)
|
||||
}
|
||||
if !got.Models[0].Default {
|
||||
t.Fatalf("default flag lost on round trip: %+v", got.Models[0])
|
||||
}
|
||||
if !got.Supported {
|
||||
t.Fatalf("supported flag lost on round trip")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisModelListStore_PopPendingAcrossInstances is the regression test
|
||||
// for the exact bug this PR fixes: two API replicas share one Redis, one
|
||||
// receives the POST that creates the request, the other receives the daemon
|
||||
// heartbeat that PopPending-s it. Before this change the in-memory store made
|
||||
// node B see nothing, the request timed out, and the picker showed
|
||||
// "No models available" forever.
|
||||
func TestRedisModelListStore_PopPendingAcrossInstances(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
|
||||
nodeA := NewRedisModelListStore(rdb)
|
||||
nodeB := NewRedisModelListStore(rdb)
|
||||
|
||||
req, err := nodeA.Create(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node A create: %v", err)
|
||||
}
|
||||
|
||||
popped, err := nodeB.PopPending(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node B pop: %v", err)
|
||||
}
|
||||
if popped == nil {
|
||||
t.Fatal("node B did not see node A's pending request")
|
||||
}
|
||||
if popped.ID != req.ID {
|
||||
t.Fatalf("popped id = %s, want %s", popped.ID, req.ID)
|
||||
}
|
||||
if popped.Status != ModelListRunning {
|
||||
t.Fatalf("popped status = %s, want running", popped.Status)
|
||||
}
|
||||
if popped.RunStartedAt == nil {
|
||||
t.Fatal("run_started_at not set after pop")
|
||||
}
|
||||
|
||||
// A third pop must see nothing (claim was atomic).
|
||||
again, err := nodeB.PopPending(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node B second pop: %v", err)
|
||||
}
|
||||
if again != nil {
|
||||
t.Fatalf("expected no more pending, got %+v", again)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisModelListStore_PopPendingConcurrent asserts the ZREM-wins race
|
||||
// guard: N concurrent PopPending calls against a single pending request
|
||||
// return exactly one winner.
|
||||
func TestRedisModelListStore_PopPendingConcurrent(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisModelListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-race")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
const N = 8
|
||||
var wg sync.WaitGroup
|
||||
results := make(chan *ModelListRequest, N)
|
||||
errs := make(chan error, N)
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
popped, err := store.PopPending(ctx, "runtime-race")
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
results <- popped
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(results)
|
||||
close(errs)
|
||||
|
||||
for err := range errs {
|
||||
t.Fatalf("concurrent pop error: %v", err)
|
||||
}
|
||||
|
||||
winners := 0
|
||||
for popped := range results {
|
||||
if popped != nil {
|
||||
winners++
|
||||
if popped.ID != req.ID {
|
||||
t.Fatalf("winner popped wrong id: %s", popped.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
if winners != 1 {
|
||||
t.Fatalf("expected exactly one winner, got %d", winners)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisModelListStore_PendingTimeout pins the lazy timeout sweep — a
|
||||
// pending request whose CreatedAt has aged past the 30s threshold MUST
|
||||
// transition to Timeout on the next Get and be evicted from the pending
|
||||
// zset so a subsequent PopPending doesn't re-claim it.
|
||||
func TestRedisModelListStore_PendingTimeout(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisModelListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
// Rewind CreatedAt so the pending threshold is blown — simulates 31s of
|
||||
// daemon silence without actually blocking the test that long.
|
||||
req.CreatedAt = time.Now().Add(-modelListPendingTimeout - time.Second)
|
||||
if err := store.persistRequest(ctx, req); err != nil {
|
||||
t.Fatalf("persist rewound: %v", err)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got.Status != ModelListTimeout {
|
||||
t.Fatalf("status = %s, want timeout", got.Status)
|
||||
}
|
||||
|
||||
// A subsequent PopPending must NOT return a timed-out request.
|
||||
popped, err := store.PopPending(ctx, "runtime-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("pop after timeout: %v", err)
|
||||
}
|
||||
if popped != nil {
|
||||
t.Fatalf("expected no pending after timeout, got %+v", popped)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisModelListStore_RunningTimeout pins the second escape hatch — a
|
||||
// claimed request whose RunStartedAt has aged past the 60s threshold MUST
|
||||
// flip to Timeout so the UI's polling loop terminates instead of waiting
|
||||
// for the retention sweep.
|
||||
func TestRedisModelListStore_RunningTimeout(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisModelListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-running-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
popped, err := store.PopPending(ctx, "runtime-running-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if popped == nil || popped.Status != ModelListRunning {
|
||||
t.Fatalf("expected running, got %+v", popped)
|
||||
}
|
||||
|
||||
// Rewind RunStartedAt past the running threshold.
|
||||
aged := time.Now().Add(-modelListRunningTimeout - time.Second)
|
||||
popped.RunStartedAt = &aged
|
||||
if err := store.persistRequest(ctx, popped); err != nil {
|
||||
t.Fatalf("persist rewound: %v", err)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got.Status != ModelListTimeout {
|
||||
t.Fatalf("status = %s, want timeout", got.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisModelListStore_HasPending pins the cheap probe used by the
|
||||
// heartbeat hot path so a slow Redis can't stall every connected daemon.
|
||||
func TestRedisModelListStore_HasPending(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisModelListStore(rdb)
|
||||
|
||||
if has, err := store.HasPending(ctx, "rt-empty"); err != nil || has {
|
||||
t.Fatalf("empty store should not report pending: has=%v err=%v", has, err)
|
||||
}
|
||||
|
||||
if _, err := store.Create(ctx, "rt-1"); err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
if has, err := store.HasPending(ctx, "rt-1"); err != nil || !has {
|
||||
t.Fatalf("expected pending=true after Create: has=%v err=%v", has, err)
|
||||
}
|
||||
if has, err := store.HasPending(ctx, "rt-other"); err != nil || has {
|
||||
t.Fatalf("expected pending=false for unrelated runtime: has=%v err=%v", has, err)
|
||||
}
|
||||
|
||||
if _, err := store.PopPending(ctx, "rt-1"); err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
|
||||
t.Fatalf("expected pending=false after PopPending: has=%v err=%v", has, err)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -16,21 +17,35 @@ import (
|
||||
// way out of Running was the 2-minute memory GC, which exceeded the UI
|
||||
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
|
||||
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
store := NewModelListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
claimed := store.PopPending("runtime-xyz")
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryModelListStore()
|
||||
req, err := store.Create(ctx, "runtime-xyz")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
claimed, err := store.PopPending(ctx, "runtime-xyz")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if claimed == nil {
|
||||
t.Fatal("expected PopPending to claim the pending request")
|
||||
}
|
||||
if claimed.Status != ModelListRunning {
|
||||
t.Fatalf("expected Running after PopPending, got %s", claimed.Status)
|
||||
}
|
||||
if claimed.RunStartedAt == nil {
|
||||
t.Fatal("expected RunStartedAt to be set on PopPending")
|
||||
}
|
||||
|
||||
// Age the running record past the threshold without the daemon ever
|
||||
// reporting a result. Get() must flip it to Timeout so the UI can
|
||||
// terminate polling instead of waiting for the 2-minute GC.
|
||||
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
|
||||
got := store.Get(req.ID)
|
||||
// terminate polling instead of waiting for the retention sweep.
|
||||
aged := time.Now().Add(-(modelListRunningTimeout + time.Second))
|
||||
claimed.RunStartedAt = &aged
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("expected stored request")
|
||||
}
|
||||
@@ -48,8 +63,12 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
// dropped here (e.g. by going through a map[string]string), the badge
|
||||
// silently disappears.
|
||||
func TestReportModelListResult_PreservesDefault(t *testing.T) {
|
||||
store := NewModelListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryModelListStore()
|
||||
req, err := store.Create(ctx, "runtime-xyz")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
// Report a completed result with one default entry and one not.
|
||||
body := map[string]any{
|
||||
@@ -72,9 +91,14 @@ func TestReportModelListResult_PreservesDefault(t *testing.T) {
|
||||
if err := json.Unmarshal(raw, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal report body: %v", err)
|
||||
}
|
||||
store.Complete(req.ID, parsed.Models, true)
|
||||
if err := store.Complete(ctx, req.ID, parsed.Models, true); err != nil {
|
||||
t.Fatalf("complete: %v", err)
|
||||
}
|
||||
|
||||
got := store.Get(req.ID)
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("expected stored result")
|
||||
}
|
||||
@@ -120,3 +144,55 @@ func TestReportModelListResult_DecodesJSONBodyDefault(t *testing.T) {
|
||||
t.Errorf("default flag lost on model[0]: %+v", body.Models[0])
|
||||
}
|
||||
}
|
||||
|
||||
// TestInMemoryModelListStore_HasPending pins the cheap probe used by the
|
||||
// heartbeat hot path. Empty queue → false; pending request → true; after
|
||||
// PopPending claims the record → false again.
|
||||
func TestInMemoryModelListStore_HasPending(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryModelListStore()
|
||||
|
||||
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
|
||||
t.Fatalf("empty store should not report pending: has=%v err=%v", has, err)
|
||||
}
|
||||
|
||||
if _, err := store.Create(ctx, "rt-1"); err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
if has, err := store.HasPending(ctx, "rt-1"); err != nil || !has {
|
||||
t.Fatalf("expected pending=true after Create: has=%v err=%v", has, err)
|
||||
}
|
||||
// Other runtimes don't see this runtime's queue.
|
||||
if has, err := store.HasPending(ctx, "rt-2"); err != nil || has {
|
||||
t.Fatalf("expected pending=false for unrelated runtime: has=%v err=%v", has, err)
|
||||
}
|
||||
|
||||
if _, err := store.PopPending(ctx, "rt-1"); err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if has, err := store.HasPending(ctx, "rt-1"); err != nil || has {
|
||||
t.Fatalf("expected pending=false after PopPending: has=%v err=%v", has, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInMemoryModelListStore_PopPendingPicksOldest documents the FIFO
|
||||
// ordering so a daemon that handles one request per heartbeat doesn't
|
||||
// starve early queue entries.
|
||||
func TestInMemoryModelListStore_PopPendingPicksOldest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryModelListStore()
|
||||
|
||||
first, _ := store.Create(ctx, "rt-1")
|
||||
// Force a measurable gap so the FIFO comparison isn't on equal
|
||||
// CreatedAt values (possible on platforms with coarse clocks).
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
second, _ := store.Create(ctx, "rt-1")
|
||||
|
||||
got, err := store.PopPending(ctx, "rt-1")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if got == nil || got.ID != first.ID {
|
||||
t.Fatalf("expected first request, got %+v (second was %s)", got, second.ID)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user