package handler import ( "context" "encoding/json" "log/slog" "net/http" "sync" "time" "github.com/go-chi/chi/v5" ) // --------------------------------------------------------------------------- // CLI update request store // --------------------------------------------------------------------------- type UpdateStatus string const ( UpdatePending UpdateStatus = "pending" UpdateRunning UpdateStatus = "running" UpdateCompleted UpdateStatus = "completed" UpdateFailed UpdateStatus = "failed" UpdateTimeout UpdateStatus = "timeout" ) // UpdateRequest represents a pending or completed CLI update request. type UpdateRequest struct { ID string `json:"id"` RuntimeID string `json:"runtime_id"` Status UpdateStatus `json:"status"` TargetVersion string `json:"target_version"` Output string `json:"output,omitempty"` Error string `json:"error,omitempty"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` RunStartedAt *time.Time `json:"-"` } const ( updatePendingTimeout = 120 * time.Second updateRunningTimeout = 150 * time.Second updateStoreRetention = 5 * time.Minute ) type UpdateStore interface { Create(ctx context.Context, runtimeID, targetVersion string) (*UpdateRequest, error) Get(ctx context.Context, id string) (*UpdateRequest, error) HasPending(ctx context.Context, runtimeID string) (bool, error) PopPending(ctx context.Context, runtimeID string) (*UpdateRequest, error) Complete(ctx context.Context, id string, output string) error Fail(ctx context.Context, id string, errMsg string) error } func updateRequestTerminal(status UpdateStatus) bool { return status == UpdateCompleted || status == UpdateFailed || status == UpdateTimeout } func applyUpdateTimeout(req *UpdateRequest, now time.Time) bool { switch req.Status { case UpdatePending: if now.Sub(req.CreatedAt) > updatePendingTimeout { req.Status = UpdateTimeout req.Error = "daemon did not respond within 120 seconds" req.UpdatedAt = now return true } case UpdateRunning: if req.RunStartedAt != nil && now.Sub(*req.RunStartedAt) > updateRunningTimeout { req.Status = UpdateTimeout req.Error = "update did not complete within 150 seconds" req.UpdatedAt = now return true } } return false } // InMemoryUpdateStore is the single-node implementation. Multi-node deploys // must use RedisUpdateStore so Web POST, daemon heartbeat, daemon report, and // UI polling agree on the same request lifecycle. type InMemoryUpdateStore struct { mu sync.Mutex requests map[string]*UpdateRequest // keyed by update ID } func NewInMemoryUpdateStore() *InMemoryUpdateStore { return &InMemoryUpdateStore{ requests: make(map[string]*UpdateRequest), } } func (s *InMemoryUpdateStore) Create(_ context.Context, runtimeID, targetVersion string) (*UpdateRequest, error) { s.mu.Lock() defer s.mu.Unlock() // Clean up old requests. for id, req := range s.requests { if time.Since(req.CreatedAt) > updateStoreRetention { delete(s.requests, id) } } // Reject if there is already a pending or running update for this runtime. for _, req := range s.requests { if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) { return nil, errUpdateInProgress } } req := &UpdateRequest{ ID: randomID(), RuntimeID: runtimeID, Status: UpdatePending, TargetVersion: targetVersion, CreatedAt: time.Now(), UpdatedAt: time.Now(), } s.requests[req.ID] = req return req, nil } var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"} type updateError struct{ msg string } func (e *updateError) Error() string { return e.msg } func (s *InMemoryUpdateStore) Get(_ context.Context, id string) (*UpdateRequest, error) { s.mu.Lock() defer s.mu.Unlock() req, ok := s.requests[id] if !ok { return nil, nil } applyUpdateTimeout(req, time.Now()) return req, nil } func (s *InMemoryUpdateStore) HasPending(_ context.Context, runtimeID string) (bool, error) { s.mu.Lock() defer s.mu.Unlock() now := time.Now() for _, req := range s.requests { applyUpdateTimeout(req, now) if req.RuntimeID == runtimeID && req.Status == UpdatePending { return true, nil } } return false, nil } // PopPending returns and marks as running the pending update for a runtime. func (s *InMemoryUpdateStore) PopPending(_ context.Context, runtimeID string) (*UpdateRequest, error) { s.mu.Lock() defer s.mu.Unlock() var oldest *UpdateRequest now := time.Now() for _, req := range s.requests { applyUpdateTimeout(req, now) if req.RuntimeID == runtimeID && req.Status == UpdatePending { if oldest == nil || req.CreatedAt.Before(oldest.CreatedAt) { oldest = req } } } if oldest != nil { oldest.Status = UpdateRunning startedAt := now oldest.RunStartedAt = &startedAt oldest.UpdatedAt = now } return oldest, nil } func (s *InMemoryUpdateStore) Complete(_ context.Context, id string, output string) error { s.mu.Lock() defer s.mu.Unlock() if req, ok := s.requests[id]; ok { req.Status = UpdateCompleted req.Output = output req.UpdatedAt = time.Now() } return nil } func (s *InMemoryUpdateStore) Fail(_ context.Context, id string, errMsg string) error { s.mu.Lock() defer s.mu.Unlock() if req, ok := s.requests[id]; ok { req.Status = UpdateFailed req.Error = errMsg req.UpdatedAt = time.Now() } return nil } // --------------------------------------------------------------------------- // Handlers // --------------------------------------------------------------------------- // InitiateUpdate creates a new CLI update request (protected route, called by frontend). func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id") if !ok { return } rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID) if err != nil { writeError(w, http.StatusNotFound, "runtime not found") return } if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { return } var req struct { TargetVersion string `json:"target_version"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } if req.TargetVersion == "" { writeError(w, http.StatusBadRequest, "target_version is required") return } update, err := h.UpdateStore.Create(r.Context(), uuidToString(rt.ID), req.TargetVersion) if err != nil { writeError(w, http.StatusConflict, err.Error()) return } writeJSON(w, http.StatusOK, update) } // GetUpdate returns the status of an update request (protected route, called by frontend). func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id") if !ok { return } rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID) if err != nil { writeError(w, http.StatusNotFound, "runtime not found") return } if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { return } updateID := chi.URLParam(r, "updateId") update, err := h.UpdateStore.Get(r.Context(), updateID) if err != nil { writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error()) return } if update == nil || update.RuntimeID != uuidToString(rt.ID) { writeError(w, http.StatusNotFound, "update not found") return } writeJSON(w, http.StatusOK, update) } // ReportUpdateResult receives the update result from the daemon. func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") // Verify the caller owns this runtime's workspace. if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok { return } updateID := chi.URLParam(r, "updateId") existing, err := h.UpdateStore.Get(r.Context(), updateID) if err != nil { writeError(w, http.StatusInternalServerError, "failed to load update: "+err.Error()) return } if existing == nil || existing.RuntimeID != runtimeID { writeError(w, http.StatusNotFound, "update not found") return } if updateRequestTerminal(existing.Status) { slog.Debug("ignoring stale update report", "runtime_id", runtimeID, "update_id", updateID, "status", existing.Status) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } var req struct { Status string `json:"status"` // "running", "completed", or "failed" Output string `json:"output"` Error string `json:"error"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } switch req.Status { case "completed": if err := h.UpdateStore.Complete(r.Context(), updateID, req.Output); err != nil { slog.Error("UpdateStore Complete failed", "error", err, "update_id", updateID) writeError(w, http.StatusInternalServerError, "failed to persist completion") return } case "failed": if err := h.UpdateStore.Fail(r.Context(), updateID, req.Error); err != nil { slog.Error("UpdateStore Fail failed", "error", err, "update_id", updateID) writeError(w, http.StatusInternalServerError, "failed to persist failure") return } case "running": // No-op: status is already "running" from PopPending. This call is // just a progress signal from the daemon to confirm it received the // update command and is executing it. default: writeError(w, http.StatusBadRequest, "invalid status: "+req.Status) return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) }