Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
3466bbc196 fix(skills): atomic Redis claim + surface store write failures (PR #1557 review)
Two real gaps GPT-Boy flagged:

1. RedisLocalSkill{List,Import}Store.PopPending was doing ZREM then SET as
   two separate round-trips. If the SET failed for any reason — transient
   Redis error, context cancellation, pod getting SIGKILL'd mid-call — the
   request was already gone from the pending zset but the stored record
   still said "pending", and no subsequent PopPending would re-dispatch
   it. Exactly the "request disappears" class of bug this PR is supposed
   to kill.

   Fix: push the claim into a Lua script so Redis runs ZREM + SET as one
   atomic unit. If ZREM returns 0 (another node won the race), SET is
   skipped and the caller retries.

2. ReportLocalSkill{List,Import}Result handlers were logging Complete/Fail
   store failures at Warn and still returning 200 OK. That made the
   daemon think the report landed when it hadn't, leaving the request
   stuck in "running" until the server-side timeout and — worse for the
   import flow — leaving the just-created Skill row orphaned in Postgres
   so every retry collided with the unique-name constraint.

   Fix: escalate to Error + return 500 so the daemon (and monitoring) can
   see the write failed. For the import flow, Complete failure after the
   Skill row is already committed also triggers a best-effort DeleteSkill
   so a daemon retry lands on a clean slate instead of hitting
   "a skill with this name already exists" forever.

Tests
- New TestRedisLocalSkillListStore_PopPendingAtomicClaim asserts the
  happy-path invariant: after one PopPending the record is "running"
  AND a second PopPending returns nothing. Deliberately does NOT poke
  Redis internals directly so the test survives any future key-layout
  refactor.
- Existing cross-instance / concurrent / timeout / per-runtime tests
  continue to pass against the Lua-based claim path (verified locally
  against a scratch redis-server; 8/8 Redis tests green).
2026-04-23 16:57:55 +08:00
Jiang Bohan
93d62cc79e fix(skills): shared-state runtime local-skill stores (MUL-1288)
Fixes the bug Bohan surfaced on MUL-1288: behind prod's multi-node API the
runtime-local-skill list/import flow would intermittently time out or 404.
Root cause: LocalSkillListStore and LocalSkillImportStore were per-process
sync.Mutex+map, so when the frontend POST, the daemon heartbeat and the
frontend GET landed on different API instances, each saw a different
pending set. Confirmed against production daemon logs — the failed
request_id never showed up in the daemon's "runtime local skills
requested" log, even though other requests around the same window worked.

Per Yushen's guidance (server must stay stateless; state lives in
storage), migrate both stores to Redis so every node agrees on the same
pending set.

What changed
- LocalSkillListStore / LocalSkillImportStore are now interfaces. Methods
  take context.Context and return error.
- InMemoryLocalSkill{List,Import}Store — renamed from the existing types,
  kept as the default for single-node dev and the in-process test suite.
- RedisLocalSkill{List,Import}Store — new. Keyed on
  mul:local_skill:{list,import}:<id> (JSON record, TTL = retention), with
  a per-runtime ZSET mul:local_skill:{list,import}:pending:<runtime_id>
  (score = created_at UnixNano) providing cross-node ordering. PopPending
  wins the claim via ZREM == 1, so concurrent pops from different nodes
  never return the same request twice.
- NewRouter gets an optional *redis.Client; when non-nil it swaps in the
  Redis-backed stores. main.go hoists the existing Redis client (already
  used by the realtime relay) so both subsystems share one client.
- Handler fields flip to interface types; handler.New still constructs
  in-memory stores by default.
- Daemon heartbeat's PopPending call sites thread r.Context() through so
  Redis operations inherit request cancellation. Errors warn instead of
  poisoning the heartbeat response.

Tests
- Existing in-memory tests updated for the new signatures (ctx + error).
- New runtime_local_skills_redis_store_test.go covers:
  - Create/Get/Complete round trip preserves skills payload
  - PopPending across two *store instances sharing one rdb (the exact
    regression: node A creates, node B pops)
  - N concurrent PopPending on one record => exactly one winner
  - Pending-timeout threshold transitions the record and removes the zset
    member so a later PopPending doesn't return a timed-out request
  - Import store round-trips CreatorID (which is json:"-" on the public
    struct — needs a Redis envelope so ReportLocalSkillImportResult can
    still attribute the created Skill)
  - Per-runtime isolation — a PopPending for runtime B does not disturb
    A's pending zset
- Tests skip gracefully if REDIS_TEST_URL is unset; CI now spins up a
  redis:7-alpine service and exports the URL so the suite actually runs
  there.

Out of scope
PingStore / UpdateStore / ModelListStore have the same shape and the
same latent bug (they just fire rarely enough to have gone unnoticed).
Migrating them to Redis is a follow-up — MUL-1288 is specifically the
local-skills break Bohan is blocked on.
2026-04-23 16:06:24 +08:00
10 changed files with 1074 additions and 103 deletions

View File

@@ -48,8 +48,22 @@ jobs:
--health-interval 5s
--health-timeout 5s
--health-retries 20
redis:
image: redis:7-alpine
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
DATABASE_URL: postgres://multica:multica@localhost:5432/multica?sslmode=disable
# Wires up the RedisLocalSkill*_test.go suite. Distinct from REDIS_URL
# (which would flip the server binary itself onto the Redis-backed
# realtime relay + request stores); the tests talk to this Redis
# directly so they run alongside the Postgres-backed suite.
REDIS_TEST_URL: redis://localhost:6379/1
steps:
- name: Checkout
uses: actions/checkout@v6

View File

