Compare commits

...

1 Commits

Author SHA1 Message Date
Jiayuan
16f0e49c44 fix(runtimes): persist model-list requests in PostgreSQL to fix multi-instance 404 (#1958)
The model-list discovery flow (POST create → poll GET) used an in-memory
map, so in multi-instance deployments the poll could land on a different
pod and return 404. Move the store to a database table so all pods share
the same state.

Changes:
- Add migration 068: model_list_request table with partial index
- Add sqlc queries with FOR UPDATE SKIP LOCKED for atomic PopPending
- Introduce ModelListStorer interface; DBModelListStore for production,
  InMemoryModelListStore for unit tests
- Update model-dropdown and model-picker to show "discovery failed"
  instead of misleading "No models available" on error

Co-authored-by: multica-agent <github@multica.ai>
2026-05-01 09:19:21 +02:00
12 changed files with 379 additions and 36 deletions

View File

@@ -163,7 +163,9 @@ export function ModelPicker({
{!modelsQuery.isLoading && filtered.length === 0 && !canCreate && (
<p className="px-3 py-3 text-center text-xs text-muted-foreground">
No models available
{modelsQuery.isError
? "Could not discover models. Type a model ID or try again."
: "No models available"}
</p>
)}

View File

@@ -201,7 +201,9 @@ export function ModelDropdown({
Object.keys(filtered).length === 0 &&
!canCreate && (
<div className="px-3 py-6 text-center text-sm text-muted-foreground">
No models available.
{modelsQuery.isError
? "Could not discover models. Type a model ID manually or try again later."
: "No models available."}
</div>
)}

View File

@@ -675,7 +675,7 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
}
}
if pending := h.ModelListStore.PopPending(runtimeID); pending != nil {
if pending, _ := h.ModelListStore.PopPending(ctx, runtimeID); pending != nil {
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pending.ID}
}

View File

@@ -60,7 +60,7 @@ type Handler struct {
AutopilotService *service.AutopilotService
EmailService *service.EmailService
UpdateStore *UpdateStore
ModelListStore *ModelListStore
ModelListStore ModelListStorer
LocalSkillListStore LocalSkillListStore
LocalSkillImportStore LocalSkillImportStore
Storage storage.Storage
@@ -98,7 +98,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
EmailService: emailService,
UpdateStore: NewUpdateStore(),
ModelListStore: NewModelListStore(),
ModelListStore: NewDBModelListStore(queries),
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
Storage: store,

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
@@ -8,10 +9,11 @@ import (
"time"
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ---------------------------------------------------------------------------
// In-memory model-list request store
// Model-list request store
// ---------------------------------------------------------------------------
//
// The server cannot call the daemon directly (the daemon is behind the user's
@@ -71,18 +73,33 @@ const (
modelListRunningTimeout = 60 * time.Second
)
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
// to bound memory use; the UI polls /requests/:id until status is terminal.
type ModelListStore struct {
// ModelListStorer abstracts the model-list request store so handlers work
// identically whether backed by an in-memory map (single-instance / tests)
// or a shared database (multi-instance production).
type ModelListStorer interface {
Create(ctx context.Context, runtimeID string) (*ModelListRequest, error)
Get(ctx context.Context, id string) (*ModelListRequest, error)
PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error)
Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error
Fail(ctx context.Context, id string, errMsg string) error
}
// ---------------------------------------------------------------------------
// In-memory implementation (tests / single-instance fallback)
// ---------------------------------------------------------------------------
// InMemoryModelListStore is a thread-safe in-memory store. Entries expire
// after 2 min to bound memory use.
type InMemoryModelListStore struct {
mu sync.Mutex
requests map[string]*ModelListRequest
}
func NewModelListStore() *ModelListStore {
return &ModelListStore{requests: make(map[string]*ModelListRequest)}
func NewInMemoryModelListStore() *InMemoryModelListStore {
return &InMemoryModelListStore{requests: make(map[string]*ModelListRequest)}
}
func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
func (s *InMemoryModelListStore) Create(_ context.Context, runtimeID string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -97,26 +114,24 @@ func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
ID: randomID(),
RuntimeID: runtimeID,
Status: ModelListPending,
// Default to true; the daemon overrides this in the report
// for providers that don't support per-agent model selection.
Supported: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
s.requests[req.ID] = req
return req
return req, nil
}
func (s *ModelListStore) Get(id string) *ModelListRequest {
func (s *InMemoryModelListStore) Get(_ context.Context, id string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil
return nil, nil
}
applyModelListTimeout(req, time.Now())
return req
return req, nil
}
// applyModelListTimeout transitions a request to ModelListTimeout when it has
@@ -142,8 +157,7 @@ func applyModelListTimeout(req *ModelListRequest, now time.Time) {
}
}
// PopPending returns and marks-running the oldest pending request for a runtime.
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
func (s *InMemoryModelListStore) PopPending(_ context.Context, runtimeID string) (*ModelListRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -159,10 +173,10 @@ func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
oldest.Status = ModelListRunning
oldest.UpdatedAt = time.Now()
}
return oldest
return oldest, nil
}
func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool) {
func (s *InMemoryModelListStore) Complete(_ context.Context, id string, models []ModelEntry, supported bool) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -172,9 +186,10 @@ func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool
req.Supported = supported
req.UpdatedAt = time.Now()
}
return nil
}
func (s *ModelListStore) Fail(id string, errMsg string) {
func (s *InMemoryModelListStore) Fail(_ context.Context, id string, errMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -183,6 +198,99 @@ func (s *ModelListStore) Fail(id string, errMsg string) {
req.Error = errMsg
req.UpdatedAt = time.Now()
}
return nil
}
// ---------------------------------------------------------------------------
// Database-backed implementation (multi-instance production)
// ---------------------------------------------------------------------------
// DBModelListStore persists model-list requests in PostgreSQL so that
// POST (create) and GET (poll) work correctly across multiple server
// instances — the root cause of #1958.
type DBModelListStore struct {
q *db.Queries
}
func NewDBModelListStore(q *db.Queries) *DBModelListStore {
return &DBModelListStore{q: q}
}
func (s *DBModelListStore) Create(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
// Best-effort GC of stale rows; ignore errors.
_ = s.q.DeleteStaleModelListRequests(ctx)
row, err := s.q.CreateModelListRequest(ctx, db.CreateModelListRequestParams{
ID: randomID(),
RuntimeID: parseUUID(runtimeID),
})
if err != nil {
return nil, err
}
return dbRowToModelListRequest(row), nil
}
func (s *DBModelListStore) Get(ctx context.Context, id string) (*ModelListRequest, error) {
row, err := s.q.GetModelListRequest(ctx, id)
if err != nil {
return nil, nil // not found → nil, matching in-memory semantics
}
req := dbRowToModelListRequest(row)
// Apply timeout transitions in Go, then persist if status changed.
oldStatus := req.Status
applyModelListTimeout(req, time.Now())
if req.Status != oldStatus {
_ = s.q.TimeoutModelListRequest(ctx, db.TimeoutModelListRequestParams{
ID: req.ID,
Error: req.Error,
})
}
return req, nil
}
func (s *DBModelListStore) PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
row, err := s.q.PopPendingModelListRequest(ctx, parseUUID(runtimeID))
if err != nil {
return nil, nil // no pending row → nil
}
return dbRowToModelListRequest(row), nil
}
func (s *DBModelListStore) Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error {
modelsJSON, err := json.Marshal(models)
if err != nil {
return err
}
return s.q.CompleteModelListRequest(ctx, db.CompleteModelListRequestParams{
ID: id,
Models: modelsJSON,
Supported: supported,
})
}
func (s *DBModelListStore) Fail(ctx context.Context, id string, errMsg string) error {
return s.q.FailModelListRequest(ctx, db.FailModelListRequestParams{
ID: id,
Error: errMsg,
})
}
// dbRowToModelListRequest converts a sqlc-generated DB row into the handler's
// ModelListRequest type used on the wire.
func dbRowToModelListRequest(row db.ModelListRequest) *ModelListRequest {
req := &ModelListRequest{
ID: row.ID,
RuntimeID: uuidToString(row.RuntimeID),
Status: ModelListStatus(row.Status),
Supported: row.Supported,
Error: row.Error,
CreatedAt: row.CreatedAt.Time,
UpdatedAt: row.UpdatedAt.Time,
}
if len(row.Models) > 0 {
_ = json.Unmarshal(row.Models, &req.Models)
}
return req
}
// ---------------------------------------------------------------------------
@@ -211,7 +319,12 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
return
}
req := h.ModelListStore.Create(uuidToString(rt.ID))
req, err := h.ModelListStore.Create(r.Context(), uuidToString(rt.ID))
if err != nil {
slog.Error("model list create failed", "error", err, "runtime_id", runtimeID)
writeError(w, http.StatusInternalServerError, "failed to initiate model discovery")
return
}
writeJSON(w, http.StatusOK, req)
}
@@ -219,7 +332,7 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
func (h *Handler) GetModelListRequest(w http.ResponseWriter, r *http.Request) {
requestID := chi.URLParam(r, "requestId")
req := h.ModelListStore.Get(requestID)
req, _ := h.ModelListStore.Get(r.Context(), requestID)
if req == nil {
writeError(w, http.StatusNotFound, "request not found")
return
@@ -255,9 +368,9 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
if body.Supported != nil {
supported = *body.Supported
}
h.ModelListStore.Complete(requestID, body.Models, supported)
h.ModelListStore.Complete(r.Context(), requestID, body.Models, supported)
} else {
h.ModelListStore.Fail(requestID, body.Error)
h.ModelListStore.Fail(r.Context(), requestID, body.Error)
}
slog.Debug("model list report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Models))

View File

@@ -2,6 +2,7 @@ package handler
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
@@ -16,9 +17,10 @@ import (
// way out of Running was the 2-minute memory GC, which exceeded the UI
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
store := NewModelListStore()
req := store.Create("runtime-xyz")
claimed := store.PopPending("runtime-xyz")
store := NewInMemoryModelListStore()
ctx := context.Background()
req, _ := store.Create(ctx, "runtime-xyz")
claimed, _ := store.PopPending(ctx, "runtime-xyz")
if claimed == nil {
t.Fatal("expected PopPending to claim the pending request")
}
@@ -30,7 +32,7 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
// reporting a result. Get() must flip it to Timeout so the UI can
// terminate polling instead of waiting for the 2-minute GC.
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
got := store.Get(req.ID)
got, _ := store.Get(ctx, req.ID)
if got == nil {
t.Fatal("expected stored request")
}
@@ -48,8 +50,9 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
// dropped here (e.g. by going through a map[string]string), the badge
// silently disappears.
func TestReportModelListResult_PreservesDefault(t *testing.T) {
store := NewModelListStore()
req := store.Create("runtime-xyz")
store := NewInMemoryModelListStore()
ctx := context.Background()
req, _ := store.Create(ctx, "runtime-xyz")
// Report a completed result with one default entry and one not.
body := map[string]any{
@@ -72,9 +75,9 @@ func TestReportModelListResult_PreservesDefault(t *testing.T) {
if err := json.Unmarshal(raw, &parsed); err != nil {
t.Fatalf("unmarshal report body: %v", err)
}
store.Complete(req.ID, parsed.Models, true)
store.Complete(ctx, req.ID, parsed.Models, true)
got := store.Get(req.ID)
got, _ := store.Get(ctx, req.ID)
if got == nil {
t.Fatal("expected stored result")
}

View File

@@ -0,0 +1 @@
DROP TABLE IF EXISTS model_list_request;

View File

@@ -0,0 +1,19 @@
-- Persistent model-list request store. Replaces the previous in-memory
-- ModelListStore so that the POST (create) and GET (poll) endpoints work
-- correctly across multiple server instances.
CREATE TABLE model_list_request (
id TEXT PRIMARY KEY,
runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'pending',
models JSONB,
supported BOOLEAN NOT NULL DEFAULT true,
error TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- PopPending needs to find the oldest pending row for a given runtime fast.
CREATE INDEX idx_model_list_request_runtime_pending
ON model_list_request (runtime_id, created_at)
WHERE status = 'pending';

View File

@@ -1449,7 +1449,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
}
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC
`
@@ -1496,6 +1496,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}

View File

@@ -0,0 +1,150 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: model_list_request.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const completeModelListRequest = `-- name: CompleteModelListRequest :exec
UPDATE model_list_request
SET status = 'completed', models = $2, supported = $3, updated_at = now()
WHERE id = $1
`
type CompleteModelListRequestParams struct {
ID string `json:"id"`
Models []byte `json:"models"`
Supported bool `json:"supported"`
}
func (q *Queries) CompleteModelListRequest(ctx context.Context, arg CompleteModelListRequestParams) error {
_, err := q.db.Exec(ctx, completeModelListRequest, arg.ID, arg.Models, arg.Supported)
return err
}
const createModelListRequest = `-- name: CreateModelListRequest :one
INSERT INTO model_list_request (id, runtime_id, status, supported, created_at, updated_at)
VALUES ($1, $2, 'pending', true, now(), now())
RETURNING id, runtime_id, status, models, supported, error, created_at, updated_at
`
type CreateModelListRequestParams struct {
ID string `json:"id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
}
func (q *Queries) CreateModelListRequest(ctx context.Context, arg CreateModelListRequestParams) (ModelListRequest, error) {
row := q.db.QueryRow(ctx, createModelListRequest, arg.ID, arg.RuntimeID)
var i ModelListRequest
err := row.Scan(
&i.ID,
&i.RuntimeID,
&i.Status,
&i.Models,
&i.Supported,
&i.Error,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const deleteStaleModelListRequests = `-- name: DeleteStaleModelListRequests :exec
DELETE FROM model_list_request
WHERE created_at < now() - interval '2 minutes'
`
func (q *Queries) DeleteStaleModelListRequests(ctx context.Context) error {
_, err := q.db.Exec(ctx, deleteStaleModelListRequests)
return err
}
const failModelListRequest = `-- name: FailModelListRequest :exec
UPDATE model_list_request
SET status = 'failed', error = $2, updated_at = now()
WHERE id = $1
`
type FailModelListRequestParams struct {
ID string `json:"id"`
Error string `json:"error"`
}
func (q *Queries) FailModelListRequest(ctx context.Context, arg FailModelListRequestParams) error {
_, err := q.db.Exec(ctx, failModelListRequest, arg.ID, arg.Error)
return err
}
const getModelListRequest = `-- name: GetModelListRequest :one
SELECT id, runtime_id, status, models, supported, error, created_at, updated_at FROM model_list_request
WHERE id = $1
`
func (q *Queries) GetModelListRequest(ctx context.Context, id string) (ModelListRequest, error) {
row := q.db.QueryRow(ctx, getModelListRequest, id)
var i ModelListRequest
err := row.Scan(
&i.ID,
&i.RuntimeID,
&i.Status,
&i.Models,
&i.Supported,
&i.Error,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const popPendingModelListRequest = `-- name: PopPendingModelListRequest :one
UPDATE model_list_request
SET status = 'running', updated_at = now()
WHERE id = (
SELECT mlr.id FROM model_list_request mlr
WHERE mlr.runtime_id = $1 AND mlr.status = 'pending'
ORDER BY mlr.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, runtime_id, status, models, supported, error, created_at, updated_at
`
// Atomically claim the oldest pending request for a runtime.
// FOR UPDATE SKIP LOCKED ensures correctness across concurrent server instances.
func (q *Queries) PopPendingModelListRequest(ctx context.Context, runtimeID pgtype.UUID) (ModelListRequest, error) {
row := q.db.QueryRow(ctx, popPendingModelListRequest, runtimeID)
var i ModelListRequest
err := row.Scan(
&i.ID,
&i.RuntimeID,
&i.Status,
&i.Models,
&i.Supported,
&i.Error,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const timeoutModelListRequest = `-- name: TimeoutModelListRequest :exec
UPDATE model_list_request
SET status = 'timeout', error = $2, updated_at = now()
WHERE id = $1
`
type TimeoutModelListRequestParams struct {
ID string `json:"id"`
Error string `json:"error"`
}
func (q *Queries) TimeoutModelListRequest(ctx context.Context, arg TimeoutModelListRequestParams) error {
_, err := q.db.Exec(ctx, timeoutModelListRequest, arg.ID, arg.Error)
return err
}

View File

@@ -322,6 +322,17 @@ type Member struct {
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type ModelListRequest struct {
ID string `json:"id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Status string `json:"status"`
Models []byte `json:"models"`
Supported bool `json:"supported"`
Error string `json:"error"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type NotificationPreference struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`

View File

@@ -0,0 +1,41 @@
-- name: CreateModelListRequest :one
INSERT INTO model_list_request (id, runtime_id, status, supported, created_at, updated_at)
VALUES ($1, $2, 'pending', true, now(), now())
RETURNING *;
-- name: GetModelListRequest :one
SELECT * FROM model_list_request
WHERE id = $1;
-- name: PopPendingModelListRequest :one
-- Atomically claim the oldest pending request for a runtime.
-- FOR UPDATE SKIP LOCKED ensures correctness across concurrent server instances.
UPDATE model_list_request
SET status = 'running', updated_at = now()
WHERE id = (
SELECT mlr.id FROM model_list_request mlr
WHERE mlr.runtime_id = $1 AND mlr.status = 'pending'
ORDER BY mlr.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *;
-- name: CompleteModelListRequest :exec
UPDATE model_list_request
SET status = 'completed', models = $2, supported = $3, updated_at = now()
WHERE id = $1;
-- name: FailModelListRequest :exec
UPDATE model_list_request
SET status = 'failed', error = $2, updated_at = now()
WHERE id = $1;
-- name: TimeoutModelListRequest :exec
UPDATE model_list_request
SET status = 'timeout', error = $2, updated_at = now()
WHERE id = $1;
-- name: DeleteStaleModelListRequests :exec
DELETE FROM model_list_request
WHERE created_at < now() - interval '2 minutes';