mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-27 17:47:43 +02:00
Compare commits
1 Commits
codex/agen
...
agent/j/30
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97d292d89b |
@@ -265,12 +265,21 @@ func runDaemonForeground(cmd *cobra.Command) error {
|
||||
serverURL = c.ServerURL
|
||||
}
|
||||
}
|
||||
stateDir := daemonDirForProfile(profile)
|
||||
if stateDir != "" {
|
||||
// Ensure the state directory exists before LoadConfig tries to write
|
||||
// daemon.id into it on first run.
|
||||
if err := os.MkdirAll(stateDir, 0o755); err != nil {
|
||||
return fmt.Errorf("create daemon state dir %s: %w", stateDir, err)
|
||||
}
|
||||
}
|
||||
overrides := daemon.Overrides{
|
||||
ServerURL: serverURL,
|
||||
DaemonID: flagString(cmd, "daemon-id"),
|
||||
DeviceName: flagString(cmd, "device-name"),
|
||||
RuntimeName: flagString(cmd, "runtime-name"),
|
||||
Profile: profile,
|
||||
StateDir: stateDir,
|
||||
HealthPort: healthPortForProfile(profile),
|
||||
}
|
||||
if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 {
|
||||
|
||||
@@ -11,10 +11,13 @@ require (
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/go-chi/cors v1.2.2
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/jackc/pgx/v5 v5.8.0
|
||||
github.com/lmittmann/tint v1.1.3
|
||||
github.com/microcosm-cc/bluemonday v1.0.27
|
||||
github.com/resend/resend-go/v2 v2.28.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/cobra v1.10.2
|
||||
)
|
||||
|
||||
@@ -35,14 +38,11 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/aymerick/douceur v0.2.0 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/gorilla/css v1.0.1 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/microcosm-cc/bluemonday v1.0.27 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/spf13/pflag v1.0.9 // indirect
|
||||
golang.org/x/net v0.26.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
|
||||
@@ -27,7 +27,8 @@ const (
|
||||
// Config holds all daemon configuration.
|
||||
type Config struct {
|
||||
ServerBaseURL string
|
||||
DaemonID string
|
||||
DaemonID string // persistent UUID from <state_dir>/daemon.id
|
||||
LegacyDaemonIDs []string // hostname-derived IDs reported to the server so it can consolidate stale runtime rows
|
||||
DeviceName string
|
||||
RuntimeName string
|
||||
CLIVersion string // multica CLI version (e.g. "0.1.13")
|
||||
@@ -60,6 +61,7 @@ type Overrides struct {
|
||||
DeviceName string
|
||||
RuntimeName string
|
||||
Profile string // profile name (empty = default)
|
||||
StateDir string // directory that holds daemon.id, daemon.pid, daemon.log (resolved by cmd_daemon)
|
||||
HealthPort int // health check port (0 = use default)
|
||||
}
|
||||
|
||||
@@ -187,15 +189,30 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
// Profile
|
||||
profile := overrides.Profile
|
||||
|
||||
// String overrides
|
||||
daemonID := envOrDefault("MULTICA_DAEMON_ID", host)
|
||||
if overrides.DaemonID != "" {
|
||||
daemonID = overrides.DaemonID
|
||||
// daemon_id: explicit override > env var > persistent UUID in state dir.
|
||||
// The UUID is generated once on first run and reread forever after, so
|
||||
// hostname drift (.local suffix, system rename, profile switch) can no
|
||||
// longer produce a second agent_runtime row for the same physical daemon.
|
||||
daemonID := strings.TrimSpace(overrides.DaemonID)
|
||||
if daemonID == "" {
|
||||
daemonID = strings.TrimSpace(os.Getenv("MULTICA_DAEMON_ID"))
|
||||
}
|
||||
legacyDaemonIDs := LegacyDaemonIDCandidates(host, profile)
|
||||
if daemonID == "" {
|
||||
if overrides.StateDir == "" {
|
||||
return Config{}, fmt.Errorf("daemon_id not set and state directory is empty; pass StateDir or set MULTICA_DAEMON_ID")
|
||||
}
|
||||
id, _, err := LoadOrCreateDaemonID(overrides.StateDir)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
daemonID = id
|
||||
} else {
|
||||
// Explicit override path: the caller pins the daemon_id themselves, so
|
||||
// legacy consolidation on the server would incorrectly target rows
|
||||
// they don't own. Skip it.
|
||||
legacyDaemonIDs = nil
|
||||
}
|
||||
// NOTE: daemon_id is intentionally stable (hostname or explicit override).
|
||||
// The unique constraint (workspace_id, daemon_id, provider) already prevents
|
||||
// collisions within the same workspace. Appending the profile name caused
|
||||
// duplicate runtimes when users switched profiles.
|
||||
|
||||
deviceName := envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host)
|
||||
if overrides.DeviceName != "" {
|
||||
@@ -258,6 +275,7 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
return Config{
|
||||
ServerBaseURL: serverBaseURL,
|
||||
DaemonID: daemonID,
|
||||
LegacyDaemonIDs: legacyDaemonIDs,
|
||||
DeviceName: deviceName,
|
||||
RuntimeName: runtimeName,
|
||||
Profile: profile,
|
||||
|
||||
@@ -211,6 +211,9 @@ func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID s
|
||||
"launched_by": d.cfg.LaunchedBy,
|
||||
"runtimes": runtimes,
|
||||
}
|
||||
if len(d.cfg.LegacyDaemonIDs) > 0 {
|
||||
req["legacy_daemon_ids"] = d.cfg.LegacyDaemonIDs
|
||||
}
|
||||
|
||||
resp, err := d.client.Register(ctx, req)
|
||||
if err != nil {
|
||||
|
||||
115
server/internal/daemon/identity.go
Normal file
115
server/internal/daemon/identity.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// DaemonIDFileName is the per-profile file that stores the persistent daemon UUID.
|
||||
const DaemonIDFileName = "daemon.id"
|
||||
|
||||
// LoadOrCreateDaemonID returns the persistent daemon identifier stored under
|
||||
// stateDir. On first run the file does not exist; a fresh v4 UUID is generated,
|
||||
// written atomically, and returned. On subsequent runs the existing value is
|
||||
// read verbatim — so hostname drift (.local suffix, system rename, profile
|
||||
// switch) can no longer produce a second agent_runtime row.
|
||||
//
|
||||
// stateDir is the profile-specific directory (e.g. ~/.multica or
|
||||
// ~/.multica/profiles/<name>). The caller is expected to create it before
|
||||
// invoking this function; an MkdirAll fallback is kept for defensive callers
|
||||
// that forgot.
|
||||
func LoadOrCreateDaemonID(stateDir string) (string, bool, error) {
|
||||
if strings.TrimSpace(stateDir) == "" {
|
||||
return "", false, errors.New("daemon identity: stateDir is empty")
|
||||
}
|
||||
|
||||
path := filepath.Join(stateDir, DaemonIDFileName)
|
||||
data, err := os.ReadFile(path)
|
||||
if err == nil {
|
||||
id := strings.TrimSpace(string(data))
|
||||
if _, parseErr := uuid.Parse(id); parseErr != nil {
|
||||
return "", false, fmt.Errorf("daemon identity: %s contains invalid UUID %q: %w", path, id, parseErr)
|
||||
}
|
||||
return id, false, nil
|
||||
}
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
return "", false, fmt.Errorf("daemon identity: read %s: %w", path, err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(stateDir, 0o755); err != nil {
|
||||
return "", false, fmt.Errorf("daemon identity: create %s: %w", stateDir, err)
|
||||
}
|
||||
|
||||
newID := uuid.NewString()
|
||||
|
||||
tmp, err := os.CreateTemp(stateDir, ".daemon-id-*.tmp")
|
||||
if err != nil {
|
||||
return "", false, fmt.Errorf("daemon identity: create temp file: %w", err)
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
if _, err := tmp.WriteString(newID + "\n"); err != nil {
|
||||
tmp.Close()
|
||||
os.Remove(tmpPath)
|
||||
return "", false, fmt.Errorf("daemon identity: write temp file: %w", err)
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return "", false, fmt.Errorf("daemon identity: close temp file: %w", err)
|
||||
}
|
||||
if err := os.Chmod(tmpPath, 0o600); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return "", false, fmt.Errorf("daemon identity: chmod temp file: %w", err)
|
||||
}
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return "", false, fmt.Errorf("daemon identity: rename temp file: %w", err)
|
||||
}
|
||||
return newID, true, nil
|
||||
}
|
||||
|
||||
// LegacyDaemonIDCandidates returns the set of daemon_id values this machine
|
||||
// may have produced before UUID persistence landed. The server uses them to
|
||||
// locate and merge stale agent_runtime rows so existing agents keep working
|
||||
// without manual migration.
|
||||
//
|
||||
// The historical formats covered:
|
||||
// - <hostname> (current, post-#1070)
|
||||
// - <hostname>.local (pre-#1070 macOS bonjour suffix)
|
||||
// - <hostname>-<profile> (pre-#906, profile-suffixed)
|
||||
// - <hostname>.local-<profile> (the .local + profile suffix combo)
|
||||
//
|
||||
// Duplicates and empty strings are removed. Order is preserved so the caller
|
||||
// sees the most-likely-current form first.
|
||||
func LegacyDaemonIDCandidates(hostname, profile string) []string {
|
||||
hostname = strings.TrimSpace(hostname)
|
||||
if hostname == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
stripped := strings.TrimSuffix(hostname, ".local")
|
||||
|
||||
raw := []string{hostname, stripped}
|
||||
if profile != "" {
|
||||
raw = append(raw, hostname+"-"+profile, stripped+"-"+profile)
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{}, len(raw))
|
||||
out := make([]string, 0, len(raw))
|
||||
for _, v := range raw {
|
||||
v = strings.TrimSpace(v)
|
||||
if v == "" {
|
||||
continue
|
||||
}
|
||||
if _, dup := seen[v]; dup {
|
||||
continue
|
||||
}
|
||||
seen[v] = struct{}{}
|
||||
out = append(out, v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
143
server/internal/daemon/identity_test.go
Normal file
143
server/internal/daemon/identity_test.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func TestLoadOrCreateDaemonID_CreatesAndReads(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
id, created, err := LoadOrCreateDaemonID(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("first call: %v", err)
|
||||
}
|
||||
if !created {
|
||||
t.Fatalf("expected created=true on fresh directory")
|
||||
}
|
||||
if _, err := uuid.Parse(id); err != nil {
|
||||
t.Fatalf("first call returned invalid UUID %q: %v", id, err)
|
||||
}
|
||||
|
||||
id2, created, err := LoadOrCreateDaemonID(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("second call: %v", err)
|
||||
}
|
||||
if created {
|
||||
t.Fatalf("expected created=false when daemon.id exists")
|
||||
}
|
||||
if id != id2 {
|
||||
t.Fatalf("expected stable id across calls: got %q then %q", id, id2)
|
||||
}
|
||||
|
||||
path := filepath.Join(dir, DaemonIDFileName)
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("read file: %v", err)
|
||||
}
|
||||
if string(data) == "" {
|
||||
t.Fatalf("daemon.id is empty")
|
||||
}
|
||||
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
t.Fatalf("stat: %v", err)
|
||||
}
|
||||
if info.Mode().Perm() != 0o600 {
|
||||
t.Fatalf("expected 0600 perms, got %v", info.Mode().Perm())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadOrCreateDaemonID_RejectsCorruptFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, DaemonIDFileName)
|
||||
if err := os.WriteFile(path, []byte("not-a-uuid"), 0o600); err != nil {
|
||||
t.Fatalf("setup: %v", err)
|
||||
}
|
||||
|
||||
_, _, err := LoadOrCreateDaemonID(dir)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error on non-UUID contents, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadOrCreateDaemonID_EmptyDir(t *testing.T) {
|
||||
if _, _, err := LoadOrCreateDaemonID(""); err == nil {
|
||||
t.Fatalf("expected error for empty stateDir")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLegacyDaemonIDCandidates(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
hostname string
|
||||
profile string
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "plain hostname, default profile",
|
||||
hostname: "MacBook-Pro",
|
||||
profile: "",
|
||||
want: []string{"MacBook-Pro"},
|
||||
},
|
||||
{
|
||||
name: ".local hostname, default profile",
|
||||
hostname: "Jiayuans-MacBook-Pro.local",
|
||||
profile: "",
|
||||
want: []string{"Jiayuans-MacBook-Pro.local", "Jiayuans-MacBook-Pro"},
|
||||
},
|
||||
{
|
||||
name: "plain hostname, named profile",
|
||||
hostname: "MacBook-Air",
|
||||
profile: "staging",
|
||||
want: []string{"MacBook-Air", "MacBook-Air-staging"},
|
||||
},
|
||||
{
|
||||
name: ".local hostname, named profile",
|
||||
hostname: "Jiayuans-MacBook-Pro.local",
|
||||
profile: "staging",
|
||||
want: []string{
|
||||
"Jiayuans-MacBook-Pro.local",
|
||||
"Jiayuans-MacBook-Pro",
|
||||
"Jiayuans-MacBook-Pro.local-staging",
|
||||
"Jiayuans-MacBook-Pro-staging",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty hostname returns nil",
|
||||
hostname: "",
|
||||
profile: "staging",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "whitespace-only hostname",
|
||||
hostname: " ",
|
||||
profile: "",
|
||||
want: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := LegacyDaemonIDCandidates(tc.hostname, tc.profile)
|
||||
if !equalStringSlice(got, tc.want) {
|
||||
t.Fatalf("LegacyDaemonIDCandidates(%q, %q) = %v, want %v", tc.hostname, tc.profile, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func equalStringSlice(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -118,10 +118,16 @@ func (h *Handler) resolveTaskWorkspaceID(r *http.Request, task db.AgentTaskQueue
|
||||
type DaemonRegisterRequest struct {
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
DaemonID string `json:"daemon_id"`
|
||||
DeviceName string `json:"device_name"`
|
||||
CLIVersion string `json:"cli_version"` // multica CLI version
|
||||
LaunchedBy string `json:"launched_by"` // "desktop" when spawned by the Electron app
|
||||
Runtimes []struct {
|
||||
// LegacyDaemonIDs carries any hostname-derived identifiers the daemon
|
||||
// may have reported in earlier versions (before daemon_id became a
|
||||
// persistent UUID). The server consolidates any matching stale rows
|
||||
// into the current registration so agent FKs stay valid across the
|
||||
// upgrade. Optional.
|
||||
LegacyDaemonIDs []string `json:"legacy_daemon_ids"`
|
||||
DeviceName string `json:"device_name"`
|
||||
CLIVersion string `json:"cli_version"` // multica CLI version
|
||||
LaunchedBy string `json:"launched_by"` // "desktop" when spawned by the Electron app
|
||||
Runtimes []struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"` // agent CLI version (claude/codex)
|
||||
@@ -287,26 +293,7 @@ func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Migrate agents from old offline runtimes on the same machine to the
|
||||
// newly registered runtime. Uses the runtime's owner_id (preserved via
|
||||
// COALESCE on upsert) so migration works with both PAT and daemon tokens.
|
||||
// Scoped by daemon_id prefix so that only old profile-suffixed runtimes
|
||||
// (e.g. "hostname-staging") from this machine are affected.
|
||||
effectiveOwnerID := registered.OwnerID
|
||||
if effectiveOwnerID.Valid {
|
||||
migrated, err := h.Queries.MigrateAgentsToRuntime(r.Context(), db.MigrateAgentsToRuntimeParams{
|
||||
NewRuntimeID: registered.ID,
|
||||
WorkspaceID: parseUUID(req.WorkspaceID),
|
||||
Provider: provider,
|
||||
OwnerID: effectiveOwnerID,
|
||||
DaemonIDPrefix: strToText(req.DaemonID),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("failed to migrate agents to new runtime", "runtime_id", uuidToString(registered.ID), "error", err)
|
||||
} else if migrated > 0 {
|
||||
slog.Info("migrated agents to new runtime", "runtime_id", uuidToString(registered.ID), "provider", provider, "migrated_count", migrated)
|
||||
}
|
||||
}
|
||||
h.consolidateLegacyRuntimes(r, registered, req.WorkspaceID, provider, req.LegacyDaemonIDs)
|
||||
|
||||
resp = append(resp, runtimeToResponse(registered))
|
||||
}
|
||||
@@ -340,6 +327,103 @@ func (h *Handler) GetDaemonWorkspaceRepos(w http.ResponseWriter, r *http.Request
|
||||
writeJSON(w, http.StatusOK, workspaceReposResponse(workspaceID, ws.Repos))
|
||||
}
|
||||
|
||||
// consolidateLegacyRuntimes merges any hostname-derived agent_runtime rows
|
||||
// this machine previously produced into the just-upserted UUID row. For each
|
||||
// legacy candidate (hostname, hostname.local, hostname-<profile>, etc.) we
|
||||
// reparent agents and tasks, delete the stale row, and record the last value
|
||||
// as legacy_daemon_id for audit. Scoped by (workspace, provider, owner) so a
|
||||
// different user's rows on a machine sharing the same hostname are untouched.
|
||||
//
|
||||
// Errors are logged but never fail registration: consolidation is a best-effort
|
||||
// cleanup and should not block a healthy daemon from coming online.
|
||||
func (h *Handler) consolidateLegacyRuntimes(r *http.Request, registered db.AgentRuntime, workspaceID, provider string, legacyDaemonIDs []string) {
|
||||
if len(legacyDaemonIDs) == 0 {
|
||||
return
|
||||
}
|
||||
// Owner is required to scope the migration; without it we'd risk touching
|
||||
// rows belonging to other users. Daemon-token flows rely on COALESCE in
|
||||
// UpsertAgentRuntime to preserve an existing owner on re-register.
|
||||
if !registered.OwnerID.Valid {
|
||||
return
|
||||
}
|
||||
|
||||
workspaceUUID := parseUUID(workspaceID)
|
||||
var lastMatched string
|
||||
var totalAgents, totalTasks, totalDeleted int64
|
||||
|
||||
for _, legacyID := range legacyDaemonIDs {
|
||||
legacyID = strings.TrimSpace(legacyID)
|
||||
if legacyID == "" {
|
||||
continue
|
||||
}
|
||||
// Skip self-match: if the daemon reported its own new daemon_id as a
|
||||
// legacy candidate we'd wipe the row we just created.
|
||||
if legacyID == strings.TrimSpace(registered.DaemonID.String) {
|
||||
continue
|
||||
}
|
||||
|
||||
agents, err := h.Queries.MigrateAgentsFromLegacyDaemon(r.Context(), db.MigrateAgentsFromLegacyDaemonParams{
|
||||
NewRuntimeID: registered.ID,
|
||||
WorkspaceID: workspaceUUID,
|
||||
Provider: provider,
|
||||
OwnerID: registered.OwnerID,
|
||||
LegacyDaemonID: strToText(legacyID),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("legacy runtime consolidation: migrate agents failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
tasks, err := h.Queries.MigrateTasksFromLegacyDaemon(r.Context(), db.MigrateTasksFromLegacyDaemonParams{
|
||||
NewRuntimeID: registered.ID,
|
||||
WorkspaceID: workspaceUUID,
|
||||
Provider: provider,
|
||||
OwnerID: registered.OwnerID,
|
||||
LegacyDaemonID: strToText(legacyID),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("legacy runtime consolidation: migrate tasks failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
deleted, err := h.Queries.DeleteLegacyRuntime(r.Context(), db.DeleteLegacyRuntimeParams{
|
||||
WorkspaceID: workspaceUUID,
|
||||
Provider: provider,
|
||||
OwnerID: registered.OwnerID,
|
||||
NewRuntimeID: registered.ID,
|
||||
LegacyDaemonID: strToText(legacyID),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("legacy runtime consolidation: delete legacy row failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if deleted > 0 || agents > 0 || tasks > 0 {
|
||||
lastMatched = legacyID
|
||||
totalAgents += agents
|
||||
totalTasks += tasks
|
||||
totalDeleted += deleted
|
||||
}
|
||||
}
|
||||
|
||||
if lastMatched != "" {
|
||||
if err := h.Queries.SetRuntimeLegacyDaemonID(r.Context(), db.SetRuntimeLegacyDaemonIDParams{
|
||||
ID: registered.ID,
|
||||
LegacyDaemonID: strToText(lastMatched),
|
||||
}); err != nil {
|
||||
slog.Warn("legacy runtime consolidation: record legacy_daemon_id failed", "runtime_id", uuidToString(registered.ID), "error", err)
|
||||
}
|
||||
slog.Info("legacy runtime consolidated",
|
||||
"runtime_id", uuidToString(registered.ID),
|
||||
"provider", provider,
|
||||
"legacy_daemon_id", lastMatched,
|
||||
"agents_reparented", totalAgents,
|
||||
"tasks_reparented", totalTasks,
|
||||
"rows_deleted", totalDeleted,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
|
||||
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
|
||||
var req struct {
|
||||
|
||||
@@ -79,6 +79,153 @@ func TestDaemonRegister_WithDaemonToken(t *testing.T) {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, runtimeID)
|
||||
}
|
||||
|
||||
// TestDaemonRegister_ConsolidatesLegacyRuntime reproduces the duplicate-
|
||||
// runtime scenario from MUL-975: an agent_runtime row already exists for a
|
||||
// hostname-derived daemon_id (e.g. "host.local"), with an agent and task
|
||||
// pointing at it. When the upgraded daemon registers with a persistent UUID
|
||||
// daemon_id and reports the old hostname as a legacy candidate, the stale row
|
||||
// must be deleted and its agents + tasks reparented to the new UUID row.
|
||||
func TestDaemonRegister_ConsolidatesLegacyRuntime(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
legacyDaemonID := "Jiayuans-MacBook-Pro.local"
|
||||
provider := "claude"
|
||||
|
||||
// Seed a legacy runtime row owned by the test user.
|
||||
var legacyRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
||||
device_info, metadata, owner_id, last_seen_at
|
||||
)
|
||||
VALUES ($1, $2, 'Legacy Claude Runtime', 'local', $3, 'offline',
|
||||
'', '{}'::jsonb, $4, now())
|
||||
RETURNING id
|
||||
`, testWorkspaceID, legacyDaemonID, provider, testUserID).Scan(&legacyRuntimeID); err != nil {
|
||||
t.Fatalf("seed legacy runtime: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, legacyRuntimeID)
|
||||
})
|
||||
|
||||
var legacyAgentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id
|
||||
)
|
||||
VALUES ($1, 'Legacy Agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3)
|
||||
RETURNING id
|
||||
`, testWorkspaceID, legacyRuntimeID, testUserID).Scan(&legacyAgentID); err != nil {
|
||||
t.Fatalf("seed legacy agent: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent WHERE id = $1`, legacyAgentID)
|
||||
})
|
||||
|
||||
var legacyIssueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_id, creator_type)
|
||||
VALUES ($1, 'consolidation-test', 'todo', 'medium', $2, 'member')
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&legacyIssueID); err != nil {
|
||||
t.Fatalf("seed issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM issue WHERE id = $1`, legacyIssueID)
|
||||
})
|
||||
|
||||
var legacyTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
|
||||
VALUES ($1, $2, $3, 'queued', 0)
|
||||
RETURNING id
|
||||
`, legacyAgentID, legacyRuntimeID, legacyIssueID).Scan(&legacyTaskID); err != nil {
|
||||
t.Fatalf("seed task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID)
|
||||
})
|
||||
|
||||
// Register with a new UUID daemon_id + the legacy hostname reported as a
|
||||
// legacy candidate. PAT auth (newRequest) so the upsert picks up the user
|
||||
// as owner_id.
|
||||
newDaemonID := "4d1b2b26-7f9b-4f50-9ea8-8e1f1ca88888"
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("POST", "/api/daemon/register", map[string]any{
|
||||
"workspace_id": testWorkspaceID,
|
||||
"daemon_id": newDaemonID,
|
||||
"device_name": "Jiayuan-MacBook-Pro",
|
||||
"legacy_daemon_ids": []string{legacyDaemonID, "Jiayuans-MacBook-Pro"},
|
||||
"runtimes": []map[string]any{
|
||||
{"name": "Claude", "type": provider, "version": "1.0.0", "status": "online"},
|
||||
},
|
||||
})
|
||||
|
||||
testHandler.DaemonRegister(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("DaemonRegister: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Runtimes []map[string]any `json:"runtimes"`
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if len(resp.Runtimes) != 1 {
|
||||
t.Fatalf("expected 1 runtime in response, got %d", len(resp.Runtimes))
|
||||
}
|
||||
newRuntimeID := resp.Runtimes[0]["id"].(string)
|
||||
if newRuntimeID == legacyRuntimeID {
|
||||
t.Fatalf("expected new runtime id to differ from legacy id %s", legacyRuntimeID)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID)
|
||||
})
|
||||
|
||||
// Legacy row must be gone.
|
||||
var legacyCount int
|
||||
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM agent_runtime WHERE id = $1`, legacyRuntimeID).Scan(&legacyCount); err != nil {
|
||||
t.Fatalf("count legacy runtime: %v", err)
|
||||
}
|
||||
if legacyCount != 0 {
|
||||
t.Fatalf("expected legacy runtime row to be deleted, still present")
|
||||
}
|
||||
|
||||
// Agent must now point at the new runtime row.
|
||||
var agentRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT runtime_id FROM agent WHERE id = $1`, legacyAgentID).Scan(&agentRuntimeID); err != nil {
|
||||
t.Fatalf("read agent.runtime_id: %v", err)
|
||||
}
|
||||
if agentRuntimeID != newRuntimeID {
|
||||
t.Fatalf("expected agent.runtime_id=%s after consolidation, got %s", newRuntimeID, agentRuntimeID)
|
||||
}
|
||||
|
||||
// Queued task must also have been reparented (not cascade-deleted).
|
||||
var taskRuntimeID, taskStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT runtime_id, status FROM agent_task_queue WHERE id = $1`, legacyTaskID).Scan(&taskRuntimeID, &taskStatus); err != nil {
|
||||
t.Fatalf("read task: %v", err)
|
||||
}
|
||||
if taskRuntimeID != newRuntimeID {
|
||||
t.Fatalf("expected task.runtime_id=%s after consolidation, got %s", newRuntimeID, taskRuntimeID)
|
||||
}
|
||||
if taskStatus != "queued" {
|
||||
t.Fatalf("expected task status unchanged (queued), got %q", taskStatus)
|
||||
}
|
||||
|
||||
// legacy_daemon_id breadcrumb should be set on the new row.
|
||||
var breadcrumb string
|
||||
if err := testPool.QueryRow(ctx, `SELECT COALESCE(legacy_daemon_id, '') FROM agent_runtime WHERE id = $1`, newRuntimeID).Scan(&breadcrumb); err != nil {
|
||||
t.Fatalf("read legacy_daemon_id: %v", err)
|
||||
}
|
||||
if breadcrumb == "" {
|
||||
t.Fatalf("expected legacy_daemon_id breadcrumb on new row, got empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemonRegister_WithDaemonToken_WorkspaceMismatch(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
|
||||
2
server/migrations/048_runtime_daemon_uuid.down.sql
Normal file
2
server/migrations/048_runtime_daemon_uuid.down.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
DROP INDEX IF EXISTS idx_agent_runtime_legacy_daemon_id;
|
||||
ALTER TABLE agent_runtime DROP COLUMN IF EXISTS legacy_daemon_id;
|
||||
24
server/migrations/048_runtime_daemon_uuid.up.sql
Normal file
24
server/migrations/048_runtime_daemon_uuid.up.sql
Normal file
@@ -0,0 +1,24 @@
|
||||
-- Stabilize daemon_id as a persistent UUID.
|
||||
--
|
||||
-- Before this change daemon_id was derived from os.Hostname(), so changes
|
||||
-- like the macOS `.local` suffix appearing/disappearing, the user renaming
|
||||
-- their machine, or switching profiles produced a fresh agent_runtime row
|
||||
-- every time, stranding agents on the stale one.
|
||||
--
|
||||
-- From this migration forward the daemon generates a UUID on first start,
|
||||
-- writes it to ~/.multica/<profile>/daemon.id, and re-uses it forever. The
|
||||
-- server-side DaemonRegister flow consolidates any pre-existing rows that
|
||||
-- match the historic hostname-based daemon_id candidates (hostname,
|
||||
-- hostname.local, hostname-<profile>, hostname.local-<profile>) into the
|
||||
-- new UUID row, so agents keep pointing to the same runtime id across the
|
||||
-- upgrade with no manual intervention.
|
||||
--
|
||||
-- legacy_daemon_id is a best-effort breadcrumb — we record whatever old
|
||||
-- daemon_id value we rewrote last, which makes the consolidation auditable.
|
||||
-- It also gives us a cheap index for locating rows that were migrated.
|
||||
ALTER TABLE agent_runtime
|
||||
ADD COLUMN legacy_daemon_id TEXT;
|
||||
|
||||
CREATE INDEX idx_agent_runtime_legacy_daemon_id
|
||||
ON agent_runtime(workspace_id, provider, legacy_daemon_id)
|
||||
WHERE legacy_daemon_id IS NOT NULL;
|
||||
@@ -42,19 +42,20 @@ type Agent struct {
|
||||
}
|
||||
|
||||
type AgentRuntime struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
DaemonID pgtype.Text `json:"daemon_id"`
|
||||
Name string `json:"name"`
|
||||
RuntimeMode string `json:"runtime_mode"`
|
||||
Provider string `json:"provider"`
|
||||
Status string `json:"status"`
|
||||
DeviceInfo string `json:"device_info"`
|
||||
Metadata []byte `json:"metadata"`
|
||||
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
DaemonID pgtype.Text `json:"daemon_id"`
|
||||
Name string `json:"name"`
|
||||
RuntimeMode string `json:"runtime_mode"`
|
||||
Provider string `json:"provider"`
|
||||
Status string `json:"status"`
|
||||
DeviceInfo string `json:"device_info"`
|
||||
Metadata []byte `json:"metadata"`
|
||||
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
||||
}
|
||||
|
||||
type AgentSkill struct {
|
||||
|
||||
@@ -40,6 +40,40 @@ func (q *Queries) DeleteArchivedAgentsByRuntime(ctx context.Context, runtimeID p
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteLegacyRuntime = `-- name: DeleteLegacyRuntime :execrows
|
||||
DELETE FROM agent_runtime
|
||||
WHERE workspace_id = $1
|
||||
AND provider = $2
|
||||
AND owner_id = $3
|
||||
AND id != $4
|
||||
AND daemon_id = $5
|
||||
`
|
||||
|
||||
type DeleteLegacyRuntimeParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Provider string `json:"provider"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
|
||||
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
||||
}
|
||||
|
||||
// Removes the stale hostname-derived runtime row once its agents and tasks
|
||||
// have been reparented. legacy_daemon_id on the new row captures the last
|
||||
// removed value as a breadcrumb.
|
||||
func (q *Queries) DeleteLegacyRuntime(ctx context.Context, arg DeleteLegacyRuntimeParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, deleteLegacyRuntime,
|
||||
arg.WorkspaceID,
|
||||
arg.Provider,
|
||||
arg.OwnerID,
|
||||
arg.NewRuntimeID,
|
||||
arg.LegacyDaemonID,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const deleteStaleOfflineRuntimes = `-- name: DeleteStaleOfflineRuntimes :many
|
||||
DELETE FROM agent_runtime
|
||||
WHERE status = 'offline'
|
||||
@@ -115,7 +149,7 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]FailTasksF
|
||||
}
|
||||
|
||||
const getAgentRuntime = `-- name: GetAgentRuntime :one
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
@@ -136,12 +170,13 @@ func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRun
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
||||
WHERE id = $1 AND workspace_id = $2
|
||||
`
|
||||
|
||||
@@ -167,12 +202,13 @@ func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentR
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
@@ -200,6 +236,7 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -212,7 +249,7 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID
|
||||
}
|
||||
|
||||
const listAgentRuntimesByOwner = `-- name: ListAgentRuntimesByOwner :many
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
||||
WHERE workspace_id = $1 AND owner_id = $2
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
@@ -245,6 +282,7 @@ func (q *Queries) ListAgentRuntimesByOwner(ctx context.Context, arg ListAgentRun
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -289,7 +327,7 @@ func (q *Queries) MarkStaleRuntimesOffline(ctx context.Context, staleSeconds flo
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const migrateAgentsToRuntime = `-- name: MigrateAgentsToRuntime :execrows
|
||||
const migrateAgentsFromLegacyDaemon = `-- name: MigrateAgentsFromLegacyDaemon :execrows
|
||||
UPDATE agent
|
||||
SET runtime_id = $1
|
||||
WHERE runtime_id IN (
|
||||
@@ -298,32 +336,68 @@ WHERE runtime_id IN (
|
||||
AND ar.provider = $3
|
||||
AND ar.owner_id = $4
|
||||
AND ar.id != $1
|
||||
AND ar.status = 'offline'
|
||||
AND ar.daemon_id LIKE $5 || '-%'
|
||||
AND ar.daemon_id = $5
|
||||
)
|
||||
`
|
||||
|
||||
type MigrateAgentsToRuntimeParams struct {
|
||||
type MigrateAgentsFromLegacyDaemonParams struct {
|
||||
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Provider string `json:"provider"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
DaemonIDPrefix pgtype.Text `json:"daemon_id_prefix"`
|
||||
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
||||
}
|
||||
|
||||
// Migrates agents from stale offline runtimes to the newly registered runtime.
|
||||
// Only migrates from runtimes that match the same workspace, provider, owner,
|
||||
// AND whose daemon_id starts with the current daemon_id followed by '-'.
|
||||
// This scopes migration to old profile-suffixed runtimes from the same machine
|
||||
// (e.g. "MacBook-staging" matches daemon_id_prefix "MacBook") without touching
|
||||
// runtimes from other machines belonging to the same user.
|
||||
func (q *Queries) MigrateAgentsToRuntime(ctx context.Context, arg MigrateAgentsToRuntimeParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, migrateAgentsToRuntime,
|
||||
// Reparents agents from the legacy (hostname-derived) runtime row to the
|
||||
// newly registered UUID row. Scoped to a single (workspace, provider,
|
||||
// owner) triple so we never touch another user's runtimes even if they
|
||||
// share a hostname on the same machine. Called once per legacy daemon_id
|
||||
// candidate reported by the daemon at registration time.
|
||||
func (q *Queries) MigrateAgentsFromLegacyDaemon(ctx context.Context, arg MigrateAgentsFromLegacyDaemonParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, migrateAgentsFromLegacyDaemon,
|
||||
arg.NewRuntimeID,
|
||||
arg.WorkspaceID,
|
||||
arg.Provider,
|
||||
arg.OwnerID,
|
||||
arg.DaemonIDPrefix,
|
||||
arg.LegacyDaemonID,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const migrateTasksFromLegacyDaemon = `-- name: MigrateTasksFromLegacyDaemon :execrows
|
||||
UPDATE agent_task_queue
|
||||
SET runtime_id = $1
|
||||
WHERE runtime_id IN (
|
||||
SELECT ar.id FROM agent_runtime ar
|
||||
WHERE ar.workspace_id = $2
|
||||
AND ar.provider = $3
|
||||
AND ar.owner_id = $4
|
||||
AND ar.id != $1
|
||||
AND ar.daemon_id = $5
|
||||
)
|
||||
`
|
||||
|
||||
type MigrateTasksFromLegacyDaemonParams struct {
|
||||
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Provider string `json:"provider"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
||||
}
|
||||
|
||||
// Same scoping as MigrateAgentsFromLegacyDaemon. Must run before the DELETE
|
||||
// below because agent_task_queue.runtime_id is ON DELETE CASCADE; deleting
|
||||
// the legacy row first would silently drop in-flight tasks.
|
||||
func (q *Queries) MigrateTasksFromLegacyDaemon(ctx context.Context, arg MigrateTasksFromLegacyDaemonParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, migrateTasksFromLegacyDaemon,
|
||||
arg.NewRuntimeID,
|
||||
arg.WorkspaceID,
|
||||
arg.Provider,
|
||||
arg.OwnerID,
|
||||
arg.LegacyDaemonID,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -342,11 +416,27 @@ func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) er
|
||||
return err
|
||||
}
|
||||
|
||||
const setRuntimeLegacyDaemonID = `-- name: SetRuntimeLegacyDaemonID :exec
|
||||
UPDATE agent_runtime
|
||||
SET legacy_daemon_id = $1
|
||||
WHERE id = $2
|
||||
`
|
||||
|
||||
type SetRuntimeLegacyDaemonIDParams struct {
|
||||
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
||||
ID pgtype.UUID `json:"id"`
|
||||
}
|
||||
|
||||
func (q *Queries) SetRuntimeLegacyDaemonID(ctx context.Context, arg SetRuntimeLegacyDaemonIDParams) error {
|
||||
_, err := q.db.Exec(ctx, setRuntimeLegacyDaemonID, arg.LegacyDaemonID, arg.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateAgentRuntimeHeartbeat = `-- name: UpdateAgentRuntimeHeartbeat :one
|
||||
UPDATE agent_runtime
|
||||
SET status = 'online', last_seen_at = now(), updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id
|
||||
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id
|
||||
`
|
||||
|
||||
func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
|
||||
@@ -366,6 +456,7 @@ func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUI
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -393,7 +484,7 @@ DO UPDATE SET
|
||||
owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id),
|
||||
last_seen_at = now(),
|
||||
updated_at = now()
|
||||
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id
|
||||
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id
|
||||
`
|
||||
|
||||
type UpsertAgentRuntimeParams struct {
|
||||
@@ -435,6 +526,7 @@ func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntime
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.OwnerID,
|
||||
&i.LegacyDaemonID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -79,13 +79,12 @@ SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL;
|
||||
-- name: DeleteArchivedAgentsByRuntime :exec
|
||||
DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL;
|
||||
|
||||
-- name: MigrateAgentsToRuntime :execrows
|
||||
-- Migrates agents from stale offline runtimes to the newly registered runtime.
|
||||
-- Only migrates from runtimes that match the same workspace, provider, owner,
|
||||
-- AND whose daemon_id starts with the current daemon_id followed by '-'.
|
||||
-- This scopes migration to old profile-suffixed runtimes from the same machine
|
||||
-- (e.g. "MacBook-staging" matches daemon_id_prefix "MacBook") without touching
|
||||
-- runtimes from other machines belonging to the same user.
|
||||
-- name: MigrateAgentsFromLegacyDaemon :execrows
|
||||
-- Reparents agents from the legacy (hostname-derived) runtime row to the
|
||||
-- newly registered UUID row. Scoped to a single (workspace, provider,
|
||||
-- owner) triple so we never touch another user's runtimes even if they
|
||||
-- share a hostname on the same machine. Called once per legacy daemon_id
|
||||
-- candidate reported by the daemon at registration time.
|
||||
UPDATE agent
|
||||
SET runtime_id = @new_runtime_id
|
||||
WHERE runtime_id IN (
|
||||
@@ -94,10 +93,40 @@ WHERE runtime_id IN (
|
||||
AND ar.provider = @provider
|
||||
AND ar.owner_id = @owner_id
|
||||
AND ar.id != @new_runtime_id
|
||||
AND ar.status = 'offline'
|
||||
AND ar.daemon_id LIKE @daemon_id_prefix || '-%'
|
||||
AND ar.daemon_id = @legacy_daemon_id
|
||||
);
|
||||
|
||||
-- name: MigrateTasksFromLegacyDaemon :execrows
|
||||
-- Same scoping as MigrateAgentsFromLegacyDaemon. Must run before the DELETE
|
||||
-- below because agent_task_queue.runtime_id is ON DELETE CASCADE; deleting
|
||||
-- the legacy row first would silently drop in-flight tasks.
|
||||
UPDATE agent_task_queue
|
||||
SET runtime_id = @new_runtime_id
|
||||
WHERE runtime_id IN (
|
||||
SELECT ar.id FROM agent_runtime ar
|
||||
WHERE ar.workspace_id = @workspace_id
|
||||
AND ar.provider = @provider
|
||||
AND ar.owner_id = @owner_id
|
||||
AND ar.id != @new_runtime_id
|
||||
AND ar.daemon_id = @legacy_daemon_id
|
||||
);
|
||||
|
||||
-- name: DeleteLegacyRuntime :execrows
|
||||
-- Removes the stale hostname-derived runtime row once its agents and tasks
|
||||
-- have been reparented. legacy_daemon_id on the new row captures the last
|
||||
-- removed value as a breadcrumb.
|
||||
DELETE FROM agent_runtime
|
||||
WHERE workspace_id = @workspace_id
|
||||
AND provider = @provider
|
||||
AND owner_id = @owner_id
|
||||
AND id != @new_runtime_id
|
||||
AND daemon_id = @legacy_daemon_id;
|
||||
|
||||
-- name: SetRuntimeLegacyDaemonID :exec
|
||||
UPDATE agent_runtime
|
||||
SET legacy_daemon_id = @legacy_daemon_id
|
||||
WHERE id = @id;
|
||||
|
||||
-- name: DeleteStaleOfflineRuntimes :many
|
||||
-- Deletes runtimes that have been offline for longer than the TTL and have
|
||||
-- no agents bound (active or archived). The FK constraint on agent.runtime_id
|
||||
|
||||
Reference in New Issue
Block a user