@@ -71,7 +71,7 @@ func TestMain(m *testing.M) {
bus := events.New()
registerListeners(bus, hub)
router := NewRouter(pool, hub, bus, analytics.NoopClient{})
router := NewRouter(pool, hub, bus, analytics.NoopClient{}, nil)
testServer = httptest.NewServer(router)
// Generate a JWT token directly for the test user

View File

@@ -62,15 +62,18 @@ func main() {
// MUL-1138: when REDIS_URL is set, route fanout through a Redis relay so
// multiple API nodes can deliver each other's events. Without it the hub
// is the sole broadcaster and the server stays single-node (legacy).
// The same client is also used for cross-node request stores (e.g. runtime
// local-skill pending requests) so every node sees the same pending set.
relayCtx, relayCancel := context.WithCancel(context.Background())
defer relayCancel()
var broadcaster realtime.Broadcaster = hub
var rdb *redis.Client
if redisURL := os.Getenv("REDIS_URL"); redisURL != "" {
opts, err := redis.ParseURL(redisURL)
if err != nil {
slog.Error("invalid REDIS_URL — falling back to in-memory hub", "error", err)
} else {
rdb := redis.NewClient(opts)
rdb = redis.NewClient(opts)
relay := realtime.NewRedisRelay(hub, rdb)
relay.Start(relayCtx)
broadcaster = relay
@@ -93,7 +96,7 @@ func main() {
registerActivityListeners(bus, queries)
registerNotificationListeners(bus, queries)
r := NewRouter(pool, hub, bus, analyticsClient)
r := NewRouter(pool, hub, bus, analyticsClient, rdb)
srv := &http.Server{
Addr: ":" + port,

View File

@@ -12,6 +12,7 @@ import (
"github.com/go-chi/cors"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/redis/go-redis/v9"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/auth"
@@ -55,7 +56,11 @@ func allowedOrigins() []string {
}
// NewRouter creates the fully-configured Chi router with all middleware and routes.
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client) chi.Router {
// rdb is optional: when non-nil the runtime local-skill request stores are
// swapped for Redis-backed implementations so multiple API nodes share the
// same pending queue (required for multi-node prod). A nil rdb keeps the
// default in-memory stores which are fine for single-node dev and tests.
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client) chi.Router {
queries := db.New(pool)
emailSvc := service.NewEmailService()
@@ -79,6 +84,10 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
}
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig)
if rdb != nil {
h.LocalSkillListStore = handler.NewRedisLocalSkillListStore(rdb)
h.LocalSkillImportStore = handler.NewRedisLocalSkillImportStore(rdb)
}
r := chi.NewRouter()

View File

@@ -529,12 +529,16 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
}
// Check for pending local-skill list requests for this runtime.
if pending := h.LocalSkillListStore.PopPending(req.RuntimeID); pending != nil {
if pending, err := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID); err != nil {
slog.Warn("local skill list PopPending failed", "error", err, "runtime_id", req.RuntimeID)
} else if pending != nil {
resp["pending_local_skills"] = map[string]string{"id": pending.ID}
}
// Check for pending local-skill import requests for this runtime.
if pending := h.LocalSkillImportStore.PopPending(req.RuntimeID); pending != nil {
if pending, err := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID); err != nil {
slog.Warn("local skill import PopPending failed", "error", err, "runtime_id", req.RuntimeID)
} else if pending != nil {
payload := map[string]string{
"id": pending.ID,
"skill_key": pending.SkillKey,

View File

@@ -50,8 +50,8 @@ type Handler struct {
PingStore *PingStore
UpdateStore *UpdateStore
ModelListStore *ModelListStore
LocalSkillListStore *RuntimeLocalSkillListStore
LocalSkillImportStore *RuntimeLocalSkillImportStore
LocalSkillListStore LocalSkillListStore
LocalSkillImportStore LocalSkillImportStore
Storage storage.Storage
CFSigner *auth.CloudFrontSigner
Analytics analytics.Client
@@ -81,8 +81,8 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
PingStore: NewPingStore(),
UpdateStore: NewUpdateStore(),
ModelListStore: NewModelListStore(),
LocalSkillListStore: NewRuntimeLocalSkillListStore(),
LocalSkillImportStore: NewRuntimeLocalSkillImportStore(),
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
Storage: store,
CFSigner: cfSigner,
Analytics: analyticsClient,

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
@@ -28,6 +29,73 @@ const (
runtimeLocalSkillStoreRetention = 2 * time.Minute
)
// LocalSkillListStore tracks pending / running / completed runtime-local-skill
// inventory requests. The server MUST stay stateless — any state that needs to
// outlive a single request has to live in shared storage so multi-node deploys
// can have POST, heartbeat and poll land on different nodes and still agree
// on the request's state.
type LocalSkillListStore interface {
Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error)
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error
Fail(ctx context.Context, id string, errMsg string) error
}
// LocalSkillImportStore is the same contract as LocalSkillListStore but for
// runtime-local-skill import requests. Kept as a separate interface because the
// Create signature carries import-specific fields (skill_key, optional rename).
type LocalSkillImportStore interface {
Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error)
Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error)
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error)
Complete(ctx context.Context, id string, skill SkillResponse) error
Fail(ctx context.Context, id string, errMsg string) error
}
// applyLocalSkillListTimeout transitions a request into the timeout terminal
// state if it has been pending / running past the configured thresholds.
// Returns true when the record was modified so callers can persist the change.
func applyLocalSkillListTimeout(req *RuntimeLocalSkillListRequest, now time.Time) bool {
switch req.Status {
case RuntimeLocalSkillPending:
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
return true
}
case RuntimeLocalSkillRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
return true
}
}
return false
}
func applyLocalSkillImportTimeout(req *RuntimeLocalSkillImportRequest, now time.Time) bool {
switch req.Status {
case RuntimeLocalSkillPending:
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
return true
}
case RuntimeLocalSkillRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
return true
}
}
return false
}
type RuntimeLocalSkillSummary struct {
Key string `json:"key"`
Name string `json:"name"`
@@ -64,16 +132,20 @@ type RuntimeLocalSkillImportRequest struct {
RunStartedAt *time.Time `json:"-"`
}
type RuntimeLocalSkillListStore struct {
// InMemoryLocalSkillListStore is the single-node implementation — good enough
// for local dev and the in-process test suite. Production (multi-node) must
// use RedisLocalSkillListStore so every API node agrees on the same pending
// set.
type InMemoryLocalSkillListStore struct {
mu sync.Mutex
requests map[string]*RuntimeLocalSkillListRequest
}
func NewRuntimeLocalSkillListStore() *RuntimeLocalSkillListStore {
return &RuntimeLocalSkillListStore{requests: make(map[string]*RuntimeLocalSkillListRequest)}
func NewInMemoryLocalSkillListStore() *InMemoryLocalSkillListStore {
return &InMemoryLocalSkillListStore{requests: make(map[string]*RuntimeLocalSkillListRequest)}
}
func (s *RuntimeLocalSkillListStore) Create(runtimeID string) *RuntimeLocalSkillListRequest {
func (s *InMemoryLocalSkillListStore) Create(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -92,29 +164,29 @@ func (s *RuntimeLocalSkillListStore) Create(runtimeID string) *RuntimeLocalSkill
UpdatedAt: time.Now(),
}
s.requests[req.ID] = req
return req
return req, nil
}
func (s *RuntimeLocalSkillListStore) Get(id string) *RuntimeLocalSkillListRequest {
func (s *InMemoryLocalSkillListStore) Get(_ context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil
return nil, nil
}
s.applyTimeout(req, time.Now())
return req
applyLocalSkillListTimeout(req, time.Now())
return req, nil
}
func (s *RuntimeLocalSkillListStore) PopPending(runtimeID string) *RuntimeLocalSkillListRequest {
func (s *InMemoryLocalSkillListStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
var oldest *RuntimeLocalSkillListRequest
now := time.Now()
for _, req := range s.requests {
s.applyTimeout(req, now)
applyLocalSkillListTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
oldest = req
@@ -127,10 +199,10 @@ func (s *RuntimeLocalSkillListStore) PopPending(runtimeID string) *RuntimeLocalS
oldest.RunStartedAt = &startedAt
oldest.UpdatedAt = now
}
return oldest
return oldest, nil
}
func (s *RuntimeLocalSkillListStore) Complete(id string, skills []RuntimeLocalSkillSummary, supported bool) {
func (s *InMemoryLocalSkillListStore) Complete(_ context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -140,9 +212,10 @@ func (s *RuntimeLocalSkillListStore) Complete(id string, skills []RuntimeLocalSk
req.Supported = supported
req.UpdatedAt = time.Now()
}
return nil
}
func (s *RuntimeLocalSkillListStore) Fail(id string, errMsg string) {
func (s *InMemoryLocalSkillListStore) Fail(_ context.Context, id string, errMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -151,35 +224,21 @@ func (s *RuntimeLocalSkillListStore) Fail(id string, errMsg string) {
req.Error = errMsg
req.UpdatedAt = time.Now()
}
return nil
}
func (s *RuntimeLocalSkillListStore) applyTimeout(req *RuntimeLocalSkillListRequest, now time.Time) {
switch req.Status {
case RuntimeLocalSkillPending:
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
}
case RuntimeLocalSkillRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
}
}
}
type RuntimeLocalSkillImportStore struct {
// InMemoryLocalSkillImportStore mirrors InMemoryLocalSkillListStore for import
// requests. Same single-node vs. multi-node caveat.
type InMemoryLocalSkillImportStore struct {
mu sync.Mutex
requests map[string]*RuntimeLocalSkillImportRequest
}
func NewRuntimeLocalSkillImportStore() *RuntimeLocalSkillImportStore {
return &RuntimeLocalSkillImportStore{requests: make(map[string]*RuntimeLocalSkillImportRequest)}
func NewInMemoryLocalSkillImportStore() *InMemoryLocalSkillImportStore {
return &InMemoryLocalSkillImportStore{requests: make(map[string]*RuntimeLocalSkillImportRequest)}
}
func (s *RuntimeLocalSkillImportStore) Create(runtimeID, creatorID, skillKey string, name, description *string) *RuntimeLocalSkillImportRequest {
func (s *InMemoryLocalSkillImportStore) Create(_ context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -201,29 +260,29 @@ func (s *RuntimeLocalSkillImportStore) Create(runtimeID, creatorID, skillKey str
CreatorID: creatorID,
}
s.requests[req.ID] = req
return req
return req, nil
}
func (s *RuntimeLocalSkillImportStore) Get(id string) *RuntimeLocalSkillImportRequest {
func (s *InMemoryLocalSkillImportStore) Get(_ context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil
return nil, nil
}
s.applyTimeout(req, time.Now())
return req
applyLocalSkillImportTimeout(req, time.Now())
return req, nil
}
func (s *RuntimeLocalSkillImportStore) PopPending(runtimeID string) *RuntimeLocalSkillImportRequest {
func (s *InMemoryLocalSkillImportStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
var oldest *RuntimeLocalSkillImportRequest
now := time.Now()
for _, req := range s.requests {
s.applyTimeout(req, now)
applyLocalSkillImportTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
oldest = req
@@ -236,10 +295,10 @@ func (s *RuntimeLocalSkillImportStore) PopPending(runtimeID string) *RuntimeLoca
oldest.RunStartedAt = &startedAt
oldest.UpdatedAt = now
}
return oldest
return oldest, nil
}
func (s *RuntimeLocalSkillImportStore) Complete(id string, skill SkillResponse) {
func (s *InMemoryLocalSkillImportStore) Complete(_ context.Context, id string, skill SkillResponse) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -248,9 +307,10 @@ func (s *RuntimeLocalSkillImportStore) Complete(id string, skill SkillResponse)
req.Skill = &skill
req.UpdatedAt = time.Now()
}
return nil
}
func (s *RuntimeLocalSkillImportStore) Fail(id string, errMsg string) {
func (s *InMemoryLocalSkillImportStore) Fail(_ context.Context, id string, errMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -259,23 +319,7 @@ func (s *RuntimeLocalSkillImportStore) Fail(id string, errMsg string) {
req.Error = errMsg
req.UpdatedAt = time.Now()
}
}
func (s *RuntimeLocalSkillImportStore) applyTimeout(req *RuntimeLocalSkillImportRequest, now time.Time) {
switch req.Status {
case RuntimeLocalSkillPending:
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not respond within 30 seconds"
req.UpdatedAt = now
}
case RuntimeLocalSkillRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
req.Status = RuntimeLocalSkillTimeout
req.Error = "daemon did not finish within 60 seconds"
req.UpdatedAt = now
}
}
return nil
}
type CreateRuntimeLocalSkillImportRequest struct {
@@ -352,7 +396,11 @@ func (h *Handler) InitiateListLocalSkills(w http.ResponseWriter, r *http.Request
return
}
req := h.LocalSkillListStore.Create(runtimeID)
req, err := h.LocalSkillListStore.Create(r.Context(), runtimeID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to enqueue local skills request: "+err.Error())
return
}
writeJSON(w, http.StatusOK, req)
}
@@ -363,7 +411,11 @@ func (h *Handler) GetLocalSkillListRequest(w http.ResponseWriter, r *http.Reques
}
requestID := chi.URLParam(r, "requestId")
req := h.LocalSkillListStore.Get(requestID)
req, err := h.LocalSkillListStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if req == nil || req.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -398,13 +450,18 @@ func (h *Handler) InitiateImportLocalSkill(w http.ResponseWriter, r *http.Reques
return
}
importReq := h.LocalSkillImportStore.Create(
importReq, err := h.LocalSkillImportStore.Create(
r.Context(),
runtimeID,
creatorID,
strings.TrimSpace(req.SkillKey),
cleanOptionalString(req.Name),
cleanOptionalString(req.Description),
)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to enqueue local skill import: "+err.Error())
return
}
writeJSON(w, http.StatusOK, importReq)
}
@@ -415,7 +472,11 @@ func (h *Handler) GetLocalSkillImportRequest(w http.ResponseWriter, r *http.Requ
}
requestID := chi.URLParam(r, "requestId")
req := h.LocalSkillImportStore.Get(requestID)
req, err := h.LocalSkillImportStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if req == nil || req.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -431,7 +492,11 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ
}
requestID := chi.URLParam(r, "requestId")
req := h.LocalSkillListStore.Get(requestID)
req, err := h.LocalSkillListStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if req == nil || req.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -458,9 +523,21 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ
if body.Supported != nil {
supported = *body.Supported
}
h.LocalSkillListStore.Complete(requestID, body.Skills, supported)
if err := h.LocalSkillListStore.Complete(r.Context(), requestID, body.Skills, supported); err != nil {
// Surface the store failure as 5xx so the daemon can retry instead
// of swallowing the report (leaves the request stuck in running
// until the server-side timeout, which is exactly the "looks OK but
// nothing happens" class of bug we're trying to avoid).
slog.Error("local skills Complete failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist completion")
return
}
} else {
h.LocalSkillListStore.Fail(requestID, body.Error)
if err := h.LocalSkillListStore.Fail(r.Context(), requestID, body.Error); err != nil {
slog.Error("local skills Fail failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
}
slog.Debug("runtime local skills report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Skills))
@@ -475,7 +552,11 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
}
requestID := chi.URLParam(r, "requestId")
req := h.LocalSkillImportStore.Get(requestID)
req, err := h.LocalSkillImportStore.Get(r.Context(), requestID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
return
}
if req == nil || req.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -497,12 +578,20 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
}
if body.Status != "completed" {
h.LocalSkillImportStore.Fail(requestID, body.Error)
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, body.Error); err != nil {
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
if body.Skill == nil {
h.LocalSkillImportStore.Fail(requestID, "daemon returned an empty skill bundle")
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, "daemon returned an empty skill bundle"); err != nil {
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
@@ -541,16 +630,33 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
Files: files,
})
if err != nil {
failMsg := err.Error()
if isUniqueViolation(err) {
h.LocalSkillImportStore.Fail(requestID, "a skill with this name already exists")
} else {
h.LocalSkillImportStore.Fail(requestID, err.Error())
failMsg = "a skill with this name already exists"
}
if ferr := h.LocalSkillImportStore.Fail(r.Context(), requestID, failMsg); ferr != nil {
slog.Error("local skill import Fail failed", "error", ferr, "request_id", requestID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
h.LocalSkillImportStore.Complete(requestID, resp.SkillResponse)
if err := h.LocalSkillImportStore.Complete(r.Context(), requestID, resp.SkillResponse); err != nil {
// We already wrote the Skill to Postgres. If the store-side Complete
// fails we can't leave that Skill orphaned: the daemon will retry on
// 5xx and re-create it, which blows up on the unique-name constraint
// and looks to the user like "import keeps failing". Roll back our
// side-effects so the retry lands on a clean slate.
slog.Error("local skill import Complete failed — rolling back created skill",
"error", err, "request_id", requestID, "skill_id", resp.ID)
if delErr := h.Queries.DeleteSkill(r.Context(), parseUUID(resp.ID)); delErr != nil {
slog.Warn("orphan skill rollback failed", "error", delErr, "skill_id", resp.ID)
}
writeError(w, http.StatusInternalServerError, "failed to persist import completion")
return
}
h.publish(protocol.EventSkillCreated, uuidToString(rt.WorkspaceID), "member", req.CreatorID, map[string]any{"skill": resp})
slog.Debug("runtime local skill imported", "runtime_id", runtimeID, "request_id", requestID, "skill_id", resp.ID)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})

View File

@@ -0,0 +1,432 @@
package handler
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// Redis-backed implementations of LocalSkillListStore / LocalSkillImportStore.
//
// Storage layout (for both list and import flows, differing only in key prefix):
//
// <prefix>:<request_id> → JSON-encoded request, TTL = retention
// <prefix>:pending:<runtime_id> → ZSET { member = request_id, score = created_at UnixNano }
// TTL = retention, refreshed on Create
//
// PopPending is the critical multi-node primitive. It MUST atomically:
// 1. pick the oldest pending request id for this runtime
// 2. claim it (remove from the pending zset) AND transition its record to
// "running" in a single step — otherwise a crash / transient Redis error
// between the two writes strands the request (no longer pending, record
// still says pending; no node will ever re-dispatch it).
//
// Doing this as two round-trips is racy; we use a Lua script so Redis runs
// ZREM + SET atomically server-side. If ZREM returns 0 (another node already
// claimed it), the SET is skipped. This is the fix for the PR-1557 review
// finding about the "request disappears under Redis hiccups" path.
const (
// Namespaced so we don't collide with the realtime relay's ws:* keys.
localSkillListKeyPrefix = "mul:local_skill:list:"
localSkillListPendingPrefix = "mul:local_skill:list:pending:"
localSkillImportKeyPrefix = "mul:local_skill:import:"
localSkillImportPendingPrefix = "mul:local_skill:import:pending:"
localSkillRedisPopMaxRetries = 5
)
// claimPendingScript atomically claims a pending request:
// KEYS[1] = pending zset ARGV[1] = request id to claim
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
// ARGV[3] = record TTL in seconds
//
// Returns 1 when this caller won the claim (zset entry removed, record
// updated), 0 when the entry was already gone (another node won).
// Either the ZREM and the SET both happen or neither does — Redis executes
// a Lua script as a single atomic unit.
var claimPendingScript = redis.NewScript(`
local removed = redis.call('ZREM', KEYS[1], ARGV[1])
if removed == 0 then
return 0
end
redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3]))
return 1
`)
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
func localSkillListPendingKey(runtimeID string) string {
return localSkillListPendingPrefix + runtimeID
}
func localSkillImportKey(id string) string { return localSkillImportKeyPrefix + id }
func localSkillImportPendingKey(runtimeID string) string {
return localSkillImportPendingPrefix + runtimeID
}
// RedisLocalSkillListStore stores pending / running / completed list requests
// in Redis so every API node agrees on the same state.
type RedisLocalSkillListStore struct {
rdb *redis.Client
}
func NewRedisLocalSkillListStore(rdb *redis.Client) *RedisLocalSkillListStore {
return &RedisLocalSkillListStore{rdb: rdb}
}
func (s *RedisLocalSkillListStore) Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
now := time.Now()
req := &RuntimeLocalSkillListRequest{
ID: randomID(),
RuntimeID: runtimeID,
Status: RuntimeLocalSkillPending,
Supported: true,
CreatedAt: now,
UpdatedAt: now,
}
data, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal list request: %w", err)
}
pipe := s.rdb.TxPipeline()
pipe.Set(ctx, localSkillListKey(req.ID), data, runtimeLocalSkillStoreRetention)
pipe.ZAdd(ctx, localSkillListPendingKey(runtimeID), redis.Z{
Score: float64(now.UnixNano()),
Member: req.ID,
})
// Keep the pending ZSET alive a bit longer than the individual request
// so stale members still in the zset can be swept lazily on PopPending
// without blocking the create path on deletion.
pipe.Expire(ctx, localSkillListPendingKey(runtimeID), runtimeLocalSkillStoreRetention*2)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("persist list request: %w", err)
}
return req, nil
}
func (s *RedisLocalSkillListStore) Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
return s.loadListRequest(ctx, id)
}
// loadListRequest fetches a single record, applies timeout transitions if the
// stored state has aged past the threshold, and persists the transition when
// applicable so sibling nodes observe the same terminal state.
func (s *RedisLocalSkillListStore) loadListRequest(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
raw, err := s.rdb.Get(ctx, localSkillListKey(id)).Bytes()
if errors.Is(err, redis.Nil) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get list request: %w", err)
}
var req RuntimeLocalSkillListRequest
if err := json.Unmarshal(raw, &req); err != nil {
return nil, fmt.Errorf("decode list request: %w", err)
}
if applyLocalSkillListTimeout(&req, time.Now()) {
// Persist the timeout so subsequent Get / PopPending on any node see
// the terminal state. Also drop the id from the pending zset —
// PopPending would do this itself, but doing it here keeps the set
// clean even for readers that never call PopPending.
if err := s.persistListRequest(ctx, &req); err != nil {
return nil, err
}
s.rdb.ZRem(ctx, localSkillListPendingKey(req.RuntimeID), req.ID)
}
return &req, nil
}
func (s *RedisLocalSkillListStore) persistListRequest(ctx context.Context, req *RuntimeLocalSkillListRequest) error {
data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal list request: %w", err)
}
if err := s.rdb.Set(ctx, localSkillListKey(req.ID), data, runtimeLocalSkillStoreRetention).Err(); err != nil {
return fmt.Errorf("persist list request: %w", err)
}
return nil
}
func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
pendingKey := localSkillListPendingKey(runtimeID)
for attempt := 0; attempt < localSkillRedisPopMaxRetries; attempt++ {
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
if err != nil {
return nil, fmt.Errorf("zrange pending: %w", err)
}
if len(ids) == 0 {
return nil, nil
}
id := ids[0]
req, err := s.loadListRequest(ctx, id)
if err != nil {
return nil, err
}
if req == nil {
// Record expired but the zset still references it — drop and retry.
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
if req.Status != RuntimeLocalSkillPending {
// Either the timeout fired inside loadListRequest or another node
// already picked it up. Either way, unlink from the pending set
// and move on to the next one.
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
now := time.Now()
req.Status = RuntimeLocalSkillRunning
req.RunStartedAt = &now
req.UpdatedAt = now
data, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal list request: %w", err)
}
result, err := claimPendingScript.Run(
ctx, s.rdb,
[]string{pendingKey, localSkillListKey(id)},
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
).Int64()
if err != nil {
return nil, fmt.Errorf("claim pending: %w", err)
}
if result == 0 {
// Another node won the race. The record still says pending and is
// owned by the winner; we just retry to pick up whatever else is
// queued (or nothing).
continue
}
return req, nil
}
return nil, nil
}
func (s *RedisLocalSkillListStore) Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error {
req, err := s.loadListRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = RuntimeLocalSkillCompleted
req.Skills = skills
req.Supported = supported
req.UpdatedAt = time.Now()
return s.persistListRequest(ctx, req)
}
func (s *RedisLocalSkillListStore) Fail(ctx context.Context, id string, errMsg string) error {
req, err := s.loadListRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = RuntimeLocalSkillFailed
req.Error = errMsg
req.UpdatedAt = time.Now()
return s.persistListRequest(ctx, req)
}
// RedisLocalSkillImportStore mirrors RedisLocalSkillListStore for import
// requests. Kept as a separate type (rather than a generic) because the
// request shape carries import-specific fields (skill_key, optional rename,
// creator id) and Go generics don't buy us much for two concrete impls.
type RedisLocalSkillImportStore struct {
rdb *redis.Client
}
func NewRedisLocalSkillImportStore(rdb *redis.Client) *RedisLocalSkillImportStore {
return &RedisLocalSkillImportStore{rdb: rdb}
}
func (s *RedisLocalSkillImportStore) Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error) {
now := time.Now()
req := &RuntimeLocalSkillImportRequest{
ID: randomID(),
RuntimeID: runtimeID,
SkillKey: skillKey,
Name: name,
Description: description,
Status: RuntimeLocalSkillPending,
CreatedAt: now,
UpdatedAt: now,
CreatorID: creatorID,
}
data, err := s.marshalImport(req)
if err != nil {
return nil, err
}
pipe := s.rdb.TxPipeline()
pipe.Set(ctx, localSkillImportKey(req.ID), data, runtimeLocalSkillStoreRetention)
pipe.ZAdd(ctx, localSkillImportPendingKey(runtimeID), redis.Z{
Score: float64(now.UnixNano()),
Member: req.ID,
})
pipe.Expire(ctx, localSkillImportPendingKey(runtimeID), runtimeLocalSkillStoreRetention*2)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("persist import request: %w", err)
}
return req, nil
}
func (s *RedisLocalSkillImportStore) Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
return s.loadImportRequest(ctx, id)
}
func (s *RedisLocalSkillImportStore) loadImportRequest(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
raw, err := s.rdb.Get(ctx, localSkillImportKey(id)).Bytes()
if errors.Is(err, redis.Nil) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get import request: %w", err)
}
req, err := s.unmarshalImport(raw)
if err != nil {
return nil, err
}
if applyLocalSkillImportTimeout(req, time.Now()) {
if err := s.persistImportRequest(ctx, req); err != nil {
return nil, err
}
s.rdb.ZRem(ctx, localSkillImportPendingKey(req.RuntimeID), req.ID)
}
return req, nil
}
func (s *RedisLocalSkillImportStore) persistImportRequest(ctx context.Context, req *RuntimeLocalSkillImportRequest) error {
data, err := s.marshalImport(req)
if err != nil {
return err
}
if err := s.rdb.Set(ctx, localSkillImportKey(req.ID), data, runtimeLocalSkillStoreRetention).Err(); err != nil {
return fmt.Errorf("persist import request: %w", err)
}
return nil
}
// The RuntimeLocalSkillImportRequest type marks CreatorID / RunStartedAt as
// `json:"-"` so those fields survive HTTP responses without leaking state.
// For Redis persistence we need those fields, so we wrap in an internal
// envelope that re-promotes them.
type redisImportEnvelope struct {
Public *RuntimeLocalSkillImportRequest `json:"r"`
CreatorID string `json:"c"`
RunStartedAt *time.Time `json:"s"`
}
func (s *RedisLocalSkillImportStore) marshalImport(req *RuntimeLocalSkillImportRequest) ([]byte, error) {
env := redisImportEnvelope{
Public: req,
CreatorID: req.CreatorID,
RunStartedAt: req.RunStartedAt,
}
data, err := json.Marshal(env)
if err != nil {
return nil, fmt.Errorf("marshal import request: %w", err)
}
return data, nil
}
func (s *RedisLocalSkillImportStore) unmarshalImport(raw []byte) (*RuntimeLocalSkillImportRequest, error) {
var env redisImportEnvelope
if err := json.Unmarshal(raw, &env); err != nil {
return nil, fmt.Errorf("decode import request: %w", err)
}
if env.Public == nil {
return nil, fmt.Errorf("decode import request: missing payload")
}
env.Public.CreatorID = env.CreatorID
env.Public.RunStartedAt = env.RunStartedAt
return env.Public, nil
}
func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
pendingKey := localSkillImportPendingKey(runtimeID)
for attempt := 0; attempt < localSkillRedisPopMaxRetries; attempt++ {
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
if err != nil {
return nil, fmt.Errorf("zrange pending: %w", err)
}
if len(ids) == 0 {
return nil, nil
}
id := ids[0]
req, err := s.loadImportRequest(ctx, id)
if err != nil {
return nil, err
}
if req == nil {
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
if req.Status != RuntimeLocalSkillPending {
s.rdb.ZRem(ctx, pendingKey, id)
continue
}
now := time.Now()
req.Status = RuntimeLocalSkillRunning
req.RunStartedAt = &now
req.UpdatedAt = now
data, err := s.marshalImport(req)
if err != nil {
return nil, err
}
result, err := claimPendingScript.Run(
ctx, s.rdb,
[]string{pendingKey, localSkillImportKey(id)},
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
).Int64()
if err != nil {
return nil, fmt.Errorf("claim pending: %w", err)
}
if result == 0 {
continue
}
return req, nil
}
return nil, nil
}
func (s *RedisLocalSkillImportStore) Complete(ctx context.Context, id string, skill SkillResponse) error {
req, err := s.loadImportRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = RuntimeLocalSkillCompleted
req.Skill = &skill
req.UpdatedAt = time.Now()
return s.persistImportRequest(ctx, req)
}
func (s *RedisLocalSkillImportStore) Fail(ctx context.Context, id string, errMsg string) error {
req, err := s.loadImportRequest(ctx, id)
if err != nil {
return err
}
if req == nil {
return nil
}
req.Status = RuntimeLocalSkillFailed
req.Error = errMsg
req.UpdatedAt = time.Now()
return s.persistImportRequest(ctx, req)
}

