Compare commits

...

1 Commits

Author SHA1 Message Date
Jiang Bohan
97d292d89b feat(daemon): stabilize daemon_id with persistent UUID + legacy consolidation
Replaces the hostname-derived daemon_id with a UUID written to
~/.multica/<profile>/daemon.id on first start and reread forever after,
so macOS .local suffix drift, system rename, or profile switch can no
longer produce a second agent_runtime row for the same physical daemon.

At registration the daemon reports any hostname-based identifiers it may
have used historically (hostname, hostname.local, hostname-<profile>,
hostname.local-<profile>) as legacy_daemon_ids. The server reparents
agents and tasks from matching stale rows onto the new UUID row, deletes
them, and records the last value in a new legacy_daemon_id column for
audit. Scoped by (workspace, provider, owner) so another user's
runtimes on a shared hostname are never touched.

Removes MigrateAgentsToRuntime and its LIKE hostname-% logic; the UUID
makes runtime rebuild impossible, so the brittle profile-prefix heuristic
is no longer needed.

Refs MUL-975.
2026-04-17 02:30:40 +08:00
13 changed files with 745 additions and 78 deletions

View File

@@ -265,12 +265,21 @@ func runDaemonForeground(cmd *cobra.Command) error {
serverURL = c.ServerURL
}
}
stateDir := daemonDirForProfile(profile)
if stateDir != "" {
// Ensure the state directory exists before LoadConfig tries to write
// daemon.id into it on first run.
if err := os.MkdirAll(stateDir, 0o755); err != nil {
return fmt.Errorf("create daemon state dir %s: %w", stateDir, err)
}
}
overrides := daemon.Overrides{
ServerURL: serverURL,
DaemonID: flagString(cmd, "daemon-id"),
DeviceName: flagString(cmd, "device-name"),
RuntimeName: flagString(cmd, "runtime-name"),
Profile: profile,
StateDir: stateDir,
HealthPort: healthPortForProfile(profile),
}
if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 {

View File

@@ -11,10 +11,13 @@ require (
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/jackc/pgx/v5 v5.8.0
github.com/lmittmann/tint v1.1.3
github.com/microcosm-cc/bluemonday v1.0.27
github.com/resend/resend-go/v2 v2.28.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.10.2
)
@@ -35,14 +38,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 // indirect
github.com/aws/smithy-go v1.24.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/microcosm-cc/bluemonday v1.0.27 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.20.0 // indirect

View File

@@ -27,7 +27,8 @@ const (
// Config holds all daemon configuration.
type Config struct {
ServerBaseURL string
DaemonID string
DaemonID string // persistent UUID from <state_dir>/daemon.id
LegacyDaemonIDs []string // hostname-derived IDs reported to the server so it can consolidate stale runtime rows
DeviceName string
RuntimeName string
CLIVersion string // multica CLI version (e.g. "0.1.13")
@@ -60,6 +61,7 @@ type Overrides struct {
DeviceName string
RuntimeName string
Profile string // profile name (empty = default)
StateDir string // directory that holds daemon.id, daemon.pid, daemon.log (resolved by cmd_daemon)
HealthPort int // health check port (0 = use default)
}
@@ -187,15 +189,30 @@ func LoadConfig(overrides Overrides) (Config, error) {
// Profile
profile := overrides.Profile
// String overrides
daemonID := envOrDefault("MULTICA_DAEMON_ID", host)
if overrides.DaemonID != "" {
daemonID = overrides.DaemonID
// daemon_id: explicit override > env var > persistent UUID in state dir.
// The UUID is generated once on first run and reread forever after, so
// hostname drift (.local suffix, system rename, profile switch) can no
// longer produce a second agent_runtime row for the same physical daemon.
daemonID := strings.TrimSpace(overrides.DaemonID)
if daemonID == "" {
daemonID = strings.TrimSpace(os.Getenv("MULTICA_DAEMON_ID"))
}
legacyDaemonIDs := LegacyDaemonIDCandidates(host, profile)
if daemonID == "" {
if overrides.StateDir == "" {
return Config{}, fmt.Errorf("daemon_id not set and state directory is empty; pass StateDir or set MULTICA_DAEMON_ID")
}
id, _, err := LoadOrCreateDaemonID(overrides.StateDir)
if err != nil {
return Config{}, err
}
daemonID = id
} else {
// Explicit override path: the caller pins the daemon_id themselves, so
// legacy consolidation on the server would incorrectly target rows
// they don't own. Skip it.
legacyDaemonIDs = nil
}
// NOTE: daemon_id is intentionally stable (hostname or explicit override).
// The unique constraint (workspace_id, daemon_id, provider) already prevents
// collisions within the same workspace. Appending the profile name caused
// duplicate runtimes when users switched profiles.
deviceName := envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host)
if overrides.DeviceName != "" {
@@ -258,6 +275,7 @@ func LoadConfig(overrides Overrides) (Config, error) {
return Config{
ServerBaseURL: serverBaseURL,
DaemonID: daemonID,
LegacyDaemonIDs: legacyDaemonIDs,
DeviceName: deviceName,
RuntimeName: runtimeName,
Profile: profile,

View File

@@ -211,6 +211,9 @@ func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID s
"launched_by": d.cfg.LaunchedBy,
"runtimes": runtimes,
}
if len(d.cfg.LegacyDaemonIDs) > 0 {
req["legacy_daemon_ids"] = d.cfg.LegacyDaemonIDs
}
resp, err := d.client.Register(ctx, req)
if err != nil {

View File

@@ -0,0 +1,115 @@
package daemon
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/google/uuid"
)
// DaemonIDFileName is the per-profile file that stores the persistent daemon UUID.
const DaemonIDFileName = "daemon.id"
// LoadOrCreateDaemonID returns the persistent daemon identifier stored under
// stateDir. On first run the file does not exist; a fresh v4 UUID is generated,
// written atomically, and returned. On subsequent runs the existing value is
// read verbatim — so hostname drift (.local suffix, system rename, profile
// switch) can no longer produce a second agent_runtime row.
//
// stateDir is the profile-specific directory (e.g. ~/.multica or
// ~/.multica/profiles/<name>). The caller is expected to create it before
// invoking this function; an MkdirAll fallback is kept for defensive callers
// that forgot.
func LoadOrCreateDaemonID(stateDir string) (string, bool, error) {
if strings.TrimSpace(stateDir) == "" {
return "", false, errors.New("daemon identity: stateDir is empty")
}
path := filepath.Join(stateDir, DaemonIDFileName)
data, err := os.ReadFile(path)
if err == nil {
id := strings.TrimSpace(string(data))
if _, parseErr := uuid.Parse(id); parseErr != nil {
return "", false, fmt.Errorf("daemon identity: %s contains invalid UUID %q: %w", path, id, parseErr)
}
return id, false, nil
}
if !errors.Is(err, os.ErrNotExist) {
return "", false, fmt.Errorf("daemon identity: read %s: %w", path, err)
}
if err := os.MkdirAll(stateDir, 0o755); err != nil {
return "", false, fmt.Errorf("daemon identity: create %s: %w", stateDir, err)
}
newID := uuid.NewString()
tmp, err := os.CreateTemp(stateDir, ".daemon-id-*.tmp")
if err != nil {
return "", false, fmt.Errorf("daemon identity: create temp file: %w", err)
}
tmpPath := tmp.Name()
if _, err := tmp.WriteString(newID + "\n"); err != nil {
tmp.Close()
os.Remove(tmpPath)
return "", false, fmt.Errorf("daemon identity: write temp file: %w", err)
}
if err := tmp.Close(); err != nil {
os.Remove(tmpPath)
return "", false, fmt.Errorf("daemon identity: close temp file: %w", err)
}
if err := os.Chmod(tmpPath, 0o600); err != nil {
os.Remove(tmpPath)
return "", false, fmt.Errorf("daemon identity: chmod temp file: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath)
return "", false, fmt.Errorf("daemon identity: rename temp file: %w", err)
}
return newID, true, nil
}
// LegacyDaemonIDCandidates returns the set of daemon_id values this machine
// may have produced before UUID persistence landed. The server uses them to
// locate and merge stale agent_runtime rows so existing agents keep working
// without manual migration.
//
// The historical formats covered:
// - <hostname> (current, post-#1070)
// - <hostname>.local (pre-#1070 macOS bonjour suffix)
// - <hostname>-<profile> (pre-#906, profile-suffixed)
// - <hostname>.local-<profile> (the .local + profile suffix combo)
//
// Duplicates and empty strings are removed. Order is preserved so the caller
// sees the most-likely-current form first.
func LegacyDaemonIDCandidates(hostname, profile string) []string {
hostname = strings.TrimSpace(hostname)
if hostname == "" {
return nil
}
stripped := strings.TrimSuffix(hostname, ".local")
raw := []string{hostname, stripped}
if profile != "" {
raw = append(raw, hostname+"-"+profile, stripped+"-"+profile)
}
seen := make(map[string]struct{}, len(raw))
out := make([]string, 0, len(raw))
for _, v := range raw {
v = strings.TrimSpace(v)
if v == "" {
continue
}
if _, dup := seen[v]; dup {
continue
}
seen[v] = struct{}{}
out = append(out, v)
}
return out
}

View File

@@ -0,0 +1,143 @@
package daemon
import (
"os"
"path/filepath"
"testing"
"github.com/google/uuid"
)
func TestLoadOrCreateDaemonID_CreatesAndReads(t *testing.T) {
dir := t.TempDir()
id, created, err := LoadOrCreateDaemonID(dir)
if err != nil {
t.Fatalf("first call: %v", err)
}
if !created {
t.Fatalf("expected created=true on fresh directory")
}
if _, err := uuid.Parse(id); err != nil {
t.Fatalf("first call returned invalid UUID %q: %v", id, err)
}
id2, created, err := LoadOrCreateDaemonID(dir)
if err != nil {
t.Fatalf("second call: %v", err)
}
if created {
t.Fatalf("expected created=false when daemon.id exists")
}
if id != id2 {
t.Fatalf("expected stable id across calls: got %q then %q", id, id2)
}
path := filepath.Join(dir, DaemonIDFileName)
data, err := os.ReadFile(path)
if err != nil {
t.Fatalf("read file: %v", err)
}
if string(data) == "" {
t.Fatalf("daemon.id is empty")
}
info, err := os.Stat(path)
if err != nil {
t.Fatalf("stat: %v", err)
}
if info.Mode().Perm() != 0o600 {
t.Fatalf("expected 0600 perms, got %v", info.Mode().Perm())
}
}
func TestLoadOrCreateDaemonID_RejectsCorruptFile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, DaemonIDFileName)
if err := os.WriteFile(path, []byte("not-a-uuid"), 0o600); err != nil {
t.Fatalf("setup: %v", err)
}
_, _, err := LoadOrCreateDaemonID(dir)
if err == nil {
t.Fatalf("expected error on non-UUID contents, got nil")
}
}
func TestLoadOrCreateDaemonID_EmptyDir(t *testing.T) {
if _, _, err := LoadOrCreateDaemonID(""); err == nil {
t.Fatalf("expected error for empty stateDir")
}
}
func TestLegacyDaemonIDCandidates(t *testing.T) {
cases := []struct {
name string
hostname string
profile string
want []string
}{
{
name: "plain hostname, default profile",
hostname: "MacBook-Pro",
profile: "",
want: []string{"MacBook-Pro"},
},
{
name: ".local hostname, default profile",
hostname: "Jiayuans-MacBook-Pro.local",
profile: "",
want: []string{"Jiayuans-MacBook-Pro.local", "Jiayuans-MacBook-Pro"},
},
{
name: "plain hostname, named profile",
hostname: "MacBook-Air",
profile: "staging",
want: []string{"MacBook-Air", "MacBook-Air-staging"},
},
{
name: ".local hostname, named profile",
hostname: "Jiayuans-MacBook-Pro.local",
profile: "staging",
want: []string{
"Jiayuans-MacBook-Pro.local",
"Jiayuans-MacBook-Pro",
"Jiayuans-MacBook-Pro.local-staging",
"Jiayuans-MacBook-Pro-staging",
},
},
{
name: "empty hostname returns nil",
hostname: "",
profile: "staging",
want: nil,
},
{
name: "whitespace-only hostname",
hostname: " ",
profile: "",
want: nil,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := LegacyDaemonIDCandidates(tc.hostname, tc.profile)
if !equalStringSlice(got, tc.want) {
t.Fatalf("LegacyDaemonIDCandidates(%q, %q) = %v, want %v", tc.hostname, tc.profile, got, tc.want)
}
})
}
}
func equalStringSlice(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View File

@@ -118,10 +118,16 @@ func (h *Handler) resolveTaskWorkspaceID(r *http.Request, task db.AgentTaskQueue
type DaemonRegisterRequest struct {
WorkspaceID string `json:"workspace_id"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
CLIVersion string `json:"cli_version"` // multica CLI version
LaunchedBy string `json:"launched_by"` // "desktop" when spawned by the Electron app
Runtimes []struct {
// LegacyDaemonIDs carries any hostname-derived identifiers the daemon
// may have reported in earlier versions (before daemon_id became a
// persistent UUID). The server consolidates any matching stale rows
// into the current registration so agent FKs stay valid across the
// upgrade. Optional.
LegacyDaemonIDs []string `json:"legacy_daemon_ids"`
DeviceName string `json:"device_name"`
CLIVersion string `json:"cli_version"` // multica CLI version
LaunchedBy string `json:"launched_by"` // "desktop" when spawned by the Electron app
Runtimes []struct {
Name string `json:"name"`
Type string `json:"type"`
Version string `json:"version"` // agent CLI version (claude/codex)
@@ -287,26 +293,7 @@ func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
return
}
// Migrate agents from old offline runtimes on the same machine to the
// newly registered runtime. Uses the runtime's owner_id (preserved via
// COALESCE on upsert) so migration works with both PAT and daemon tokens.
// Scoped by daemon_id prefix so that only old profile-suffixed runtimes
// (e.g. "hostname-staging") from this machine are affected.
effectiveOwnerID := registered.OwnerID
if effectiveOwnerID.Valid {
migrated, err := h.Queries.MigrateAgentsToRuntime(r.Context(), db.MigrateAgentsToRuntimeParams{
NewRuntimeID: registered.ID,
WorkspaceID: parseUUID(req.WorkspaceID),
Provider: provider,
OwnerID: effectiveOwnerID,
DaemonIDPrefix: strToText(req.DaemonID),
})
if err != nil {
slog.Warn("failed to migrate agents to new runtime", "runtime_id", uuidToString(registered.ID), "error", err)
} else if migrated > 0 {
slog.Info("migrated agents to new runtime", "runtime_id", uuidToString(registered.ID), "provider", provider, "migrated_count", migrated)
}
}
h.consolidateLegacyRuntimes(r, registered, req.WorkspaceID, provider, req.LegacyDaemonIDs)
resp = append(resp, runtimeToResponse(registered))
}
@@ -340,6 +327,103 @@ func (h *Handler) GetDaemonWorkspaceRepos(w http.ResponseWriter, r *http.Request
writeJSON(w, http.StatusOK, workspaceReposResponse(workspaceID, ws.Repos))
}
// consolidateLegacyRuntimes merges any hostname-derived agent_runtime rows
// this machine previously produced into the just-upserted UUID row. For each
// legacy candidate (hostname, hostname.local, hostname-<profile>, etc.) we
// reparent agents and tasks, delete the stale row, and record the last value
// as legacy_daemon_id for audit. Scoped by (workspace, provider, owner) so a
// different user's rows on a machine sharing the same hostname are untouched.
//
// Errors are logged but never fail registration: consolidation is a best-effort
// cleanup and should not block a healthy daemon from coming online.
func (h *Handler) consolidateLegacyRuntimes(r *http.Request, registered db.AgentRuntime, workspaceID, provider string, legacyDaemonIDs []string) {
if len(legacyDaemonIDs) == 0 {
return
}
// Owner is required to scope the migration; without it we'd risk touching
// rows belonging to other users. Daemon-token flows rely on COALESCE in
// UpsertAgentRuntime to preserve an existing owner on re-register.
if !registered.OwnerID.Valid {
return
}
workspaceUUID := parseUUID(workspaceID)
var lastMatched string
var totalAgents, totalTasks, totalDeleted int64
for _, legacyID := range legacyDaemonIDs {
legacyID = strings.TrimSpace(legacyID)
if legacyID == "" {
continue
}
// Skip self-match: if the daemon reported its own new daemon_id as a
// legacy candidate we'd wipe the row we just created.
if legacyID == strings.TrimSpace(registered.DaemonID.String) {
continue
}
agents, err := h.Queries.MigrateAgentsFromLegacyDaemon(r.Context(), db.MigrateAgentsFromLegacyDaemonParams{
NewRuntimeID: registered.ID,
WorkspaceID: workspaceUUID,
Provider: provider,
OwnerID: registered.OwnerID,
LegacyDaemonID: strToText(legacyID),
})
if err != nil {
slog.Warn("legacy runtime consolidation: migrate agents failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
continue
}
tasks, err := h.Queries.MigrateTasksFromLegacyDaemon(r.Context(), db.MigrateTasksFromLegacyDaemonParams{
NewRuntimeID: registered.ID,
WorkspaceID: workspaceUUID,
Provider: provider,
OwnerID: registered.OwnerID,
LegacyDaemonID: strToText(legacyID),
})
if err != nil {
slog.Warn("legacy runtime consolidation: migrate tasks failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
continue
}
deleted, err := h.Queries.DeleteLegacyRuntime(r.Context(), db.DeleteLegacyRuntimeParams{
WorkspaceID: workspaceUUID,
Provider: provider,
OwnerID: registered.OwnerID,
NewRuntimeID: registered.ID,
LegacyDaemonID: strToText(legacyID),
})
if err != nil {
slog.Warn("legacy runtime consolidation: delete legacy row failed", "runtime_id", uuidToString(registered.ID), "legacy_daemon_id", legacyID, "error", err)
continue
}
if deleted > 0 || agents > 0 || tasks > 0 {
lastMatched = legacyID
totalAgents += agents
totalTasks += tasks
totalDeleted += deleted
}
}
if lastMatched != "" {
if err := h.Queries.SetRuntimeLegacyDaemonID(r.Context(), db.SetRuntimeLegacyDaemonIDParams{
ID: registered.ID,
LegacyDaemonID: strToText(lastMatched),
}); err != nil {
slog.Warn("legacy runtime consolidation: record legacy_daemon_id failed", "runtime_id", uuidToString(registered.ID), "error", err)
}
slog.Info("legacy runtime consolidated",
"runtime_id", uuidToString(registered.ID),
"provider", provider,
"legacy_daemon_id", lastMatched,
"agents_reparented", totalAgents,
"tasks_reparented", totalTasks,
"rows_deleted", totalDeleted,
)
}
}
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
var req struct {

View File

@@ -79,6 +79,153 @@ func TestDaemonRegister_WithDaemonToken(t *testing.T) {
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, runtimeID)
}
// TestDaemonRegister_ConsolidatesLegacyRuntime reproduces the duplicate-
// runtime scenario from MUL-975: an agent_runtime row already exists for a
// hostname-derived daemon_id (e.g. "host.local"), with an agent and task
// pointing at it. When the upgraded daemon registers with a persistent UUID
// daemon_id and reports the old hostname as a legacy candidate, the stale row
// must be deleted and its agents + tasks reparented to the new UUID row.
func TestDaemonRegister_ConsolidatesLegacyRuntime(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
legacyDaemonID := "Jiayuans-MacBook-Pro.local"
provider := "claude"
// Seed a legacy runtime row owned by the test user.
var legacyRuntimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status,
device_info, metadata, owner_id, last_seen_at
)
VALUES ($1, $2, 'Legacy Claude Runtime', 'local', $3, 'offline',
'', '{}'::jsonb, $4, now())
RETURNING id
`, testWorkspaceID, legacyDaemonID, provider, testUserID).Scan(&legacyRuntimeID); err != nil {
t.Fatalf("seed legacy runtime: %v", err)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, legacyRuntimeID)
})
var legacyAgentID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent (
workspace_id, name, description, runtime_mode, runtime_config,
runtime_id, visibility, max_concurrent_tasks, owner_id
)
VALUES ($1, 'Legacy Agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3)
RETURNING id
`, testWorkspaceID, legacyRuntimeID, testUserID).Scan(&legacyAgentID); err != nil {
t.Fatalf("seed legacy agent: %v", err)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent WHERE id = $1`, legacyAgentID)
})
var legacyIssueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, status, priority, creator_id, creator_type)
VALUES ($1, 'consolidation-test', 'todo', 'medium', $2, 'member')
RETURNING id
`, testWorkspaceID, testUserID).Scan(&legacyIssueID); err != nil {
t.Fatalf("seed issue: %v", err)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM issue WHERE id = $1`, legacyIssueID)
})
var legacyTaskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
VALUES ($1, $2, $3, 'queued', 0)
RETURNING id
`, legacyAgentID, legacyRuntimeID, legacyIssueID).Scan(&legacyTaskID); err != nil {
t.Fatalf("seed task: %v", err)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, legacyTaskID)
})
// Register with a new UUID daemon_id + the legacy hostname reported as a
// legacy candidate. PAT auth (newRequest) so the upsert picks up the user
// as owner_id.
newDaemonID := "4d1b2b26-7f9b-4f50-9ea8-8e1f1ca88888"
w := httptest.NewRecorder()
req := newRequest("POST", "/api/daemon/register", map[string]any{
"workspace_id": testWorkspaceID,
"daemon_id": newDaemonID,
"device_name": "Jiayuan-MacBook-Pro",
"legacy_daemon_ids": []string{legacyDaemonID, "Jiayuans-MacBook-Pro"},
"runtimes": []map[string]any{
{"name": "Claude", "type": provider, "version": "1.0.0", "status": "online"},
},
})
testHandler.DaemonRegister(w, req)
if w.Code != http.StatusOK {
t.Fatalf("DaemonRegister: expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp struct {
Runtimes []map[string]any `json:"runtimes"`
}
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode response: %v", err)
}
if len(resp.Runtimes) != 1 {
t.Fatalf("expected 1 runtime in response, got %d", len(resp.Runtimes))
}
newRuntimeID := resp.Runtimes[0]["id"].(string)
if newRuntimeID == legacyRuntimeID {
t.Fatalf("expected new runtime id to differ from legacy id %s", legacyRuntimeID)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID)
})
// Legacy row must be gone.
var legacyCount int
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM agent_runtime WHERE id = $1`, legacyRuntimeID).Scan(&legacyCount); err != nil {
t.Fatalf("count legacy runtime: %v", err)
}
if legacyCount != 0 {
t.Fatalf("expected legacy runtime row to be deleted, still present")
}
// Agent must now point at the new runtime row.
var agentRuntimeID string
if err := testPool.QueryRow(ctx, `SELECT runtime_id FROM agent WHERE id = $1`, legacyAgentID).Scan(&agentRuntimeID); err != nil {
t.Fatalf("read agent.runtime_id: %v", err)
}
if agentRuntimeID != newRuntimeID {
t.Fatalf("expected agent.runtime_id=%s after consolidation, got %s", newRuntimeID, agentRuntimeID)
}
// Queued task must also have been reparented (not cascade-deleted).
var taskRuntimeID, taskStatus string
if err := testPool.QueryRow(ctx, `SELECT runtime_id, status FROM agent_task_queue WHERE id = $1`, legacyTaskID).Scan(&taskRuntimeID, &taskStatus); err != nil {
t.Fatalf("read task: %v", err)
}
if taskRuntimeID != newRuntimeID {
t.Fatalf("expected task.runtime_id=%s after consolidation, got %s", newRuntimeID, taskRuntimeID)
}
if taskStatus != "queued" {
t.Fatalf("expected task status unchanged (queued), got %q", taskStatus)
}
// legacy_daemon_id breadcrumb should be set on the new row.
var breadcrumb string
if err := testPool.QueryRow(ctx, `SELECT COALESCE(legacy_daemon_id, '') FROM agent_runtime WHERE id = $1`, newRuntimeID).Scan(&breadcrumb); err != nil {
t.Fatalf("read legacy_daemon_id: %v", err)
}
if breadcrumb == "" {
t.Fatalf("expected legacy_daemon_id breadcrumb on new row, got empty")
}
}
func TestDaemonRegister_WithDaemonToken_WorkspaceMismatch(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")

View File

@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_agent_runtime_legacy_daemon_id;
ALTER TABLE agent_runtime DROP COLUMN IF EXISTS legacy_daemon_id;

View File

@@ -0,0 +1,24 @@
-- Stabilize daemon_id as a persistent UUID.
--
-- Before this change daemon_id was derived from os.Hostname(), so changes
-- like the macOS `.local` suffix appearing/disappearing, the user renaming
-- their machine, or switching profiles produced a fresh agent_runtime row
-- every time, stranding agents on the stale one.
--
-- From this migration forward the daemon generates a UUID on first start,
-- writes it to ~/.multica/<profile>/daemon.id, and re-uses it forever. The
-- server-side DaemonRegister flow consolidates any pre-existing rows that
-- match the historic hostname-based daemon_id candidates (hostname,
-- hostname.local, hostname-<profile>, hostname.local-<profile>) into the
-- new UUID row, so agents keep pointing to the same runtime id across the
-- upgrade with no manual intervention.
--
-- legacy_daemon_id is a best-effort breadcrumb — we record whatever old
-- daemon_id value we rewrote last, which makes the consolidation auditable.
-- It also gives us a cheap index for locating rows that were migrated.
ALTER TABLE agent_runtime
ADD COLUMN legacy_daemon_id TEXT;
CREATE INDEX idx_agent_runtime_legacy_daemon_id
ON agent_runtime(workspace_id, provider, legacy_daemon_id)
WHERE legacy_daemon_id IS NOT NULL;

View File

@@ -42,19 +42,20 @@ type Agent struct {
}
type AgentRuntime struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
OwnerID pgtype.UUID `json:"owner_id"`
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
OwnerID pgtype.UUID `json:"owner_id"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
}
type AgentSkill struct {

View File

@@ -40,6 +40,40 @@ func (q *Queries) DeleteArchivedAgentsByRuntime(ctx context.Context, runtimeID p
return err
}
const deleteLegacyRuntime = `-- name: DeleteLegacyRuntime :execrows
DELETE FROM agent_runtime
WHERE workspace_id = $1
AND provider = $2
AND owner_id = $3
AND id != $4
AND daemon_id = $5
`
type DeleteLegacyRuntimeParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Provider string `json:"provider"`
OwnerID pgtype.UUID `json:"owner_id"`
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
}
// Removes the stale hostname-derived runtime row once its agents and tasks
// have been reparented. legacy_daemon_id on the new row captures the last
// removed value as a breadcrumb.
func (q *Queries) DeleteLegacyRuntime(ctx context.Context, arg DeleteLegacyRuntimeParams) (int64, error) {
result, err := q.db.Exec(ctx, deleteLegacyRuntime,
arg.WorkspaceID,
arg.Provider,
arg.OwnerID,
arg.NewRuntimeID,
arg.LegacyDaemonID,
)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const deleteStaleOfflineRuntimes = `-- name: DeleteStaleOfflineRuntimes :many
DELETE FROM agent_runtime
WHERE status = 'offline'
@@ -115,7 +149,7 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]FailTasksF
}
const getAgentRuntime = `-- name: GetAgentRuntime :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
WHERE id = $1
`
@@ -136,12 +170,13 @@ func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRun
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
)
return i, err
}
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
WHERE id = $1 AND workspace_id = $2
`
@@ -167,12 +202,13 @@ func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentR
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
)
return i, err
}
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
WHERE workspace_id = $1
ORDER BY created_at ASC
`
@@ -200,6 +236,7 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
); err != nil {
return nil, err
}
@@ -212,7 +249,7 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID
}
const listAgentRuntimesByOwner = `-- name: ListAgentRuntimesByOwner :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id FROM agent_runtime
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
WHERE workspace_id = $1 AND owner_id = $2
ORDER BY created_at ASC
`
@@ -245,6 +282,7 @@ func (q *Queries) ListAgentRuntimesByOwner(ctx context.Context, arg ListAgentRun
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
); err != nil {
return nil, err
}
@@ -289,7 +327,7 @@ func (q *Queries) MarkStaleRuntimesOffline(ctx context.Context, staleSeconds flo
return items, nil
}
const migrateAgentsToRuntime = `-- name: MigrateAgentsToRuntime :execrows
const migrateAgentsFromLegacyDaemon = `-- name: MigrateAgentsFromLegacyDaemon :execrows
UPDATE agent
SET runtime_id = $1
WHERE runtime_id IN (
@@ -298,32 +336,68 @@ WHERE runtime_id IN (
AND ar.provider = $3
AND ar.owner_id = $4
AND ar.id != $1
AND ar.status = 'offline'
AND ar.daemon_id LIKE $5 || '-%'
AND ar.daemon_id = $5
)
`
type MigrateAgentsToRuntimeParams struct {
type MigrateAgentsFromLegacyDaemonParams struct {
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
Provider string `json:"provider"`
OwnerID pgtype.UUID `json:"owner_id"`
DaemonIDPrefix pgtype.Text `json:"daemon_id_prefix"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
}
// Migrates agents from stale offline runtimes to the newly registered runtime.
// Only migrates from runtimes that match the same workspace, provider, owner,
// AND whose daemon_id starts with the current daemon_id followed by '-'.
// This scopes migration to old profile-suffixed runtimes from the same machine
// (e.g. "MacBook-staging" matches daemon_id_prefix "MacBook") without touching
// runtimes from other machines belonging to the same user.
func (q *Queries) MigrateAgentsToRuntime(ctx context.Context, arg MigrateAgentsToRuntimeParams) (int64, error) {
result, err := q.db.Exec(ctx, migrateAgentsToRuntime,
// Reparents agents from the legacy (hostname-derived) runtime row to the
// newly registered UUID row. Scoped to a single (workspace, provider,
// owner) triple so we never touch another user's runtimes even if they
// share a hostname on the same machine. Called once per legacy daemon_id
// candidate reported by the daemon at registration time.
func (q *Queries) MigrateAgentsFromLegacyDaemon(ctx context.Context, arg MigrateAgentsFromLegacyDaemonParams) (int64, error) {
result, err := q.db.Exec(ctx, migrateAgentsFromLegacyDaemon,
arg.NewRuntimeID,
arg.WorkspaceID,
arg.Provider,
arg.OwnerID,
arg.DaemonIDPrefix,
arg.LegacyDaemonID,
)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const migrateTasksFromLegacyDaemon = `-- name: MigrateTasksFromLegacyDaemon :execrows
UPDATE agent_task_queue
SET runtime_id = $1
WHERE runtime_id IN (
SELECT ar.id FROM agent_runtime ar
WHERE ar.workspace_id = $2
AND ar.provider = $3
AND ar.owner_id = $4
AND ar.id != $1
AND ar.daemon_id = $5
)
`
type MigrateTasksFromLegacyDaemonParams struct {
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
Provider string `json:"provider"`
OwnerID pgtype.UUID `json:"owner_id"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
}
// Same scoping as MigrateAgentsFromLegacyDaemon. Must run before the DELETE
// below because agent_task_queue.runtime_id is ON DELETE CASCADE; deleting
// the legacy row first would silently drop in-flight tasks.
func (q *Queries) MigrateTasksFromLegacyDaemon(ctx context.Context, arg MigrateTasksFromLegacyDaemonParams) (int64, error) {
result, err := q.db.Exec(ctx, migrateTasksFromLegacyDaemon,
arg.NewRuntimeID,
arg.WorkspaceID,
arg.Provider,
arg.OwnerID,
arg.LegacyDaemonID,
)
if err != nil {
return 0, err
@@ -342,11 +416,27 @@ func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) er
return err
}
const setRuntimeLegacyDaemonID = `-- name: SetRuntimeLegacyDaemonID :exec
UPDATE agent_runtime
SET legacy_daemon_id = $1
WHERE id = $2
`
type SetRuntimeLegacyDaemonIDParams struct {
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
ID pgtype.UUID `json:"id"`
}
func (q *Queries) SetRuntimeLegacyDaemonID(ctx context.Context, arg SetRuntimeLegacyDaemonIDParams) error {
_, err := q.db.Exec(ctx, setRuntimeLegacyDaemonID, arg.LegacyDaemonID, arg.ID)
return err
}
const updateAgentRuntimeHeartbeat = `-- name: UpdateAgentRuntimeHeartbeat :one
UPDATE agent_runtime
SET status = 'online', last_seen_at = now(), updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id
`
func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
@@ -366,6 +456,7 @@ func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUI
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
)
return i, err
}
@@ -393,7 +484,7 @@ DO UPDATE SET
owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id),
last_seen_at = now(),
updated_at = now()
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id
`
type UpsertAgentRuntimeParams struct {
@@ -435,6 +526,7 @@ func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntime
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
)
return i, err
}

View File

@@ -79,13 +79,12 @@ SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL;
-- name: DeleteArchivedAgentsByRuntime :exec
DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL;
-- name: MigrateAgentsToRuntime :execrows
-- Migrates agents from stale offline runtimes to the newly registered runtime.
-- Only migrates from runtimes that match the same workspace, provider, owner,
-- AND whose daemon_id starts with the current daemon_id followed by '-'.
-- This scopes migration to old profile-suffixed runtimes from the same machine
-- (e.g. "MacBook-staging" matches daemon_id_prefix "MacBook") without touching
-- runtimes from other machines belonging to the same user.
-- name: MigrateAgentsFromLegacyDaemon :execrows
-- Reparents agents from the legacy (hostname-derived) runtime row to the
-- newly registered UUID row. Scoped to a single (workspace, provider,
-- owner) triple so we never touch another user's runtimes even if they
-- share a hostname on the same machine. Called once per legacy daemon_id
-- candidate reported by the daemon at registration time.
UPDATE agent
SET runtime_id = @new_runtime_id
WHERE runtime_id IN (
@@ -94,10 +93,40 @@ WHERE runtime_id IN (
AND ar.provider = @provider
AND ar.owner_id = @owner_id
AND ar.id != @new_runtime_id
AND ar.status = 'offline'
AND ar.daemon_id LIKE @daemon_id_prefix || '-%'
AND ar.daemon_id = @legacy_daemon_id
);
-- name: MigrateTasksFromLegacyDaemon :execrows
-- Same scoping as MigrateAgentsFromLegacyDaemon. Must run before the DELETE
-- below because agent_task_queue.runtime_id is ON DELETE CASCADE; deleting
-- the legacy row first would silently drop in-flight tasks.
UPDATE agent_task_queue
SET runtime_id = @new_runtime_id
WHERE runtime_id IN (
SELECT ar.id FROM agent_runtime ar
WHERE ar.workspace_id = @workspace_id
AND ar.provider = @provider
AND ar.owner_id = @owner_id
AND ar.id != @new_runtime_id
AND ar.daemon_id = @legacy_daemon_id
);
-- name: DeleteLegacyRuntime :execrows
-- Removes the stale hostname-derived runtime row once its agents and tasks
-- have been reparented. legacy_daemon_id on the new row captures the last
-- removed value as a breadcrumb.
DELETE FROM agent_runtime
WHERE workspace_id = @workspace_id
AND provider = @provider
AND owner_id = @owner_id
AND id != @new_runtime_id
AND daemon_id = @legacy_daemon_id;
-- name: SetRuntimeLegacyDaemonID :exec
UPDATE agent_runtime
SET legacy_daemon_id = @legacy_daemon_id
WHERE id = @id;
-- name: DeleteStaleOfflineRuntimes :many
-- Deletes runtimes that have been offline for longer than the TTL and have
-- no agents bound (active or archived). The FK constraint on agent.runtime_id