mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-27 09:30:00 +02:00
Compare commits
2 Commits
codex/agen
...
agent/j/80
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3466bbc196 | ||
|
|
93d62cc79e |
14
.github/workflows/ci.yml
vendored
14
.github/workflows/ci.yml
vendored
@@ -48,8 +48,22 @@ jobs:
|
||||
--health-interval 5s
|
||||
--health-timeout 5s
|
||||
--health-retries 20
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
ports:
|
||||
- 6379:6379
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 5s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
env:
|
||||
DATABASE_URL: postgres://multica:multica@localhost:5432/multica?sslmode=disable
|
||||
# Wires up the RedisLocalSkill*_test.go suite. Distinct from REDIS_URL
|
||||
# (which would flip the server binary itself onto the Redis-backed
|
||||
# realtime relay + request stores); the tests talk to this Redis
|
||||
# directly so they run alongside the Postgres-backed suite.
|
||||
REDIS_TEST_URL: redis://localhost:6379/1
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6
|
||||
|
||||
@@ -71,7 +71,7 @@ func TestMain(m *testing.M) {
|
||||
|
||||
bus := events.New()
|
||||
registerListeners(bus, hub)
|
||||
router := NewRouter(pool, hub, bus, analytics.NoopClient{})
|
||||
router := NewRouter(pool, hub, bus, analytics.NoopClient{}, nil)
|
||||
testServer = httptest.NewServer(router)
|
||||
|
||||
// Generate a JWT token directly for the test user
|
||||
|
||||
@@ -62,15 +62,18 @@ func main() {
|
||||
// MUL-1138: when REDIS_URL is set, route fanout through a Redis relay so
|
||||
// multiple API nodes can deliver each other's events. Without it the hub
|
||||
// is the sole broadcaster and the server stays single-node (legacy).
|
||||
// The same client is also used for cross-node request stores (e.g. runtime
|
||||
// local-skill pending requests) so every node sees the same pending set.
|
||||
relayCtx, relayCancel := context.WithCancel(context.Background())
|
||||
defer relayCancel()
|
||||
var broadcaster realtime.Broadcaster = hub
|
||||
var rdb *redis.Client
|
||||
if redisURL := os.Getenv("REDIS_URL"); redisURL != "" {
|
||||
opts, err := redis.ParseURL(redisURL)
|
||||
if err != nil {
|
||||
slog.Error("invalid REDIS_URL — falling back to in-memory hub", "error", err)
|
||||
} else {
|
||||
rdb := redis.NewClient(opts)
|
||||
rdb = redis.NewClient(opts)
|
||||
relay := realtime.NewRedisRelay(hub, rdb)
|
||||
relay.Start(relayCtx)
|
||||
broadcaster = relay
|
||||
@@ -93,7 +96,7 @@ func main() {
|
||||
registerActivityListeners(bus, queries)
|
||||
registerNotificationListeners(bus, queries)
|
||||
|
||||
r := NewRouter(pool, hub, bus, analyticsClient)
|
||||
r := NewRouter(pool, hub, bus, analyticsClient, rdb)
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: ":" + port,
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/go-chi/cors"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/analytics"
|
||||
"github.com/multica-ai/multica/server/internal/auth"
|
||||
@@ -55,7 +56,11 @@ func allowedOrigins() []string {
|
||||
}
|
||||
|
||||
// NewRouter creates the fully-configured Chi router with all middleware and routes.
|
||||
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client) chi.Router {
|
||||
// rdb is optional: when non-nil the runtime local-skill request stores are
|
||||
// swapped for Redis-backed implementations so multiple API nodes share the
|
||||
// same pending queue (required for multi-node prod). A nil rdb keeps the
|
||||
// default in-memory stores which are fine for single-node dev and tests.
|
||||
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client) chi.Router {
|
||||
queries := db.New(pool)
|
||||
emailSvc := service.NewEmailService()
|
||||
|
||||
@@ -79,6 +84,10 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
|
||||
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
|
||||
}
|
||||
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig)
|
||||
if rdb != nil {
|
||||
h.LocalSkillListStore = handler.NewRedisLocalSkillListStore(rdb)
|
||||
h.LocalSkillImportStore = handler.NewRedisLocalSkillImportStore(rdb)
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
|
||||
|
||||
@@ -529,12 +529,16 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Check for pending local-skill list requests for this runtime.
|
||||
if pending := h.LocalSkillListStore.PopPending(req.RuntimeID); pending != nil {
|
||||
if pending, err := h.LocalSkillListStore.PopPending(r.Context(), req.RuntimeID); err != nil {
|
||||
slog.Warn("local skill list PopPending failed", "error", err, "runtime_id", req.RuntimeID)
|
||||
} else if pending != nil {
|
||||
resp["pending_local_skills"] = map[string]string{"id": pending.ID}
|
||||
}
|
||||
|
||||
// Check for pending local-skill import requests for this runtime.
|
||||
if pending := h.LocalSkillImportStore.PopPending(req.RuntimeID); pending != nil {
|
||||
if pending, err := h.LocalSkillImportStore.PopPending(r.Context(), req.RuntimeID); err != nil {
|
||||
slog.Warn("local skill import PopPending failed", "error", err, "runtime_id", req.RuntimeID)
|
||||
} else if pending != nil {
|
||||
payload := map[string]string{
|
||||
"id": pending.ID,
|
||||
"skill_key": pending.SkillKey,
|
||||
|
||||
@@ -50,8 +50,8 @@ type Handler struct {
|
||||
PingStore *PingStore
|
||||
UpdateStore *UpdateStore
|
||||
ModelListStore *ModelListStore
|
||||
LocalSkillListStore *RuntimeLocalSkillListStore
|
||||
LocalSkillImportStore *RuntimeLocalSkillImportStore
|
||||
LocalSkillListStore LocalSkillListStore
|
||||
LocalSkillImportStore LocalSkillImportStore
|
||||
Storage storage.Storage
|
||||
CFSigner *auth.CloudFrontSigner
|
||||
Analytics analytics.Client
|
||||
@@ -81,8 +81,8 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
|
||||
PingStore: NewPingStore(),
|
||||
UpdateStore: NewUpdateStore(),
|
||||
ModelListStore: NewModelListStore(),
|
||||
LocalSkillListStore: NewRuntimeLocalSkillListStore(),
|
||||
LocalSkillImportStore: NewRuntimeLocalSkillImportStore(),
|
||||
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
|
||||
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
|
||||
Storage: store,
|
||||
CFSigner: cfSigner,
|
||||
Analytics: analyticsClient,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -28,6 +29,73 @@ const (
|
||||
runtimeLocalSkillStoreRetention = 2 * time.Minute
|
||||
)
|
||||
|
||||
// LocalSkillListStore tracks pending / running / completed runtime-local-skill
|
||||
// inventory requests. The server MUST stay stateless — any state that needs to
|
||||
// outlive a single request has to live in shared storage so multi-node deploys
|
||||
// can have POST, heartbeat and poll land on different nodes and still agree
|
||||
// on the request's state.
|
||||
type LocalSkillListStore interface {
|
||||
Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
|
||||
Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error)
|
||||
Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
}
|
||||
|
||||
// LocalSkillImportStore is the same contract as LocalSkillListStore but for
|
||||
// runtime-local-skill import requests. Kept as a separate interface because the
|
||||
// Create signature carries import-specific fields (skill_key, optional rename).
|
||||
type LocalSkillImportStore interface {
|
||||
Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error)
|
||||
Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error)
|
||||
Complete(ctx context.Context, id string, skill SkillResponse) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
}
|
||||
|
||||
// applyLocalSkillListTimeout transitions a request into the timeout terminal
|
||||
// state if it has been pending / running past the configured thresholds.
|
||||
// Returns true when the record was modified so callers can persist the change.
|
||||
func applyLocalSkillListTimeout(req *RuntimeLocalSkillListRequest, now time.Time) bool {
|
||||
switch req.Status {
|
||||
case RuntimeLocalSkillPending:
|
||||
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
case RuntimeLocalSkillRunning:
|
||||
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func applyLocalSkillImportTimeout(req *RuntimeLocalSkillImportRequest, now time.Time) bool {
|
||||
switch req.Status {
|
||||
case RuntimeLocalSkillPending:
|
||||
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
case RuntimeLocalSkillRunning:
|
||||
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type RuntimeLocalSkillSummary struct {
|
||||
Key string `json:"key"`
|
||||
Name string `json:"name"`
|
||||
@@ -64,16 +132,20 @@ type RuntimeLocalSkillImportRequest struct {
|
||||
RunStartedAt *time.Time `json:"-"`
|
||||
}
|
||||
|
||||
type RuntimeLocalSkillListStore struct {
|
||||
// InMemoryLocalSkillListStore is the single-node implementation — good enough
|
||||
// for local dev and the in-process test suite. Production (multi-node) must
|
||||
// use RedisLocalSkillListStore so every API node agrees on the same pending
|
||||
// set.
|
||||
type InMemoryLocalSkillListStore struct {
|
||||
mu sync.Mutex
|
||||
requests map[string]*RuntimeLocalSkillListRequest
|
||||
}
|
||||
|
||||
func NewRuntimeLocalSkillListStore() *RuntimeLocalSkillListStore {
|
||||
return &RuntimeLocalSkillListStore{requests: make(map[string]*RuntimeLocalSkillListRequest)}
|
||||
func NewInMemoryLocalSkillListStore() *InMemoryLocalSkillListStore {
|
||||
return &InMemoryLocalSkillListStore{requests: make(map[string]*RuntimeLocalSkillListRequest)}
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) Create(runtimeID string) *RuntimeLocalSkillListRequest {
|
||||
func (s *InMemoryLocalSkillListStore) Create(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -92,29 +164,29 @@ func (s *RuntimeLocalSkillListStore) Create(runtimeID string) *RuntimeLocalSkill
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
s.requests[req.ID] = req
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) Get(id string) *RuntimeLocalSkillListRequest {
|
||||
func (s *InMemoryLocalSkillListStore) Get(_ context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
req, ok := s.requests[id]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
s.applyTimeout(req, time.Now())
|
||||
return req
|
||||
applyLocalSkillListTimeout(req, time.Now())
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) PopPending(runtimeID string) *RuntimeLocalSkillListRequest {
|
||||
func (s *InMemoryLocalSkillListStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var oldest *RuntimeLocalSkillListRequest
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
s.applyTimeout(req, now)
|
||||
applyLocalSkillListTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
|
||||
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
|
||||
oldest = req
|
||||
@@ -127,10 +199,10 @@ func (s *RuntimeLocalSkillListStore) PopPending(runtimeID string) *RuntimeLocalS
|
||||
oldest.RunStartedAt = &startedAt
|
||||
oldest.UpdatedAt = now
|
||||
}
|
||||
return oldest
|
||||
return oldest, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) Complete(id string, skills []RuntimeLocalSkillSummary, supported bool) {
|
||||
func (s *InMemoryLocalSkillListStore) Complete(_ context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -140,9 +212,10 @@ func (s *RuntimeLocalSkillListStore) Complete(id string, skills []RuntimeLocalSk
|
||||
req.Supported = supported
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) Fail(id string, errMsg string) {
|
||||
func (s *InMemoryLocalSkillListStore) Fail(_ context.Context, id string, errMsg string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -151,35 +224,21 @@ func (s *RuntimeLocalSkillListStore) Fail(id string, errMsg string) {
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillListStore) applyTimeout(req *RuntimeLocalSkillListRequest, now time.Time) {
|
||||
switch req.Status {
|
||||
case RuntimeLocalSkillPending:
|
||||
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
case RuntimeLocalSkillRunning:
|
||||
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type RuntimeLocalSkillImportStore struct {
|
||||
// InMemoryLocalSkillImportStore mirrors InMemoryLocalSkillListStore for import
|
||||
// requests. Same single-node vs. multi-node caveat.
|
||||
type InMemoryLocalSkillImportStore struct {
|
||||
mu sync.Mutex
|
||||
requests map[string]*RuntimeLocalSkillImportRequest
|
||||
}
|
||||
|
||||
func NewRuntimeLocalSkillImportStore() *RuntimeLocalSkillImportStore {
|
||||
return &RuntimeLocalSkillImportStore{requests: make(map[string]*RuntimeLocalSkillImportRequest)}
|
||||
func NewInMemoryLocalSkillImportStore() *InMemoryLocalSkillImportStore {
|
||||
return &InMemoryLocalSkillImportStore{requests: make(map[string]*RuntimeLocalSkillImportRequest)}
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) Create(runtimeID, creatorID, skillKey string, name, description *string) *RuntimeLocalSkillImportRequest {
|
||||
func (s *InMemoryLocalSkillImportStore) Create(_ context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -201,29 +260,29 @@ func (s *RuntimeLocalSkillImportStore) Create(runtimeID, creatorID, skillKey str
|
||||
CreatorID: creatorID,
|
||||
}
|
||||
s.requests[req.ID] = req
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) Get(id string) *RuntimeLocalSkillImportRequest {
|
||||
func (s *InMemoryLocalSkillImportStore) Get(_ context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
req, ok := s.requests[id]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
s.applyTimeout(req, time.Now())
|
||||
return req
|
||||
applyLocalSkillImportTimeout(req, time.Now())
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) PopPending(runtimeID string) *RuntimeLocalSkillImportRequest {
|
||||
func (s *InMemoryLocalSkillImportStore) PopPending(_ context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var oldest *RuntimeLocalSkillImportRequest
|
||||
now := time.Now()
|
||||
for _, req := range s.requests {
|
||||
s.applyTimeout(req, now)
|
||||
applyLocalSkillImportTimeout(req, now)
|
||||
if req.RuntimeID == runtimeID && req.Status == RuntimeLocalSkillPending {
|
||||
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
|
||||
oldest = req
|
||||
@@ -236,10 +295,10 @@ func (s *RuntimeLocalSkillImportStore) PopPending(runtimeID string) *RuntimeLoca
|
||||
oldest.RunStartedAt = &startedAt
|
||||
oldest.UpdatedAt = now
|
||||
}
|
||||
return oldest
|
||||
return oldest, nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) Complete(id string, skill SkillResponse) {
|
||||
func (s *InMemoryLocalSkillImportStore) Complete(_ context.Context, id string, skill SkillResponse) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -248,9 +307,10 @@ func (s *RuntimeLocalSkillImportStore) Complete(id string, skill SkillResponse)
|
||||
req.Skill = &skill
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) Fail(id string, errMsg string) {
|
||||
func (s *InMemoryLocalSkillImportStore) Fail(_ context.Context, id string, errMsg string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -259,23 +319,7 @@ func (s *RuntimeLocalSkillImportStore) Fail(id string, errMsg string) {
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RuntimeLocalSkillImportStore) applyTimeout(req *RuntimeLocalSkillImportRequest, now time.Time) {
|
||||
switch req.Status {
|
||||
case RuntimeLocalSkillPending:
|
||||
if now.Sub(req.CreatedAt) > runtimeLocalSkillPendingTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not respond within 30 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
case RuntimeLocalSkillRunning:
|
||||
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > runtimeLocalSkillRunningTimeout {
|
||||
req.Status = RuntimeLocalSkillTimeout
|
||||
req.Error = "daemon did not finish within 60 seconds"
|
||||
req.UpdatedAt = now
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type CreateRuntimeLocalSkillImportRequest struct {
|
||||
@@ -352,7 +396,11 @@ func (h *Handler) InitiateListLocalSkills(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
req := h.LocalSkillListStore.Create(runtimeID)
|
||||
req, err := h.LocalSkillListStore.Create(r.Context(), runtimeID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to enqueue local skills request: "+err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, req)
|
||||
}
|
||||
|
||||
@@ -363,7 +411,11 @@ func (h *Handler) GetLocalSkillListRequest(w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
req := h.LocalSkillListStore.Get(requestID)
|
||||
req, err := h.LocalSkillListStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req == nil || req.RuntimeID != runtimeID {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -398,13 +450,18 @@ func (h *Handler) InitiateImportLocalSkill(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
importReq := h.LocalSkillImportStore.Create(
|
||||
importReq, err := h.LocalSkillImportStore.Create(
|
||||
r.Context(),
|
||||
runtimeID,
|
||||
creatorID,
|
||||
strings.TrimSpace(req.SkillKey),
|
||||
cleanOptionalString(req.Name),
|
||||
cleanOptionalString(req.Description),
|
||||
)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to enqueue local skill import: "+err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, importReq)
|
||||
}
|
||||
|
||||
@@ -415,7 +472,11 @@ func (h *Handler) GetLocalSkillImportRequest(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
req := h.LocalSkillImportStore.Get(requestID)
|
||||
req, err := h.LocalSkillImportStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req == nil || req.RuntimeID != runtimeID {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -431,7 +492,11 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
req := h.LocalSkillListStore.Get(requestID)
|
||||
req, err := h.LocalSkillListStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req == nil || req.RuntimeID != runtimeID {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -458,9 +523,21 @@ func (h *Handler) ReportLocalSkillListResult(w http.ResponseWriter, r *http.Requ
|
||||
if body.Supported != nil {
|
||||
supported = *body.Supported
|
||||
}
|
||||
h.LocalSkillListStore.Complete(requestID, body.Skills, supported)
|
||||
if err := h.LocalSkillListStore.Complete(r.Context(), requestID, body.Skills, supported); err != nil {
|
||||
// Surface the store failure as 5xx so the daemon can retry instead
|
||||
// of swallowing the report (leaves the request stuck in running
|
||||
// until the server-side timeout, which is exactly the "looks OK but
|
||||
// nothing happens" class of bug we're trying to avoid).
|
||||
slog.Error("local skills Complete failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist completion")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
h.LocalSkillListStore.Fail(requestID, body.Error)
|
||||
if err := h.LocalSkillListStore.Fail(r.Context(), requestID, body.Error); err != nil {
|
||||
slog.Error("local skills Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
slog.Debug("runtime local skills report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Skills))
|
||||
@@ -475,7 +552,11 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
req := h.LocalSkillImportStore.Get(requestID)
|
||||
req, err := h.LocalSkillImportStore.Get(r.Context(), requestID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to load request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req == nil || req.RuntimeID != runtimeID {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -497,12 +578,20 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
|
||||
if body.Status != "completed" {
|
||||
h.LocalSkillImportStore.Fail(requestID, body.Error)
|
||||
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, body.Error); err != nil {
|
||||
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
if body.Skill == nil {
|
||||
h.LocalSkillImportStore.Fail(requestID, "daemon returned an empty skill bundle")
|
||||
if err := h.LocalSkillImportStore.Fail(r.Context(), requestID, "daemon returned an empty skill bundle"); err != nil {
|
||||
slog.Error("local skill import Fail failed", "error", err, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
@@ -541,16 +630,33 @@ func (h *Handler) ReportLocalSkillImportResult(w http.ResponseWriter, r *http.Re
|
||||
Files: files,
|
||||
})
|
||||
if err != nil {
|
||||
failMsg := err.Error()
|
||||
if isUniqueViolation(err) {
|
||||
h.LocalSkillImportStore.Fail(requestID, "a skill with this name already exists")
|
||||
} else {
|
||||
h.LocalSkillImportStore.Fail(requestID, err.Error())
|
||||
failMsg = "a skill with this name already exists"
|
||||
}
|
||||
if ferr := h.LocalSkillImportStore.Fail(r.Context(), requestID, failMsg); ferr != nil {
|
||||
slog.Error("local skill import Fail failed", "error", ferr, "request_id", requestID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
|
||||
h.LocalSkillImportStore.Complete(requestID, resp.SkillResponse)
|
||||
if err := h.LocalSkillImportStore.Complete(r.Context(), requestID, resp.SkillResponse); err != nil {
|
||||
// We already wrote the Skill to Postgres. If the store-side Complete
|
||||
// fails we can't leave that Skill orphaned: the daemon will retry on
|
||||
// 5xx and re-create it, which blows up on the unique-name constraint
|
||||
// and looks to the user like "import keeps failing". Roll back our
|
||||
// side-effects so the retry lands on a clean slate.
|
||||
slog.Error("local skill import Complete failed — rolling back created skill",
|
||||
"error", err, "request_id", requestID, "skill_id", resp.ID)
|
||||
if delErr := h.Queries.DeleteSkill(r.Context(), parseUUID(resp.ID)); delErr != nil {
|
||||
slog.Warn("orphan skill rollback failed", "error", delErr, "skill_id", resp.ID)
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, "failed to persist import completion")
|
||||
return
|
||||
}
|
||||
h.publish(protocol.EventSkillCreated, uuidToString(rt.WorkspaceID), "member", req.CreatorID, map[string]any{"skill": resp})
|
||||
slog.Debug("runtime local skill imported", "runtime_id", runtimeID, "request_id", requestID, "skill_id", resp.ID)
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
|
||||
432
server/internal/handler/runtime_local_skills_redis_store.go
Normal file
432
server/internal/handler/runtime_local_skills_redis_store.go
Normal file
@@ -0,0 +1,432 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Redis-backed implementations of LocalSkillListStore / LocalSkillImportStore.
|
||||
//
|
||||
// Storage layout (for both list and import flows, differing only in key prefix):
|
||||
//
|
||||
// <prefix>:<request_id> → JSON-encoded request, TTL = retention
|
||||
// <prefix>:pending:<runtime_id> → ZSET { member = request_id, score = created_at UnixNano }
|
||||
// TTL = retention, refreshed on Create
|
||||
//
|
||||
// PopPending is the critical multi-node primitive. It MUST atomically:
|
||||
// 1. pick the oldest pending request id for this runtime
|
||||
// 2. claim it (remove from the pending zset) AND transition its record to
|
||||
// "running" in a single step — otherwise a crash / transient Redis error
|
||||
// between the two writes strands the request (no longer pending, record
|
||||
// still says pending; no node will ever re-dispatch it).
|
||||
//
|
||||
// Doing this as two round-trips is racy; we use a Lua script so Redis runs
|
||||
// ZREM + SET atomically server-side. If ZREM returns 0 (another node already
|
||||
// claimed it), the SET is skipped. This is the fix for the PR-1557 review
|
||||
// finding about the "request disappears under Redis hiccups" path.
|
||||
|
||||
const (
|
||||
// Namespaced so we don't collide with the realtime relay's ws:* keys.
|
||||
localSkillListKeyPrefix = "mul:local_skill:list:"
|
||||
localSkillListPendingPrefix = "mul:local_skill:list:pending:"
|
||||
localSkillImportKeyPrefix = "mul:local_skill:import:"
|
||||
localSkillImportPendingPrefix = "mul:local_skill:import:pending:"
|
||||
localSkillRedisPopMaxRetries = 5
|
||||
)
|
||||
|
||||
// claimPendingScript atomically claims a pending request:
|
||||
// KEYS[1] = pending zset ARGV[1] = request id to claim
|
||||
// KEYS[2] = record key ARGV[2] = new record JSON (status=running)
|
||||
// ARGV[3] = record TTL in seconds
|
||||
//
|
||||
// Returns 1 when this caller won the claim (zset entry removed, record
|
||||
// updated), 0 when the entry was already gone (another node won).
|
||||
// Either the ZREM and the SET both happen or neither does — Redis executes
|
||||
// a Lua script as a single atomic unit.
|
||||
var claimPendingScript = redis.NewScript(`
|
||||
local removed = redis.call('ZREM', KEYS[1], ARGV[1])
|
||||
if removed == 0 then
|
||||
return 0
|
||||
end
|
||||
redis.call('SET', KEYS[2], ARGV[2], 'EX', tonumber(ARGV[3]))
|
||||
return 1
|
||||
`)
|
||||
|
||||
func localSkillListKey(id string) string { return localSkillListKeyPrefix + id }
|
||||
func localSkillListPendingKey(runtimeID string) string {
|
||||
return localSkillListPendingPrefix + runtimeID
|
||||
}
|
||||
func localSkillImportKey(id string) string { return localSkillImportKeyPrefix + id }
|
||||
func localSkillImportPendingKey(runtimeID string) string {
|
||||
return localSkillImportPendingPrefix + runtimeID
|
||||
}
|
||||
|
||||
// RedisLocalSkillListStore stores pending / running / completed list requests
|
||||
// in Redis so every API node agrees on the same state.
|
||||
type RedisLocalSkillListStore struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewRedisLocalSkillListStore(rdb *redis.Client) *RedisLocalSkillListStore {
|
||||
return &RedisLocalSkillListStore{rdb: rdb}
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) Create(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
now := time.Now()
|
||||
req := &RuntimeLocalSkillListRequest{
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
Status: RuntimeLocalSkillPending,
|
||||
Supported: true,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal list request: %w", err)
|
||||
}
|
||||
|
||||
pipe := s.rdb.TxPipeline()
|
||||
pipe.Set(ctx, localSkillListKey(req.ID), data, runtimeLocalSkillStoreRetention)
|
||||
pipe.ZAdd(ctx, localSkillListPendingKey(runtimeID), redis.Z{
|
||||
Score: float64(now.UnixNano()),
|
||||
Member: req.ID,
|
||||
})
|
||||
// Keep the pending ZSET alive a bit longer than the individual request
|
||||
// so stale members still in the zset can be swept lazily on PopPending
|
||||
// without blocking the create path on deletion.
|
||||
pipe.Expire(ctx, localSkillListPendingKey(runtimeID), runtimeLocalSkillStoreRetention*2)
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
return nil, fmt.Errorf("persist list request: %w", err)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) Get(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
|
||||
return s.loadListRequest(ctx, id)
|
||||
}
|
||||
|
||||
// loadListRequest fetches a single record, applies timeout transitions if the
|
||||
// stored state has aged past the threshold, and persists the transition when
|
||||
// applicable so sibling nodes observe the same terminal state.
|
||||
func (s *RedisLocalSkillListStore) loadListRequest(ctx context.Context, id string) (*RuntimeLocalSkillListRequest, error) {
|
||||
raw, err := s.rdb.Get(ctx, localSkillListKey(id)).Bytes()
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get list request: %w", err)
|
||||
}
|
||||
var req RuntimeLocalSkillListRequest
|
||||
if err := json.Unmarshal(raw, &req); err != nil {
|
||||
return nil, fmt.Errorf("decode list request: %w", err)
|
||||
}
|
||||
if applyLocalSkillListTimeout(&req, time.Now()) {
|
||||
// Persist the timeout so subsequent Get / PopPending on any node see
|
||||
// the terminal state. Also drop the id from the pending zset —
|
||||
// PopPending would do this itself, but doing it here keeps the set
|
||||
// clean even for readers that never call PopPending.
|
||||
if err := s.persistListRequest(ctx, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.rdb.ZRem(ctx, localSkillListPendingKey(req.RuntimeID), req.ID)
|
||||
}
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) persistListRequest(ctx context.Context, req *RuntimeLocalSkillListRequest) error {
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal list request: %w", err)
|
||||
}
|
||||
if err := s.rdb.Set(ctx, localSkillListKey(req.ID), data, runtimeLocalSkillStoreRetention).Err(); err != nil {
|
||||
return fmt.Errorf("persist list request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillListRequest, error) {
|
||||
pendingKey := localSkillListPendingKey(runtimeID)
|
||||
|
||||
for attempt := 0; attempt < localSkillRedisPopMaxRetries; attempt++ {
|
||||
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("zrange pending: %w", err)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
id := ids[0]
|
||||
|
||||
req, err := s.loadListRequest(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req == nil {
|
||||
// Record expired but the zset still references it — drop and retry.
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
if req.Status != RuntimeLocalSkillPending {
|
||||
// Either the timeout fired inside loadListRequest or another node
|
||||
// already picked it up. Either way, unlink from the pending set
|
||||
// and move on to the next one.
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
req.RunStartedAt = &now
|
||||
req.UpdatedAt = now
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal list request: %w", err)
|
||||
}
|
||||
|
||||
result, err := claimPendingScript.Run(
|
||||
ctx, s.rdb,
|
||||
[]string{pendingKey, localSkillListKey(id)},
|
||||
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
|
||||
).Int64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claim pending: %w", err)
|
||||
}
|
||||
if result == 0 {
|
||||
// Another node won the race. The record still says pending and is
|
||||
// owned by the winner; we just retry to pick up whatever else is
|
||||
// queued (or nothing).
|
||||
continue
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) Complete(ctx context.Context, id string, skills []RuntimeLocalSkillSummary, supported bool) error {
|
||||
req, err := s.loadListRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = RuntimeLocalSkillCompleted
|
||||
req.Skills = skills
|
||||
req.Supported = supported
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistListRequest(ctx, req)
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillListStore) Fail(ctx context.Context, id string, errMsg string) error {
|
||||
req, err := s.loadListRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = RuntimeLocalSkillFailed
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistListRequest(ctx, req)
|
||||
}
|
||||
|
||||
// RedisLocalSkillImportStore mirrors RedisLocalSkillListStore for import
|
||||
// requests. Kept as a separate type (rather than a generic) because the
|
||||
// request shape carries import-specific fields (skill_key, optional rename,
|
||||
// creator id) and Go generics don't buy us much for two concrete impls.
|
||||
type RedisLocalSkillImportStore struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewRedisLocalSkillImportStore(rdb *redis.Client) *RedisLocalSkillImportStore {
|
||||
return &RedisLocalSkillImportStore{rdb: rdb}
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) Create(ctx context.Context, runtimeID, creatorID, skillKey string, name, description *string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
now := time.Now()
|
||||
req := &RuntimeLocalSkillImportRequest{
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
SkillKey: skillKey,
|
||||
Name: name,
|
||||
Description: description,
|
||||
Status: RuntimeLocalSkillPending,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatorID: creatorID,
|
||||
}
|
||||
data, err := s.marshalImport(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipe := s.rdb.TxPipeline()
|
||||
pipe.Set(ctx, localSkillImportKey(req.ID), data, runtimeLocalSkillStoreRetention)
|
||||
pipe.ZAdd(ctx, localSkillImportPendingKey(runtimeID), redis.Z{
|
||||
Score: float64(now.UnixNano()),
|
||||
Member: req.ID,
|
||||
})
|
||||
pipe.Expire(ctx, localSkillImportPendingKey(runtimeID), runtimeLocalSkillStoreRetention*2)
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
return nil, fmt.Errorf("persist import request: %w", err)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) Get(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
return s.loadImportRequest(ctx, id)
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) loadImportRequest(ctx context.Context, id string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
raw, err := s.rdb.Get(ctx, localSkillImportKey(id)).Bytes()
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get import request: %w", err)
|
||||
}
|
||||
req, err := s.unmarshalImport(raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if applyLocalSkillImportTimeout(req, time.Now()) {
|
||||
if err := s.persistImportRequest(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.rdb.ZRem(ctx, localSkillImportPendingKey(req.RuntimeID), req.ID)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) persistImportRequest(ctx context.Context, req *RuntimeLocalSkillImportRequest) error {
|
||||
data, err := s.marshalImport(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.rdb.Set(ctx, localSkillImportKey(req.ID), data, runtimeLocalSkillStoreRetention).Err(); err != nil {
|
||||
return fmt.Errorf("persist import request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// The RuntimeLocalSkillImportRequest type marks CreatorID / RunStartedAt as
|
||||
// `json:"-"` so those fields survive HTTP responses without leaking state.
|
||||
// For Redis persistence we need those fields, so we wrap in an internal
|
||||
// envelope that re-promotes them.
|
||||
type redisImportEnvelope struct {
|
||||
Public *RuntimeLocalSkillImportRequest `json:"r"`
|
||||
CreatorID string `json:"c"`
|
||||
RunStartedAt *time.Time `json:"s"`
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) marshalImport(req *RuntimeLocalSkillImportRequest) ([]byte, error) {
|
||||
env := redisImportEnvelope{
|
||||
Public: req,
|
||||
CreatorID: req.CreatorID,
|
||||
RunStartedAt: req.RunStartedAt,
|
||||
}
|
||||
data, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal import request: %w", err)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) unmarshalImport(raw []byte) (*RuntimeLocalSkillImportRequest, error) {
|
||||
var env redisImportEnvelope
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
return nil, fmt.Errorf("decode import request: %w", err)
|
||||
}
|
||||
if env.Public == nil {
|
||||
return nil, fmt.Errorf("decode import request: missing payload")
|
||||
}
|
||||
env.Public.CreatorID = env.CreatorID
|
||||
env.Public.RunStartedAt = env.RunStartedAt
|
||||
return env.Public, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) PopPending(ctx context.Context, runtimeID string) (*RuntimeLocalSkillImportRequest, error) {
|
||||
pendingKey := localSkillImportPendingKey(runtimeID)
|
||||
|
||||
for attempt := 0; attempt < localSkillRedisPopMaxRetries; attempt++ {
|
||||
ids, err := s.rdb.ZRange(ctx, pendingKey, 0, 0).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("zrange pending: %w", err)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
id := ids[0]
|
||||
|
||||
req, err := s.loadImportRequest(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req == nil {
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
if req.Status != RuntimeLocalSkillPending {
|
||||
s.rdb.ZRem(ctx, pendingKey, id)
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
req.RunStartedAt = &now
|
||||
req.UpdatedAt = now
|
||||
data, err := s.marshalImport(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := claimPendingScript.Run(
|
||||
ctx, s.rdb,
|
||||
[]string{pendingKey, localSkillImportKey(id)},
|
||||
id, data, int(runtimeLocalSkillStoreRetention.Seconds()),
|
||||
).Int64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("claim pending: %w", err)
|
||||
}
|
||||
if result == 0 {
|
||||
continue
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) Complete(ctx context.Context, id string, skill SkillResponse) error {
|
||||
req, err := s.loadImportRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = RuntimeLocalSkillCompleted
|
||||
req.Skill = &skill
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistImportRequest(ctx, req)
|
||||
}
|
||||
|
||||
func (s *RedisLocalSkillImportStore) Fail(ctx context.Context, id string, errMsg string) error {
|
||||
req, err := s.loadImportRequest(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
req.Status = RuntimeLocalSkillFailed
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
return s.persistImportRequest(ctx, req)
|
||||
}
|
||||
366
server/internal/handler/runtime_local_skills_redis_store_test.go
Normal file
366
server/internal/handler/runtime_local_skills_redis_store_test.go
Normal file
@@ -0,0 +1,366 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// newRedisTestClient connects to the Redis instance indicated by REDIS_TEST_URL
|
||||
// and flushes it so each test starts from a clean slate. The helper skips the
|
||||
// calling test if the env var is unset — matches the DATABASE_URL gating in
|
||||
// the rest of the suite so `go test ./...` still works on a stock laptop
|
||||
// without a running Redis.
|
||||
func newRedisTestClient(t *testing.T) *redis.Client {
|
||||
t.Helper()
|
||||
url := os.Getenv("REDIS_TEST_URL")
|
||||
if url == "" {
|
||||
t.Skip("REDIS_TEST_URL not set")
|
||||
}
|
||||
opts, err := redis.ParseURL(url)
|
||||
if err != nil {
|
||||
t.Fatalf("parse REDIS_TEST_URL: %v", err)
|
||||
}
|
||||
rdb := redis.NewClient(opts)
|
||||
ctx := context.Background()
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
t.Skipf("REDIS_TEST_URL unreachable: %v", err)
|
||||
}
|
||||
if err := rdb.FlushDB(ctx).Err(); err != nil {
|
||||
t.Fatalf("flushdb: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
rdb.FlushDB(context.Background())
|
||||
rdb.Close()
|
||||
})
|
||||
return rdb
|
||||
}
|
||||
|
||||
func TestRedisLocalSkillListStore_CreateGetComplete(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-1")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
if req.Status != RuntimeLocalSkillPending {
|
||||
t.Fatalf("initial status = %s", req.Status)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil || got.ID != req.ID {
|
||||
t.Fatalf("round trip lost id: got=%v", got)
|
||||
}
|
||||
|
||||
skills := []RuntimeLocalSkillSummary{
|
||||
{
|
||||
Key: "review-helper",
|
||||
Name: "Review Helper",
|
||||
Description: "Review PRs",
|
||||
SourcePath: "~/.claude/skills/review-helper",
|
||||
Provider: "claude",
|
||||
FileCount: 2,
|
||||
},
|
||||
}
|
||||
if err := store.Complete(ctx, req.ID, skills, true); err != nil {
|
||||
t.Fatalf("complete: %v", err)
|
||||
}
|
||||
|
||||
got, err = store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get after complete: %v", err)
|
||||
}
|
||||
if got.Status != RuntimeLocalSkillCompleted {
|
||||
t.Fatalf("status after complete = %s", got.Status)
|
||||
}
|
||||
if len(got.Skills) != 1 || got.Skills[0].Key != "review-helper" {
|
||||
t.Fatalf("skills not persisted: %+v", got.Skills)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisLocalSkillListStore_PopPendingAcrossInstances is the regression
|
||||
// test for the exact bug this change fixes: two distinct *store* instances
|
||||
// (i.e. two API nodes) share one Redis, one creates a pending request, the
|
||||
// other PopPending-s it. Before the Redis-backed store this returned nil and
|
||||
// the request timed out.
|
||||
func TestRedisLocalSkillListStore_PopPendingAcrossInstances(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
|
||||
nodeA := NewRedisLocalSkillListStore(rdb)
|
||||
nodeB := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := nodeA.Create(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node A create: %v", err)
|
||||
}
|
||||
|
||||
popped, err := nodeB.PopPending(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node B pop: %v", err)
|
||||
}
|
||||
if popped == nil {
|
||||
t.Fatal("node B did not see node A's pending request")
|
||||
}
|
||||
if popped.ID != req.ID {
|
||||
t.Fatalf("popped id = %s, want %s", popped.ID, req.ID)
|
||||
}
|
||||
if popped.Status != RuntimeLocalSkillRunning {
|
||||
t.Fatalf("popped status = %s, want running", popped.Status)
|
||||
}
|
||||
if popped.RunStartedAt == nil {
|
||||
t.Fatal("run_started_at not set after pop")
|
||||
}
|
||||
|
||||
// A third pop must see nothing (claim was atomic).
|
||||
again, err := nodeB.PopPending(ctx, "runtime-cross")
|
||||
if err != nil {
|
||||
t.Fatalf("node B second pop: %v", err)
|
||||
}
|
||||
if again != nil {
|
||||
t.Fatalf("expected no more pending, got %+v", again)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisLocalSkillListStore_PopPendingConcurrent asserts the ZREM-wins race
|
||||
// guard: N concurrent PopPending calls against a single pending request
|
||||
// return exactly one winner.
|
||||
func TestRedisLocalSkillListStore_PopPendingConcurrent(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-race")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
const N = 8
|
||||
var wg sync.WaitGroup
|
||||
results := make(chan *RuntimeLocalSkillListRequest, N)
|
||||
errs := make(chan error, N)
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
popped, err := store.PopPending(ctx, "runtime-race")
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
results <- popped
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(results)
|
||||
close(errs)
|
||||
|
||||
for err := range errs {
|
||||
t.Fatalf("concurrent pop error: %v", err)
|
||||
}
|
||||
|
||||
winners := 0
|
||||
for popped := range results {
|
||||
if popped != nil {
|
||||
winners++
|
||||
if popped.ID != req.ID {
|
||||
t.Fatalf("winner popped wrong id: %s", popped.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
if winners != 1 {
|
||||
t.Fatalf("expected exactly one winner, got %d", winners)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisLocalSkillListStore_PendingTimeout(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
// Rewind CreatedAt so the pending threshold is blown — simulates 31s of
|
||||
// daemon silence without actually blocking the test that long.
|
||||
req.CreatedAt = time.Now().Add(-runtimeLocalSkillPendingTimeout - time.Second)
|
||||
if err := store.persistListRequest(ctx, req); err != nil {
|
||||
t.Fatalf("persist rewound: %v", err)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got.Status != RuntimeLocalSkillTimeout {
|
||||
t.Fatalf("status = %s, want timeout", got.Status)
|
||||
}
|
||||
|
||||
// A subsequent PopPending must NOT return a timed-out request.
|
||||
popped, err := store.PopPending(ctx, "runtime-timeout")
|
||||
if err != nil {
|
||||
t.Fatalf("pop after timeout: %v", err)
|
||||
}
|
||||
if popped != nil {
|
||||
t.Fatalf("expected no pending after timeout, got %+v", popped)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisLocalSkillImportStore_PreservesCreatorID(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillImportStore(rdb)
|
||||
|
||||
name := "Review Helper"
|
||||
desc := "Desc"
|
||||
req, err := store.Create(ctx, "runtime-1", "user-42", "review-helper", &name, &desc)
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
if req.CreatorID != "user-42" {
|
||||
t.Fatalf("creator id lost on create")
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
// CreatorID is `json:"-"` on the public struct — verify the Redis envelope
|
||||
// restores it, otherwise ReportLocalSkillImportResult can't attribute the
|
||||
// created Skill to anyone.
|
||||
if got.CreatorID != "user-42" {
|
||||
t.Fatalf("creator id lost round trip: %q", got.CreatorID)
|
||||
}
|
||||
if got.Name == nil || *got.Name != name {
|
||||
t.Fatalf("name lost: %v", got.Name)
|
||||
}
|
||||
if got.Description == nil || *got.Description != desc {
|
||||
t.Fatalf("description lost: %v", got.Description)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisLocalSkillImportStore_PopPendingAcrossInstances(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
|
||||
nodeA := NewRedisLocalSkillImportStore(rdb)
|
||||
nodeB := NewRedisLocalSkillImportStore(rdb)
|
||||
|
||||
req, err := nodeA.Create(ctx, "runtime-import", "user-1", "review-helper", nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
popped, err := nodeB.PopPending(ctx, "runtime-import")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if popped == nil || popped.ID != req.ID {
|
||||
t.Fatalf("cross-node pop failed: got %+v", popped)
|
||||
}
|
||||
if popped.Status != RuntimeLocalSkillRunning {
|
||||
t.Fatalf("popped status = %s", popped.Status)
|
||||
}
|
||||
if popped.SkillKey != "review-helper" {
|
||||
t.Fatalf("skill_key lost: %q", popped.SkillKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Smoke test: make sure the runtime-local-skill store keys don't collide
|
||||
// across runtimes — PopPending for runtime A must not see B's pending.
|
||||
func TestRedisLocalSkillListStore_PerRuntimeIsolation(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
if _, err := store.Create(ctx, "runtime-A"); err != nil {
|
||||
t.Fatalf("create A: %v", err)
|
||||
}
|
||||
reqB, err := store.Create(ctx, "runtime-B")
|
||||
if err != nil {
|
||||
t.Fatalf("create B: %v", err)
|
||||
}
|
||||
|
||||
popped, err := store.PopPending(ctx, "runtime-B")
|
||||
if err != nil {
|
||||
t.Fatalf("pop B: %v", err)
|
||||
}
|
||||
if popped == nil || popped.ID != reqB.ID {
|
||||
t.Fatalf("pop returned wrong request: %+v", popped)
|
||||
}
|
||||
|
||||
// A's request is still pending.
|
||||
ids, err := rdb.ZRange(ctx, localSkillListPendingKey("runtime-A"), 0, -1).Result()
|
||||
if err != nil {
|
||||
t.Fatalf("zrange A: %v", err)
|
||||
}
|
||||
if len(ids) != 1 {
|
||||
t.Fatalf("expected 1 pending for A after pop(B), got %d: %v", len(ids), ids)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRedisLocalSkillListStore_PopPendingAtomicClaim pins the PR-1557 review
|
||||
// fix: the claim (ZREM pending + persist running record) MUST land as one
|
||||
// atomic unit. If the old two-step ordering came back ("ZRem first, SET
|
||||
// second") a transient error between the two would strand the request — not
|
||||
// in pending, still serialised as "pending" on disk, never re-dispatched.
|
||||
//
|
||||
// We verify the happy-path invariant end-to-end: after one PopPending the
|
||||
// record is in "running" state AND a second PopPending on the same runtime
|
||||
// returns nothing (i.e. the pending zset no longer references the id).
|
||||
func TestRedisLocalSkillListStore_PopPendingAtomicClaim(t *testing.T) {
|
||||
rdb := newRedisTestClient(t)
|
||||
ctx := context.Background()
|
||||
store := NewRedisLocalSkillListStore(rdb)
|
||||
|
||||
req, err := store.Create(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
popped, err := store.PopPending(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("pop: %v", err)
|
||||
}
|
||||
if popped == nil || popped.ID != req.ID {
|
||||
t.Fatalf("pop returned wrong request: %+v", popped)
|
||||
}
|
||||
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get after pop: %v", err)
|
||||
}
|
||||
if got.Status != RuntimeLocalSkillRunning {
|
||||
t.Fatalf("record status = %s, want running", got.Status)
|
||||
}
|
||||
|
||||
// The pending queue must no longer reference the claimed id — exposed
|
||||
// via PopPending rather than poking the zset directly.
|
||||
again, err := store.PopPending(ctx, "runtime-atomic")
|
||||
if err != nil {
|
||||
t.Fatalf("second pop: %v", err)
|
||||
}
|
||||
if again != nil {
|
||||
t.Fatalf("second pop should be empty, got %+v", again)
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time assertions: the Redis stores MUST satisfy the interfaces so
|
||||
// NewRouter's assignment stays type-safe.
|
||||
var (
|
||||
_ LocalSkillListStore = (*RedisLocalSkillListStore)(nil)
|
||||
_ LocalSkillImportStore = (*RedisLocalSkillImportStore)(nil)
|
||||
_ LocalSkillListStore = (*InMemoryLocalSkillListStore)(nil)
|
||||
_ LocalSkillImportStore = (*InMemoryLocalSkillImportStore)(nil)
|
||||
)
|
||||
@@ -106,9 +106,13 @@ func countSkillFiles(t *testing.T, skillID string) int {
|
||||
return count
|
||||
}
|
||||
|
||||
func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
|
||||
store := NewRuntimeLocalSkillListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
func TestInMemoryLocalSkillListStore_PreservesSummaries(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryLocalSkillListStore()
|
||||
req, err := store.Create(ctx, "runtime-xyz")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
|
||||
body := map[string]any{
|
||||
"status": "completed",
|
||||
@@ -133,8 +137,13 @@ func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
|
||||
t.Fatalf("unmarshal report body: %v", err)
|
||||
}
|
||||
|
||||
store.Complete(req.ID, parsed.Skills, true)
|
||||
got := store.Get(req.ID)
|
||||
if err := store.Complete(ctx, req.ID, parsed.Skills, true); err != nil {
|
||||
t.Fatalf("complete: %v", err)
|
||||
}
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("expected stored result")
|
||||
}
|
||||
@@ -149,14 +158,21 @@ func TestRuntimeLocalSkillListStore_PreservesSummaries(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
|
||||
store := NewRuntimeLocalSkillListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
func TestInMemoryLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryLocalSkillListStore()
|
||||
req, err := store.Create(ctx, "runtime-xyz")
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
startedAt := time.Now().Add(-61 * time.Second)
|
||||
req.RunStartedAt = &startedAt
|
||||
|
||||
got := store.Get(req.ID)
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("expected stored request")
|
||||
}
|
||||
@@ -168,14 +184,21 @@ func TestRuntimeLocalSkillListStore_TimesOutRunningRequests(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeLocalSkillImportStore_TimesOutRunningRequests(t *testing.T) {
|
||||
store := NewRuntimeLocalSkillImportStore()
|
||||
req := store.Create("runtime-xyz", "user-1", "review-helper", nil, nil)
|
||||
func TestInMemoryLocalSkillImportStore_TimesOutRunningRequests(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewInMemoryLocalSkillImportStore()
|
||||
req, err := store.Create(ctx, "runtime-xyz", "user-1", "review-helper", nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
req.Status = RuntimeLocalSkillRunning
|
||||
startedAt := time.Now().Add(-61 * time.Second)
|
||||
req.RunStartedAt = &startedAt
|
||||
|
||||
got := store.Get(req.ID)
|
||||
got, err := store.Get(ctx, req.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("expected stored request")
|
||||
}
|
||||
@@ -214,7 +237,10 @@ func TestGetLocalSkillImportRequest_RequiresRuntimeOwner(t *testing.T) {
|
||||
|
||||
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
|
||||
adminUserID := createRuntimeLocalSkillTestMember(t, "admin")
|
||||
importReq := testHandler.LocalSkillImportStore.Create(runtimeID, testUserID, "review-helper", nil, nil)
|
||||
importReq, err := testHandler.LocalSkillImportStore.Create(context.Background(), runtimeID, testUserID, "review-helper", nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("create import request: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := withURLParams(
|
||||
@@ -349,18 +375,26 @@ func TestReportLocalSkillImportResult_IgnoresTimedOutRequests(t *testing.T) {
|
||||
}
|
||||
|
||||
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
|
||||
importReq := testHandler.LocalSkillImportStore.Create(
|
||||
ctx := context.Background()
|
||||
importReq, err := testHandler.LocalSkillImportStore.Create(
|
||||
ctx,
|
||||
runtimeID,
|
||||
testUserID,
|
||||
"review-helper",
|
||||
cleanOptionalString(ptr("Timed Out Import")),
|
||||
cleanOptionalString(ptr("Should not be created")),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("create import request: %v", err)
|
||||
}
|
||||
importReq.Status = RuntimeLocalSkillRunning
|
||||
startedAt := time.Now().Add(-61 * time.Second)
|
||||
importReq.RunStartedAt = &startedAt
|
||||
|
||||
timedOut := testHandler.LocalSkillImportStore.Get(importReq.ID)
|
||||
timedOut, err := testHandler.LocalSkillImportStore.Get(ctx, importReq.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get import request: %v", err)
|
||||
}
|
||||
if timedOut == nil || timedOut.Status != RuntimeLocalSkillTimeout {
|
||||
t.Fatalf("expected timed out request, got %#v", timedOut)
|
||||
}
|
||||
@@ -399,7 +433,10 @@ func TestReportLocalSkillImportResult_RejectsCrossWorkspaceDaemonToken(t *testin
|
||||
}
|
||||
|
||||
runtimeID := createRuntimeLocalSkillTestRuntime(t, testUserID)
|
||||
importReq := testHandler.LocalSkillImportStore.Create(runtimeID, testUserID, "review-helper", nil, nil)
|
||||
importReq, err := testHandler.LocalSkillImportStore.Create(context.Background(), runtimeID, testUserID, "review-helper", nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("create import request: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
reportReq := withURLParams(
|
||||
|
||||
Reference in New Issue
Block a user