Files
multica/server/internal/handler/runtime.go
Jiayuan Zhang ff5f6ac2ee fix(daemon): prevent duplicate runtime registration on profile switch (#906)
* fix(daemon): prevent duplicate runtime registration on profile switch

The daemon_id included a profile name suffix (e.g. "hostname-staging"),
so switching profiles created a new daemon_id that bypassed the UPSERT
dedup constraint, leaving orphaned runtime records in the database.

Three changes:
- Remove profile suffix from daemon_id — use stable hostname only.
  The unique constraint (workspace_id, daemon_id, provider) already
  prevents collisions within the same workspace.
- Auto-migrate agents from old offline runtimes to the newly registered
  runtime during DaemonRegister (same workspace/provider/owner).
- Add TTL-based GC in the runtime sweeper to delete offline runtimes
  with no active agents after 7 days.

Closes MUL-695

* fix(daemon): address code review issues on PR #906

1. Move gcRuntimes() to the main sweep loop — previously it was inside
   sweepStaleRuntimes() after an early return, so it only ran when new
   runtimes were marked stale. Now it runs every sweep cycle independently.

2. Fix DeleteStaleOfflineRuntimes to exclude runtimes with ANY agent
   reference (not just active ones). The FK agent.runtime_id is ON DELETE
   RESTRICT, so archived agents also block deletion.

3. Scope MigrateAgentsToRuntime to the same machine by matching
   daemon_id LIKE '<current_daemon_id>-%'. This prevents cross-machine
   agent migration when the same user has multiple devices.
2026-04-14 01:52:34 +08:00

383 lines
12 KiB
Go

package handler
import (
"encoding/json"
"log/slog"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type AgentRuntimeResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
DaemonID *string `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata any `json:"metadata"`
OwnerID *string `json:"owner_id"`
LastSeenAt *string `json:"last_seen_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
var metadata any
if rt.Metadata != nil {
json.Unmarshal(rt.Metadata, &metadata)
}
if metadata == nil {
metadata = map[string]any{}
}
return AgentRuntimeResponse{
ID: uuidToString(rt.ID),
WorkspaceID: uuidToString(rt.WorkspaceID),
DaemonID: textToPtr(rt.DaemonID),
Name: rt.Name,
RuntimeMode: rt.RuntimeMode,
Provider: rt.Provider,
Status: rt.Status,
DeviceInfo: rt.DeviceInfo,
Metadata: metadata,
OwnerID: uuidToPtr(rt.OwnerID),
LastSeenAt: timestampToPtr(rt.LastSeenAt),
CreatedAt: timestampToString(rt.CreatedAt),
UpdatedAt: timestampToString(rt.UpdatedAt),
}
}
// ---------------------------------------------------------------------------
// Runtime Usage
// ---------------------------------------------------------------------------
type RuntimeUsageEntry struct {
Date string `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
type RuntimeUsageResponse struct {
RuntimeID string `json:"runtime_id"`
Date string `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// ReportRuntimeUsage receives usage data from the daemon.
func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
if runtimeID == "" {
writeError(w, http.StatusBadRequest, "runtimeId is required")
return
}
// Verify the caller owns this runtime's workspace.
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
return
}
var req struct {
Entries []RuntimeUsageEntry `json:"entries"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
for _, entry := range req.Entries {
date, err := time.Parse("2006-01-02", entry.Date)
if err != nil {
continue
}
h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Date: pgtype.Date{Time: date, Valid: true},
Provider: entry.Provider,
Model: entry.Model,
InputTokens: entry.InputTokens,
OutputTokens: entry.OutputTokens,
CacheReadTokens: entry.CacheReadTokens,
CacheWriteTokens: entry.CacheWriteTokens,
})
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// GetRuntimeUsage returns usage data for a runtime (protected route).
func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
days := 90
if d := r.URL.Query().Get("days"); d != "" {
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
days = parsed
}
}
since := pgtype.Date{Time: time.Now().AddDate(0, 0, -days), Valid: true}
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Date: since,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list usage")
return
}
resp := make([]RuntimeUsageResponse, len(rows))
for i, row := range rows {
resp[i] = RuntimeUsageResponse{
RuntimeID: runtimeID,
Date: row.Date.Time.Format("2006-01-02"),
Provider: row.Provider,
Model: row.Model,
InputTokens: row.InputTokens,
OutputTokens: row.OutputTokens,
CacheReadTokens: row.CacheReadTokens,
CacheWriteTokens: row.CacheWriteTokens,
}
}
writeJSON(w, http.StatusOK, resp)
}
// GetRuntimeTaskActivity returns hourly task activity distribution for a runtime.
func (h *Handler) GetRuntimeTaskActivity(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
rows, err := h.Queries.GetRuntimeTaskHourlyActivity(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get task activity")
return
}
type HourlyActivity struct {
Hour int `json:"hour"`
Count int `json:"count"`
}
resp := make([]HourlyActivity, len(rows))
for i, row := range rows {
resp[i] = HourlyActivity{Hour: int(row.Hour), Count: int(row.Count)}
}
writeJSON(w, http.StatusOK, resp)
}
// GetWorkspaceUsageByDay returns daily token usage aggregated by model for the workspace.
func (h *Handler) GetWorkspaceUsageByDay(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
since := parseSinceParam(r, 30)
rows, err := h.Queries.GetWorkspaceUsageByDay(r.Context(), db.GetWorkspaceUsageByDayParams{
WorkspaceID: parseUUID(workspaceID),
Since: since,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get usage")
return
}
type DailyUsageRow struct {
Date string `json:"date"`
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
resp := make([]DailyUsageRow, len(rows))
for i, row := range rows {
resp[i] = DailyUsageRow{
Date: row.Date.Time.Format("2006-01-02"),
Model: row.Model,
TotalInputTokens: row.TotalInputTokens,
TotalOutputTokens: row.TotalOutputTokens,
TotalCacheReadTokens: row.TotalCacheReadTokens,
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
TaskCount: row.TaskCount,
}
}
writeJSON(w, http.StatusOK, resp)
}
// GetWorkspaceUsageSummary returns total token usage aggregated by model for the workspace.
func (h *Handler) GetWorkspaceUsageSummary(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
since := parseSinceParam(r, 30)
rows, err := h.Queries.GetWorkspaceUsageSummary(r.Context(), db.GetWorkspaceUsageSummaryParams{
WorkspaceID: parseUUID(workspaceID),
Since: since,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get usage summary")
return
}
type UsageSummaryRow struct {
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
resp := make([]UsageSummaryRow, len(rows))
for i, row := range rows {
resp[i] = UsageSummaryRow{
Model: row.Model,
TotalInputTokens: row.TotalInputTokens,
TotalOutputTokens: row.TotalOutputTokens,
TotalCacheReadTokens: row.TotalCacheReadTokens,
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
TaskCount: row.TaskCount,
}
}
writeJSON(w, http.StatusOK, resp)
}
// parseSinceParam parses the "days" query parameter and returns a timestamptz.
func parseSinceParam(r *http.Request, defaultDays int) pgtype.Timestamptz {
days := defaultDays
if d := r.URL.Query().Get("days"); d != "" {
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
days = parsed
}
}
t := time.Now().AddDate(0, 0, -days)
return pgtype.Timestamptz{Time: t, Valid: true}
}
func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
var runtimes []db.AgentRuntime
var err error
if ownerFilter := r.URL.Query().Get("owner"); ownerFilter == "me" {
userID, ok := requireUserID(w, r)
if !ok {
return
}
runtimes, err = h.Queries.ListAgentRuntimesByOwner(r.Context(), db.ListAgentRuntimesByOwnerParams{
WorkspaceID: parseUUID(workspaceID),
OwnerID: parseUUID(userID),
})
} else {
runtimes, err = h.Queries.ListAgentRuntimes(r.Context(), parseUUID(workspaceID))
}
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list runtimes")
return
}
resp := make([]AgentRuntimeResponse, len(runtimes))
for i, rt := range runtimes {
resp[i] = runtimeToResponse(rt)
}
writeJSON(w, http.StatusOK, resp)
}
// DeleteAgentRuntime deletes a runtime after permission and dependency checks.
func (h *Handler) DeleteAgentRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
wsID := uuidToString(rt.WorkspaceID)
member, ok := h.requireWorkspaceMember(w, r, wsID, "runtime not found")
if !ok {
return
}
// Permission: owner/admin can delete any runtime; members can only delete their own.
userID := uuidToString(member.UserID)
isAdmin := roleAllowed(member.Role, "owner", "admin")
isOwner := rt.OwnerID.Valid && uuidToString(rt.OwnerID) == userID
if !isAdmin && !isOwner {
writeError(w, http.StatusForbidden, "you can only delete your own runtimes")
return
}
// Check if any active (non-archived) agents are bound to this runtime.
activeCount, err := h.Queries.CountActiveAgentsByRuntime(r.Context(), rt.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to check runtime dependencies")
return
}
if activeCount > 0 {
writeError(w, http.StatusConflict, "cannot delete runtime: it has active agents bound to it. Archive or reassign the agents first.")
return
}
// Remove archived agents so the FK constraint (ON DELETE RESTRICT) won't block deletion.
if err := h.Queries.DeleteArchivedAgentsByRuntime(r.Context(), rt.ID); err != nil {
writeError(w, http.StatusInternalServerError, "failed to clean up archived agents")
return
}
if err := h.Queries.DeleteAgentRuntime(r.Context(), rt.ID); err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete runtime")
return
}
slog.Info("runtime deleted", "runtime_id", runtimeID, "deleted_by", userID)
// Notify frontend to refresh runtime list.
h.publish(protocol.EventDaemonRegister, wsID, "member", userID, map[string]any{
"action": "delete",
})
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}