Compare commits

...

3 Commits

Author SHA1 Message Date
Jiang Bohan
b57fe97251 fix(server/model-list): add running-timeout so lost heartbeats don't wedge UI
ModelListStore.Get previously only transitioned Pending → Timeout after
30s; a request that had already been claimed (PopPending → Running) but
whose result never came back (because the heartbeat response carrying
`pending_model_list` was lost, e.g. due to the /heartbeat slow path this
PR already addresses) stayed in Running until the 2-minute memory GC.
The UI's polling window is 30s, so the user saw a silent "discovery
failed" even though every individual piece worked — surfaced as
MUL-1397 alongside #1637.

Add a 60s running threshold that mirrors the existing pending one, and
fold both into a single applyModelListTimeout helper so the two
transitions live in one place. A regression test ages a Running record
past the threshold and asserts Get() flips it to Timeout with a useful
error message.
2026-04-25 01:55:25 +08:00
Jiang Bohan
a14949fce6 fix(server/heartbeat): split probe/claim to make the bound ack-safe
Addresses GPT-Boy's blocking review on PR #1644: wrapping the whole
LocalSkill*Store.PopPending in context.WithTimeout is ack-unsafe. The
Redis implementation runs a Lua script that atomically does ZREM +
SET running. If the Go deadline fires after Redis has already executed
the script but before the client reads the response, the handler
silently drops the payload even though the claim succeeded — the
pending request is then stranded in the running state until the
running-timeout sweep (and never delivered to the daemon).

Fix by splitting the two concerns:

- New LocalSkill*Store.HasPending(ctx, runtimeID) — cheap read-only
  probe (ZCARD in Redis, scan-through-map in memory). Safe to bound
  with a context timeout because it has no side effects.
- Heartbeat handler now probes with a 1s timeout, and only when
  HasPending returns true does it invoke the unbounded PopPending.
  Empty-queue ticks (the vast majority) are fast-bounded; claim
  ticks run to completion without a race with the handler deadline.
- Slow-log fields renamed to reflect the new phases:
  probe_skills_ms / pop_skills_ms / probe_import_ms / pop_import_ms
  plus probe_*_timed_out flags.

Tests updated accordingly:
- TestDaemonHeartbeat_SlowProbeDoesNotWedge — stalled HasPending
  returns within the probe bound.
- TestDaemonHeartbeat_EmptyQueueSkipsPopPending — PopPending is
  never invoked when HasPending reports an empty queue, pinning
  the ack-safety invariant GPT-Boy flagged.
