diff --git a/server/internal/handler/runtime_local_skills.go b/server/internal/handler/runtime_local_skills.go index 86e4577a8..1928f2b2d 100644 --- a/server/internal/handler/runtime_local_skills.go +++ b/server/internal/handler/runtime_local_skills.go @@ -524,11 +524,19 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ supported = *body.Supported } if err := h.LocalSkillListStore.Complete(r.Context(), requestID, body.Skills, supported); err != nil { - slog.Warn("local skills Complete failed", "error", err, "request_id", requestID) + // Surface the store failure as 5xx so the daemon can retry instead + // of swallowing the report (leaves the request stuck in running + // until the server-side timeout, which is exactly the "looks OK but + // nothing happens" class of bug we're trying to avoid). + slog.Error("local skills Complete failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist completion") + return } } else { if err := h.LocalSkillListStore.Fail(r.Context(), requestID, body.Error); err != nil { - slog.Warn("local skills Fail failed", "error", err, "request_id", requestID) + slog.Error("local skills Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } } @@ -571,14 +579,18 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re if body.Status != "completed" { if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, body.Error); err != nil { - slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if body.Skill == nil { if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, "daemon returned an empty skill bundle"); err != nil { - slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return @@ -623,14 +635,27 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re failMsg = "a skill with this name already exists" } if ferr := h.LocalSkillImportStore.Fail(r.Context(), requestID, failMsg); ferr != nil { - slog.Warn("local skill import Fail failed", "error", ferr, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", ferr, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if err := h.LocalSkillImportStore.Complete(r.Context(), requestID, resp.SkillResponse); err != nil { - slog.Warn("local skill import Complete failed", "error", err, "request_id", requestID) + // We already wrote the Skill to Postgres. If the store-side Complete + // fails we can't leave that Skill orphaned: the daemon will retry on + // 5xx and re-create it, which blows up on the unique-name constraint + // and looks to the user like "import keeps failing". Roll back our + // side-effects so the retry lands on a clean slate. + slog.Error("local skill import Complete failed — rolling back created skill", + "error", err, "request_id", requestID, "skill_id", resp.ID) + if delErr := h.Queries.DeleteSkill(r.Context(), parseUUID(resp.ID)); delErr != nil { + slog.Warn("orphan skill rollback failed", "error", delErr, "skill_id", resp.ID) + } + writeError(w, http.StatusInternalServerError, "failed to persist import completion") + return } h.publish(protocol.EventSkillCreated, uuidToString(rt.WorkspaceID), "member", req.CreatorID, map[string]any{"skill": resp}) slog.Debug("runtime local skill imported", "runtime_id", runtimeID, "request_id", requestID, "skill_id", resp.ID) diff --git a/server/internal/handler/runtime_local_skills_redis_store.go b/server/internal/handler/runtime_local_skills_redis_store.go index 05b80dab4..c9f2fb497 100644 --- a/server/internal/handler/runtime_local_skills_redis_store.go +++ b/server/internal/handler/runtime_local_skills_redis_store.go @@ -20,23 +20,43 @@ import ( // // PopPending is the critical multi-node primitive. It MUST atomically: // 1. pick the oldest pending request id for this runtime -// 2. read and update its record to "running" +// 2. claim it (remove from the pending zset) AND transition its record to +// "running" in a single step — otherwise a crash / transient Redis error +// between the two writes strands the request (no longer pending, record +// still says pending; no node will ever re-dispatch it). // -// Redis WATCH/MULTI/EXEC gives us that without a Lua script: the transaction -// aborts if another node modifies either key between WATCH and EXEC, so we -// retry. In practice two nodes almost never race for the same runtime (each -// runtime only has one daemon heartbeating), and the few ZPOPMIN fallbacks -// below catch the rest. +// Doing this as two round-trips is racy; we use a Lua script so Redis runs +// ZREM + SET atomically server-side. If ZREM returns 0 (another node already +// claimed it), the SET is skipped. This is the fix for the PR-1557 review +// finding about the "request disappears under Redis hiccups" path. const ( // Namespaced so we don't collide with the realtime relay's ws:* keys. - localSkillListKeyPrefix = "mul:local_skill:list:" - localSkillListPendingPrefix = "mul:local_skill:list:pending:" - localSkillImportKeyPrefix = "mul:local_skill:import:" - localSkillImportPendingPrefix = "mul:local_skill:import:pending:" - localSkillRedisPopMaxRetries = 5 + localSkillListKeyPrefix = "mul:local_skill:list:" + localSkillListPendingPrefix = "mul:local_skill:list:pending:" + localSkillImportKeyPrefix = "mul:local_skill:import:" + localSkillImportPendingPrefix = "mul:local_skill:import:pending:" + localSkillRedisPopMaxRetries = 5 ) +// claimPendingScript atomically claims a pending request: +// KEYS[1] = pending zset ARGV[1] = request id to claim +// KEYS[2] = record key ARGV[2] = new record JSON (status=running) +// ARGV[3] = record TTL in seconds +// +// Returns 1 when this caller won the claim (zset entry removed, record +// updated), 0 when the entry was already gone (another node won). +// Either the ZREM and the SET both happen or neither does — Redis executes +// a Lua script as a single atomic unit. +var claimPendingScript = redis.NewScript(` +local removed = redis.call('ZREM', KEYS[1], ARGV[1]) +if removed == 0 then + return 0 +end +redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3])) +return 1 +`) + func localSkillListKey(id string) string { return localSkillListKeyPrefix + id } func localSkillListPendingKey(runtimeID string) string { return localSkillListPendingPrefix + runtimeID @@ -160,25 +180,28 @@ func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID str continue } - // Race guard: remove the id from the pending zset atomically-ish. Only - // the node whose ZRem returns 1 "wins" the claim; losers retry. Note - // we deliberately ZRem BEFORE overwriting the record so two winners - // never see Status=Pending in the persisted JSON at the same time. - removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result() - if err != nil { - return nil, fmt.Errorf("zrem pending: %w", err) - } - if removed == 0 { - // Someone else popped it first. - continue - } - now := time.Now() req.Status = RuntimeLocalSkillRunning req.RunStartedAt = &now req.UpdatedAt = now - if err := s.persistListRequest(ctx, req); err != nil { - return nil, err + data, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal list request: %w", err) + } + + result, err := claimPendingScript.Run( + ctx, s.rdb, + []string{pendingKey, localSkillListKey(id)}, + id, data, int(runtimeLocalSkillStoreRetention.Seconds()), + ).Int64() + if err != nil { + return nil, fmt.Errorf("claim pending: %w", err) + } + if result == 0 { + // Another node won the race. The record still says pending and is + // owned by the winner; we just retry to pick up whatever else is + // queued (or nothing). + continue } return req, nil } @@ -355,21 +378,26 @@ func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID s continue } - removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result() - if err != nil { - return nil, fmt.Errorf("zrem pending: %w", err) - } - if removed == 0 { - continue - } - now := time.Now() req.Status = RuntimeLocalSkillRunning req.RunStartedAt = &now req.UpdatedAt = now - if err := s.persistImportRequest(ctx, req); err != nil { + data, err := s.marshalImport(req) + if err != nil { return nil, err } + + result, err := claimPendingScript.Run( + ctx, s.rdb, + []string{pendingKey, localSkillImportKey(id)}, + id, data, int(runtimeLocalSkillStoreRetention.Seconds()), + ).Int64() + if err != nil { + return nil, fmt.Errorf("claim pending: %w", err) + } + if result == 0 { + continue + } return req, nil } return nil, nil diff --git a/server/internal/handler/runtime_local_skills_redis_store_test.go b/server/internal/handler/runtime_local_skills_redis_store_test.go index 91b94ed8f..a7169ae1e 100644 --- a/server/internal/handler/runtime_local_skills_redis_store_test.go +++ b/server/internal/handler/runtime_local_skills_redis_store_test.go @@ -310,6 +310,52 @@ func TestRedisLocalSkillListStore_PerRuntimeIsolation(t *testing.T) { } } +// TestRedisLocalSkillListStore_PopPendingAtomicClaim pins the PR-1557 review +// fix: the claim (ZREM pending + persist running record) MUST land as one +// atomic unit. If the old two-step ordering came back ("ZRem first, SET +// second") a transient error between the two would strand the request — not +// in pending, still serialised as "pending" on disk, never re-dispatched. +// +// We verify the happy-path invariant end-to-end: after one PopPending the +// record is in "running" state AND a second PopPending on the same runtime +// returns nothing (i.e. the pending zset no longer references the id). +func TestRedisLocalSkillListStore_PopPendingAtomicClaim(t *testing.T) { + rdb := newRedisTestClient(t) + ctx := context.Background() + store := NewRedisLocalSkillListStore(rdb) + + req, err := store.Create(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("create: %v", err) + } + + popped, err := store.PopPending(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("pop: %v", err) + } + if popped == nil || popped.ID != req.ID { + t.Fatalf("pop returned wrong request: %+v", popped) + } + + got, err := store.Get(ctx, req.ID) + if err != nil { + t.Fatalf("get after pop: %v", err) + } + if got.Status != RuntimeLocalSkillRunning { + t.Fatalf("record status = %s, want running", got.Status) + } + + // The pending queue must no longer reference the claimed id — exposed + // via PopPending rather than poking the zset directly. + again, err := store.PopPending(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("second pop: %v", err) + } + if again != nil { + t.Fatalf("second pop should be empty, got %+v", again) + } +} + // Compile-time assertions: the Redis stores MUST satisfy the interfaces so // NewRouter's assignment stays type-safe. var (