mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Compare commits
3 Commits
feat/runti
...
fix/daemon
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b57fe97251 | ||
|
|
a14949fce6 | ||
|
|
743e269488 |
@@ -1,9 +1,11 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -483,25 +485,55 @@ type DaemonHeartbeatRequest struct {
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
}
|
||||
|
||||
// heartbeatHasPendingTimeout bounds the cheap HasPending probe on the
|
||||
// heartbeat hot path. Probes are read-only (ZCARD in Redis) so a timeout is
|
||||
// ack-safe: the worst case is "we didn't find out if anything was queued this
|
||||
// tick" and the next heartbeat (default 15s later) will try again.
|
||||
//
|
||||
// PopPending is deliberately NOT bounded this way — its Redis implementation
|
||||
// runs a Lua claim script whose ZREM + SET-running side effects cannot be
|
||||
// cleanly un-run from the client side if the context expires mid-script. We
|
||||
// therefore only invoke PopPending after HasPending confirms there is work
|
||||
// to claim, so we never start a claim we might have to abort.
|
||||
const heartbeatHasPendingTimeout = 1 * time.Second
|
||||
|
||||
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
var (
|
||||
outcome = "unauth"
|
||||
runtimeID string
|
||||
authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
|
||||
probeSkillsTimedOut, probeImportTimedOut bool
|
||||
)
|
||||
defer func() {
|
||||
logHeartbeatEndpointSlow(runtimeID, outcome, start, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeSkillsTimedOut, probeImportTimedOut)
|
||||
}()
|
||||
|
||||
var req DaemonHeartbeatRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
outcome = "bad_body"
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.RuntimeID == "" {
|
||||
outcome = "missing_runtime_id"
|
||||
writeError(w, http.StatusBadRequest, "runtime_id is required")
|
||||
return
|
||||
}
|
||||
runtimeID = req.RuntimeID
|
||||
|
||||
// Verify the caller owns this runtime's workspace.
|
||||
if _, ok := h.requireDaemonRuntimeAccess(w, r, req.RuntimeID); !ok {
|
||||
return
|
||||
}
|
||||
authMs = time.Since(start).Milliseconds()
|
||||
|
||||
updateStart := time.Now()
|
||||
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
|
||||
updateMs = time.Since(updateStart).Milliseconds()
|
||||
if err != nil {
|
||||
outcome = "error_update"
|
||||
writeError(w, http.StatusInternalServerError, "heartbeat failed")
|
||||
return
|
||||
}
|
||||
@@ -523,27 +555,90 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
resp["pending_model_list"] = map[string]string{"id": pending.ID}
|
||||
}
|
||||
|
||||
// Check for pending local-skill list requests for this runtime.
|
||||
if pending, err := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID); err != nil {
|
||||
slog.Warn("local skill list PopPending failed", "error", err, "runtime_id", req.RuntimeID)
|
||||
} else if pending != nil {
|
||||
resp["pending_local_skills"] = map[string]string{"id": pending.ID}
|
||||
}
|
||||
|
||||
// Check for pending local-skill import requests for this runtime.
|
||||
if pending, err := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID); err != nil {
|
||||
slog.Warn("local skill import PopPending failed", "error", err, "runtime_id", req.RuntimeID)
|
||||
} else if pending != nil {
|
||||
payload := map[string]string{
|
||||
"id": pending.ID,
|
||||
"skill_key": pending.SkillKey,
|
||||
// Probe then claim the local-skill list queue. The probe is bounded so a
|
||||
// slow shared store cannot stall the heartbeat on empty-queue ticks; the
|
||||
// claim runs unbounded (it inherits only r.Context()) because its Lua
|
||||
// side effects cannot be safely aborted mid-script.
|
||||
probeSkillsStart := time.Now()
|
||||
probeSkillsCtx, cancelProbeSkills := context.WithTimeout(r.Context(), heartbeatHasPendingTimeout)
|
||||
hasSkills, probeErr := h.LocalSkillListStore.HasPending(probeSkillsCtx, req.RuntimeID)
|
||||
cancelProbeSkills()
|
||||
probeSkillsMs = time.Since(probeSkillsStart).Milliseconds()
|
||||
switch {
|
||||
case probeErr == nil && hasSkills:
|
||||
popStart := time.Now()
|
||||
pendingSkills, popErr := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID)
|
||||
popSkillsMs = time.Since(popStart).Milliseconds()
|
||||
if popErr != nil {
|
||||
slog.Warn("local skill list PopPending failed", "error", popErr, "runtime_id", req.RuntimeID)
|
||||
} else if pendingSkills != nil {
|
||||
resp["pending_local_skills"] = map[string]string{"id": pendingSkills.ID}
|
||||
}
|
||||
case probeErr != nil:
|
||||
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
|
||||
probeSkillsTimedOut = true
|
||||
slog.Warn("local skill list HasPending timed out", "runtime_id", req.RuntimeID, "elapsed_ms", probeSkillsMs)
|
||||
} else {
|
||||
slog.Warn("local skill list HasPending failed", "error", probeErr, "runtime_id", req.RuntimeID)
|
||||
}
|
||||
resp["pending_local_skill_import"] = payload
|
||||
}
|
||||
|
||||
// Same probe-then-claim pattern for the import queue.
|
||||
probeImportStart := time.Now()
|
||||
probeImportCtx, cancelProbeImport := context.WithTimeout(r.Context(), heartbeatHasPendingTimeout)
|
||||
hasImport, probeErr := h.LocalSkillImportStore.HasPending(probeImportCtx, req.RuntimeID)
|
||||
cancelProbeImport()
|
||||
probeImportMs = time.Since(probeImportStart).Milliseconds()
|
||||
switch {
|
||||
case probeErr == nil && hasImport:
|
||||
popStart := time.Now()
|
||||
pendingImport, popErr := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID)
|
||||
popImportMs = time.Since(popStart).Milliseconds()
|
||||
if popErr != nil {
|
||||
slog.Warn("local skill import PopPending failed", "error", popErr, "runtime_id", req.RuntimeID)
|
||||
} else if pendingImport != nil {
|
||||
resp["pending_local_skill_import"] = map[string]string{
|
||||
"id": pendingImport.ID,
|
||||
"skill_key": pendingImport.SkillKey,
|
||||
}
|
||||
}
|
||||
case probeErr != nil:
|
||||
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
|
||||
probeImportTimedOut = true
|
||||
slog.Warn("local skill import HasPending timed out", "runtime_id", req.RuntimeID, "elapsed_ms", probeImportMs)
|
||||
} else {
|
||||
slog.Warn("local skill import HasPending failed", "error", probeErr, "runtime_id", req.RuntimeID)
|
||||
}
|
||||
}
|
||||
|
||||
outcome = "ok"
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// logHeartbeatEndpointSlow emits one structured log when /api/daemon/heartbeat
|
||||
// exceeds 500ms, splitting auth / update / probe / pop phases for both queues
|
||||
// so the prod tail can be attributed without flooding logs at normal rates.
|
||||
// Mirrors logClaimEndpointSlow for consistency.
|
||||
func logHeartbeatEndpointSlow(runtimeID, outcome string, start time.Time, authMs, updateMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeSkillsTimedOut, probeImportTimedOut bool) {
|
||||
totalMs := time.Since(start).Milliseconds()
|
||||
if totalMs < 500 && !probeSkillsTimedOut && !probeImportTimedOut {
|
||||
return
|
||||
}
|
||||
slog.Info("heartbeat_endpoint slow",
|
||||
"runtime_id", runtimeID,
|
||||
"outcome", outcome,
|
||||
"total_ms", totalMs,
|
||||
"auth_ms", authMs,
|
||||
"update_ms", updateMs,
|
||||
"probe_skills_ms", probeSkillsMs,
|
||||
"pop_skills_ms", popSkillsMs,
|
||||
"probe_import_ms", probeImportMs,
|
||||
"pop_import_ms", popImportMs,
|
||||
"probe_skills_timed_out", probeSkillsTimedOut,
|
||||
"probe_import_timed_out", probeImportTimedOut,
|
||||
)
|
||||
}
|
||||
|
||||
// logClaimEndpointSlow emits one structured log when the /tasks/claim endpoint
|
||||
// exceeds 500ms, splitting auth / claim / response-build phases so the prod
|
||||
// tail can be diagnosed without flooding logs at normal poll rates.
|
||||
|
||||
@@ -8,11 +8,55 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/multica-ai/multica/server/internal/middleware"
|
||||
)
|
||||
|
||||
// slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside
|
||||
// HasPending until the provided context is cancelled. PopPending delegates
|
||||
// to the underlying store. Used to verify that a stalled probe cannot wedge
|
||||
// the heartbeat — the bound context must cut it short — while the ack-safe
|
||||
// PopPending path is never reached because HasPending returns an error, not
|
||||
// true.
|
||||
type slowProbeLocalSkillListStore struct{ LocalSkillListStore }
|
||||
|
||||
func (s slowProbeLocalSkillListStore) HasPending(ctx context.Context, _ string) (bool, error) {
|
||||
<-ctx.Done()
|
||||
return false, ctx.Err()
|
||||
}
|
||||
|
||||
type slowProbeLocalSkillImportStore struct{ LocalSkillImportStore }
|
||||
|
||||
func (s slowProbeLocalSkillImportStore) HasPending(ctx context.Context, _ string) (bool, error) {
|
||||
<-ctx.Done()
|
||||
return false, ctx.Err()
|
||||
}
|
||||
|
||||
// popRecordingLocalSkillListStore counts PopPending calls so a test can assert
|
||||
// that the handler never reaches the ack-unsafe side-effecting claim path
|
||||
// when HasPending reports an empty queue.
|
||||
type popRecordingLocalSkillListStore struct {
|
||||
LocalSkillListStore
|
||||
popCalls int
|
||||
}
|
||||
|
||||
func (s *popRecordingLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
s.popCalls++
|
||||
return s.LocalSkillListStore.PopPending(ctx, runtimeID)
|
||||
}
|
||||
|
||||
type popRecordingLocalSkillImportStore struct {
|
||||
LocalSkillImportStore
|
||||
popCalls int
|
||||
}
|
||||
|
||||
func (s *popRecordingLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
s.popCalls++
|
||||
return s.LocalSkillImportStore.PopPending(ctx, runtimeID)
|
||||
}
|
||||
|
||||
func setHandlerTestWorkspaceRepos(t *testing.T, repos []map[string]string) {
|
||||
t.Helper()
|
||||
data, err := json.Marshal(repos)
|
||||
@@ -138,6 +182,84 @@ func TestDaemonHeartbeat_WithDaemonToken_CrossWorkspace(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDaemonHeartbeat_SlowProbeDoesNotWedge pins the invariant that a stalled
|
||||
// HasPending probe cannot wedge the heartbeat endpoint past the per-probe
|
||||
// timeout. The probe is the only bounded call; PopPending is ack-safe-
|
||||
// critical and is intentionally left unbounded. Without the probe bound the
|
||||
// heartbeat would hang on a slow shared store.
|
||||
func TestDaemonHeartbeat_SlowProbeDoesNotWedge(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
|
||||
|
||||
origList := testHandler.LocalSkillListStore
|
||||
origImport := testHandler.LocalSkillImportStore
|
||||
testHandler.LocalSkillListStore = slowProbeLocalSkillListStore{origList}
|
||||
testHandler.LocalSkillImportStore = slowProbeLocalSkillImportStore{origImport}
|
||||
t.Cleanup(func() {
|
||||
testHandler.LocalSkillListStore = origList
|
||||
testHandler.LocalSkillImportStore = origImport
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
|
||||
"runtime_id": runtimeID,
|
||||
}, testWorkspaceID, "runtime-local-skills-daemon")
|
||||
|
||||
start := time.Now()
|
||||
testHandler.DaemonHeartbeat(w, req)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("DaemonHeartbeat with slow probes: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
// Two bounded probes at 1s each + a small fixed slack.
|
||||
if elapsed > 3*time.Second {
|
||||
t.Fatalf("DaemonHeartbeat took %s; expected fast return despite slow probes", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDaemonHeartbeat_EmptyQueueSkipsPopPending pins the ack-safety property:
|
||||
// when HasPending reports no work, the heartbeat must NOT invoke PopPending,
|
||||
// because PopPending's Redis implementation has non-atomic side effects that
|
||||
// a client-side cancel cannot cleanly un-run (see GH #1637 review).
|
||||
func TestDaemonHeartbeat_EmptyQueueSkipsPopPending(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
|
||||
|
||||
origList := testHandler.LocalSkillListStore
|
||||
origImport := testHandler.LocalSkillImportStore
|
||||
listSpy := &popRecordingLocalSkillListStore{LocalSkillListStore: origList}
|
||||
importSpy := &popRecordingLocalSkillImportStore{LocalSkillImportStore: origImport}
|
||||
testHandler.LocalSkillListStore = listSpy
|
||||
testHandler.LocalSkillImportStore = importSpy
|
||||
t.Cleanup(func() {
|
||||
testHandler.LocalSkillListStore = origList
|
||||
testHandler.LocalSkillImportStore = origImport
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
|
||||
"runtime_id": runtimeID,
|
||||
}, testWorkspaceID, "runtime-local-skills-daemon")
|
||||
|
||||
testHandler.DaemonHeartbeat(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("DaemonHeartbeat: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if listSpy.popCalls != 0 {
|
||||
t.Fatalf("expected 0 PopPending calls on empty list queue, got %d", listSpy.popCalls)
|
||||
}
|
||||
if importSpy.popCalls != 0 {
|
||||
t.Fatalf("expected 0 PopPending calls on empty import queue, got %d", importSpy.popCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTaskStatus_WithDaemonToken_CrossWorkspace(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
@@ -713,7 +835,9 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime(t *testing.T) {
|
||||
`, legacyAgentID, legacyIssueID, legacyRuntimeID).Scan(&legacyTaskID); err != nil {
|
||||
t.Fatalf("seed legacy task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID) })
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID)
|
||||
})
|
||||
|
||||
// Register under the new stable UUID, declaring the prior hostname-derived
|
||||
// id as legacy. The handler should merge the legacy row into the new one.
|
||||
@@ -795,8 +919,8 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime_ReverseDotLocal(t *testing.T
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
const legacyDaemonID = "ReverseDotLocalHost" // stored without .local
|
||||
const emittedLegacyID = "ReverseDotLocalHost.local" // daemon now reports with .local
|
||||
const legacyDaemonID = "ReverseDotLocalHost" // stored without .local
|
||||
const emittedLegacyID = "ReverseDotLocalHost.local" // daemon now reports with .local
|
||||
const newDaemonID = "0192a7b0-0011-7ee9-9c21-30a5bcf86aa2"
|
||||
|
||||
var legacyRuntimeID string
|
||||
@@ -853,8 +977,8 @@ func TestDaemonRegister_MergesLegacyDaemonIDRuntime_CaseDrift(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
const storedDaemonID = "Jiayuans-MacBook-Pro.local" // DB has original mixed case
|
||||
const emittedLegacyID = "jiayuans-macbook-pro.local" // Daemon now reports lowercased
|
||||
const storedDaemonID = "Jiayuans-MacBook-Pro.local" // DB has original mixed case
|
||||
const emittedLegacyID = "jiayuans-macbook-pro.local" // Daemon now reports lowercased
|
||||
const newDaemonID = "0192a7b0-0022-7ee9-9c21-30a5bcf86aa3"
|
||||
|
||||
var legacyRuntimeID string
|
||||
|
||||
@@ -37,6 +37,11 @@ const (
|
||||
type LocalSkillListStore interface {
|
||||
Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
|
||||
Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error)
|
||||
// HasPending is a cheap read-only probe that reports whether the runtime
|
||||
// has at least one pending request. Callers on the hot path (e.g. the
|
||||
// heartbeat handler) use it to gate the side-effecting PopPending so they
|
||||
// never start a claim they might have to abort.
|
||||
HasPending(ctx context.Context, runtimeID string) (bool, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
|
||||
Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
@@ -48,6 +53,7 @@ type LocalSkillListStore interface {
|
||||
type LocalSkillImportStore interface {
|
||||
Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error)
|
||||
Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error)
|
||||
HasPending(ctx context.Context, runtimeID string) (bool, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error)
|
||||
Complete(ctx context.Context, id string, skill SkillResponse) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
@@ -106,30 +112,30 @@ type RuntimeLocalSkillSummary struct {
|
||||
}
|
||||
|
||||
type RuntimeLocalSkillListRequest struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status RuntimeLocalSkillRequestStatus `json:"status"`
|
||||
Skills []RuntimeLocalSkillSummary `json:"skills,omitempty"`
|
||||
Supported bool `json:"supported"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status RuntimeLocalSkillRequestStatus `json:"status"`
|
||||
Skills []RuntimeLocalSkillSummary `json:"skills,omitempty"`
|
||||
Supported bool `json:"supported"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
}
|
||||
|
||||
type RuntimeLocalSkillImportRequest struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
SkillKey string `json:"skill_key"`
|
||||
Name *string `json:"name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
Status RuntimeLocalSkillRequestStatus `json:"status"`
|
||||
Skill *SkillResponse `json:"skill,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
CreatorID string `json:"-"`
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
SkillKey string `json:"skill_key"`
|
||||
Name *string `json:"name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
Status RuntimeLocalSkillRequestStatus `json:"status"`
|
||||
Skill *SkillResponse `json:"skill,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
CreatorID string `json:"-"`
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
}
|
||||
|
||||
// InMemoryLocalSkillListStore is the single-node implementation — good enough
|
||||
@@ -179,6 +185,20 @@ func (s *InMemoryLocalSkillListStore) Get(_ context.Context, id string) (*Runtim
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *InMemoryLocalSkillListStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
applyLocalSkillListTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *InMemoryLocalSkillListStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -275,6 +295,20 @@ func (s *InMemoryLocalSkillImportStore) Get(_ context.Context, id string) (*Runt
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *InMemoryLocalSkillImportStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
applyLocalSkillImportTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *InMemoryLocalSkillImportStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -40,9 +40,10 @@ const (
|
||||
)
|
||||
|
||||
// claimPendingScript atomically claims a pending request:
|
||||
// KEYS[1] = pending zset ARGV[1] = request id to claim
|
||||
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
|
||||
// ARGV[3] = record TTL in seconds
|
||||
//
|
||||
// KEYS[1] = pending zset ARGV[1] = request id to claim
|
||||
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
|
||||
// ARGV[3] = record TTL in seconds
|
||||
//
|
||||
// Returns 1 when this caller won the claim (zset entry removed, record
|
||||
// updated), 0 when the entry was already gone (another node won).
|
||||
@@ -57,7 +58,7 @@ redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3]))
|
||||
return 1
|
||||
`)
|
||||
|
||||
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
|
||||
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
|
||||
func localSkillListPendingKey(runtimeID string) string {
|
||||
return localSkillListPendingPrefix + runtimeID
|
||||
}
|
||||
@@ -150,6 +151,18 @@ func (s *RedisLocalSkillListStore) persistListRequest(ctx context.Context, req *
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasPending is a cheap read-only probe (ZCARD) used by hot paths to decide
|
||||
// whether to invoke the side-effecting PopPending. It does NOT sweep
|
||||
// expired / already-claimed entries — a spurious "true" is fine because the
|
||||
// follow-up PopPending still handles the race correctly.
|
||||
func (s *RedisLocalSkillListStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
|
||||
cnt, err := s.rdb.ZCard(ctx, localSkillListPendingKey(runtimeID)).Result()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("zcard pending: %w", err)
|
||||
}
|
||||
return cnt > 0, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
pendingKey := localSkillListPendingKey(runtimeID)
|
||||
|
||||
@@ -352,6 +365,16 @@ func (s *RedisLocalSkillImportStore) unmarshalImport(raw []byte) (*RuntimeLocalS
|
||||
return env.Public, nil
|
||||
}
|
||||
|
||||
// HasPending mirrors RedisLocalSkillListStore.HasPending — cheap ZCARD probe
|
||||
// for hot-path gating.
|
||||
func (s *RedisLocalSkillImportStore) HasPending(ctx context.Context, runtimeID string) (bool, error) {
|
||||
cnt, err := s.rdb.ZCard(ctx, localSkillImportPendingKey(runtimeID)).Result()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("zcard pending: %w", err)
|
||||
}
|
||||
return cnt > 0, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
pendingKey := localSkillImportPendingKey(runtimeID)
|
||||
|
||||
|
||||
@@ -57,6 +57,20 @@ type ModelEntry struct {
|
||||
Default bool `json:"default,omitempty"`
|
||||
}
|
||||
|
||||
const (
|
||||
// modelListPendingTimeout bounds how long a pending request can sit in
|
||||
// the store before the UI is told "daemon didn't pick this up".
|
||||
modelListPendingTimeout = 30 * time.Second
|
||||
// modelListRunningTimeout bounds how long a claimed (running) request
|
||||
// can stay claimed before the UI is told "daemon picked this up but
|
||||
// never reported a result". This matters when the heartbeat response
|
||||
// carrying `pending_model_list` is lost in transit (e.g. HTTP client
|
||||
// timeout after PopPending already mutated store state): without this
|
||||
// transition the UI would keep polling a record that is stuck in
|
||||
// `running` until the 2-minute memory GC sweeps it.
|
||||
modelListRunningTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
|
||||
// to bound memory use; the UI polls /requests/:id until status is terminal.
|
||||
type ModelListStore struct {
|
||||
@@ -101,14 +115,33 @@ func (s *ModelListStore) Get(id string) *ModelListRequest {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if req.Status == ModelListPending && time.Since(req.CreatedAt) > 30*time.Second {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
applyModelListTimeout(req, time.Now())
|
||||
return req
|
||||
}
|
||||
|
||||
// applyModelListTimeout transitions a request to ModelListTimeout when it has
|
||||
// been stuck in a non-terminal state past its threshold. The pending threshold
|
||||
// catches "daemon never picked this up"; the running threshold catches
|
||||
// "daemon picked it up but the result report was lost" — previously the only
|
||||
// escape from running was the 2-minute memory GC, which exceeded the UI's
|
||||
// polling window and surfaced as a silent discovery failure.
|
||||
func applyModelListTimeout(req *ModelListRequest, now time.Time) {
|
||||
switch req.Status {
|
||||
case ModelListPending:
|
||||
if now.Sub(req.CreatedAt) > modelListPendingTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
case ModelListRunning:
|
||||
if now.Sub(req.UpdatedAt) > modelListRunningTimeout {
|
||||
req.Status = ModelListTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PopPending returns and marks-running the oldest pending request for a runtime.
|
||||
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
|
||||
s.mu.Lock()
|
||||
|
||||
@@ -6,8 +6,42 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestModelListStore_RunningRequestTimesOut pins the escape hatch for
|
||||
// requests that were claimed (PopPending → Running) but whose result was
|
||||
// never reported — usually because the heartbeat response carrying the
|
||||
// `pending_model_list` field was lost in transit. Before this, the only
|
||||
// way out of Running was the 2-minute memory GC, which exceeded the UI
|
||||
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
|
||||
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
store := NewModelListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
claimed := store.PopPending("runtime-xyz")
|
||||
if claimed == nil {
|
||||
t.Fatal("expected PopPending to claim the pending request")
|
||||
}
|
||||
if claimed.Status != ModelListRunning {
|
||||
t.Fatalf("expected Running after PopPending, got %s", claimed.Status)
|
||||
}
|
||||
|
||||
// Age the running record past the threshold without the daemon ever
|
||||
// reporting a result. Get() must flip it to Timeout so the UI can
|
||||
// terminate polling instead of waiting for the 2-minute GC.
|
||||
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
|
||||
got := store.Get(req.ID)
|
||||
if got == nil {
|
||||
t.Fatal("expected stored request")
|
||||
}
|
||||
if got.Status != ModelListTimeout {
|
||||
t.Fatalf("expected Timeout after running threshold, got %s", got.Status)
|
||||
}
|
||||
if got.Error == "" {
|
||||
t.Fatal("expected timeout error message")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportModelListResult_PreservesDefault guards the daemon → server
|
||||
// → UI wire format for the model-discovery result. The `default` bool
|
||||
// on each ModelEntry lights up the UI's "default" badge; if it gets
|
||||
|
||||
Reference in New Issue
Block a user