mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
339 lines
9.7 KiB
Go
339 lines
9.7 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// CLI update request store
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type UpdateStatus string
|
|
|
|
const (
|
|
UpdatePending UpdateStatus = "pending"
|
|
UpdateRunning UpdateStatus = "running"
|
|
UpdateCompleted UpdateStatus = "completed"
|
|
UpdateFailed UpdateStatus = "failed"
|
|
UpdateTimeout UpdateStatus = "timeout"
|
|
)
|
|
|
|
// UpdateRequest represents a pending or completed CLI update request.
|
|
type UpdateRequest struct {
|
|
ID string `json:"id"`
|
|
RuntimeID string `json:"runtime_id"`
|
|
Status UpdateStatus `json:"status"`
|
|
TargetVersion string `json:"target_version"`
|
|
Output string `json:"output,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
RunStartedAt *time.Time `json:"-"`
|
|
}
|
|
|
|
const (
|
|
updatePendingTimeout = 120 * time.Second
|
|
updateRunningTimeout = 150 * time.Second
|
|
updateStoreRetention = 5 * time.Minute
|
|
)
|
|
|
|
type UpdateStore interface {
|
|
Create(ctx context.Context, runtimeID, targetVersion string) (*UpdateRequest, error)
|
|
Get(ctx context.Context, id string) (*UpdateRequest, error)
|
|
HasPending(ctx context.Context, runtimeID string) (bool, error)
|
|
PopPending(ctx context.Context, runtimeID string) (*UpdateRequest, error)
|
|
Complete(ctx context.Context, id string, output string) error
|
|
Fail(ctx context.Context, id string, errMsg string) error
|
|
}
|
|
|
|
func updateRequestTerminal(status UpdateStatus) bool {
|
|
return status == UpdateCompleted || status == UpdateFailed || status == UpdateTimeout
|
|
}
|
|
|
|
func applyUpdateTimeout(req *UpdateRequest, now time.Time) bool {
|
|
switch req.Status {
|
|
case UpdatePending:
|
|
if now.Sub(req.CreatedAt) > updatePendingTimeout {
|
|
req.Status = UpdateTimeout
|
|
req.Error = "daemon did not respond within 120 seconds"
|
|
req.UpdatedAt = now
|
|
return true
|
|
}
|
|
case UpdateRunning:
|
|
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > updateRunningTimeout {
|
|
req.Status = UpdateTimeout
|
|
req.Error = "update did not complete within 150 seconds"
|
|
req.UpdatedAt = now
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// InMemoryUpdateStore is the single-node implementation. Multi-node deploys
|
|
// must use RedisUpdateStore so Web POST, daemon heartbeat, daemon report, and
|
|
// UI polling agree on the same request lifecycle.
|
|
type InMemoryUpdateStore struct {
|
|
mu sync.Mutex
|
|
requests map[string]*UpdateRequest // keyed by update ID
|
|
}
|
|
|
|
func NewInMemoryUpdateStore() *InMemoryUpdateStore {
|
|
return &InMemoryUpdateStore{
|
|
requests: make(map[string]*UpdateRequest),
|
|
}
|
|
}
|
|
|
|
func (s *InMemoryUpdateStore) Create(_ context.Context, runtimeID, targetVersion string) (*UpdateRequest, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Clean up old requests.
|
|
for id, req := range s.requests {
|
|
if time.Since(req.CreatedAt) > updateStoreRetention {
|
|
delete(s.requests, id)
|
|
}
|
|
}
|
|
|
|
// Reject if there is already a pending or running update for this runtime.
|
|
for _, req := range s.requests {
|
|
if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) {
|
|
return nil, errUpdateInProgress
|
|
}
|
|
}
|
|
|
|
req := &UpdateRequest{
|
|
ID: randomID(),
|
|
RuntimeID: runtimeID,
|
|
Status: UpdatePending,
|
|
TargetVersion: targetVersion,
|
|
CreatedAt: time.Now(),
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
s.requests[req.ID] = req
|
|
return req, nil
|
|
}
|
|
|
|
var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"}
|
|
|
|
type updateError struct{ msg string }
|
|
|
|
func (e *updateError) Error() string { return e.msg }
|
|
|
|
func (s *InMemoryUpdateStore) Get(_ context.Context, id string) (*UpdateRequest, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
req, ok := s.requests[id]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
applyUpdateTimeout(req, time.Now())
|
|
return req, nil
|
|
}
|
|
|
|
func (s *InMemoryUpdateStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
for _, req := range s.requests {
|
|
applyUpdateTimeout(req, now)
|
|
if req.RuntimeID == runtimeID && req.Status == UpdatePending {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// PopPending returns and marks as running the pending update for a runtime.
|
|
func (s *InMemoryUpdateStore) PopPending(_ context.Context, runtimeID string) (*UpdateRequest, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var oldest *UpdateRequest
|
|
now := time.Now()
|
|
for _, req := range s.requests {
|
|
applyUpdateTimeout(req, now)
|
|
if req.RuntimeID == runtimeID && req.Status == UpdatePending {
|
|
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
|
|
oldest = req
|
|
}
|
|
}
|
|
}
|
|
if oldest != nil {
|
|
oldest.Status = UpdateRunning
|
|
startedAt := now
|
|
oldest.RunStartedAt = &startedAt
|
|
oldest.UpdatedAt = now
|
|
}
|
|
return oldest, nil
|
|
}
|
|
|
|
func (s *InMemoryUpdateStore) Complete(_ context.Context, id string, output string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if req, ok := s.requests[id]; ok {
|
|
req.Status = UpdateCompleted
|
|
req.Output = output
|
|
req.UpdatedAt = time.Now()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *InMemoryUpdateStore) Fail(_ context.Context, id string, errMsg string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if req, ok := s.requests[id]; ok {
|
|
req.Status = UpdateFailed
|
|
req.Error = errMsg
|
|
req.UpdatedAt = time.Now()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Handlers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// InitiateUpdate creates a new CLI update request (protected route, called by frontend).
|
|
func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime not found")
|
|
return
|
|
}
|
|
|
|
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
TargetVersion string `json:"target_version"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if req.TargetVersion == "" {
|
|
writeError(w, http.StatusBadRequest, "target_version is required")
|
|
return
|
|
}
|
|
|
|
update, err := h.UpdateStore.Create(r.Context(), uuidToString(rt.ID), req.TargetVersion)
|
|
if err != nil {
|
|
writeError(w, http.StatusConflict, err.Error())
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, update)
|
|
}
|
|
|
|
// GetUpdate returns the status of an update request (protected route, called by frontend).
|
|
func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime not found")
|
|
return
|
|
}
|
|
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
|
|
return
|
|
}
|
|
|
|
updateID := chi.URLParam(r, "updateId")
|
|
|
|
update, err := h.UpdateStore.Get(r.Context(), updateID)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error())
|
|
return
|
|
}
|
|
if update == nil || update.RuntimeID != uuidToString(rt.ID) {
|
|
writeError(w, http.StatusNotFound, "update not found")
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, update)
|
|
}
|
|
|
|
// ReportUpdateResult receives the update result from the daemon.
|
|
func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
// Verify the caller owns this runtime's workspace.
|
|
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
|
|
return
|
|
}
|
|
|
|
updateID := chi.URLParam(r, "updateId")
|
|
|
|
existing, err := h.UpdateStore.Get(r.Context(), updateID)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error())
|
|
return
|
|
}
|
|
if existing == nil || existing.RuntimeID != runtimeID {
|
|
writeError(w, http.StatusNotFound, "update not found")
|
|
return
|
|
}
|
|
if updateRequestTerminal(existing.Status) {
|
|
slog.Debug("ignoring stale update report", "runtime_id", runtimeID, "update_id", updateID, "status", existing.Status)
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Status string `json:"status"` // "running", "completed", or "failed"
|
|
Output string `json:"output"`
|
|
Error string `json:"error"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
switch req.Status {
|
|
case "completed":
|
|
if err := h.UpdateStore.Complete(r.Context(), updateID, req.Output); err != nil {
|
|
slog.Error("UpdateStore Complete failed", "error", err, "update_id", updateID)
|
|
writeError(w, http.StatusInternalServerError, "failed to persist completion")
|
|
return
|
|
}
|
|
case "failed":
|
|
if err := h.UpdateStore.Fail(r.Context(), updateID, req.Error); err != nil {
|
|
slog.Error("UpdateStore Fail failed", "error", err, "update_id", updateID)
|
|
writeError(w, http.StatusInternalServerError, "failed to persist failure")
|
|
return
|
|
}
|
|
case "running":
|
|
// No-op: status is already "running" from PopPending. This call is
|
|
// just a progress signal from the daemon to confirm it received the
|
|
// update command and is executing it.
|
|
default:
|
|
writeError(w, http.StatusBadRequest, "invalid status: "+req.Status)
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|