Files
multica/server/internal/handler/runtime_update.go
2026-05-06 13:00:11 +08:00

339 lines
9.7 KiB
Go

package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"sync"
"time"
"github.com/go-chi/chi/v5"
)
// ---------------------------------------------------------------------------
// CLI update request store
// ---------------------------------------------------------------------------
type UpdateStatus string
const (
UpdatePending UpdateStatus = "pending"
UpdateRunning UpdateStatus = "running"
UpdateCompleted UpdateStatus = "completed"
UpdateFailed UpdateStatus = "failed"
UpdateTimeout UpdateStatus = "timeout"
)
// UpdateRequest represents a pending or completed CLI update request.
type UpdateRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status UpdateStatus `json:"status"`
TargetVersion string `json:"target_version"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RunStartedAt *time.Time `json:"-"`
}
const (
updatePendingTimeout = 120 * time.Second
updateRunningTimeout = 150 * time.Second
updateStoreRetention = 5 * time.Minute
)
type UpdateStore interface {
Create(ctx context.Context, runtimeID, targetVersion string) (*UpdateRequest, error)
Get(ctx context.Context, id string) (*UpdateRequest, error)
HasPending(ctx context.Context, runtimeID string) (bool, error)
PopPending(ctx context.Context, runtimeID string) (*UpdateRequest, error)
Complete(ctx context.Context, id string, output string) error
Fail(ctx context.Context, id string, errMsg string) error
}
func updateRequestTerminal(status UpdateStatus) bool {
return status == UpdateCompleted || status == UpdateFailed || status == UpdateTimeout
}
func applyUpdateTimeout(req *UpdateRequest, now time.Time) bool {
switch req.Status {
case UpdatePending:
if now.Sub(req.CreatedAt) > updatePendingTimeout {
req.Status = UpdateTimeout
req.Error = "daemon did not respond within 120 seconds"
req.UpdatedAt = now
return true
}
case UpdateRunning:
if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > updateRunningTimeout {
req.Status = UpdateTimeout
req.Error = "update did not complete within 150 seconds"
req.UpdatedAt = now
return true
}
}
return false
}
// InMemoryUpdateStore is the single-node implementation. Multi-node deploys
// must use RedisUpdateStore so Web POST, daemon heartbeat, daemon report, and
// UI polling agree on the same request lifecycle.
type InMemoryUpdateStore struct {
mu sync.Mutex
requests map[string]*UpdateRequest // keyed by update ID
}
func NewInMemoryUpdateStore() *InMemoryUpdateStore {
return &InMemoryUpdateStore{
requests: make(map[string]*UpdateRequest),
}
}
func (s *InMemoryUpdateStore) Create(_ context.Context, runtimeID, targetVersion string) (*UpdateRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Clean up old requests.
for id, req := range s.requests {
if time.Since(req.CreatedAt) > updateStoreRetention {
delete(s.requests, id)
}
}
// Reject if there is already a pending or running update for this runtime.
for _, req := range s.requests {
if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) {
return nil, errUpdateInProgress
}
}
req := &UpdateRequest{
ID: randomID(),
RuntimeID: runtimeID,
Status: UpdatePending,
TargetVersion: targetVersion,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
s.requests[req.ID] = req
return req, nil
}
var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"}
type updateError struct{ msg string }
func (e *updateError) Error() string { return e.msg }
func (s *InMemoryUpdateStore) Get(_ context.Context, id string) (*UpdateRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil, nil
}
applyUpdateTimeout(req, time.Now())
return req, nil
}
func (s *InMemoryUpdateStore) HasPending(_ context.Context, runtimeID string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for _, req := range s.requests {
applyUpdateTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == UpdatePending {
return true, nil
}
}
return false, nil
}
// PopPending returns and marks as running the pending update for a runtime.
func (s *InMemoryUpdateStore) PopPending(_ context.Context, runtimeID string) (*UpdateRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
var oldest *UpdateRequest
now := time.Now()
for _, req := range s.requests {
applyUpdateTimeout(req, now)
if req.RuntimeID == runtimeID && req.Status == UpdatePending {
if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) {
oldest = req
}
}
}
if oldest != nil {
oldest.Status = UpdateRunning
startedAt := now
oldest.RunStartedAt = &startedAt
oldest.UpdatedAt = now
}
return oldest, nil
}
func (s *InMemoryUpdateStore) Complete(_ context.Context, id string, output string) error {
s.mu.Lock()
defer s.mu.Unlock()
if req, ok := s.requests[id]; ok {
req.Status = UpdateCompleted
req.Output = output
req.UpdatedAt = time.Now()
}
return nil
}
func (s *InMemoryUpdateStore) Fail(_ context.Context, id string, errMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
if req, ok := s.requests[id]; ok {
req.Status = UpdateFailed
req.Error = errMsg
req.UpdatedAt = time.Now()
}
return nil
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
// InitiateUpdate creates a new CLI update request (protected route, called by frontend).
func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
if !ok {
return
}
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
var req struct {
TargetVersion string `json:"target_version"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.TargetVersion == "" {
writeError(w, http.StatusBadRequest, "target_version is required")
return
}
update, err := h.UpdateStore.Create(r.Context(), uuidToString(rt.ID), req.TargetVersion)
if err != nil {
writeError(w, http.StatusConflict, err.Error())
return
}
writeJSON(w, http.StatusOK, update)
}
// GetUpdate returns the status of an update request (protected route, called by frontend).
func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
if !ok {
return
}
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
updateID := chi.URLParam(r, "updateId")
update, err := h.UpdateStore.Get(r.Context(), updateID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error())
return
}
if update == nil || update.RuntimeID != uuidToString(rt.ID) {
writeError(w, http.StatusNotFound, "update not found")
return
}
writeJSON(w, http.StatusOK, update)
}
// ReportUpdateResult receives the update result from the daemon.
func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
// Verify the caller owns this runtime's workspace.
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
return
}
updateID := chi.URLParam(r, "updateId")
existing, err := h.UpdateStore.Get(r.Context(), updateID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error())
return
}
if existing == nil || existing.RuntimeID != runtimeID {
writeError(w, http.StatusNotFound, "update not found")
return
}
if updateRequestTerminal(existing.Status) {
slog.Debug("ignoring stale update report", "runtime_id", runtimeID, "update_id", updateID, "status", existing.Status)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
var req struct {
Status string `json:"status"` // "running", "completed", or "failed"
Output string `json:"output"`
Error string `json:"error"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
switch req.Status {
case "completed":
if err := h.UpdateStore.Complete(r.Context(), updateID, req.Output); err != nil {
slog.Error("UpdateStore Complete failed", "error", err, "update_id", updateID)
writeError(w, http.StatusInternalServerError, "failed to persist completion")
return
}
case "failed":
if err := h.UpdateStore.Fail(r.Context(), updateID, req.Error); err != nil {
slog.Error("UpdateStore Fail failed", "error", err, "update_id", updateID)
writeError(w, http.StatusInternalServerError, "failed to persist failure")
return
}
case "running":
// No-op: status is already "running" from PopPending. This call is
// just a progress signal from the daemon to confirm it received the
// update command and is executing it.
default:
writeError(w, http.StatusBadRequest, "invalid status: "+req.Status)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}