2026-04-25 01:42:04 +08:00
Jiang Bohan
743e269488 fix(server/heartbeat): bound LocalSkill PopPending calls + add slow-log
/api/daemon/heartbeat was observed in prod at 12-24s response time (GH
#1637), which is long enough to time out some daemon clients and is
flatly wrong for a "liveness ping" that should be sub-100ms. The two
LocalSkill PopPending calls in the handler hit the shared (Redis) store
and had no per-call bound, so a transient store stall propagated
directly into the request latency.

- Wrap each LocalSkill PopPending with a 1s context.WithTimeout. On
  timeout we log the skip and fall through without returning a pending
  request; the next heartbeat (default 15s later) re-reads the pending
  set, so pending requests are only delayed, never dropped.
- Add logHeartbeatEndpointSlow mirroring logClaimEndpointSlow, so any
  heartbeat >500ms or any PopPending timeout emits one structured log
  with per-phase timings (auth / update / pop_skills / pop_import).
  This replaces "heartbeat feels slow" with attributable data.
- Regression test asserts the handler returns 200 within a few seconds
  when both stores stall, proving the bound actually bounds.
2026-04-25 01:19:19 +08:00
6 changed files with 393 additions and 50 deletions

View File

@@ -1,9 +1,11 @@
package handler
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
@@ -483,25 +485,55 @@ type DaemonHeartbeatRequest struct {
RuntimeID string `json:"runtime_id"`
}
// heartbeatHasPendingTimeout bounds the cheap HasPending probe on the
// heartbeat hot path. Probes are read-only (ZCARD in Redis) so a timeout is
// ack-safe: the worst case is "we didn't find out if anything was queued this
// tick" and the next heartbeat (default 15s later) will try again.
//
// PopPending is deliberately NOT bounded this way — its Redis implementation
// runs a Lua claim script whose ZREM + SET-running side effects cannot be
// cleanly un-run from the client side if the context expires mid-script. We
// therefore only invoke PopPending after HasPending confirms there is work
// to claim, so we never start a claim we might have to abort.
const heartbeatHasPendingTimeout = 1 * time.Second
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var (
outcome = "unauth"
runtimeID string
authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
probeSkillsTimedOut, probeImportTimedOut bool
)
defer func() {
logHeartbeatEndpointSlow(runtimeID, outcome, start, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeSkillsTimedOut, probeImportTimedOut)
}()
var req DaemonHeartbeatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
outcome = "bad_body"
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.RuntimeID == "" {
outcome = "missing_runtime_id"
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
runtimeID = req.RuntimeID
// Verify the caller owns this runtime's workspace.
if _, ok := h.requireDaemonRuntimeAccess(w, r, req.RuntimeID); !ok {
return
}
authMs = time.Since(start).Milliseconds()
updateStart := time.Now()
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
updateMs = time.Since(updateStart).Milliseconds()
if err != nil {
outcome = "error_update"
writeError(w, http.StatusInternalServerError, "heartbeat failed")
return
}
@@ -523,27 +555,90 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
resp["pending_model_list"] = map[string]string{"id": pending.ID}
}
// Check for pending local-skill list requests for this runtime.
if pending, err := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID); err != nil {
slog.Warn("local skill list PopPending failed", "error", err, "runtime_id", req.RuntimeID)
} else if pending != nil {
resp["pending_local_skills"] = map[string]string{"id": pending.ID}
}
// Check for pending local-skill import requests for this runtime.
if pending, err := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID); err != nil {
slog.Warn("local skill import PopPending failed", "error", err, "runtime_id", req.RuntimeID)
} else if pending != nil {
payload := map[string]string{
"id": pending.ID,
"skill_key": pending.SkillKey,
// Probe then claim the local-skill list queue. The probe is bounded so a
// slow shared store cannot stall the heartbeat on empty-queue ticks; the
// claim runs unbounded (it inherits only r.Context()) because its Lua
// side effects cannot be safely aborted mid-script.
probeSkillsStart := time.Now()
probeSkillsCtx, cancelProbeSkills := context.WithTimeout(r.Context(), heartbeatHasPendingTimeout)
hasSkills, probeErr := h.LocalSkillListStore.HasPending(probeSkillsCtx, req.RuntimeID)
cancelProbeSkills()
probeSkillsMs = time.Since(probeSkillsStart).Milliseconds()
switch {
case probeErr == nil && hasSkills:
popStart := time.Now()
pendingSkills, popErr := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID)
popSkillsMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("local skill list PopPending failed", "error", popErr, "runtime_id", req.RuntimeID)
} else if pendingSkills != nil {
resp["pending_local_skills"] = map[string]string{"id": pendingSkills.ID}
}
case probeErr != nil:
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
probeSkillsTimedOut = true
slog.Warn("local skill list HasPending timed out", "runtime_id", req.RuntimeID, "elapsed_ms", probeSkillsMs)
} else {
slog.Warn("local skill list HasPending failed", "error", probeErr, "runtime_id", req.RuntimeID)
}
resp["pending_local_skill_import"] = payload
}
// Same probe-then-claim pattern for the import queue.
probeImportStart := time.Now()
probeImportCtx, cancelProbeImport := context.WithTimeout(r.Context(), heartbeatHasPendingTimeout)
hasImport, probeErr := h.LocalSkillImportStore.HasPending(probeImportCtx, req.RuntimeID)
cancelProbeImport()
probeImportMs = time.Since(probeImportStart).Milliseconds()
switch {
case probeErr == nil && hasImport:
popStart := time.Now()
pendingImport, popErr := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID)
popImportMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("local skill import PopPending failed", "error", popErr, "runtime_id", req.RuntimeID)
} else if pendingImport != nil {
resp["pending_local_skill_import"] = map[string]string{
"id": pendingImport.ID,
"skill_key": pendingImport.SkillKey,
}
}
case probeErr != nil:
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
probeImportTimedOut = true
slog.Warn("local skill import HasPending timed out", "runtime_id", req.RuntimeID, "elapsed_ms", probeImportMs)
} else {
slog.Warn("local skill import HasPending failed", "error", probeErr, "runtime_id", req.RuntimeID)
}
}
outcome = "ok"
writeJSON(w, http.StatusOK, resp)
}
// logHeartbeatEndpointSlow emits one structured log when /api/daemon/heartbeat
// exceeds 500ms, splitting auth / update / probe / pop phases for both queues
// so the prod tail can be attributed without flooding logs at normal rates.
// Mirrors logClaimEndpointSlow for consistency.
func logHeartbeatEndpointSlow(runtimeID, outcome string, start time.Time, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeSkillsTimedOut, probeImportTimedOut bool) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 500 && !probeSkillsTimedOut && !probeImportTimedOut {
return
}
slog.Info("heartbeat_endpoint slow",
"runtime_id", runtimeID,
"outcome", outcome,
"total_ms", totalMs,
"auth_ms", authMs,
"update_ms", updateMs,
"probe_skills_ms", probeSkillsMs,
"pop_skills_ms", popSkillsMs,
"probe_import_ms", probeImportMs,
"pop_import_ms", popImportMs,
"probe_skills_timed_out", probeSkillsTimedOut,
"probe_import_timed_out", probeImportTimedOut,
)
}
// logClaimEndpointSlow emits one structured log when the /tasks/claim endpoint
// exceeds 500ms, splitting auth / claim / response-build phases so the prod
// tail can be diagnosed without flooding logs at normal poll rates.

