mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 21:39:54 +02:00
* fix(daemon): prevent duplicate runtime registration on profile switch The daemon_id included a profile name suffix (e.g. "hostname-staging"), so switching profiles created a new daemon_id that bypassed the UPSERT dedup constraint, leaving orphaned runtime records in the database. Three changes: - Remove profile suffix from daemon_id — use stable hostname only. The unique constraint (workspace_id, daemon_id, provider) already prevents collisions within the same workspace. - Auto-migrate agents from old offline runtimes to the newly registered runtime during DaemonRegister (same workspace/provider/owner). - Add TTL-based GC in the runtime sweeper to delete offline runtimes with no active agents after 7 days. Closes MUL-695 * fix(daemon): address code review issues on PR #906 1. Move gcRuntimes() to the main sweep loop — previously it was inside sweepStaleRuntimes() after an early return, so it only ran when new runtimes were marked stale. Now it runs every sweep cycle independently. 2. Fix DeleteStaleOfflineRuntimes to exclude runtimes with ANY agent reference (not just active ones). The FK agent.runtime_id is ON DELETE RESTRICT, so archived agents also block deletion. 3. Scope MigrateAgentsToRuntime to the same machine by matching daemon_id LIKE '<current_daemon_id>-%'. This prevents cross-machine agent migration when the same user has multiple devices.
383 lines
12 KiB
Go
383 lines
12 KiB
Go
package handler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log/slog"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
"github.com/multica-ai/multica/server/pkg/protocol"
|
|
)
|
|
|
|
type AgentRuntimeResponse struct {
|
|
ID string `json:"id"`
|
|
WorkspaceID string `json:"workspace_id"`
|
|
DaemonID *string `json:"daemon_id"`
|
|
Name string `json:"name"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
Provider string `json:"provider"`
|
|
Status string `json:"status"`
|
|
DeviceInfo string `json:"device_info"`
|
|
Metadata any `json:"metadata"`
|
|
OwnerID *string `json:"owner_id"`
|
|
LastSeenAt *string `json:"last_seen_at"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
}
|
|
|
|
func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
|
|
var metadata any
|
|
if rt.Metadata != nil {
|
|
json.Unmarshal(rt.Metadata, &metadata)
|
|
}
|
|
if metadata == nil {
|
|
metadata = map[string]any{}
|
|
}
|
|
|
|
return AgentRuntimeResponse{
|
|
ID: uuidToString(rt.ID),
|
|
WorkspaceID: uuidToString(rt.WorkspaceID),
|
|
DaemonID: textToPtr(rt.DaemonID),
|
|
Name: rt.Name,
|
|
RuntimeMode: rt.RuntimeMode,
|
|
Provider: rt.Provider,
|
|
Status: rt.Status,
|
|
DeviceInfo: rt.DeviceInfo,
|
|
Metadata: metadata,
|
|
OwnerID: uuidToPtr(rt.OwnerID),
|
|
LastSeenAt: timestampToPtr(rt.LastSeenAt),
|
|
CreatedAt: timestampToString(rt.CreatedAt),
|
|
UpdatedAt: timestampToString(rt.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Runtime Usage
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type RuntimeUsageEntry struct {
|
|
Date string `json:"date"`
|
|
Provider string `json:"provider"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
}
|
|
|
|
type RuntimeUsageResponse struct {
|
|
RuntimeID string `json:"runtime_id"`
|
|
Date string `json:"date"`
|
|
Provider string `json:"provider"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
}
|
|
|
|
// ReportRuntimeUsage receives usage data from the daemon.
|
|
func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
if runtimeID == "" {
|
|
writeError(w, http.StatusBadRequest, "runtimeId is required")
|
|
return
|
|
}
|
|
|
|
// Verify the caller owns this runtime's workspace.
|
|
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Entries []RuntimeUsageEntry `json:"entries"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
for _, entry := range req.Entries {
|
|
date, err := time.Parse("2006-01-02", entry.Date)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{
|
|
RuntimeID: parseUUID(runtimeID),
|
|
Date: pgtype.Date{Time: date, Valid: true},
|
|
Provider: entry.Provider,
|
|
Model: entry.Model,
|
|
InputTokens: entry.InputTokens,
|
|
OutputTokens: entry.OutputTokens,
|
|
CacheReadTokens: entry.CacheReadTokens,
|
|
CacheWriteTokens: entry.CacheWriteTokens,
|
|
})
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
// GetRuntimeUsage returns usage data for a runtime (protected route).
|
|
func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime not found")
|
|
return
|
|
}
|
|
|
|
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
|
|
return
|
|
}
|
|
|
|
days := 90
|
|
if d := r.URL.Query().Get("days"); d != "" {
|
|
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
|
|
days = parsed
|
|
}
|
|
}
|
|
since := pgtype.Date{Time: time.Now().AddDate(0, 0, -days), Valid: true}
|
|
|
|
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
|
|
RuntimeID: parseUUID(runtimeID),
|
|
Date: since,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list usage")
|
|
return
|
|
}
|
|
|
|
resp := make([]RuntimeUsageResponse, len(rows))
|
|
for i, row := range rows {
|
|
resp[i] = RuntimeUsageResponse{
|
|
RuntimeID: runtimeID,
|
|
Date: row.Date.Time.Format("2006-01-02"),
|
|
Provider: row.Provider,
|
|
Model: row.Model,
|
|
InputTokens: row.InputTokens,
|
|
OutputTokens: row.OutputTokens,
|
|
CacheReadTokens: row.CacheReadTokens,
|
|
CacheWriteTokens: row.CacheWriteTokens,
|
|
}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// GetRuntimeTaskActivity returns hourly task activity distribution for a runtime.
|
|
func (h *Handler) GetRuntimeTaskActivity(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime not found")
|
|
return
|
|
}
|
|
|
|
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
|
|
return
|
|
}
|
|
|
|
rows, err := h.Queries.GetRuntimeTaskHourlyActivity(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to get task activity")
|
|
return
|
|
}
|
|
|
|
type HourlyActivity struct {
|
|
Hour int `json:"hour"`
|
|
Count int `json:"count"`
|
|
}
|
|
|
|
resp := make([]HourlyActivity, len(rows))
|
|
for i, row := range rows {
|
|
resp[i] = HourlyActivity{Hour: int(row.Hour), Count: int(row.Count)}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// GetWorkspaceUsageByDay returns daily token usage aggregated by model for the workspace.
|
|
func (h *Handler) GetWorkspaceUsageByDay(w http.ResponseWriter, r *http.Request) {
|
|
workspaceID := resolveWorkspaceID(r)
|
|
since := parseSinceParam(r, 30)
|
|
|
|
rows, err := h.Queries.GetWorkspaceUsageByDay(r.Context(), db.GetWorkspaceUsageByDayParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
Since: since,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to get usage")
|
|
return
|
|
}
|
|
|
|
type DailyUsageRow struct {
|
|
Date string `json:"date"`
|
|
Model string `json:"model"`
|
|
TotalInputTokens int64 `json:"total_input_tokens"`
|
|
TotalOutputTokens int64 `json:"total_output_tokens"`
|
|
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
|
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
resp := make([]DailyUsageRow, len(rows))
|
|
for i, row := range rows {
|
|
resp[i] = DailyUsageRow{
|
|
Date: row.Date.Time.Format("2006-01-02"),
|
|
Model: row.Model,
|
|
TotalInputTokens: row.TotalInputTokens,
|
|
TotalOutputTokens: row.TotalOutputTokens,
|
|
TotalCacheReadTokens: row.TotalCacheReadTokens,
|
|
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
|
|
TaskCount: row.TaskCount,
|
|
}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// GetWorkspaceUsageSummary returns total token usage aggregated by model for the workspace.
|
|
func (h *Handler) GetWorkspaceUsageSummary(w http.ResponseWriter, r *http.Request) {
|
|
workspaceID := resolveWorkspaceID(r)
|
|
since := parseSinceParam(r, 30)
|
|
|
|
rows, err := h.Queries.GetWorkspaceUsageSummary(r.Context(), db.GetWorkspaceUsageSummaryParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
Since: since,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to get usage summary")
|
|
return
|
|
}
|
|
|
|
type UsageSummaryRow struct {
|
|
Model string `json:"model"`
|
|
TotalInputTokens int64 `json:"total_input_tokens"`
|
|
TotalOutputTokens int64 `json:"total_output_tokens"`
|
|
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
|
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
resp := make([]UsageSummaryRow, len(rows))
|
|
for i, row := range rows {
|
|
resp[i] = UsageSummaryRow{
|
|
Model: row.Model,
|
|
TotalInputTokens: row.TotalInputTokens,
|
|
TotalOutputTokens: row.TotalOutputTokens,
|
|
TotalCacheReadTokens: row.TotalCacheReadTokens,
|
|
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
|
|
TaskCount: row.TaskCount,
|
|
}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// parseSinceParam parses the "days" query parameter and returns a timestamptz.
|
|
func parseSinceParam(r *http.Request, defaultDays int) pgtype.Timestamptz {
|
|
days := defaultDays
|
|
if d := r.URL.Query().Get("days"); d != "" {
|
|
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
|
|
days = parsed
|
|
}
|
|
}
|
|
t := time.Now().AddDate(0, 0, -days)
|
|
return pgtype.Timestamptz{Time: t, Valid: true}
|
|
}
|
|
|
|
func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) {
|
|
workspaceID := resolveWorkspaceID(r)
|
|
|
|
var runtimes []db.AgentRuntime
|
|
var err error
|
|
|
|
if ownerFilter := r.URL.Query().Get("owner"); ownerFilter == "me" {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
runtimes, err = h.Queries.ListAgentRuntimesByOwner(r.Context(), db.ListAgentRuntimesByOwnerParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
OwnerID: parseUUID(userID),
|
|
})
|
|
} else {
|
|
runtimes, err = h.Queries.ListAgentRuntimes(r.Context(), parseUUID(workspaceID))
|
|
}
|
|
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list runtimes")
|
|
return
|
|
}
|
|
|
|
resp := make([]AgentRuntimeResponse, len(runtimes))
|
|
for i, rt := range runtimes {
|
|
resp[i] = runtimeToResponse(rt)
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// DeleteAgentRuntime deletes a runtime after permission and dependency checks.
|
|
func (h *Handler) DeleteAgentRuntime(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime not found")
|
|
return
|
|
}
|
|
|
|
wsID := uuidToString(rt.WorkspaceID)
|
|
member, ok := h.requireWorkspaceMember(w, r, wsID, "runtime not found")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Permission: owner/admin can delete any runtime; members can only delete their own.
|
|
userID := uuidToString(member.UserID)
|
|
isAdmin := roleAllowed(member.Role, "owner", "admin")
|
|
isOwner := rt.OwnerID.Valid && uuidToString(rt.OwnerID) == userID
|
|
if !isAdmin && !isOwner {
|
|
writeError(w, http.StatusForbidden, "you can only delete your own runtimes")
|
|
return
|
|
}
|
|
|
|
// Check if any active (non-archived) agents are bound to this runtime.
|
|
activeCount, err := h.Queries.CountActiveAgentsByRuntime(r.Context(), rt.ID)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to check runtime dependencies")
|
|
return
|
|
}
|
|
if activeCount > 0 {
|
|
writeError(w, http.StatusConflict, "cannot delete runtime: it has active agents bound to it. Archive or reassign the agents first.")
|
|
return
|
|
}
|
|
|
|
// Remove archived agents so the FK constraint (ON DELETE RESTRICT) won't block deletion.
|
|
if err := h.Queries.DeleteArchivedAgentsByRuntime(r.Context(), rt.ID); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to clean up archived agents")
|
|
return
|
|
}
|
|
|
|
if err := h.Queries.DeleteAgentRuntime(r.Context(), rt.ID); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to delete runtime")
|
|
return
|
|
}
|
|
|
|
slog.Info("runtime deleted", "runtime_id", runtimeID, "deleted_by", userID)
|
|
|
|
// Notify frontend to refresh runtime list.
|
|
h.publish(protocol.EventDaemonRegister, wsID, "member", userID, map[string]any{
|
|
"action": "delete",
|
|
})
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|