View File

@@ -0,0 +1,366 @@
package handler
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/redis/go-redis/v9"
)
// newRedisTestClient connects to the Redis instance indicated by REDIS_TEST_URL
// and flushes it so each test starts from a clean slate. The helper skips the
// calling test if the env var is unset — matches the DATABASE_URL gating in
// the rest of the suite so `go test ./...` still works on a stock laptop
// without a running Redis.
func newRedisTestClient(t *testing.T) *redis.Client {
t.Helper()
url := os.Getenv("REDIS_TEST_URL")
if url == "" {
t.Skip("REDIS_TEST_URL not set")
}
opts, err := redis.ParseURL(url)
if err != nil {
t.Fatalf("parse REDIS_TEST_URL: %v", err)
}
rdb := redis.NewClient(opts)
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
}
if err := rdb.FlushDB(ctx).Err(); err != nil {
t.Fatalf("flushdb: %v", err)
}
t.Cleanup(func() {
rdb.FlushDB(context.Background())
rdb.Close()
})
return rdb
}
func TestRedisLocalSkillListStore_CreateGetComplete(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillListStore(rdb)
req, err := store.Create(ctx, "runtime-1")
if err != nil {
t.Fatalf("create: %v", err)
}
if req.Status != RuntimeLocalSkillPending {
t.Fatalf("initial status = %s", req.Status)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil || got.ID != req.ID {
t.Fatalf("round trip lost id: got=%v", got)
}
skills := []RuntimeLocalSkillSummary{
{
Key: "review-helper",
Name: "Review Helper",
Description: "Review PRs",
SourcePath: "~/.claude/skills/review-helper",
Provider: "claude",
FileCount: 2,
},
}
if err := store.Complete(ctx, req.ID, skills, true); err != nil {
t.Fatalf("complete: %v", err)
}
got, err = store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get after complete: %v", err)
}
if got.Status != RuntimeLocalSkillCompleted {
t.Fatalf("status after complete = %s", got.Status)
}
if len(got.Skills) != 1 || got.Skills[0].Key != "review-helper" {
t.Fatalf("skills not persisted: %+v", got.Skills)
}
}
// TestRedisLocalSkillListStore_PopPendingAcrossInstances is the regression
// test for the exact bug this change fixes: two distinct *store* instances
// (i.e. two API nodes) share one Redis, one creates a pending request, the
// other PopPending-s it. Before the Redis-backed store this returned nil and
// the request timed out.
func TestRedisLocalSkillListStore_PopPendingAcrossInstances(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
nodeA := NewRedisLocalSkillListStore(rdb)
nodeB := NewRedisLocalSkillListStore(rdb)
req, err := nodeA.Create(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node A create: %v", err)
}
popped, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B pop: %v", err)
}
if popped == nil {
t.Fatal("node B did not see node A's pending request")
}
if popped.ID != req.ID {
t.Fatalf("popped id = %s, want %s", popped.ID, req.ID)
}
if popped.Status != RuntimeLocalSkillRunning {
t.Fatalf("popped status = %s, want running", popped.Status)
}
if popped.RunStartedAt == nil {
t.Fatal("run_started_at not set after pop")
}
// A third pop must see nothing (claim was atomic).
again, err := nodeB.PopPending(ctx, "runtime-cross")
if err != nil {
t.Fatalf("node B second pop: %v", err)
}
if again != nil {
t.Fatalf("expected no more pending, got %+v", again)
}
}
// TestRedisLocalSkillListStore_PopPendingConcurrent asserts the ZREM-wins race
// guard: N concurrent PopPending calls against a single pending request
// return exactly one winner.
func TestRedisLocalSkillListStore_PopPendingConcurrent(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillListStore(rdb)
req, err := store.Create(ctx, "runtime-race")
if err != nil {
t.Fatalf("create: %v", err)
}
const N = 8
var wg sync.WaitGroup
results := make(chan *RuntimeLocalSkillListRequest, N)
errs := make(chan error, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
popped, err := store.PopPending(ctx, "runtime-race")
if err != nil {
errs <- err
return
}
results <- popped
}()
}
wg.Wait()
close(results)
close(errs)
for err := range errs {
t.Fatalf("concurrent pop error: %v", err)
}
winners := 0
for popped := range results {
if popped != nil {
winners++
if popped.ID != req.ID {
t.Fatalf("winner popped wrong id: %s", popped.ID)
}
}
}
if winners != 1 {
t.Fatalf("expected exactly one winner, got %d", winners)
}
}
func TestRedisLocalSkillListStore_PendingTimeout(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillListStore(rdb)
req, err := store.Create(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("create: %v", err)
}
// Rewind CreatedAt so the pending threshold is blown — simulates 31s of
// daemon silence without actually blocking the test that long.
req.CreatedAt = time.Now().Add(-runtimeLocalSkillPendingTimeout - time.Second)
if err := store.persistListRequest(ctx, req); err != nil {
t.Fatalf("persist rewound: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Status != RuntimeLocalSkillTimeout {
t.Fatalf("status = %s, want timeout", got.Status)
}
// A subsequent PopPending must NOT return a timed-out request.
popped, err := store.PopPending(ctx, "runtime-timeout")
if err != nil {
t.Fatalf("pop after timeout: %v", err)
}
if popped != nil {
t.Fatalf("expected no pending after timeout, got %+v", popped)
}
}
func TestRedisLocalSkillImportStore_PreservesCreatorID(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillImportStore(rdb)
name := "Review Helper"
desc := "Desc"
req, err := store.Create(ctx, "runtime-1", "user-42", "review-helper", &name, &desc)
if err != nil {
t.Fatalf("create: %v", err)
}
if req.CreatorID != "user-42" {
t.Fatalf("creator id lost on create")
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
// CreatorID is `json:"-"` on the public struct — verify the Redis envelope
// restores it, otherwise ReportLocalSkillImportResult can't attribute the
// created Skill to anyone.
if got.CreatorID != "user-42" {
t.Fatalf("creator id lost round trip: %q", got.CreatorID)
}
if got.Name == nil || *got.Name != name {
t.Fatalf("name lost: %v", got.Name)
}
if got.Description == nil || *got.Description != desc {
t.Fatalf("description lost: %v", got.Description)
}
}
func TestRedisLocalSkillImportStore_PopPendingAcrossInstances(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
nodeA := NewRedisLocalSkillImportStore(rdb)
nodeB := NewRedisLocalSkillImportStore(rdb)
req, err := nodeA.Create(ctx, "runtime-import", "user-1", "review-helper", nil, nil)
if err != nil {
t.Fatalf("create: %v", err)
}
popped, err := nodeB.PopPending(ctx, "runtime-import")
if err != nil {
t.Fatalf("pop: %v", err)
}
if popped == nil || popped.ID != req.ID {
t.Fatalf("cross-node pop failed: got %+v", popped)
}
if popped.Status != RuntimeLocalSkillRunning {
t.Fatalf("popped status = %s", popped.Status)
}
if popped.SkillKey != "review-helper" {
t.Fatalf("skill_key lost: %q", popped.SkillKey)
}
}
// Smoke test: make sure the runtime-local-skill store keys don't collide
// across runtimes — PopPending for runtime A must not see B's pending.
func TestRedisLocalSkillListStore_PerRuntimeIsolation(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillListStore(rdb)
if _, err := store.Create(ctx, "runtime-A"); err != nil {
t.Fatalf("create A: %v", err)
}
reqB, err := store.Create(ctx, "runtime-B")
if err != nil {
t.Fatalf("create B: %v", err)
}
popped, err := store.PopPending(ctx, "runtime-B")
if err != nil {
t.Fatalf("pop B: %v", err)
}
if popped == nil || popped.ID != reqB.ID {
t.Fatalf("pop returned wrong request: %+v", popped)
}
// A's request is still pending.
ids, err := rdb.ZRange(ctx, localSkillListPendingKey("runtime-A"), 0, -1).Result()
if err != nil {
t.Fatalf("zrange A: %v", err)
}
if len(ids) != 1 {
t.Fatalf("expected 1 pending for A after pop(B), got %d: %v", len(ids), ids)
}
}
// TestRedisLocalSkillListStore_PopPendingAtomicClaim pins the PR-1557 review
// fix: the claim (ZREM pending + persist running record) MUST land as one
// atomic unit. If the old two-step ordering came back ("ZRem first, SET
// second") a transient error between the two would strand the request — not
// in pending, still serialised as "pending" on disk, never re-dispatched.
//
// We verify the happy-path invariant end-to-end: after one PopPending the
// record is in "running" state AND a second PopPending on the same runtime
// returns nothing (i.e. the pending zset no longer references the id).
func TestRedisLocalSkillListStore_PopPendingAtomicClaim(t *testing.T) {
rdb := newRedisTestClient(t)
ctx := context.Background()
store := NewRedisLocalSkillListStore(rdb)
req, err := store.Create(ctx, "runtime-atomic")
if err != nil {
t.Fatalf("create: %v", err)
}
popped, err := store.PopPending(ctx, "runtime-atomic")
if err != nil {
t.Fatalf("pop: %v", err)
}
if popped == nil || popped.ID != req.ID {
t.Fatalf("pop returned wrong request: %+v", popped)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get after pop: %v", err)
}
if got.Status != RuntimeLocalSkillRunning {
t.Fatalf("record status = %s, want running", got.Status)
}
// The pending queue must no longer reference the claimed id — exposed
// via PopPending rather than poking the zset directly.
again, err := store.PopPending(ctx, "runtime-atomic")
if err != nil {
t.Fatalf("second pop: %v", err)
}
if again != nil {
t.Fatalf("second pop should be empty, got %+v", again)
}
}
// Compile-time assertions: the Redis stores MUST satisfy the interfaces so
// NewRouter's assignment stays type-safe.
var (
_ LocalSkillListStore = (*RedisLocalSkillListStore)(nil)
_ LocalSkillImportStore = (*RedisLocalSkillImportStore)(nil)
_ LocalSkillListStore = (*InMemoryLocalSkillListStore)(nil)
_ LocalSkillImportStore = (*InMemoryLocalSkillImportStore)(nil)
)

View File

@@ -106,9 +106,13 @@ func countSkillFiles(t *testing.T, skillID string) int {
return count
}
func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
store := NewRuntimeLocalSkillListStore()
req := store.Create("runtime-xyz")
func TestInMemoryLocalSkillListStore_PreservesSummaries(t *testing.T) {
ctx := context.Background()
store := NewInMemoryLocalSkillListStore()
req, err := store.Create(ctx, "runtime-xyz")
if err != nil {
t.Fatalf("create: %v", err)
}
body := map[string]any{
"status": "completed",
@@ -133,8 +137,13 @@ func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
t.Fatalf("unmarshal report body: %v", err)
}
store.Complete(req.ID, parsed.Skills, true)
got := store.Get(req.ID)
if err := store.Complete(ctx, req.ID, parsed.Skills, true); err != nil {
t.Fatalf("complete: %v", err)
}
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected stored result")
}
@@ -149,14 +158,21 @@ func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
}
}
func TestRuntimeLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
store := NewRuntimeLocalSkillListStore()
req := store.Create("runtime-xyz")
func TestInMemoryLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
ctx := context.Background()
store := NewInMemoryLocalSkillListStore()
req, err := store.Create(ctx, "runtime-xyz")
if err != nil {
t.Fatalf("create: %v", err)
}
req.Status = RuntimeLocalSkillRunning
startedAt := time.Now().Add(-61 * time.Second)
req.RunStartedAt = &startedAt
got := store.Get(req.ID)
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected stored request")
}
@@ -168,14 +184,21 @@ func TestRuntimeLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
}
}
func TestRuntimeLocalSkillImportStore_TimesOutRunningRequests(t *testing.T) {
store := NewRuntimeLocalSkillImportStore()
req := store.Create("runtime-xyz", "user-1", "review-helper", nil, nil)
func TestInMemoryLocalSkillImportStore_TimesOutRunningRequests(t *testing.T) {
ctx := context.Background()
store := NewInMemoryLocalSkillImportStore()
req, err := store.Create(ctx, "runtime-xyz", "user-1", "review-helper", nil, nil)
if err != nil {
t.Fatalf("create: %v", err)
}
req.Status = RuntimeLocalSkillRunning
startedAt := time.Now().Add(-61 * time.Second)
req.RunStartedAt = &startedAt
got := store.Get(req.ID)
got, err := store.Get(ctx, req.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got == nil {
t.Fatal("expected stored request")
}
@@ -214,7 +237,10 @@ func TestGetLocalSkillImportRequest_RequiresRuntimeOwner(t *testing.T) {
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
adminUserID := createRuntimeLocalSkillTestMember(t, "admin")
importReq := testHandler.LocalSkillImportStore.Create(runtimeID, testUserID, "review-helper", nil, nil)
importReq, err := testHandler.LocalSkillImportStore.Create(context.Background(), runtimeID, testUserID, "review-helper", nil, nil)
if err != nil {
t.Fatalf("create import request: %v", err)
}
w := httptest.NewRecorder()
req := withURLParams(
@@ -349,18 +375,26 @@ func TestReportLocalSkillImportResult_IgnoresTimedOutRequests(t *testing.T) {
}
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
importReq := testHandler.LocalSkillImportStore.Create(
ctx := context.Background()
importReq, err := testHandler.LocalSkillImportStore.Create(
ctx,
runtimeID,
testUserID,
"review-helper",
cleanOptionalString(ptr("Timed Out Import")),
cleanOptionalString(ptr("Should not be created")),
)
if err != nil {
t.Fatalf("create import request: %v", err)
}
importReq.Status = RuntimeLocalSkillRunning
startedAt := time.Now().Add(-61 * time.Second)
importReq.RunStartedAt = &startedAt
timedOut := testHandler.LocalSkillImportStore.Get(importReq.ID)
timedOut, err := testHandler.LocalSkillImportStore.Get(ctx, importReq.ID)
if err != nil {
t.Fatalf("get import request: %v", err)
}
if timedOut == nil || timedOut.Status != RuntimeLocalSkillTimeout {
t.Fatalf("expected timed out request, got %#v", timedOut)
}
@@ -399,7 +433,10 @@ func TestReportLocalSkillImportResult_RejectsCrossWorkspaceDaemonToken(t *testin
}
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
importReq := testHandler.LocalSkillImportStore.Create(runtimeID, testUserID, "review-helper", nil, nil)
importReq, err := testHandler.LocalSkillImportStore.Create(context.Background(), runtimeID, testUserID, "review-helper", nil, nil)
if err != nil {
t.Fatalf("create import request: %v", err)
}
w := httptest.NewRecorder()
reportReq := withURLParams(