View File

@@ -8,11 +8,55 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/multica-ai/multica/server/internal/middleware"
)
// slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside
// HasPending until the provided context is cancelled. PopPending delegates
// to the underlying store. Used to verify that a stalled probe cannot wedge
// the heartbeat — the bound context must cut it short — while the ack-safe
// PopPending path is never reached because HasPending returns an error, not
// true.
type slowProbeLocalSkillListStore struct{ LocalSkillListStore }
func (s slowProbeLocalSkillListStore) HasPending(ctx context.Context, _ string) (bool, error) {
<-ctx.Done()
return false, ctx.Err()
}
type slowProbeLocalSkillImportStore struct{ LocalSkillImportStore }
func (s slowProbeLocalSkillImportStore) HasPending(ctx context.Context, _ string) (bool, error) {
<-ctx.Done()
return false, ctx.Err()
}
// popRecordingLocalSkillListStore counts PopPending calls so a test can assert
// that the handler never reaches the ack-unsafe side-effecting claim path
// when HasPending reports an empty queue.
type popRecordingLocalSkillListStore struct {
LocalSkillListStore
popCalls int
}
func (s *popRecordingLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
s.popCalls++
return s.LocalSkillListStore.PopPending(ctx, runtimeID)
}
type popRecordingLocalSkillImportStore struct {
LocalSkillImportStore
popCalls int
}
func (s *popRecordingLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
s.popCalls++
return s.LocalSkillImportStore.PopPending(ctx, runtimeID)
}
func setHandlerTestWorkspaceRepos(t *testing.T, repos []map[string]string) {
t.Helper()
data, err := json.Marshal(repos)
@@ -138,6 +182,84 @@ func TestDaemonHeartbeat_WithDaemonToken_CrossWorkspace(t *testing.T) {
}
}
// TestDaemonHeartbeat_SlowProbeDoesNotWedge pins the invariant that a stalled
// HasPending probe cannot wedge the heartbeat endpoint past the per-probe
// timeout. The probe is the only bounded call; PopPending is ack-safe-
// critical and is intentionally left unbounded. Without the probe bound the
// heartbeat would hang on a slow shared store.
func TestDaemonHeartbeat_SlowProbeDoesNotWedge(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
origList := testHandler.LocalSkillListStore
origImport := testHandler.LocalSkillImportStore
testHandler.LocalSkillListStore = slowProbeLocalSkillListStore{origList}
testHandler.LocalSkillImportStore = slowProbeLocalSkillImportStore{origImport}
t.Cleanup(func() {
testHandler.LocalSkillListStore = origList
testHandler.LocalSkillImportStore = origImport
})
w := httptest.NewRecorder()
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
"runtime_id": runtimeID,
}, testWorkspaceID, "runtime-local-skills-daemon")
start := time.Now()
testHandler.DaemonHeartbeat(w, req)
elapsed := time.Since(start)
if w.Code != http.StatusOK {
t.Fatalf("DaemonHeartbeat with slow probes: expected 200, got %d: %s", w.Code, w.Body.String())
}
// Two bounded probes at 1s each + a small fixed slack.
if elapsed > 3*time.Second {
t.Fatalf("DaemonHeartbeat took %s; expected fast return despite slow probes", elapsed)
}
}
// TestDaemonHeartbeat_EmptyQueueSkipsPopPending pins the ack-safety property:
// when HasPending reports no work, the heartbeat must NOT invoke PopPending,
// because PopPending's Redis implementation has non-atomic side effects that
// a client-side cancel cannot cleanly un-run (see GH #1637 review).
func TestDaemonHeartbeat_EmptyQueueSkipsPopPending(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
origList := testHandler.LocalSkillListStore
origImport := testHandler.LocalSkillImportStore
listSpy := &popRecordingLocalSkillListStore{LocalSkillListStore: origList}
importSpy := &popRecordingLocalSkillImportStore{LocalSkillImportStore: origImport}
testHandler.LocalSkillListStore = listSpy
testHandler.LocalSkillImportStore = importSpy
t.Cleanup(func() {
testHandler.LocalSkillListStore = origList
testHandler.LocalSkillImportStore = origImport
})
w := httptest.NewRecorder()
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
"runtime_id": runtimeID,
}, testWorkspaceID, "runtime-local-skills-daemon")
testHandler.DaemonHeartbeat(w, req)
if w.Code != http.StatusOK {
t.Fatalf("DaemonHeartbeat: expected 200, got %d: %s", w.Code, w.Body.String())
}
if listSpy.popCalls != 0 {
t.Fatalf("expected 0 PopPending calls on empty list queue, got %d", listSpy.popCalls)
}
if importSpy.popCalls != 0 {
t.Fatalf("expected 0 PopPending calls on empty import queue, got %d", importSpy.popCalls)
}
}
func TestGetTaskStatus_WithDaemonToken_CrossWorkspace(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
@@ -713,7 +835,9 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime(t *testing.T) {
`, legacyAgentID, legacyIssueID, legacyRuntimeID).Scan(&legacyTaskID); err != nil {
t.Fatalf("seed legacy task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID) })
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID)
})
// Register under the new stable UUID, declaring the prior hostname-derived
// id as legacy. The handler should merge the legacy row into the new one.
@@ -795,8 +919,8 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime_ReverseDotLocal(t *testing.T
}
ctx := context.Background()
const legacyDaemonID = "ReverseDotLocalHost" // stored without .local
const emittedLegacyID = "ReverseDotLocalHost.local" // daemon now reports with .local
const legacyDaemonID = "ReverseDotLocalHost" // stored without .local
const emittedLegacyID = "ReverseDotLocalHost.local" // daemon now reports with .local
const newDaemonID = "0192a7b0-0011-7ee9-9c21-30a5bcf86aa2"
var legacyRuntimeID string
@@ -853,8 +977,8 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime_CaseDrift(t *testing.T) {
}
ctx := context.Background()
const storedDaemonID = "Jiayuans-MacBook-Pro.local" // DB has original mixed case
const emittedLegacyID = "jiayuans-macbook-pro.local" // Daemon now reports lowercased
const storedDaemonID = "Jiayuans-MacBook-Pro.local" // DB has original mixed case
const emittedLegacyID = "jiayuans-macbook-pro.local" // Daemon now reports lowercased
const newDaemonID = "0192a7b0-0022-7ee9-9c21-30a5bcf86aa3"
var legacyRuntimeID string

View File

@@ -37,6 +37,11 @@ const (
type LocalSkillListStore interface {
Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error)
// HasPending is a cheap read-only probe that reports whether the runtime
// has at least one pending request. Callers on the hot path (e.g. the
// heartbeat handler) use it to gate the side-effecting PopPending so they
// never start a claim they might have to abort.
HasPending(ctx context.Context, runtimeID string) (bool, error)
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error
Fail(ctx context.Context, id string, errMsg string) error
@@ -48,6 +53,7 @@ type LocalSkillListStore interface {
type LocalSkillImportStore interface {
Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error)
Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error)
HasPending(ctx context.Context, runtimeID string) (bool, error)
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error)
Complete(ctx context.Context, id string, skill SkillResponse) error
Fail(ctx context.Context, id string, errMsg string) error
@@ -106,30 +112,30 @@ type RuntimeLocalSkillSummary struct {
}
type RuntimeLocalSkillListRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status RuntimeLocalSkillRequestStatus `json:"status"`
Skills []RuntimeLocalSkillSummary `json:"skills,omitempty"`
Supported bool `json:"supported"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RunStartedAt *time.Time `json:"-"`
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status RuntimeLocalSkillRequestStatus `json:"status"`
Skills []RuntimeLocalSkillSummary `json:"skills,omitempty"`
Supported bool `json:"supported"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RunStartedAt *time.Time `json:"-"`
}
type RuntimeLocalSkillImportRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
SkillKey string `json:"skill_key"`
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
Status RuntimeLocalSkillRequestStatus `json:"status"`
Skill *SkillResponse `json:"skill,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatorID string `json:"-"`
RunStartedAt *time.Time `json:"-"`
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
SkillKey string `json:"skill_key"`
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
Status RuntimeLocalSkillRequestStatus `json:"status"`
Skill *SkillResponse `json:"skill,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatorID string `json:"-"`
RunStartedAt *time.Time `json:"-"`
}
// InMemoryLocalSkillListStore is the single-node implementation — good enough
@@ -179,6 +185,20 @@ func (s *InMemoryLocalSkillListStore) Get(_ context.Context, id string) (*Runtim
return req, nil
}
func (s *InMemoryLocalSkillListStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for _, req := range s.requests {
applyLocalSkillListTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
return true, nil
}
}
return false, nil
}
func (s *InMemoryLocalSkillListStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -275,6 +295,20 @@ func (s *InMemoryLocalSkillImportStore) Get(_ context.Context, id string) (*Runt
return req, nil
}
func (s *InMemoryLocalSkillImportStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for _, req := range s.requests {
applyLocalSkillImportTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
return true, nil
}
}
return false, nil
}
func (s *InMemoryLocalSkillImportStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()

View File

@@ -40,9 +40,10 @@ const (
)
// claimPendingScript atomically claims a pending request:
// KEYS[1] = pending zset ARGV[1] = request id to claim
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
// ARGV[3] = record TTL in seconds
//
// KEYS[1] = pending zset ARGV[1] = request id to claim
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
// ARGV[3] = record TTL in seconds
//
// Returns 1 when this caller won the claim (zset entry removed, record
// updated), 0 when the entry was already gone (another node won).
@@ -57,7 +58,7 @@ redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3]))
return 1
`)
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
func localSkillListPendingKey(runtimeID string) string {
return localSkillListPendingPrefix + runtimeID
}
@@ -150,6 +151,18 @@ func (s *RedisLocalSkillListStore) persistListRequest(ctx context.Context, req *
return nil
}
// HasPending is a cheap read-only probe (ZCARD) used by hot paths to decide
// whether to invoke the side-effecting PopPending. It does NOT sweep
// expired / already-claimed entries — a spurious "true" is fine because the
// follow-up PopPending still handles the race correctly.
func (s *RedisLocalSkillListStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
cnt, err := s.rdb.ZCard(ctx, localSkillListPendingKey(runtimeID)).Result()
if err != nil {
return false, fmt.Errorf("zcard pending: %w", err)
}
return cnt > 0, nil
}
func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
pendingKey := localSkillListPendingKey(runtimeID)
@@ -352,6 +365,16 @@ func (s *RedisLocalSkillImportStore) unmarshalImport(raw []byte) (*RuntimeLocalS
return env.Public, nil
}
// HasPending mirrors RedisLocalSkillListStore.HasPending — cheap ZCARD probe
// for hot-path gating.
func (s *RedisLocalSkillImportStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
cnt, err := s.rdb.ZCard(ctx, localSkillImportPendingKey(runtimeID)).Result()
if err != nil {
return false, fmt.Errorf("zcard pending: %w", err)
}
return cnt > 0, nil
}
func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
pendingKey := localSkillImportPendingKey(runtimeID)

View File

@@ -57,6 +57,20 @@ type ModelEntry struct {
Default bool `json:"default,omitempty"`
}
const (
// modelListPendingTimeout bounds how long a pending request can sit in
// the store before the UI is told "daemon didn't pick this up".
modelListPendingTimeout = 30 * time.Second
// modelListRunningTimeout bounds how long a claimed (running) request
// can stay claimed before the UI is told "daemon picked this up but
// never reported a result". This matters when the heartbeat response
// carrying `pending_model_list` is lost in transit (e.g. HTTP client
// timeout after PopPending already mutated store state): without this
// transition the UI would keep polling a record that is stuck in
// `running` until the 2-minute memory GC sweeps it.
modelListRunningTimeout = 60 * time.Second
)
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
// to bound memory use; the UI polls /requests/:id until status is terminal.
type ModelListStore struct {
@@ -101,14 +115,33 @@ func (s *ModelListStore) Get(id string) *ModelListRequest {
if !ok {
return nil
}
if req.Status == ModelListPending && time.Since(req.CreatedAt) > 30*time.Second {
req.Status = ModelListTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = time.Now()
}
applyModelListTimeout(req, time.Now())
return req
}
// applyModelListTimeout transitions a request to ModelListTimeout when it has
// been stuck in a non-terminal state past its threshold. The pending threshold
// catches "daemon never picked this up"; the running threshold catches
// "daemon picked it up but the result report was lost" — previously the only
// escape from running was the 2-minute memory GC, which exceeded the UI's
// polling window and surfaced as a silent discovery failure.
func applyModelListTimeout(req *ModelListRequest, now time.Time) {
switch req.Status {
case ModelListPending:
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
}
case ModelListRunning:
if now.Sub(req.UpdatedAt) > modelListRunningTimeout {
req.Status = ModelListTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
}
}
}
// PopPending returns and marks-running the oldest pending request for a runtime.
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
s.mu.Lock()

View File

@@ -6,8 +6,42 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
)
// TestModelListStore_RunningRequestTimesOut pins the escape hatch for
// requests that were claimed (PopPending → Running) but whose result was
// never reported — usually because the heartbeat response carrying the
// `pending_model_list` field was lost in transit. Before this, the only
// way out of Running was the 2-minute memory GC, which exceeded the UI
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
store := NewModelListStore()
req := store.Create("runtime-xyz")
claimed := store.PopPending("runtime-xyz")
if claimed == nil {
t.Fatal("expected PopPending to claim the pending request")
}
if claimed.Status != ModelListRunning {
t.Fatalf("expected Running after PopPending, got %s", claimed.Status)
}
// Age the running record past the threshold without the daemon ever
// reporting a result. Get() must flip it to Timeout so the UI can
// terminate polling instead of waiting for the 2-minute GC.
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
got := store.Get(req.ID)
if got == nil {
t.Fatal("expected stored request")
}
if got.Status != ModelListTimeout {
t.Fatalf("expected Timeout after running threshold, got %s", got.Status)
}
if got.Error == "" {
t.Fatal("expected timeout error message")
}
}
// TestReportModelListResult_PreservesDefault guards the daemon → server
// → UI wire format for the model-discovery result. The `default` bool
// on each ModelEntry lights up the UI's "default" badge; if it gets