From 3466bbc1961749d2d04a4bdbbf73e4ea8aab2a93 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Thu, 23 Apr 2026 16:57:55 +0800 Subject: [PATCH] fix(skills): atomic Redis claim + surface store write failures (PR #1557 review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two real gaps GPT-Boy flagged: 1. RedisLocalSkill{List,Import}Store.PopPending was doing ZREM then SET as two separate round-trips. If the SET failed for any reason — transient Redis error, context cancellation, pod getting SIGKILL'd mid-call — the request was already gone from the pending zset but the stored record still said "pending", and no subsequent PopPending would re-dispatch it. Exactly the "request disappears" class of bug this PR is supposed to kill. Fix: push the claim into a Lua script so Redis runs ZREM + SET as one atomic unit. If ZREM returns 0 (another node won the race), SET is skipped and the caller retries. 2. ReportLocalSkill{List,Import}Result handlers were logging Complete/Fail store failures at Warn and still returning 200 OK. That made the daemon think the report landed when it hadn't, leaving the request stuck in "running" until the server-side timeout and — worse for the import flow — leaving the just-created Skill row orphaned in Postgres so every retry collided with the unique-name constraint. Fix: escalate to Error + return 500 so the daemon (and monitoring) can see the write failed. For the import flow, Complete failure after the Skill row is already committed also triggers a best-effort DeleteSkill so a daemon retry lands on a clean slate instead of hitting "a skill with this name already exists" forever. Tests - New TestRedisLocalSkillListStore_PopPendingAtomicClaim asserts the happy-path invariant: after one PopPending the record is "running" AND a second PopPending returns nothing. Deliberately does NOT poke Redis internals directly so the test survives any future key-layout refactor. - Existing cross-instance / concurrent / timeout / per-runtime tests continue to pass against the Lua-based claim path (verified locally against a scratch redis-server; 8/8 Redis tests green). --- .../internal/handler/runtime_local_skills.go | 37 +++++-- .../runtime_local_skills_redis_store.go | 98 ++++++++++++------- .../runtime_local_skills_redis_store_test.go | 46 +++++++++ 3 files changed, 140 insertions(+), 41 deletions(-) diff --git a/server/internal/handler/runtime_local_skills.go b/server/internal/handler/runtime_local_skills.go index 86e4577a8..1928f2b2d 100644 --- a/server/internal/handler/runtime_local_skills.go +++ b/server/internal/handler/runtime_local_skills.go @@ -524,11 +524,19 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ supported = *body.Supported } if err := h.LocalSkillListStore.Complete(r.Context(), requestID, body.Skills, supported); err != nil { - slog.Warn("local skills Complete failed", "error", err, "request_id", requestID) + // Surface the store failure as 5xx so the daemon can retry instead + // of swallowing the report (leaves the request stuck in running + // until the server-side timeout, which is exactly the "looks OK but + // nothing happens" class of bug we're trying to avoid). + slog.Error("local skills Complete failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist completion") + return } } else { if err := h.LocalSkillListStore.Fail(r.Context(), requestID, body.Error); err != nil { - slog.Warn("local skills Fail failed", "error", err, "request_id", requestID) + slog.Error("local skills Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } } @@ -571,14 +579,18 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re if body.Status != "completed" { if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, body.Error); err != nil { - slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if body.Skill == nil { if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, "daemon returned an empty skill bundle"); err != nil { - slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", err, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return @@ -623,14 +635,27 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re failMsg = "a skill with this name already exists" } if ferr := h.LocalSkillImportStore.Fail(r.Context(), requestID, failMsg); ferr != nil { - slog.Warn("local skill import Fail failed", "error", ferr, "request_id", requestID) + slog.Error("local skill import Fail failed", "error", ferr, "request_id", requestID) + writeError(w, http.StatusInternalServerError, "failed to persist failure") + return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if err := h.LocalSkillImportStore.Complete(r.Context(), requestID, resp.SkillResponse); err != nil { - slog.Warn("local skill import Complete failed", "error", err, "request_id", requestID) + // We already wrote the Skill to Postgres. If the store-side Complete + // fails we can't leave that Skill orphaned: the daemon will retry on + // 5xx and re-create it, which blows up on the unique-name constraint + // and looks to the user like "import keeps failing". Roll back our + // side-effects so the retry lands on a clean slate. + slog.Error("local skill import Complete failed — rolling back created skill", + "error", err, "request_id", requestID, "skill_id", resp.ID) + if delErr := h.Queries.DeleteSkill(r.Context(), parseUUID(resp.ID)); delErr != nil { + slog.Warn("orphan skill rollback failed", "error", delErr, "skill_id", resp.ID) + } + writeError(w, http.StatusInternalServerError, "failed to persist import completion") + return } h.publish(protocol.EventSkillCreated, uuidToString(rt.WorkspaceID), "member", req.CreatorID, map[string]any{"skill": resp}) slog.Debug("runtime local skill imported", "runtime_id", runtimeID, "request_id", requestID, "skill_id", resp.ID) diff --git a/server/internal/handler/runtime_local_skills_redis_store.go b/server/internal/handler/runtime_local_skills_redis_store.go index 05b80dab4..c9f2fb497 100644 --- a/server/internal/handler/runtime_local_skills_redis_store.go +++ b/server/internal/handler/runtime_local_skills_redis_store.go @@ -20,23 +20,43 @@ import ( // // PopPending is the critical multi-node primitive. It MUST atomically: // 1. pick the oldest pending request id for this runtime -// 2. read and update its record to "running" +// 2. claim it (remove from the pending zset) AND transition its record to +// "running" in a single step — otherwise a crash / transient Redis error +// between the two writes strands the request (no longer pending, record +// still says pending; no node will ever re-dispatch it). // -// Redis WATCH/MULTI/EXEC gives us that without a Lua script: the transaction -// aborts if another node modifies either key between WATCH and EXEC, so we -// retry. In practice two nodes almost never race for the same runtime (each -// runtime only has one daemon heartbeating), and the few ZPOPMIN fallbacks -// below catch the rest. +// Doing this as two round-trips is racy; we use a Lua script so Redis runs +// ZREM + SET atomically server-side. If ZREM returns 0 (another node already +// claimed it), the SET is skipped. This is the fix for the PR-1557 review +// finding about the "request disappears under Redis hiccups" path. const ( // Namespaced so we don't collide with the realtime relay's ws:* keys. - localSkillListKeyPrefix = "mul:local_skill:list:" - localSkillListPendingPrefix = "mul:local_skill:list:pending:" - localSkillImportKeyPrefix = "mul:local_skill:import:" - localSkillImportPendingPrefix = "mul:local_skill:import:pending:" - localSkillRedisPopMaxRetries = 5 + localSkillListKeyPrefix = "mul:local_skill:list:" + localSkillListPendingPrefix = "mul:local_skill:list:pending:" + localSkillImportKeyPrefix = "mul:local_skill:import:" + localSkillImportPendingPrefix = "mul:local_skill:import:pending:" + localSkillRedisPopMaxRetries = 5 ) +// claimPendingScript atomically claims a pending request: +// KEYS[1] = pending zset ARGV[1] = request id to claim +// KEYS[2] = record key ARGV[2] = new record JSON (status=running) +// ARGV[3] = record TTL in seconds +// +// Returns 1 when this caller won the claim (zset entry removed, record +// updated), 0 when the entry was already gone (another node won). +// Either the ZREM and the SET both happen or neither does — Redis executes +// a Lua script as a single atomic unit. +var claimPendingScript = redis.NewScript(` +local removed = redis.call('ZREM', KEYS[1], ARGV[1]) +if removed == 0 then + return 0 +end +redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3])) +return 1 +`) + func localSkillListKey(id string) string { return localSkillListKeyPrefix + id } func localSkillListPendingKey(runtimeID string) string { return localSkillListPendingPrefix + runtimeID @@ -160,25 +180,28 @@ func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID str continue } - // Race guard: remove the id from the pending zset atomically-ish. Only - // the node whose ZRem returns 1 "wins" the claim; losers retry. Note - // we deliberately ZRem BEFORE overwriting the record so two winners - // never see Status=Pending in the persisted JSON at the same time. - removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result() - if err != nil { - return nil, fmt.Errorf("zrem pending: %w", err) - } - if removed == 0 { - // Someone else popped it first. - continue - } - now := time.Now() req.Status = RuntimeLocalSkillRunning req.RunStartedAt = &now req.UpdatedAt = now - if err := s.persistListRequest(ctx, req); err != nil { - return nil, err + data, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal list request: %w", err) + } + + result, err := claimPendingScript.Run( + ctx, s.rdb, + []string{pendingKey, localSkillListKey(id)}, + id, data, int(runtimeLocalSkillStoreRetention.Seconds()), + ).Int64() + if err != nil { + return nil, fmt.Errorf("claim pending: %w", err) + } + if result == 0 { + // Another node won the race. The record still says pending and is + // owned by the winner; we just retry to pick up whatever else is + // queued (or nothing). + continue } return req, nil } @@ -355,21 +378,26 @@ func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID s continue } - removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result() - if err != nil { - return nil, fmt.Errorf("zrem pending: %w", err) - } - if removed == 0 { - continue - } - now := time.Now() req.Status = RuntimeLocalSkillRunning req.RunStartedAt = &now req.UpdatedAt = now - if err := s.persistImportRequest(ctx, req); err != nil { + data, err := s.marshalImport(req) + if err != nil { return nil, err } + + result, err := claimPendingScript.Run( + ctx, s.rdb, + []string{pendingKey, localSkillImportKey(id)}, + id, data, int(runtimeLocalSkillStoreRetention.Seconds()), + ).Int64() + if err != nil { + return nil, fmt.Errorf("claim pending: %w", err) + } + if result == 0 { + continue + } return req, nil } return nil, nil diff --git a/server/internal/handler/runtime_local_skills_redis_store_test.go b/server/internal/handler/runtime_local_skills_redis_store_test.go index 91b94ed8f..a7169ae1e 100644 --- a/server/internal/handler/runtime_local_skills_redis_store_test.go +++ b/server/internal/handler/runtime_local_skills_redis_store_test.go @@ -310,6 +310,52 @@ func TestRedisLocalSkillListStore_PerRuntimeIsolation(t *testing.T) { } } +// TestRedisLocalSkillListStore_PopPendingAtomicClaim pins the PR-1557 review +// fix: the claim (ZREM pending + persist running record) MUST land as one +// atomic unit. If the old two-step ordering came back ("ZRem first, SET +// second") a transient error between the two would strand the request — not +// in pending, still serialised as "pending" on disk, never re-dispatched. +// +// We verify the happy-path invariant end-to-end: after one PopPending the +// record is in "running" state AND a second PopPending on the same runtime +// returns nothing (i.e. the pending zset no longer references the id). +func TestRedisLocalSkillListStore_PopPendingAtomicClaim(t *testing.T) { + rdb := newRedisTestClient(t) + ctx := context.Background() + store := NewRedisLocalSkillListStore(rdb) + + req, err := store.Create(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("create: %v", err) + } + + popped, err := store.PopPending(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("pop: %v", err) + } + if popped == nil || popped.ID != req.ID { + t.Fatalf("pop returned wrong request: %+v", popped) + } + + got, err := store.Get(ctx, req.ID) + if err != nil { + t.Fatalf("get after pop: %v", err) + } + if got.Status != RuntimeLocalSkillRunning { + t.Fatalf("record status = %s, want running", got.Status) + } + + // The pending queue must no longer reference the claimed id — exposed + // via PopPending rather than poking the zset directly. + again, err := store.PopPending(ctx, "runtime-atomic") + if err != nil { + t.Fatalf("second pop: %v", err) + } + if again != nil { + t.Fatalf("second pop should be empty, got %+v", again) + } +} + // Compile-time assertions: the Redis stores MUST satisfy the interfaces so // NewRouter's assignment stays type-safe. var (