mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
Compare commits
1 Commits
fix/email-
...
agent/lamb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16f0e49c44 |
@@ -163,7 +163,9 @@ export function ModelPicker({
|
||||
|
||||
{!modelsQuery.isLoading && filtered.length === 0 && !canCreate && (
|
||||
<p className="px-3 py-3 text-center text-xs text-muted-foreground">
|
||||
No models available
|
||||
{modelsQuery.isError
|
||||
? "Could not discover models. Type a model ID or try again."
|
||||
: "No models available"}
|
||||
</p>
|
||||
)}
|
||||
|
||||
|
||||
@@ -201,7 +201,9 @@ export function ModelDropdown({
|
||||
Object.keys(filtered).length === 0 &&
|
||||
!canCreate && (
|
||||
<div className="px-3 py-6 text-center text-sm text-muted-foreground">
|
||||
No models available.
|
||||
{modelsQuery.isError
|
||||
? "Could not discover models. Type a model ID manually or try again later."
|
||||
: "No models available."}
|
||||
</div>
|
||||
)}
|
||||
|
||||
|
||||
@@ -675,7 +675,7 @@ func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime) (*pr
|
||||
}
|
||||
}
|
||||
|
||||
if pending := h.ModelListStore.PopPending(runtimeID); pending != nil {
|
||||
if pending, _ := h.ModelListStore.PopPending(ctx, runtimeID); pending != nil {
|
||||
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pending.ID}
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ type Handler struct {
|
||||
AutopilotService *service.AutopilotService
|
||||
EmailService *service.EmailService
|
||||
UpdateStore *UpdateStore
|
||||
ModelListStore *ModelListStore
|
||||
ModelListStore ModelListStorer
|
||||
LocalSkillListStore LocalSkillListStore
|
||||
LocalSkillImportStore LocalSkillImportStore
|
||||
Storage storage.Storage
|
||||
@@ -98,7 +98,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
|
||||
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
|
||||
EmailService: emailService,
|
||||
UpdateStore: NewUpdateStore(),
|
||||
ModelListStore: NewModelListStore(),
|
||||
ModelListStore: NewDBModelListStore(queries),
|
||||
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
|
||||
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
|
||||
Storage: store,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -8,10 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// In-memory model-list request store
|
||||
// Model-list request store
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// The server cannot call the daemon directly (the daemon is behind the user's
|
||||
@@ -71,18 +73,33 @@ const (
|
||||
modelListRunningTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
// ModelListStore is a thread-safe in-memory store. Entries expire after 2 min
|
||||
// to bound memory use; the UI polls /requests/:id until status is terminal.
|
||||
type ModelListStore struct {
|
||||
// ModelListStorer abstracts the model-list request store so handlers work
|
||||
// identically whether backed by an in-memory map (single-instance / tests)
|
||||
// or a shared database (multi-instance production).
|
||||
type ModelListStorer interface {
|
||||
Create(ctx context.Context, runtimeID string) (*ModelListRequest, error)
|
||||
Get(ctx context.Context, id string) (*ModelListRequest, error)
|
||||
PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error)
|
||||
Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error
|
||||
Fail(ctx context.Context, id string, errMsg string) error
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// In-memory implementation (tests / single-instance fallback)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// InMemoryModelListStore is a thread-safe in-memory store. Entries expire
|
||||
// after 2 min to bound memory use.
|
||||
type InMemoryModelListStore struct {
|
||||
mu sync.Mutex
|
||||
requests map[string]*ModelListRequest
|
||||
}
|
||||
|
||||
func NewModelListStore() *ModelListStore {
|
||||
return &ModelListStore{requests: make(map[string]*ModelListRequest)}
|
||||
func NewInMemoryModelListStore() *InMemoryModelListStore {
|
||||
return &InMemoryModelListStore{requests: make(map[string]*ModelListRequest)}
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) Create(_ context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -97,26 +114,24 @@ func (s *ModelListStore) Create(runtimeID string) *ModelListRequest {
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
Status: ModelListPending,
|
||||
// Default to true; the daemon overrides this in the report
|
||||
// for providers that don't support per-agent model selection.
|
||||
Supported: true,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
s.requests[req.ID] = req
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Get(id string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) Get(_ context.Context, id string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
req, ok := s.requests[id]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
applyModelListTimeout(req, time.Now())
|
||||
return req
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// applyModelListTimeout transitions a request to ModelListTimeout when it has
|
||||
@@ -142,8 +157,7 @@ func applyModelListTimeout(req *ModelListRequest, now time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
// PopPending returns and marks-running the oldest pending request for a runtime.
|
||||
func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
|
||||
func (s *InMemoryModelListStore) PopPending(_ context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -159,10 +173,10 @@ func (s *ModelListStore) PopPending(runtimeID string) *ModelListRequest {
|
||||
oldest.Status = ModelListRunning
|
||||
oldest.UpdatedAt = time.Now()
|
||||
}
|
||||
return oldest
|
||||
return oldest, nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool) {
|
||||
func (s *InMemoryModelListStore) Complete(_ context.Context, id string, models []ModelEntry, supported bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -172,9 +186,10 @@ func (s *ModelListStore) Complete(id string, models []ModelEntry, supported bool
|
||||
req.Supported = supported
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ModelListStore) Fail(id string, errMsg string) {
|
||||
func (s *InMemoryModelListStore) Fail(_ context.Context, id string, errMsg string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -183,6 +198,99 @@ func (s *ModelListStore) Fail(id string, errMsg string) {
|
||||
req.Error = errMsg
|
||||
req.UpdatedAt = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Database-backed implementation (multi-instance production)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// DBModelListStore persists model-list requests in PostgreSQL so that
|
||||
// POST (create) and GET (poll) work correctly across multiple server
|
||||
// instances — the root cause of #1958.
|
||||
type DBModelListStore struct {
|
||||
q *db.Queries
|
||||
}
|
||||
|
||||
func NewDBModelListStore(q *db.Queries) *DBModelListStore {
|
||||
return &DBModelListStore{q: q}
|
||||
}
|
||||
|
||||
func (s *DBModelListStore) Create(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
// Best-effort GC of stale rows; ignore errors.
|
||||
_ = s.q.DeleteStaleModelListRequests(ctx)
|
||||
|
||||
row, err := s.q.CreateModelListRequest(ctx, db.CreateModelListRequestParams{
|
||||
ID: randomID(),
|
||||
RuntimeID: parseUUID(runtimeID),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dbRowToModelListRequest(row), nil
|
||||
}
|
||||
|
||||
func (s *DBModelListStore) Get(ctx context.Context, id string) (*ModelListRequest, error) {
|
||||
row, err := s.q.GetModelListRequest(ctx, id)
|
||||
if err != nil {
|
||||
return nil, nil // not found → nil, matching in-memory semantics
|
||||
}
|
||||
req := dbRowToModelListRequest(row)
|
||||
// Apply timeout transitions in Go, then persist if status changed.
|
||||
oldStatus := req.Status
|
||||
applyModelListTimeout(req, time.Now())
|
||||
if req.Status != oldStatus {
|
||||
_ = s.q.TimeoutModelListRequest(ctx, db.TimeoutModelListRequestParams{
|
||||
ID: req.ID,
|
||||
Error: req.Error,
|
||||
})
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *DBModelListStore) PopPending(ctx context.Context, runtimeID string) (*ModelListRequest, error) {
|
||||
row, err := s.q.PopPendingModelListRequest(ctx, parseUUID(runtimeID))
|
||||
if err != nil {
|
||||
return nil, nil // no pending row → nil
|
||||
}
|
||||
return dbRowToModelListRequest(row), nil
|
||||
}
|
||||
|
||||
func (s *DBModelListStore) Complete(ctx context.Context, id string, models []ModelEntry, supported bool) error {
|
||||
modelsJSON, err := json.Marshal(models)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.q.CompleteModelListRequest(ctx, db.CompleteModelListRequestParams{
|
||||
ID: id,
|
||||
Models: modelsJSON,
|
||||
Supported: supported,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DBModelListStore) Fail(ctx context.Context, id string, errMsg string) error {
|
||||
return s.q.FailModelListRequest(ctx, db.FailModelListRequestParams{
|
||||
ID: id,
|
||||
Error: errMsg,
|
||||
})
|
||||
}
|
||||
|
||||
// dbRowToModelListRequest converts a sqlc-generated DB row into the handler's
|
||||
// ModelListRequest type used on the wire.
|
||||
func dbRowToModelListRequest(row db.ModelListRequest) *ModelListRequest {
|
||||
req := &ModelListRequest{
|
||||
ID: row.ID,
|
||||
RuntimeID: uuidToString(row.RuntimeID),
|
||||
Status: ModelListStatus(row.Status),
|
||||
Supported: row.Supported,
|
||||
Error: row.Error,
|
||||
CreatedAt: row.CreatedAt.Time,
|
||||
UpdatedAt: row.UpdatedAt.Time,
|
||||
}
|
||||
if len(row.Models) > 0 {
|
||||
_ = json.Unmarshal(row.Models, &req.Models)
|
||||
}
|
||||
return req
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -211,7 +319,12 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
req := h.ModelListStore.Create(uuidToString(rt.ID))
|
||||
req, err := h.ModelListStore.Create(r.Context(), uuidToString(rt.ID))
|
||||
if err != nil {
|
||||
slog.Error("model list create failed", "error", err, "runtime_id", runtimeID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to initiate model discovery")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, req)
|
||||
}
|
||||
|
||||
@@ -219,7 +332,7 @@ func (h *Handler) InitiateListModels(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) GetModelListRequest(w http.ResponseWriter, r *http.Request) {
|
||||
requestID := chi.URLParam(r, "requestId")
|
||||
|
||||
req := h.ModelListStore.Get(requestID)
|
||||
req, _ := h.ModelListStore.Get(r.Context(), requestID)
|
||||
if req == nil {
|
||||
writeError(w, http.StatusNotFound, "request not found")
|
||||
return
|
||||
@@ -255,9 +368,9 @@ func (h *Handler) ReportModelListResult(w http.ResponseWriter, r *http.Request)
|
||||
if body.Supported != nil {
|
||||
supported = *body.Supported
|
||||
}
|
||||
h.ModelListStore.Complete(requestID, body.Models, supported)
|
||||
h.ModelListStore.Complete(r.Context(), requestID, body.Models, supported)
|
||||
} else {
|
||||
h.ModelListStore.Fail(requestID, body.Error)
|
||||
h.ModelListStore.Fail(r.Context(), requestID, body.Error)
|
||||
}
|
||||
|
||||
slog.Debug("model list report", "runtime_id", runtimeID, "request_id", requestID, "status", body.Status, "count", len(body.Models))
|
||||
|
||||
@@ -2,6 +2,7 @@ package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -16,9 +17,10 @@ import (
|
||||
// way out of Running was the 2-minute memory GC, which exceeded the UI
|
||||
// polling window and surfaced as a silent "discovery failed" (MUL-1397).
|
||||
func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
store := NewModelListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
claimed := store.PopPending("runtime-xyz")
|
||||
store := NewInMemoryModelListStore()
|
||||
ctx := context.Background()
|
||||
req, _ := store.Create(ctx, "runtime-xyz")
|
||||
claimed, _ := store.PopPending(ctx, "runtime-xyz")
|
||||
if claimed == nil {
|
||||
t.Fatal("expected PopPending to claim the pending request")
|
||||
}
|
||||
@@ -30,7 +32,7 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
// reporting a result. Get() must flip it to Timeout so the UI can
|
||||
// terminate polling instead of waiting for the 2-minute GC.
|
||||
claimed.UpdatedAt = time.Now().Add(-(modelListRunningTimeout + time.Second))
|
||||
got := store.Get(req.ID)
|
||||
got, _ := store.Get(ctx, req.ID)
|
||||
if got == nil {
|
||||
t.Fatal("expected stored request")
|
||||
}
|
||||
@@ -48,8 +50,9 @@ func TestModelListStore_RunningRequestTimesOut(t *testing.T) {
|
||||
// dropped here (e.g. by going through a map[string]string), the badge
|
||||
// silently disappears.
|
||||
func TestReportModelListResult_PreservesDefault(t *testing.T) {
|
||||
store := NewModelListStore()
|
||||
req := store.Create("runtime-xyz")
|
||||
store := NewInMemoryModelListStore()
|
||||
ctx := context.Background()
|
||||
req, _ := store.Create(ctx, "runtime-xyz")
|
||||
|
||||
// Report a completed result with one default entry and one not.
|
||||
body := map[string]any{
|
||||
@@ -72,9 +75,9 @@ func TestReportModelListResult_PreservesDefault(t *testing.T) {
|
||||
if err := json.Unmarshal(raw, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal report body: %v", err)
|
||||
}
|
||||
store.Complete(req.ID, parsed.Models, true)
|
||||
store.Complete(ctx, req.ID, parsed.Models, true)
|
||||
|
||||
got := store.Get(req.ID)
|
||||
got, _ := store.Get(ctx, req.ID)
|
||||
if got == nil {
|
||||
t.Fatal("expected stored result")
|
||||
}
|
||||
|
||||
1
server/migrations/068_model_list_request.down.sql
Normal file
1
server/migrations/068_model_list_request.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS model_list_request;
|
||||
19
server/migrations/068_model_list_request.up.sql
Normal file
19
server/migrations/068_model_list_request.up.sql
Normal file
@@ -0,0 +1,19 @@
|
||||
-- Persistent model-list request store. Replaces the previous in-memory
|
||||
-- ModelListStore so that the POST (create) and GET (poll) endpoints work
|
||||
-- correctly across multiple server instances.
|
||||
|
||||
CREATE TABLE model_list_request (
|
||||
id TEXT PRIMARY KEY,
|
||||
runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
models JSONB,
|
||||
supported BOOLEAN NOT NULL DEFAULT true,
|
||||
error TEXT NOT NULL DEFAULT '',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- PopPending needs to find the oldest pending row for a given runtime fast.
|
||||
CREATE INDEX idx_model_list_request_runtime_pending
|
||||
ON model_list_request (runtime_id, created_at)
|
||||
WHERE status = 'pending';
|
||||
@@ -1449,7 +1449,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
}
|
||||
|
||||
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
@@ -1496,6 +1496,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
150
server/pkg/db/generated/model_list_request.sql.go
Normal file
150
server/pkg/db/generated/model_list_request.sql.go
Normal file
@@ -0,0 +1,150 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: model_list_request.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const completeModelListRequest = `-- name: CompleteModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'completed', models = $2, supported = $3, updated_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type CompleteModelListRequestParams struct {
|
||||
ID string `json:"id"`
|
||||
Models []byte `json:"models"`
|
||||
Supported bool `json:"supported"`
|
||||
}
|
||||
|
||||
func (q *Queries) CompleteModelListRequest(ctx context.Context, arg CompleteModelListRequestParams) error {
|
||||
_, err := q.db.Exec(ctx, completeModelListRequest, arg.ID, arg.Models, arg.Supported)
|
||||
return err
|
||||
}
|
||||
|
||||
const createModelListRequest = `-- name: CreateModelListRequest :one
|
||||
INSERT INTO model_list_request (id, runtime_id, status, supported, created_at, updated_at)
|
||||
VALUES ($1, $2, 'pending', true, now(), now())
|
||||
RETURNING id, runtime_id, status, models, supported, error, created_at, updated_at
|
||||
`
|
||||
|
||||
type CreateModelListRequestParams struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateModelListRequest(ctx context.Context, arg CreateModelListRequestParams) (ModelListRequest, error) {
|
||||
row := q.db.QueryRow(ctx, createModelListRequest, arg.ID, arg.RuntimeID)
|
||||
var i ModelListRequest
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.RuntimeID,
|
||||
&i.Status,
|
||||
&i.Models,
|
||||
&i.Supported,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteStaleModelListRequests = `-- name: DeleteStaleModelListRequests :exec
|
||||
DELETE FROM model_list_request
|
||||
WHERE created_at < now() - interval '2 minutes'
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteStaleModelListRequests(ctx context.Context) error {
|
||||
_, err := q.db.Exec(ctx, deleteStaleModelListRequests)
|
||||
return err
|
||||
}
|
||||
|
||||
const failModelListRequest = `-- name: FailModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'failed', error = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type FailModelListRequestParams struct {
|
||||
ID string `json:"id"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (q *Queries) FailModelListRequest(ctx context.Context, arg FailModelListRequestParams) error {
|
||||
_, err := q.db.Exec(ctx, failModelListRequest, arg.ID, arg.Error)
|
||||
return err
|
||||
}
|
||||
|
||||
const getModelListRequest = `-- name: GetModelListRequest :one
|
||||
SELECT id, runtime_id, status, models, supported, error, created_at, updated_at FROM model_list_request
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetModelListRequest(ctx context.Context, id string) (ModelListRequest, error) {
|
||||
row := q.db.QueryRow(ctx, getModelListRequest, id)
|
||||
var i ModelListRequest
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.RuntimeID,
|
||||
&i.Status,
|
||||
&i.Models,
|
||||
&i.Supported,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const popPendingModelListRequest = `-- name: PopPendingModelListRequest :one
|
||||
UPDATE model_list_request
|
||||
SET status = 'running', updated_at = now()
|
||||
WHERE id = (
|
||||
SELECT mlr.id FROM model_list_request mlr
|
||||
WHERE mlr.runtime_id = $1 AND mlr.status = 'pending'
|
||||
ORDER BY mlr.created_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, runtime_id, status, models, supported, error, created_at, updated_at
|
||||
`
|
||||
|
||||
// Atomically claim the oldest pending request for a runtime.
|
||||
// FOR UPDATE SKIP LOCKED ensures correctness across concurrent server instances.
|
||||
func (q *Queries) PopPendingModelListRequest(ctx context.Context, runtimeID pgtype.UUID) (ModelListRequest, error) {
|
||||
row := q.db.QueryRow(ctx, popPendingModelListRequest, runtimeID)
|
||||
var i ModelListRequest
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.RuntimeID,
|
||||
&i.Status,
|
||||
&i.Models,
|
||||
&i.Supported,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const timeoutModelListRequest = `-- name: TimeoutModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'timeout', error = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type TimeoutModelListRequestParams struct {
|
||||
ID string `json:"id"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (q *Queries) TimeoutModelListRequest(ctx context.Context, arg TimeoutModelListRequestParams) error {
|
||||
_, err := q.db.Exec(ctx, timeoutModelListRequest, arg.ID, arg.Error)
|
||||
return err
|
||||
}
|
||||
@@ -322,6 +322,17 @@ type Member struct {
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type ModelListRequest struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Status string `json:"status"`
|
||||
Models []byte `json:"models"`
|
||||
Supported bool `json:"supported"`
|
||||
Error string `json:"error"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type NotificationPreference struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
|
||||
41
server/pkg/db/queries/model_list_request.sql
Normal file
41
server/pkg/db/queries/model_list_request.sql
Normal file
@@ -0,0 +1,41 @@
|
||||
-- name: CreateModelListRequest :one
|
||||
INSERT INTO model_list_request (id, runtime_id, status, supported, created_at, updated_at)
|
||||
VALUES ($1, $2, 'pending', true, now(), now())
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetModelListRequest :one
|
||||
SELECT * FROM model_list_request
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: PopPendingModelListRequest :one
|
||||
-- Atomically claim the oldest pending request for a runtime.
|
||||
-- FOR UPDATE SKIP LOCKED ensures correctness across concurrent server instances.
|
||||
UPDATE model_list_request
|
||||
SET status = 'running', updated_at = now()
|
||||
WHERE id = (
|
||||
SELECT mlr.id FROM model_list_request mlr
|
||||
WHERE mlr.runtime_id = $1 AND mlr.status = 'pending'
|
||||
ORDER BY mlr.created_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: CompleteModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'completed', models = $2, supported = $3, updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: FailModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'failed', error = $2, updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: TimeoutModelListRequest :exec
|
||||
UPDATE model_list_request
|
||||
SET status = 'timeout', error = $2, updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: DeleteStaleModelListRequests :exec
|
||||
DELETE FROM model_list_request
|
||||
WHERE created_at < now() - interval '2 minutes';
|
||||
Reference in New Issue
Block a user