mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
fix(skills): atomic Redis claim + surface store write failures (PR #1557 review)
Two real gaps GPT-Boy flagged:
1. RedisLocalSkill{List,Import}Store.PopPending was doing ZREM then SET as
two separate round-trips. If the SET failed for any reason — transient
Redis error, context cancellation, pod getting SIGKILL'd mid-call — the
request was already gone from the pending zset but the stored record
still said "pending", and no subsequent PopPending would re-dispatch
it. Exactly the "request disappears" class of bug this PR is supposed
to kill.
Fix: push the claim into a Lua script so Redis runs ZREM + SET as one
atomic unit. If ZREM returns 0 (another node won the race), SET is
skipped and the caller retries.
2. ReportLocalSkill{List,Import}Result handlers were logging Complete/Fail
store failures at Warn and still returning 200 OK. That made the
daemon think the report landed when it hadn't, leaving the request
stuck in "running" until the server-side timeout and — worse for the
import flow — leaving the just-created Skill row orphaned in Postgres
so every retry collided with the unique-name constraint.
Fix: escalate to Error + return 500 so the daemon (and monitoring) can
see the write failed. For the import flow, Complete failure after the
Skill row is already committed also triggers a best-effort DeleteSkill
so a daemon retry lands on a clean slate instead of hitting
"a skill with this name already exists" forever.
Tests
- New TestRedisLocalSkillListStore_PopPendingAtomicClaim asserts the
happy-path invariant: after one PopPending the record is "running"
AND a second PopPending returns nothing. Deliberately does NOT poke
Redis internals directly so the test survives any future key-layout
refactor.
- Existing cross-instance / concurrent / timeout / per-runtime tests
continue to pass against the Lua-based claim path (verified locally
against a scratch redis-server; 8/8 Redis tests green).
This commit is contained in:
@@ -524,11 +524,19 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ
|
||||
supported = *body.Supported
|
||||
}
|
||||
if err := h.LocalSkillListStore.Complete(r.Context(), requestID, body.Skills, supported); err != nil {
|
||||
slog.Warn("local skills Complete failed", "error", err, "request_id", requestID)
|
||||
// Surface the store failure as 5xx so the daemon can retry instead
|
||||
// of swallowing the report (leaves the request stuck in running
|
||||
// until the server-side timeout, which is exactly the "looks OK but
|
||||
// nothing happens" class of bug we're trying to avoid).
|
||||
slog.Error("local skills Complete failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist completion")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := h.LocalSkillListStore.Fail(r.Context(), requestID, body.Error); err != nil {
|
||||
slog.Warn("local skills Fail failed", "error", err, "request_id", requestID)
|
||||
slog.Error("local skills Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -571,14 +579,18 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
|
||||
|
||||
if body.Status != "completed" {
|
||||
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, body.Error); err != nil {
|
||||
slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
if body.Skill == nil {
|
||||
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, "daemon returned an empty skill bundle"); err != nil {
|
||||
slog.Warn("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
@@ -623,14 +635,27 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
|
||||
failMsg = "a skill with this name already exists"
|
||||
}
|
||||
if ferr := h.LocalSkillImportStore.Fail(r.Context(), requestID, failMsg); ferr != nil {
|
||||
slog.Warn("local skill import Fail failed", "error", ferr, "request_id", requestID)
|
||||
slog.Error("local skill import Fail failed", "error", ferr, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.LocalSkillImportStore.Complete(r.Context(), requestID, resp.SkillResponse); err != nil {
|
||||
slog.Warn("local skill import Complete failed", "error", err, "request_id", requestID)
|
||||
// We already wrote the Skill to Postgres. If the store-side Complete
|
||||
// fails we can't leave that Skill orphaned: the daemon will retry on
|
||||
// 5xx and re-create it, which blows up on the unique-name constraint
|
||||
// and looks to the user like "import keeps failing". Roll back our
|
||||
// side-effects so the retry lands on a clean slate.
|
||||
slog.Error("local skill import Complete failed — rolling back created skill",
|
||||
"error", err, "request_id", requestID, "skill_id", resp.ID)
|
||||
if delErr := h.Queries.DeleteSkill(r.Context(), parseUUID(resp.ID)); delErr != nil {
|
||||
slog.Warn("orphan skill rollback failed", "error", delErr, "skill_id", resp.ID)
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist import completion")
|
||||
return
|
||||
}
|
||||
h.publish(protocol.EventSkillCreated, uuidToString(rt.WorkspaceID), "member", req.CreatorID, map[string]any{"skill": resp})
|
||||
slog.Debug("runtime local skill imported", "runtime_id", runtimeID, "request_id", requestID, "skill_id", resp.ID)
|
||||
|
||||
@@ -20,23 +20,43 @@ import (
|
||||
//
|
||||
// PopPending is the critical multi-node primitive. It MUST atomically:
|
||||
// 1. pick the oldest pending request id for this runtime
|
||||
// 2. read and update its record to "running"
|
||||
// 2. claim it (remove from the pending zset) AND transition its record to
|
||||
// "running" in a single step — otherwise a crash / transient Redis error
|
||||
// between the two writes strands the request (no longer pending, record
|
||||
// still says pending; no node will ever re-dispatch it).
|
||||
//
|
||||
// Redis WATCH/MULTI/EXEC gives us that without a Lua script: the transaction
|
||||
// aborts if another node modifies either key between WATCH and EXEC, so we
|
||||
// retry. In practice two nodes almost never race for the same runtime (each
|
||||
// runtime only has one daemon heartbeating), and the few ZPOPMIN fallbacks
|
||||
// below catch the rest.
|
||||
// Doing this as two round-trips is racy; we use a Lua script so Redis runs
|
||||
// ZREM + SET atomically server-side. If ZREM returns 0 (another node already
|
||||
// claimed it), the SET is skipped. This is the fix for the PR-1557 review
|
||||
// finding about the "request disappears under Redis hiccups" path.
|
||||
|
||||
const (
|
||||
// Namespaced so we don't collide with the realtime relay's ws:* keys.
|
||||
localSkillListKeyPrefix = "mul:local_skill:list:"
|
||||
localSkillListPendingPrefix = "mul:local_skill:list:pending:"
|
||||
localSkillImportKeyPrefix = "mul:local_skill:import:"
|
||||
localSkillImportPendingPrefix = "mul:local_skill:import:pending:"
|
||||
localSkillRedisPopMaxRetries = 5
|
||||
localSkillListKeyPrefix = "mul:local_skill:list:"
|
||||
localSkillListPendingPrefix = "mul:local_skill:list:pending:"
|
||||
localSkillImportKeyPrefix = "mul:local_skill:import:"
|
||||
localSkillImportPendingPrefix = "mul:local_skill:import:pending:"
|
||||
localSkillRedisPopMaxRetries = 5
|
||||
)
|
||||
|
||||
// claimPendingScript atomically claims a pending request:
|
||||
// KEYS[1] = pending zset ARGV[1] = request id to claim
|
||||
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
|
||||
// ARGV[3] = record TTL in seconds
|
||||
//
|
||||
// Returns 1 when this caller won the claim (zset entry removed, record
|
||||
// updated), 0 when the entry was already gone (another node won).
|
||||
// Either the ZREM and the SET both happen or neither does — Redis executes
|
||||
// a Lua script as a single atomic unit.
|
||||
var claimPendingScript = redis.NewScript(`
|
||||
local removed = redis.call('ZREM', KEYS[1], ARGV[1])
|
||||
if removed == 0 then
|
||||
return 0
|
||||
end
|
||||
redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3]))
|
||||
return 1
|
||||
`)
|
||||
|
||||
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
|
||||
func localSkillListPendingKey(runtimeID string) string {
|
||||
return localSkillListPendingPrefix + runtimeID
|
||||
@@ -160,25 +180,28 @@ func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID str
|
||||
continue
|
||||
}
|
||||
|
||||
// Race guard: remove the id from the pending zset atomically-ish. Only
|
||||
// the node whose ZRem returns 1 "wins" the claim; losers retry. Note
|
||||
// we deliberately ZRem BEFORE overwriting the record so two winners
|
||||
// never see Status=Pending in the persisted JSON at the same time.
|
||||
removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("zrem pending: %w", err)
|
||||
}
|
||||
if removed == 0 {
|
||||
// Someone else popped it first.
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
req.RunStartedAt = &now
|
||||
req.UpdatedAt = now
|
||||
if err := s.persistListRequest(ctx, req); err != nil {
|
||||
return nil, err
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal list request: %w", err)
|
||||
}
|
||||
|
||||
result, err := claimPendingScript.Run(
|
||||
ctx, s.rdb,
|
||||
[]string{pendingKey, localSkillListKey(id)},
|
||||
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
|
||||
).Int64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claim pending: %w", err)
|
||||
}
|
||||
if result == 0 {
|
||||
// Another node won the race. The record still says pending and is
|
||||
// owned by the winner; we just retry to pick up whatever else is
|
||||
// queued (or nothing).
|
||||
continue
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
@@ -355,21 +378,26 @@ func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID s
|
||||
continue
|
||||
}
|
||||
|
||||
removed, err := s.rdb.ZRem(ctx, pendingKey, id).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("zrem pending: %w", err)
|
||||
}
|
||||
if removed == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
req.RunStartedAt = &now
|
||||
req.UpdatedAt = now
|
||||
if err := s.persistImportRequest(ctx, req); err != nil {
|
||||
data, err := s.marshalImport(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := claimPendingScript.Run(
|
||||
ctx, s.rdb,
|
||||
[]string{pendingKey, localSkillImportKey(id)},
|
||||
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
|
||||
).Int64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claim pending: %w", err)
|
||||
}
|
||||
if result == 0 {
|
||||
continue
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
@@ -310,6 +310,52 @@ func TestRedisLocalSkillListStore_PerRuntimeIsolation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisLocalSkillListStore_PopPendingAtomicClaim pins the PR-1557 review
|
||||
// fix: the claim (ZREM pending + persist running record) MUST land as one
|
||||
// atomic unit. If the old two-step ordering came back ("ZRem first, SET
|
||||
// second") a transient error between the two would strand the request — not
|
||||
// in pending, still serialised as "pending" on disk, never re-dispatched.
|
||||
//
|
||||
// We verify the happy-path invariant end-to-end: after one PopPending the
|
||||
// record is in "running" state AND a second PopPending on the same runtime
|
||||
// returns nothing (i.e. the pending zset no longer references the id).
|
||||
func TestRedisLocalSkillListStore_PopPendingAtomicClaim(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
popped, err := store.PopPending(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if popped == nil || popped.ID != req.ID {
|
||||
t.Fatalf("pop returned wrong request: %+v", popped)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get after pop: %v", err)
|
||||
}
|
||||
if got.Status != RuntimeLocalSkillRunning {
|
||||
t.Fatalf("record status = %s, want running", got.Status)
|
||||
}
|
||||
|
||||
// The pending queue must no longer reference the claimed id — exposed
|
||||
// via PopPending rather than poking the zset directly.
|
||||
again, err := store.PopPending(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("second pop: %v", err)
|
||||
}
|
||||
if again != nil {
|
||||
t.Fatalf("second pop should be empty, got %+v", again)
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time assertions: the Redis stores MUST satisfy the interfaces so
|
||||
// NewRouter's assignment stays type-safe.
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user