Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
d596f4d883 fix(daemon): address review blockers in runtime self-heal path
Three concrete bugs surfaced in GPT-Boy's review of PR #2404. All fail
without these changes against new regression tests in this commit.

1. Recovery used the per-runtime heartbeat ctx for the register HTTP call.
   handleRuntimeGone prunes the dead runtime and calls notifyRuntimeSetChanged
   before the register completes, which lets heartbeatLoop cancel that very
   ctx — the in-flight register would self-cancel.

   Fix: store the daemon root ctx on the Daemon struct in Run(), expose it
   via recoveryContext(), and drop the ctx parameter from handleRuntimeGone
   entirely so no caller can accidentally pass a per-runtime ctx again. The
   poller path already used parentCtx; this brings the heartbeat and WS-ack
   paths in line.

2. reregisterNextAttempt[workspaceID] was kept set for the full 30s coalesce
   window even after a successful re-register. A user deleting a second
   distinct runtime in the same workspace within that window would see
   handleRuntimeGone prune it locally and then skip the register, losing
   that provider until daemon restart.

   Fix: clear the per-workspace slot on success. The per-runtime in-flight
   set still prevents same-event stampedes; the slot is only for coalescing
   concurrent callers triggered by the same initial event. Failure backoff
   is preserved on the failed path.

3. The register response was appended to the surviving runtimeIDs, but
   DaemonRegister returns every configured provider — including the
   unchanged ones. UpsertAgentRuntime keys on (workspace_id, daemon_id,
   provider), so a partial recovery on [rt-claude, rt-codex] returned
   [rt-claude-new, rt-codex] and produced [rt-codex, rt-claude-new,
   rt-codex] with the old append logic. Duplicates leaked through
   allRuntimeIDs(), the WS URL, deregister(), and future recovery state.

   Fix: treat the response as authoritative for the workspace's runtime
   set — replace ws.runtimeIDs with the response IDs, and diff
   runtimeIndex so any old IDs the response did not return are dropped.
   The workspaceState pointer is still not replaced; only fields are
   mutated, so the ensureRepoReady / repoRefreshMu invariant is preserved.

Tests:
- TestHandleRuntimeGone_PartialWorkspaceRecoveryKeepsSibling — workspace
  [rt-claude-1, rt-codex-1], only claude deleted, server upsert behavior
  simulated; ws.runtimeIDs must be exactly the response set, no duplicates.
- TestHandleRuntimeGone_DistinctDeletionsWithinCoalesceWindowBothRecover —
  delete claude (1 register), then within the coalesce window delete codex
  (must produce a 2nd register); both providers present in final state.
- TestHandleRuntimeGone_RecoveryContextSurvivesCallerCancellation — register
  handler asserts r.Context() is not Done at request time, even though no
  caller-provided ctx is forwarded.
- TestHandleRuntimeGone_RecoveryContextStopsOnDaemonShutdown — bounded
  server handler; verifies handleRuntimeGone returns promptly after
  rootCancel, not after the 30s HTTP client timeout.

go test -race ./internal/daemon ./internal/middleware ./internal/daemonws
green.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-11 15:57:21 +08:00
Jiang Bohan
6bddcd2c34 fix(daemon): self-heal when a runtime is deleted server-side
Closes the warn-log flood reported in #2391. After a runtime was deleted
via the UI (or reclaimed by the 7-day offline GC), the server kept Warning
about every heartbeat / claim / WS frame against the dead UUID at ~3 lines
per 15 s per stale runtime — indefinitely, until someone restarted the
daemon process.

Three coordinated changes:

1. Server distinguishes pgx.ErrNoRows from other DB errors. Both the HTTP
   heartbeat path and HandleDaemonWSHeartbeat (plus requireDaemonRuntimeAccess)
   now only map a missing-row to 404; transient pool / schema errors return
   500. Without this the daemon would mistake a DB hiccup for a deletion
   and prune its own runtime state.

2. WebSocket heartbeat now returns a success ack with RuntimeGone=true and
   Status=runtime_gone instead of an error when the runtime is missing. The
   hub no longer logs a Warn per beat for the (now expected) state. New
   field on DaemonHeartbeatAckPayload; older daemons just ignore it and fall
   back to the existing HTTP-side 404 self-heal path.

3. Daemon self-heals. isRuntimeNotFoundError joins the existing 404-body
   detectors and a unified handleRuntimeGone:
     - Coalesces concurrent recoveries via per-runtime in-flight set, so
       heartbeat / poller / WS-ack discovering the same stale UUID together
       don't stampede registerRuntimesForWorkspace.
     - Per-workspace next-attempt timestamp coalesces sibling stale runtimes
       in the same workspace; on registration failure the window stretches
       to 60 s so we never replace a log flood with a register flood.
     - Prunes the runtime from workspaceState.runtimeIDs, Daemon.runtimeIndex
       and the WS heartbeat freshness map.
     - Re-registers in place — the workspaceState pointer is NEVER replaced,
       preserving the repoRefreshMu invariant that syncWorkspacesFromAPI
       already documents.
     - workspaceSyncLoop now retries empty workspaces every 30 s, so a
       transient register failure still recovers without daemon restart.

Logger side: RequestLogger inspects the response body via WrapResponseWriter.Tee
(bounded to 256 bytes) and downgrades 404s with "runtime not found" /
"task not found" bodies to Info. Genuine 404s (wrong path, real bugs)
keep Warn; 5xx stays Error.

Tests cover: isRuntimeNotFoundError table-driven recognition, removeStaleRuntime
pointer-stability and full state pruning, handleRuntimeGone concurrent-caller
coalescing, failure backoff, RuntimeGone WS ack triggering recovery, the
server-side HandleDaemonWSHeartbeat returning the new ack on pgx.ErrNoRows
(plus HTTP 404 mirror), and the logger downgrade for matching bodies while
keeping Warn on generic 404s.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-11 15:27:46 +08:00
10 changed files with 1430 additions and 22 deletions

View File

@@ -55,6 +55,26 @@ func isTaskNotFoundError(err error) bool {
return strings.Contains(strings.ToLower(reqErr.Body), "task not found")
}
// isRuntimeNotFoundError returns true if the error is a 404 with "runtime not
// found" body. The daemon uses this to detect that the runtime row was deleted
// server-side (UI Delete, 7-day offline GC) while the daemon was still
// heartbeating against the dead UUID, so it can prune the stale runtime from
// its local state and re-register instead of looping on the dead ID forever.
//
// Server-side, this body is paired with pgx.ErrNoRows specifically (other DB
// errors return 500), so a transient DB hiccup cannot make the daemon
// self-cleanup.
func isRuntimeNotFoundError(err error) bool {
var reqErr *requestError
if !errors.As(err, &reqErr) {
return false
}
if reqErr.StatusCode != http.StatusNotFound {
return false
}
return strings.Contains(strings.ToLower(reqErr.Body), "runtime not found")
}
// Client handles HTTP communication with the Multica server daemon API.
type Client struct {
baseURL string

View File

@@ -30,6 +30,13 @@ var (
isBrewInstall = cli.IsBrewInstall
getBrewPrefix = cli.GetBrewPrefix
matchKnownBrewPrefix = cli.MatchKnownBrewPrefix
// detectAgentVersion / checkAgentMinVersion are indirections over the
// real agent helpers so tests can run the registration path without
// shelling out to a real CLI. Mirrors the pattern used for the brew
// helpers above.
detectAgentVersion = agent.DetectVersion
checkAgentMinVersion = agent.CheckMinVersion
)
// workspaceState tracks registered runtimes for a single workspace.
@@ -75,7 +82,16 @@ type Daemon struct {
wsHBMu sync.RWMutex // guards wsHBLastAck
wsHBLastAck map[string]time.Time // runtime_id -> last successful WS heartbeat ack timestamp
// runtimeGoneMu guards runtimeGoneInflight and reregisterNextAttempt. The
// state lets heartbeat / poller / WS-ack handlers converge on a single
// recovery path when they each detect that a runtime row was deleted
// server-side without three of them stampeding registerRuntimesForWorkspace.
runtimeGoneMu sync.Mutex
runtimeGoneInflight map[string]struct{} // runtime_id -> currently recovering
reregisterNextAttempt map[string]time.Time // workspace_id -> earliest time the next re-register attempt may run
cancelFunc context.CancelFunc // set by Run(); called by triggerRestart
rootCtx context.Context // set by Run(); used by long-running recoveries that must survive per-runtime ctx cancellation
restartBinary string // non-empty after a successful update; path to the new binary
updating atomic.Bool // prevents concurrent update attempts
activeTasks atomic.Int64 // number of tasks currently in handleTask; exposed via /health
@@ -99,16 +115,18 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
// server can split logs/metrics by client version (parallel to the CLI).
client.SetVersion(cfg.CLIVersion)
return &Daemon{
cfg: cfg,
client: client,
repoCache: repocache.New(cacheRoot, logger),
logger: logger,
workspaces: make(map[string]*workspaceState),
runtimeIndex: make(map[string]Runtime),
runtimeSet: newRuntimeSetWatcher(),
agentVersions: make(map[string]string),
wsHBLastAck: make(map[string]time.Time),
activeEnvRoots: make(map[string]int),
cfg: cfg,
client: client,
repoCache: repocache.New(cacheRoot, logger),
logger: logger,
workspaces: make(map[string]*workspaceState),
runtimeIndex: make(map[string]Runtime),
runtimeSet: newRuntimeSetWatcher(),
agentVersions: make(map[string]string),
wsHBLastAck: make(map[string]time.Time),
activeEnvRoots: make(map[string]int),
runtimeGoneInflight: make(map[string]struct{}),
reregisterNextAttempt: make(map[string]time.Time),
}
}
@@ -132,6 +150,247 @@ func (d *Daemon) notifyRuntimeSetChanged() {
d.runtimeSet.notify()
}
// reregisterCoalesceWindow caps how often the daemon re-registers a workspace
// after detecting a runtime_not_found response. Many stale runtime IDs may be
// reported within seconds of each other (one delete clears all of a daemon's
// runtimes), and a single re-register call replaces every runtime in the
// workspace, so concurrent recoveries must collapse to one API call.
const reregisterCoalesceWindow = 30 * time.Second
// reregisterFailureBackoff is the additional wait inserted before the next
// re-register attempt when the previous one failed. This prevents heartbeat
// ticks (~15s) from converting a server-side log flood into a re-register
// flood when re-registration itself is failing (workspace removed, server
// unreachable, ...).
const reregisterFailureBackoff = 60 * time.Second
// handleRuntimeGone is the single recovery entry point shared by the HTTP
// heartbeat path, the runtime poller, and the WebSocket runtime_gone ack
// handler. All three may notice the same stale runtime within a few ms of
// each other, so this function:
//
// - keys an in-flight set on runtimeID to drop concurrent calls for the same
// ID after the first one is already cleaning up; and
// - keys a per-workspace next-attempt timestamp on workspaceID so that
// concurrent recoveries triggered by the SAME initial event coalesce to a
// single registerRuntimesForWorkspace call. The slot is cleared on success
// so a later distinct runtime deletion in the same workspace can trigger
// its own recovery without waiting for the coalesce window to expire.
//
// On failure of the underlying re-register, the next-attempt timestamp is
// extended by reregisterFailureBackoff so we don't replace a server-side log
// flood with a daemon-side register flood. workspaceSyncLoop will retry
// independently every DefaultWorkspaceSyncInterval as a safety net.
//
// The recovery HTTP call uses the daemon root context, not the caller's. The
// heartbeat path's per-runtime ctx is cancelled by notifyRuntimeSetChanged the
// moment we prune the dead UUID, and if we forwarded that ctx the in-flight
// register would self-cancel mid-flight.
func (d *Daemon) handleRuntimeGone(runtimeID string) {
if runtimeID == "" {
return
}
// Stampede control per runtime ID.
d.runtimeGoneMu.Lock()
if _, inflight := d.runtimeGoneInflight[runtimeID]; inflight {
d.runtimeGoneMu.Unlock()
return
}
d.runtimeGoneInflight[runtimeID] = struct{}{}
d.runtimeGoneMu.Unlock()
defer func() {
d.runtimeGoneMu.Lock()
delete(d.runtimeGoneInflight, runtimeID)
d.runtimeGoneMu.Unlock()
}()
workspaceID, removed := d.removeStaleRuntime(runtimeID)
if !removed {
// Already gone from local state — a parallel recovery already
// cleaned this up, or workspaceSyncLoop pruned the whole workspace.
return
}
d.logger.Info("runtime deleted server-side; pruned from local state",
"runtime_id", runtimeID, "workspace_id", workspaceID)
d.notifyRuntimeSetChanged()
// Per-workspace coalescing: claim the slot atomically. The first caller
// past this check is the only one that will run
// registerRuntimesForWorkspace while the coalesce window is open. We
// clear the slot on success so a separate later deletion in the same
// workspace is NOT suppressed; the inflight set above is what keeps two
// callers from racing the same recovery.
now := time.Now()
d.runtimeGoneMu.Lock()
if next, ok := d.reregisterNextAttempt[workspaceID]; ok && now.Before(next) {
d.runtimeGoneMu.Unlock()
d.logger.Debug("skip re-register: coalescing with recent attempt",
"workspace_id", workspaceID)
return
}
d.reregisterNextAttempt[workspaceID] = now.Add(reregisterCoalesceWindow)
d.runtimeGoneMu.Unlock()
if err := d.reregisterWorkspaceAfterRuntimeGone(d.recoveryContext(), workspaceID); err != nil {
d.runtimeGoneMu.Lock()
d.reregisterNextAttempt[workspaceID] = time.Now().Add(reregisterFailureBackoff)
d.runtimeGoneMu.Unlock()
// Logged at Warn (not Error) because workspaceSyncLoop retries
// independently every DefaultWorkspaceSyncInterval, so a transient
// failure here is not a stuck state — just an extra wait.
d.logger.Warn("re-register after runtime gone failed",
"workspace_id", workspaceID, "error", err)
return
}
// Success: clear the coalesce slot so a future distinct runtime deletion
// in this workspace can trigger its own recovery immediately. The
// inflight set on runtimeID still prevents same-event stampedes.
d.runtimeGoneMu.Lock()
delete(d.reregisterNextAttempt, workspaceID)
d.runtimeGoneMu.Unlock()
}
// recoveryContext returns the daemon root context for long-running recovery
// HTTP calls (re-register, recover-orphans) that must survive the heartbeat
// loop tearing down a per-runtime context. Falls back to Background when the
// daemon was not started via Run(), e.g. unit-test fixtures.
func (d *Daemon) recoveryContext() context.Context {
if d.rootCtx != nil {
return d.rootCtx
}
return context.Background()
}
// removeStaleRuntime drops a runtime ID from its owning workspace's runtimeIDs
// list, the daemon-level runtimeIndex, and the WS heartbeat freshness map.
// Returns the workspace ID and true if the runtime was tracked, "" and false
// otherwise.
//
// Callers must NOT replace workspaceState pointers — only mutate fields in
// place — because ensureRepoReady holds workspaceState.repoRefreshMu through
// long repo-sync calls. See syncWorkspacesFromAPI for the same invariant.
func (d *Daemon) removeStaleRuntime(runtimeID string) (string, bool) {
d.mu.Lock()
var workspaceID string
for wsID, ws := range d.workspaces {
found := false
filtered := ws.runtimeIDs[:0:0]
for _, rid := range ws.runtimeIDs {
if rid == runtimeID {
found = true
continue
}
filtered = append(filtered, rid)
}
if found {
ws.runtimeIDs = filtered
workspaceID = wsID
break
}
}
if workspaceID == "" {
d.mu.Unlock()
return "", false
}
delete(d.runtimeIndex, runtimeID)
d.mu.Unlock()
d.wsHBMu.Lock()
delete(d.wsHBLastAck, runtimeID)
d.wsHBMu.Unlock()
return workspaceID, true
}
// workspaceNeedsRuntimeRecovery reports whether a tracked workspace currently
// has zero runtime IDs — the state reached when handleRuntimeGone pruned every
// runtime and its inline re-register failed. workspaceSyncLoop calls this on
// each tick so the workspace can recover without waiting for an external
// trigger.
func (d *Daemon) workspaceNeedsRuntimeRecovery(workspaceID string) bool {
d.mu.Lock()
defer d.mu.Unlock()
ws, ok := d.workspaces[workspaceID]
if !ok {
return false
}
return len(ws.runtimeIDs) == 0
}
// reregisterWorkspaceAfterRuntimeGone calls registerRuntimesForWorkspace and
// updates the existing workspaceState in place. The register response is
// authoritative for this workspace's runtime set — every configured provider
// is included, with UpsertAgentRuntime returning the same row ID for surviving
// providers and a fresh ID for any that were deleted server-side. Replacing
// (rather than appending) is required: a partial recovery, where only one
// runtime in a multi-provider workspace was deleted, would otherwise produce
// duplicates for every provider that wasn't deleted.
//
// The workspaceState pointer is NEVER replaced (see syncWorkspacesFromAPI's
// invariant about repoRefreshMu). Only fields are mutated.
func (d *Daemon) reregisterWorkspaceAfterRuntimeGone(ctx context.Context, workspaceID string) error {
resp, err := d.registerRuntimesForWorkspace(ctx, workspaceID)
if err != nil {
return fmt.Errorf("register runtimes: %w", err)
}
newIDs := make([]string, 0, len(resp.Runtimes))
newIDSet := make(map[string]struct{}, len(resp.Runtimes))
for _, rt := range resp.Runtimes {
newIDs = append(newIDs, rt.ID)
newIDSet[rt.ID] = struct{}{}
}
d.mu.Lock()
ws, ok := d.workspaces[workspaceID]
if !ok {
d.mu.Unlock()
return fmt.Errorf("workspace %s no longer tracked", workspaceID)
}
// Drop runtimeIndex entries for prior runtime IDs that the server did not
// return — typically there are none for upsert-on-existing-provider, but
// a daemon config change (provider removed) would leak entries otherwise.
for _, oldID := range ws.runtimeIDs {
if _, kept := newIDSet[oldID]; !kept {
delete(d.runtimeIndex, oldID)
}
}
for _, rt := range resp.Runtimes {
d.runtimeIndex[rt.ID] = rt
}
// Response is authoritative — replace, do not append. Replacing also
// catches the rare case where UpsertAgentRuntime returns a different ID
// for a surviving provider (e.g. schema change); the daemon converges on
// what the server says without leaving stale heartbeat goroutines.
ws.runtimeIDs = newIDs
if resp.ReposVersion != "" {
ws.reposVersion = resp.ReposVersion
ws.allowedRepoURLs = repoAllowlist(resp.Repos)
}
if len(resp.Settings) > 0 {
ws.settings = resp.Settings
}
d.mu.Unlock()
for _, rid := range newIDs {
d.logger.Info("re-registered runtime after server-side deletion",
"workspace_id", workspaceID, "runtime_id", rid)
}
d.notifyRuntimeSetChanged()
// Tell the server about any tasks the previous (now-deleted) runtime
// was working on, mirroring the registration path's recover-orphans call.
for _, rid := range newIDs {
if err := d.client.RecoverOrphans(ctx, rid); err != nil {
d.logger.Warn("recover-orphans after re-register failed",
"runtime_id", rid, "error", err)
}
}
return nil
}
// runtimeSetWatcher is a tiny pub/sub for runtime-set changes. It exists
// because more than one supervisor (taskWakeupLoop, heartbeatLoop, pollLoop)
// needs to react to runtime-set changes; a single buffered channel would
@@ -226,6 +485,7 @@ func (d *Daemon) Run(ctx context.Context) error {
// Wrap context so handleUpdate can cancel the daemon for restart.
ctx, cancel := context.WithCancel(ctx)
d.cancelFunc = cancel
d.rootCtx = ctx
// Bind health port early to detect another running daemon.
healthLn, err := d.listenHealth()
@@ -338,12 +598,12 @@ func (d *Daemon) findRuntime(id string) *Runtime {
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) (*RegisterResponse, error) {
var runtimes []map[string]string
for name, entry := range d.cfg.Agents {
version, err := agent.DetectVersion(ctx, entry.Path)
version, err := detectAgentVersion(ctx, entry.Path)
if err != nil {
d.logger.Warn("skip registering runtime", "name", name, "error", err)
continue
}
if err := agent.CheckMinVersion(name, version); err != nil {
if err := checkAgentMinVersion(name, version); err != nil {
d.logger.Warn("skip registering runtime: version too old", "name", name, "version", version, "error", err)
continue
}
@@ -695,7 +955,21 @@ func (d *Daemon) syncWorkspacesFromAPI(ctx context.Context) error {
var removed int
for id, name := range apiIDs {
if currentIDs[id] {
continue // important: never replace existing workspaceState; ensureRepoReady holds ws.repoRefreshMu from the original pointer
// Already tracked: only intervene if the workspace lost all of
// its runtimes (most commonly because handleRuntimeGone pruned
// them and its inline re-register failed). The pointer is not
// replaced here either — ensureRepoReady holds repoRefreshMu
// from the original pointer.
if !d.workspaceNeedsRuntimeRecovery(id) {
continue
}
d.logger.Info("workspace has no runtimes; retrying registration", "workspace_id", id, "name", name)
if err := d.reregisterWorkspaceAfterRuntimeGone(ctx, id); err != nil {
d.logger.Warn("retry register failed", "workspace_id", id, "error", err)
continue
}
registered++
continue
}
resp, err := d.registerRuntimesForWorkspace(ctx, id)
if err != nil {
@@ -850,10 +1124,26 @@ func (d *Daemon) runHeartbeatTick(ctx context.Context, rid string) {
resp, err := d.client.SendHeartbeat(ctx, rid)
if err != nil {
if ctx.Err() == nil {
if isRuntimeNotFoundError(err) {
// Server says this runtime is gone — recover instead of
// looping on the dead UUID. handleRuntimeGone coalesces
// concurrent callers and runs the recovery HTTP call under
// the daemon root context so notifyRuntimeSetChanged
// tearing down this heartbeat goroutine cannot abort it.
go d.handleRuntimeGone(rid)
return
}
d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err)
}
return
}
if resp != nil && resp.RuntimeGone {
// The WS path returns a successful ack with RuntimeGone=true for the
// same scenario; treat it the same way here in case HTTP starts
// surfacing this signal too.
go d.handleRuntimeGone(rid)
return
}
d.handleHeartbeatActions(ctx, rid, resp)
}
@@ -1370,6 +1660,14 @@ func (d *Daemon) runRuntimePoller(
if err != nil {
sem <- slot
if pollerCtx.Err() == nil {
if isRuntimeNotFoundError(err) {
// Server says this runtime is gone — recover and exit
// the poller; the runtime-set watcher will tear this
// goroutine down via pollerCtx once the workspace is
// re-registered with a new runtime ID.
go d.handleRuntimeGone(rid)
return
}
d.logger.Warn("claim task failed", "runtime_id", rid, "error", err)
}
if err := sleepWithContextOrWakeup(pollerCtx, d.cfg.PollInterval, wakeup); err != nil {

View File

@@ -479,6 +479,87 @@ func TestIsTaskNotFoundError(t *testing.T) {
}
}
func TestIsRuntimeNotFoundError(t *testing.T) {
t.Parallel()
cases := []struct {
name string
err error
want bool
}{
{
name: "404 with runtime not found body from heartbeat",
err: &requestError{
Method: http.MethodPost,
Path: "/api/daemon/heartbeat",
StatusCode: http.StatusNotFound,
Body: `{"error":"runtime not found"}`,
},
want: true,
},
{
name: "404 with runtime not found body from claim",
err: &requestError{
Method: http.MethodPost,
Path: "/api/daemon/runtimes/abc/tasks/claim",
StatusCode: http.StatusNotFound,
Body: `{"error":"runtime not found"}`,
},
want: true,
},
{
name: "mixed-case body still matches",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"Runtime Not Found"}`,
},
want: true,
},
{
name: "500 with same body must NOT be treated as runtime-not-found",
err: &requestError{
StatusCode: http.StatusInternalServerError,
Body: `{"error":"runtime not found"}`,
},
want: false,
},
{
name: "404 with task-not-found body is not runtime-not-found",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"task not found"}`,
},
want: false,
},
{
name: "404 with workspace-not-found body is not runtime-not-found",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"workspace not found"}`,
},
want: false,
},
{
name: "non-requestError",
err: errors.New("network down"),
want: false,
},
{
name: "nil",
err: nil,
want: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := isRuntimeNotFoundError(tc.err); got != tc.want {
t.Fatalf("isRuntimeNotFoundError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestShouldInterruptAgent(t *testing.T) {
t.Parallel()

View File

@@ -0,0 +1,652 @@
package daemon
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// freshDaemon builds a Daemon with every map field the production New() seeds
// so callers can exercise handleRuntimeGone without going through Run.
func freshDaemon(serverURL string) *Daemon {
return &Daemon{
client: NewClient(serverURL),
logger: slog.New(slog.NewTextHandler(testNopWriter{}, &slog.HandlerOptions{Level: slog.LevelWarn})),
workspaces: make(map[string]*workspaceState),
runtimeIndex: make(map[string]Runtime),
runtimeSet: newRuntimeSetWatcher(),
agentVersions: make(map[string]string),
wsHBLastAck: make(map[string]time.Time),
activeEnvRoots: make(map[string]int),
runtimeGoneInflight: make(map[string]struct{}),
reregisterNextAttempt: make(map[string]time.Time),
}
}
// testNopWriter discards log output so tests don't spam stderr.
type testNopWriter struct{}
func (testNopWriter) Write(p []byte) (int, error) { return len(p), nil }
// stubAgentVersion swaps out the agent version probes that registerRuntimesForWorkspace
// would normally shell out for, and restores the production hooks on cleanup.
// Returns a no-op cleanup so callers can use t.Cleanup directly.
func stubAgentVersion(t *testing.T) func() {
t.Helper()
origDetect := detectAgentVersion
origCheck := checkAgentMinVersion
detectAgentVersion = func(_ context.Context, _ string) (string, error) {
return "9.9.9", nil
}
checkAgentMinVersion = func(_, _ string) error { return nil }
return func() {
detectAgentVersion = origDetect
checkAgentMinVersion = origCheck
}
}
func TestRemoveStaleRuntime_PrunesAllLocalState(t *testing.T) {
t.Parallel()
d := freshDaemon("")
ws := &workspaceState{
workspaceID: "ws-1",
runtimeIDs: []string{"rt-1", "rt-2", "rt-3"},
}
d.workspaces["ws-1"] = ws
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
d.runtimeIndex["rt-2"] = Runtime{ID: "rt-2"}
d.runtimeIndex["rt-3"] = Runtime{ID: "rt-3"}
d.wsHBLastAck["rt-2"] = time.Now()
workspaceID, removed := d.removeStaleRuntime("rt-2")
if !removed {
t.Fatalf("removeStaleRuntime: removed=false, want true")
}
if workspaceID != "ws-1" {
t.Fatalf("workspaceID = %q, want ws-1", workspaceID)
}
if got := ws.runtimeIDs; len(got) != 2 || got[0] != "rt-1" || got[1] != "rt-3" {
t.Fatalf("runtimeIDs = %v, want [rt-1 rt-3]", got)
}
if _, ok := d.runtimeIndex["rt-2"]; ok {
t.Fatalf("runtimeIndex still contains rt-2")
}
if _, ok := d.wsHBLastAck["rt-2"]; ok {
t.Fatalf("wsHBLastAck still contains rt-2")
}
}
func TestRemoveStaleRuntime_UnknownRuntimeIsNoop(t *testing.T) {
t.Parallel()
d := freshDaemon("")
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1"}}
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
workspaceID, removed := d.removeStaleRuntime("rt-unknown")
if removed {
t.Fatalf("removeStaleRuntime: removed=true for unknown id, want false")
}
if workspaceID != "" {
t.Fatalf("workspaceID = %q for unknown id, want empty", workspaceID)
}
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 {
t.Fatalf("unrelated workspace runtimeIDs mutated: %v", got)
}
}
func TestRemoveStaleRuntime_PreservesWorkspaceStatePointer(t *testing.T) {
t.Parallel()
// The Daemon contract is that workspaceState pointers must NEVER be
// replaced — only fields mutated — because ensureRepoReady holds a long
// repoRefreshMu through repo syncs. Regressing this turns concurrent
// repo refreshes into a deadlock against the wrong mutex copy. Guard it
// here so the invariant is observable in tests.
d := freshDaemon("")
original := &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1"}}
d.workspaces["ws-1"] = original
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
d.removeStaleRuntime("rt-1")
if d.workspaces["ws-1"] != original {
t.Fatalf("workspaceState pointer was replaced; ensureRepoReady's mutex assumption broken")
}
}
// handleRuntimeGoneFixture wires up a Daemon against a fake server that
// answers register/recover-orphans. registerCount is incremented exactly
// once per /api/daemon/register call so tests can assert on coalescing.
type handleRuntimeGoneFixture struct {
daemon *Daemon
server *httptest.Server
registerCount *atomic.Int64
}
func newHandleRuntimeGoneFixture(t *testing.T) *handleRuntimeGoneFixture {
t.Helper()
var registerCount atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "/api/daemon/register":
registerCount.Add(1)
// Each register call returns the same fresh runtime ID so
// downstream assertions can observe it.
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(RegisterResponse{
Runtimes: []Runtime{{ID: "rt-new", Name: "Claude", Provider: "claude", Status: "online"}},
Repos: []RepoData{},
})
case strings.HasSuffix(r.URL.Path, "/recover-orphans"):
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := freshDaemon(srv.URL)
// Attach a single configured agent so registerRuntimesForWorkspace would
// produce a non-empty request body. The fake server ignores the body,
// but the registerRuntimesForWorkspace pre-flight (DetectVersion) would
// otherwise reject the call.
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
// Replace the agent version probe so the test doesn't shell out.
t.Cleanup(stubAgentVersion(t))
return &handleRuntimeGoneFixture{daemon: d, server: srv, registerCount: &registerCount}
}
func TestHandleRuntimeGone_PrunesAndReregisters(t *testing.T) {
// Not t.Parallel: stubAgentVersion mutates package-level vars used by
// registerRuntimesForWorkspace. Other Parallel tests in this file that
// don't exercise registration are still parallel-safe.
fx := newHandleRuntimeGoneFixture(t)
d := fx.daemon
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
d.wsHBLastAck["rt-old"] = time.Now()
d.handleRuntimeGone("rt-old")
if got := d.runtimeIndex["rt-old"]; got.ID != "" {
t.Fatalf("rt-old still present in runtimeIndex: %+v", got)
}
if _, ok := d.runtimeIndex["rt-new"]; !ok {
t.Fatalf("rt-new not added to runtimeIndex after re-register")
}
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
t.Fatalf("workspace runtimeIDs after recovery = %v, want [rt-new]", got)
}
if _, ok := d.wsHBLastAck["rt-old"]; ok {
t.Fatalf("wsHBLastAck not cleared for rt-old")
}
if got := fx.registerCount.Load(); got != 1 {
t.Fatalf("register endpoint called %d times, want 1", got)
}
}
func TestHandleRuntimeGone_CoalescesConcurrentCallers(t *testing.T) {
// Not t.Parallel — stubAgentVersion via newHandleRuntimeGoneFixture.
// Three goroutines (heartbeat, poller, WS) may each detect the same
// stale runtime within the same beat. Exactly one re-register must
// reach the server.
fx := newHandleRuntimeGoneFixture(t)
d := fx.daemon
d.workspaces["ws-1"] = &workspaceState{
workspaceID: "ws-1",
runtimeIDs: []string{"rt-a", "rt-b", "rt-c"},
}
d.runtimeIndex["rt-a"] = Runtime{ID: "rt-a"}
d.runtimeIndex["rt-b"] = Runtime{ID: "rt-b"}
d.runtimeIndex["rt-c"] = Runtime{ID: "rt-c"}
var wg sync.WaitGroup
for _, rid := range []string{"rt-a", "rt-b", "rt-c"} {
wg.Add(1)
go func(id string) {
defer wg.Done()
d.handleRuntimeGone(id)
}(rid)
}
wg.Wait()
if got := fx.registerCount.Load(); got != 1 {
t.Fatalf("register endpoint called %d times under stampede, want 1", got)
}
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
t.Fatalf("workspace runtimeIDs after stampede = %v, want [rt-new]", got)
}
}
func TestHandleRuntimeGone_BackoffOnFailure(t *testing.T) {
// Not t.Parallel — stubAgentVersion.
// Failure path: the register endpoint returns 500 — exactly one attempt
// should make the round trip; subsequent immediate calls must be
// short-circuited by the failure backoff. This is the "don't replace
// log spam with register spam" guarantee.
var registerCount atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/daemon/register" {
registerCount.Add(1)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
d := freshDaemon(srv.URL)
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
t.Cleanup(stubAgentVersion(t))
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1", "rt-2"}}
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
d.runtimeIndex["rt-2"] = Runtime{ID: "rt-2"}
d.handleRuntimeGone("rt-1")
d.handleRuntimeGone("rt-2")
if got := registerCount.Load(); got != 1 {
t.Fatalf("register endpoint called %d times on failure path, want 1 (second call should be coalesced)", got)
}
// Local state pruning still happened for both, even though re-register
// failed: the workspace is now empty, which workspaceSyncLoop will
// retry on the next tick.
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 0 {
t.Fatalf("workspace runtimeIDs after failed recovery = %v, want []", got)
}
}
func TestHandleWSHeartbeatAck_RuntimeGoneTriggersRecovery(t *testing.T) {
// The WS path's twin of an HTTP 404 "runtime not found". When the server
// flags a runtime as gone, the daemon must NOT record a freshness mark
// — doing so would tell the HTTP heartbeat to skip its tick and let the
// daemon keep believing the runtime is alive.
fx := newHandleRuntimeGoneFixture(t)
d := fx.daemon
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
d.wsHBLastAck["rt-old"] = time.Now()
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{
RuntimeID: "rt-old",
Status: "runtime_gone",
RuntimeGone: true,
})
// handleRuntimeGone is fired asynchronously via `go`; spin briefly.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
d.mu.Lock()
_, stillOld := d.runtimeIndex["rt-old"]
_, gotNew := d.runtimeIndex["rt-new"]
d.mu.Unlock()
if !stillOld && gotNew {
break
}
time.Sleep(10 * time.Millisecond)
}
if _, stillOld := d.runtimeIndex["rt-old"]; stillOld {
t.Fatalf("rt-old not pruned after RuntimeGone ack")
}
if _, ok := d.wsHBLastAck["rt-old"]; ok {
t.Fatalf("WS freshness mark not cleared for gone runtime — HTTP heartbeat would skip its tick")
}
}
func TestHandleWSHeartbeatAck_NormalAckRecordsFreshness(t *testing.T) {
t.Parallel()
d := freshDaemon("")
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{
RuntimeID: "rt-1",
Status: "ok",
})
if !d.wsHeartbeatRecentlyAcked("rt-1") {
t.Fatalf("normal ack should record WS freshness for rt-1")
}
}
func TestHandleWSHeartbeatAck_EmptyAckIgnored(t *testing.T) {
t.Parallel()
d := freshDaemon("")
d.handleWSHeartbeatAck(context.Background(), nil)
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{RuntimeID: ""})
// Should not panic, should not record any state.
if len(d.wsHBLastAck) != 0 {
t.Fatalf("empty ack recorded state: %v", d.wsHBLastAck)
}
}
func TestWorkspaceNeedsRuntimeRecovery(t *testing.T) {
t.Parallel()
d := freshDaemon("")
d.workspaces["ws-empty"] = &workspaceState{workspaceID: "ws-empty"}
d.workspaces["ws-full"] = &workspaceState{workspaceID: "ws-full", runtimeIDs: []string{"rt-1"}}
if !d.workspaceNeedsRuntimeRecovery("ws-empty") {
t.Fatalf("ws-empty should need recovery")
}
if d.workspaceNeedsRuntimeRecovery("ws-full") {
t.Fatalf("ws-full should NOT need recovery")
}
if d.workspaceNeedsRuntimeRecovery("ws-unknown") {
t.Fatalf("untracked workspace should NOT need recovery")
}
}
// multiProviderRegisterFixture mirrors handleRuntimeGoneFixture but speaks the
// upsert semantics of UpsertAgentRuntime: surviving providers keep their
// runtime IDs across re-registers, deleted ones get a fresh ID. The fake
// server is the source of truth and rewrites its own knowledge of which
// providers are alive each time a runtime is deleted.
//
// markDeleted(rid) emulates a UI Delete by removing the row server-side and
// returning a brand-new ID for that provider on the next register call.
type multiProviderRegisterFixture struct {
daemon *Daemon
server *httptest.Server
registerCount *atomic.Int64
mu sync.Mutex
// providerToID maps provider -> current server-side runtime ID. The fake
// register handler reads/mutates this so the test reflects realistic
// upsert behavior.
providerToID map[string]string
idCounter int
}
func newMultiProviderRegisterFixture(t *testing.T, providers map[string]string) *multiProviderRegisterFixture {
t.Helper()
fx := &multiProviderRegisterFixture{
providerToID: make(map[string]string, len(providers)),
}
for p, id := range providers {
fx.providerToID[p] = id
}
var registerCount atomic.Int64
fx.registerCount = &registerCount
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "/api/daemon/register":
registerCount.Add(1)
fx.mu.Lock()
runtimes := make([]Runtime, 0, len(fx.providerToID))
for provider, id := range fx.providerToID {
if id == "" {
// Provider was marked deleted; mint a fresh ID
// (the UpsertAgentRuntime INSERT branch).
fx.idCounter++
id = fmt.Sprintf("%s-new-%d", provider, fx.idCounter)
fx.providerToID[provider] = id
}
runtimes = append(runtimes, Runtime{
ID: id, Name: provider, Provider: provider, Status: "online",
})
}
fx.mu.Unlock()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(RegisterResponse{
Runtimes: runtimes,
Repos: []RepoData{},
})
case strings.HasSuffix(r.URL.Path, "/recover-orphans"):
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := freshDaemon(srv.URL)
d.cfg.Agents = make(map[string]AgentEntry, len(providers))
for p := range providers {
d.cfg.Agents[p] = AgentEntry{Path: "/usr/bin/true"}
}
t.Cleanup(stubAgentVersion(t))
fx.daemon = d
fx.server = srv
return fx
}
// markDeleted simulates server-side runtime deletion: the next register call
// will mint a new ID for this provider, matching the UI Delete + re-register
// path's UpsertAgentRuntime INSERT branch.
func (fx *multiProviderRegisterFixture) markDeleted(provider string) {
fx.mu.Lock()
defer fx.mu.Unlock()
fx.providerToID[provider] = ""
}
func TestHandleRuntimeGone_PartialWorkspaceRecoveryKeepsSibling(t *testing.T) {
// Workspace has two providers, only one runtime is deleted. The siblings
// must NOT end up duplicated in workspaceState.runtimeIDs — that would
// leak through allRuntimeIDs(), deregister(), and re-recovery state.
// This is the regression test for Finding #3 (register response is
// authoritative for the workspace's runtime set, not an append).
fx := newMultiProviderRegisterFixture(t, map[string]string{
"claude": "rt-claude-1",
"codex": "rt-codex-1",
})
d := fx.daemon
d.workspaces["ws-1"] = &workspaceState{
workspaceID: "ws-1",
runtimeIDs: []string{"rt-claude-1", "rt-codex-1"},
}
d.runtimeIndex["rt-claude-1"] = Runtime{ID: "rt-claude-1", Provider: "claude"}
d.runtimeIndex["rt-codex-1"] = Runtime{ID: "rt-codex-1", Provider: "codex"}
// Only the claude runtime gets deleted server-side.
fx.markDeleted("claude")
d.handleRuntimeGone("rt-claude-1")
got := append([]string(nil), d.workspaces["ws-1"].runtimeIDs...)
if len(got) != 2 {
t.Fatalf("workspace runtimeIDs has %d entries after partial recovery; want 2; got %v", len(got), got)
}
// Set comparison: must contain rt-codex-1 (surviving) and a freshly
// minted claude id, with NO duplicates.
seen := make(map[string]int, len(got))
for _, id := range got {
seen[id]++
}
for id, count := range seen {
if count != 1 {
t.Fatalf("duplicate runtime id %q (count=%d) after partial recovery: %v", id, count, got)
}
}
if _, ok := seen["rt-codex-1"]; !ok {
t.Fatalf("surviving codex runtime missing from workspace state after recovery: %v", got)
}
if _, ok := seen["rt-claude-1"]; ok {
t.Fatalf("deleted claude runtime should not be in workspace state: %v", got)
}
// And the runtimeIndex must reflect the same: codex kept, claude-1 dropped.
if _, ok := d.runtimeIndex["rt-claude-1"]; ok {
t.Fatalf("rt-claude-1 still in runtimeIndex after deletion")
}
if _, ok := d.runtimeIndex["rt-codex-1"]; !ok {
t.Fatalf("rt-codex-1 dropped from runtimeIndex during partial recovery")
}
}
func TestHandleRuntimeGone_DistinctDeletionsWithinCoalesceWindowBothRecover(t *testing.T) {
// Two sequential, distinct runtime deletions in the same workspace fired
// within the 30s coalesce window. Each deletion must trigger its own
// re-register: success on call #1 must NOT suppress call #2. Regression
// for Finding #2 (success-case clear of reregisterNextAttempt).
fx := newMultiProviderRegisterFixture(t, map[string]string{
"claude": "rt-claude-1",
"codex": "rt-codex-1",
})
d := fx.daemon
d.workspaces["ws-1"] = &workspaceState{
workspaceID: "ws-1",
runtimeIDs: []string{"rt-claude-1", "rt-codex-1"},
}
d.runtimeIndex["rt-claude-1"] = Runtime{ID: "rt-claude-1", Provider: "claude"}
d.runtimeIndex["rt-codex-1"] = Runtime{ID: "rt-codex-1", Provider: "codex"}
// Sequential, NOT concurrent: the first call fully completes before the
// second starts, so the in-flight set never collides.
fx.markDeleted("claude")
d.handleRuntimeGone("rt-claude-1")
if got := fx.registerCount.Load(); got != 1 {
t.Fatalf("after first deletion: register called %d times, want 1", got)
}
// Inspect the new claude id the fake assigned, so we can detect that
// the second recovery actually ran register again.
fx.mu.Lock()
claudeIDAfterFirst := fx.providerToID["claude"]
fx.mu.Unlock()
// Now delete codex within the coalesce window (effectively t<1s after
// the first recovery), simulating a user deleting a second runtime
// shortly after the first.
fx.markDeleted("codex")
d.handleRuntimeGone("rt-codex-1")
if got := fx.registerCount.Load(); got != 2 {
t.Fatalf("after second distinct deletion: register called %d times, want 2 (coalesce window must clear on success)", got)
}
got := append([]string(nil), d.workspaces["ws-1"].runtimeIDs...)
if len(got) != 2 {
t.Fatalf("workspace runtimeIDs after both recoveries = %v, want 2 entries", got)
}
seen := make(map[string]int, len(got))
for _, id := range got {
seen[id]++
}
for id, count := range seen {
if count != 1 {
t.Fatalf("duplicate runtime id %q after sequential recoveries: %v", id, got)
}
}
if _, ok := seen[claudeIDAfterFirst]; !ok {
t.Fatalf("claude id from first recovery missing after second deletion of codex: have %v, expected to keep %q", got, claudeIDAfterFirst)
}
}
func TestHandleRuntimeGone_RecoveryContextSurvivesCallerCancellation(t *testing.T) {
// Regression for Finding #1: handleRuntimeGone must not use the per-
// runtime heartbeat ctx for the register HTTP call. notifyRuntimeSetChanged
// tears that ctx down as soon as we prune the dead runtime, so forwarding
// it would self-cancel the in-flight register.
//
// We assert by inspecting the register handler's request context: it
// must not be Done when the daemon's rootCtx is alive, regardless of what
// upstream contexts (heartbeat, poller, WS) are doing.
var observedCancelled atomic.Bool
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/daemon/register" {
// Inspect the inbound request ctx. If handleRuntimeGone had
// forwarded a cancelled caller ctx, this would be Done.
select {
case <-r.Context().Done():
observedCancelled.Store(true)
default:
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(RegisterResponse{
Runtimes: []Runtime{{ID: "rt-new", Name: "claude", Provider: "claude", Status: "online"}},
})
return
}
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
d := freshDaemon(srv.URL)
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
t.Cleanup(stubAgentVersion(t))
// rootCtx is what handleRuntimeGone uses for recovery. We keep it alive.
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
d.rootCtx = rootCtx
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
d.handleRuntimeGone("rt-old")
if observedCancelled.Load() {
t.Fatalf("register HTTP call ran with a cancelled context — recovery would self-cancel under runtime-set churn")
}
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
t.Fatalf("workspace runtimeIDs after recovery = %v, want [rt-new]", got)
}
}
func TestHandleRuntimeGone_RecoveryContextStopsOnDaemonShutdown(t *testing.T) {
// Companion to RecoveryContextSurvivesCallerCancellation: when the daemon
// IS shutting down, recovery must abort promptly instead of holding the
// HTTP call open until its 30s client timeout. We bound the server
// handler with a short safety timeout so test cleanup never hangs on a
// stuck connection — the assertion is on the daemon-side return time,
// not on server-side context propagation.
registerEntered := make(chan struct{}, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/daemon/register" {
select {
case registerEntered <- struct{}{}:
default:
}
select {
case <-r.Context().Done():
case <-time.After(2 * time.Second):
}
return
}
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
d := freshDaemon(srv.URL)
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
t.Cleanup(stubAgentVersion(t))
rootCtx, rootCancel := context.WithCancel(context.Background())
t.Cleanup(rootCancel)
d.rootCtx = rootCtx
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
done := make(chan struct{})
go func() {
d.handleRuntimeGone("rt-old")
close(done)
}()
select {
case <-registerEntered:
case <-time.After(2 * time.Second):
t.Fatalf("register endpoint was never reached")
}
rootCancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatalf("handleRuntimeGone did not abort after daemon root context cancellation")
}
}

View File

@@ -231,6 +231,30 @@ func marshalRaw(v any) json.RawMessage {
return data
}
// handleWSHeartbeatAck dispatches one heartbeat_ack received over the WS
// task-wakeup connection. Extracted from readTaskWakeupMessages so tests can
// exercise the branching logic without a real WebSocket.
//
// A RuntimeGone=true ack is the WebSocket twin of an HTTP 404 "runtime not
// found": it tells the daemon the runtime row was deleted server-side. We
// route it through the same self-heal entry point as the HTTP path and do
// NOT record a heartbeat freshness mark — pretending the runtime is alive
// would let HTTP keep skipping its own heartbeat against the dead UUID.
//
// handleRuntimeGone uses the daemon root context for its register call, so
// this function can safely pass any caller context here.
func (d *Daemon) handleWSHeartbeatAck(ctx context.Context, ack *HeartbeatResponse) {
if ack == nil || ack.RuntimeID == "" {
return
}
if ack.RuntimeGone {
go d.handleRuntimeGone(ack.RuntimeID)
return
}
d.recordWSHeartbeatAck(ack.RuntimeID)
d.handleHeartbeatActions(ctx, ack.RuntimeID, ack)
}
func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<- struct{}) error {
conn.SetReadLimit(64 * 1024)
for {
@@ -262,11 +286,7 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
d.logger.Debug("ws heartbeat ack invalid payload", "error", err)
continue
}
if ack.RuntimeID == "" {
continue
}
d.recordWSHeartbeatAck(ack.RuntimeID)
d.handleHeartbeatActions(context.Background(), ack.RuntimeID, &ack)
d.handleWSHeartbeatAck(context.Background(), &ack)
}
}
}

View File

@@ -54,6 +54,11 @@ func (h *Handler) requireDaemonWorkspaceAccess(w http.ResponseWriter, r *http.Re
}
// requireDaemonRuntimeAccess looks up a runtime and verifies the caller owns its workspace.
//
// Only pgx.ErrNoRows is treated as a real "runtime gone" 404 — the daemon uses
// that response to drop the stale runtime from its in-memory map and re-register,
// so collapsing transient DB errors into the same 404 would force the daemon to
// self-cleanup on a hiccup. Other DB errors become 500.
func (h *Handler) requireDaemonRuntimeAccess(w http.ResponseWriter, r *http.Request, runtimeID string) (db.AgentRuntime, bool) {
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
if !ok {
@@ -61,7 +66,12 @@ func (h *Handler) requireDaemonRuntimeAccess(w http.ResponseWriter, r *http.Requ
}
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
if isNotFound(err) {
writeError(w, http.StatusNotFound, "runtime not found")
return db.AgentRuntime{}, false
}
slog.Warn("get agent runtime failed", "runtime_id", runtimeID, "error", err)
writeError(w, http.StatusInternalServerError, "failed to load runtime")
return db.AgentRuntime{}, false
}
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(rt.WorkspaceID)) {
@@ -672,8 +682,18 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
rt, lookupErr := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
runtimeLookupMs = time.Since(lookupStart).Milliseconds()
if lookupErr != nil {
outcome = "runtime_not_found"
writeError(w, http.StatusNotFound, "runtime not found")
// Only pgx.ErrNoRows means the runtime row is gone. Daemon reads this
// 404 as a signal to drop the stale runtime locally; treating a
// transient DB error the same way would force daemons to self-cleanup
// on a hiccup.
if isNotFound(lookupErr) {
outcome = "runtime_not_found"
writeError(w, http.StatusNotFound, "runtime not found")
return
}
outcome = "runtime_lookup_error"
slog.Warn("get agent runtime failed", "runtime_id", req.RuntimeID, "error", lookupErr)
writeError(w, http.StatusInternalServerError, "failed to load runtime")
return
}
wsCheckStart := time.Now()
@@ -729,6 +749,13 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
// Workspace authorization is re-checked on every heartbeat instead of trusted
// from the upgrade-time check because runtime ownership can change (e.g. a
// runtime is reassigned to another workspace mid-connection).
//
// When the runtime row is missing (pgx.ErrNoRows), the function returns a
// successful ack with Status=HeartbeatStatusRuntimeGone and RuntimeGone=true
// instead of an error. That keeps the hub from logging every beat at Warn,
// and tells the daemon to drop the stale runtime and re-register. Other DB
// errors still propagate as errors so they keep their existing Warn logging
// and the daemon does not mistake a hiccup for a deletion.
func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws.ClientIdentity, runtimeID string) (*protocol.DaemonHeartbeatAckPayload, error) {
runtimeUUID, err := util.ParseUUID(runtimeID)
if err != nil {
@@ -736,7 +763,14 @@ func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws
}
rt, err := h.Queries.GetAgentRuntime(ctx, runtimeUUID)
if err != nil {
return nil, fmt.Errorf("runtime not found: %w", err)
if isNotFound(err) {
return &protocol.DaemonHeartbeatAckPayload{
RuntimeID: runtimeID,
Status: protocol.HeartbeatStatusRuntimeGone,
RuntimeGone: true,
}, nil
}
return nil, fmt.Errorf("get agent runtime: %w", err)
}
if identity.WorkspaceID != "" && identity.WorkspaceID != uuidToString(rt.WorkspaceID) {
return nil, fmt.Errorf("runtime not in connection workspace")

View File

@@ -12,9 +12,12 @@ import (
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/middleware"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside
@@ -185,6 +188,63 @@ func TestDaemonHeartbeat_WithDaemonToken_CrossWorkspace(t *testing.T) {
}
}
// TestHandleDaemonWSHeartbeat_RuntimeGoneReturnsAckNotError pins the fix for
// issue #2391: when GetAgentRuntime returns pgx.ErrNoRows (runtime row was
// deleted server-side), the WS handler must return a successful ack with
// RuntimeGone=true rather than an error. Returning an error makes the WS hub
// log every beat at Warn — the flood the issue is about.
func TestHandleDaemonWSHeartbeat_RuntimeGoneReturnsAckNotError(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
// A well-formed UUID that does NOT exist in agent_runtime. The handler
// must turn the resulting pgx.ErrNoRows into a RuntimeGone ack.
missingRuntime := uuid.New().String()
ack, err := testHandler.HandleDaemonWSHeartbeat(context.Background(),
daemonws.ClientIdentity{WorkspaceID: testWorkspaceID},
missingRuntime)
if err != nil {
t.Fatalf("HandleDaemonWSHeartbeat: unexpected error %v", err)
}
if ack == nil {
t.Fatal("HandleDaemonWSHeartbeat: nil ack for missing runtime")
}
if !ack.RuntimeGone {
t.Fatalf("ack.RuntimeGone = false, want true")
}
if ack.Status != protocol.HeartbeatStatusRuntimeGone {
t.Fatalf("ack.Status = %q, want %q", ack.Status, protocol.HeartbeatStatusRuntimeGone)
}
if ack.RuntimeID != missingRuntime {
t.Fatalf("ack.RuntimeID = %q, want %q", ack.RuntimeID, missingRuntime)
}
}
// TestDaemonHeartbeat_HTTPRuntimeGoneReturns404 pins the HTTP-path mirror:
// pgx.ErrNoRows on the runtime lookup is the only DB error mapped to 404.
// Anything else (transient pool issue, schema mismatch, ...) must surface
// as 500 so the daemon does not mistake a hiccup for a deletion.
func TestDaemonHeartbeat_HTTPRuntimeGoneReturns404(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
missingRuntime := uuid.New().String()
w := httptest.NewRecorder()
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
"runtime_id": missingRuntime,
}, testWorkspaceID, "test-daemon")
testHandler.DaemonHeartbeat(w, req)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404 for missing runtime, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(strings.ToLower(w.Body.String()), "runtime not found") {
t.Fatalf("expected 'runtime not found' body, got %s", w.Body.String())
}
}
// TestDaemonHeartbeat_SlowProbeDoesNotWedge pins the invariant that a stalled
// HasPending probe cannot wedge the heartbeat endpoint past the per-probe
// timeout. The probe is the only bounded call; PopPending is ack-safe-

View File

@@ -1,13 +1,56 @@
package middleware
import (
"bytes"
"log/slog"
"net/http"
"strings"
"time"
chimw "github.com/go-chi/chi/v5/middleware"
)
// boundedBuffer captures up to Cap bytes from a stream then silently drops the
// rest. Used by RequestLogger so a large response body cannot blow up logger
// memory while we mirror just enough bytes to classify the response.
type boundedBuffer struct {
buf bytes.Buffer
cap int
}
func (b *boundedBuffer) Write(p []byte) (int, error) {
remain := b.cap - b.buf.Len()
if remain <= 0 {
return len(p), nil
}
if len(p) > remain {
b.buf.Write(p[:remain])
return len(p), nil
}
b.buf.Write(p)
return len(p), nil
}
func (b *boundedBuffer) Bytes() []byte { return b.buf.Bytes() }
// softNotFoundBodyCaptureLimit is the maximum number of body bytes the
// request logger inspects to decide whether a 404 is an expected stale-state
// signal (runtime/task deleted server-side). The JSON error envelope is small
// — 256 bytes is enough to see the "error" field — and the cap means an
// unbounded handler body cannot blow up logger memory.
const softNotFoundBodyCaptureLimit = 256
// softNotFoundMarkers are 404 response bodies the daemon emits routinely as
// part of normal lifecycle events: a runtime deleted from the UI, a task GC'd
// after an issue was removed, etc. Logging these at Warn turned production
// stderr into a flood whenever a runtime was deleted (see issue #2391). They
// stay machine-recognizable at Info, while genuine 4xx (wrong path, bad
// auth, real bugs) keep Warn.
var softNotFoundMarkers = []string{
"runtime not found",
"task not found",
}
// RequestLogger is a structured HTTP request logger using slog.
// It replaces Chi's built-in chimw.Logger with colored, structured output.
func RequestLogger(next http.Handler) http.Handler {
@@ -21,6 +64,12 @@ func RequestLogger(next http.Handler) http.Handler {
start := time.Now()
ww := chimw.NewWrapResponseWriter(w, r.ProtoMajor)
// Capture a small body prefix so 404s can be classified by content.
// chimw.WrapResponseWriter exposes Tee for exactly this — the body
// keeps flowing to the client; we mirror up to N bytes for inspection.
bodyPrefix := &boundedBuffer{cap: softNotFoundBodyCaptureLimit}
ww.Tee(bodyPrefix)
next.ServeHTTP(ww, r)
duration := time.Since(start)
@@ -53,6 +102,12 @@ func RequestLogger(next http.Handler) http.Handler {
switch {
case status >= 500:
slog.Error("http request", attrs...)
case status == http.StatusNotFound && isSoftNotFound(bodyPrefix.Bytes()):
// Lifecycle 404 — runtime/task was deleted server-side. The daemon
// catches this exact body and triggers its own self-heal, so it is
// neither noise nor a bug; logging at Info keeps the signal in
// structured logs without flooding the warn channel.
slog.Info("http request", attrs...)
case status >= 400:
slog.Warn("http request", attrs...)
default:
@@ -60,3 +115,18 @@ func RequestLogger(next http.Handler) http.Handler {
}
})
}
// isSoftNotFound reports whether the captured response body matches one of
// the expected stale-state 404 signals listed in softNotFoundMarkers.
func isSoftNotFound(body []byte) bool {
if len(body) == 0 {
return false
}
lower := strings.ToLower(string(body))
for _, marker := range softNotFoundMarkers {
if strings.Contains(lower, marker) {
return true
}
}
return false
}

View File

@@ -0,0 +1,160 @@
package middleware
import (
"bytes"
"context"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
)
// withCapturedLogs swaps the default slog logger for one that writes to buf,
// then restores it on cleanup. Returns the buffer so tests can inspect what
// RequestLogger emitted.
//
// Uses a shared mutex because t.Parallel tests would otherwise race on the
// global slog.Default — tests in this file intentionally do NOT run in
// parallel for that reason.
var defaultLoggerMu sync.Mutex
func withCapturedLogs(t *testing.T) *bytes.Buffer {
t.Helper()
defaultLoggerMu.Lock()
buf := &bytes.Buffer{}
orig := slog.Default()
slog.SetDefault(slog.New(slog.NewTextHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})))
t.Cleanup(func() {
slog.SetDefault(orig)
defaultLoggerMu.Unlock()
})
return buf
}
func runRequestLogger(t *testing.T, status int, body string) *bytes.Buffer {
t.Helper()
logs := withCapturedLogs(t)
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(status)
_, _ = w.Write([]byte(body))
}))
req := httptest.NewRequest(http.MethodPost, "/api/daemon/heartbeat", nil).
WithContext(context.Background())
handler.ServeHTTP(httptest.NewRecorder(), req)
return logs
}
// requireLogLevel asserts that the captured output contains exactly the
// expected slog level prefix and not any of the disallowed ones.
func requireLogLevel(t *testing.T, logs *bytes.Buffer, want string, disallowed ...string) {
t.Helper()
out := logs.String()
if !strings.Contains(out, "level="+want) {
t.Fatalf("expected level=%s in logs, got:\n%s", want, out)
}
for _, dis := range disallowed {
if strings.Contains(out, "level="+dis) {
t.Fatalf("did not expect level=%s in logs, got:\n%s", dis, out)
}
}
}
func TestRequestLogger_RuntimeNotFound404DowngradesToInfo(t *testing.T) {
// The whole reason this middleware change exists: a flood of WRN lines
// after a runtime is deleted (issue #2391). The daemon catches the same
// body and self-heals, so the line is signal-not-noise.
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"runtime not found"}`)
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
}
func TestRequestLogger_TaskNotFound404DowngradesToInfo(t *testing.T) {
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"task not found"}`)
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
}
func TestRequestLogger_GenericNotFound404KeepsWarn(t *testing.T) {
// A 404 with an unfamiliar body is still a real 404 — most likely a
// daemon hitting a wrong path, which is what Warn is for. We do NOT
// want to downgrade these blindly.
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"not found"}`)
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
}
func TestRequestLogger_400StaysWarn(t *testing.T) {
logs := runRequestLogger(t, http.StatusBadRequest, `{"error":"bad input"}`)
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
}
func TestRequestLogger_500StaysError(t *testing.T) {
logs := runRequestLogger(t, http.StatusInternalServerError, `{"error":"boom"}`)
requireLogLevel(t, logs, "ERROR", "WARN", "INFO")
}
func TestRequestLogger_200StaysInfo(t *testing.T) {
logs := runRequestLogger(t, http.StatusOK, `{"ok":true}`)
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
}
func TestRequestLogger_HealthEndpointIsSkipped(t *testing.T) {
logs := withCapturedLogs(t)
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
req := httptest.NewRequest(http.MethodGet, "/health", nil)
handler.ServeHTTP(httptest.NewRecorder(), req)
if logs.Len() != 0 {
t.Fatalf("/health should not be logged, got:\n%s", logs.String())
}
}
func TestRequestLogger_BodyStillReachesClient(t *testing.T) {
// The body capture is implemented via Tee, which must mirror writes
// rather than swallow them. Regress-protect: assert the response writer
// still gets the full body.
rec := httptest.NewRecorder()
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"runtime not found"}`))
}))
_ = withCapturedLogs(t)
handler.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/api/daemon/heartbeat", nil))
if got := rec.Body.String(); got != `{"error":"runtime not found"}` {
t.Fatalf("response body lost or mutated: got %q", got)
}
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want 404", rec.Code)
}
}
func TestRequestLogger_LargeBodyBeyondCaptureLimit(t *testing.T) {
// If the soft-404 marker only appears beyond the capture limit we
// intentionally keep Warn — capturing arbitrary-size bodies is the
// memory blowup we are guarding against. This test pins that
// trade-off.
prefix := strings.Repeat("x", softNotFoundBodyCaptureLimit+8)
logs := runRequestLogger(t, http.StatusNotFound, prefix+`{"error":"runtime not found"}`)
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
}
func TestIsSoftNotFound(t *testing.T) {
t.Parallel()
cases := []struct {
body string
want bool
}{
{`{"error":"runtime not found"}`, true},
{`{"error":"task not found"}`, true},
{`{"error":"Runtime Not Found"}`, true},
{`{"error":"not found"}`, false},
{`{"error":"workspace not found"}`, false},
{"", false},
}
for _, tc := range cases {
if got := isSoftNotFound([]byte(tc.body)); got != tc.want {
t.Errorf("isSoftNotFound(%q) = %v, want %v", tc.body, got, tc.want)
}
}
}

View File

@@ -104,15 +104,28 @@ type DaemonHeartbeatRequestPayload struct {
// DaemonHeartbeatAckPayload is the server's reply to DaemonHeartbeatRequestPayload.
// JSON shape mirrors the HTTP heartbeat response so daemon code can decode either.
//
// RuntimeGone is the WebSocket replacement for the HTTP 404 "runtime not found"
// response. When the server discovers the runtime row was deleted (UI delete,
// 7-day offline GC), it sends back an ack with Status=HeartbeatStatusRuntimeGone
// and RuntimeGone=true rather than tearing down the connection with an error.
// The daemon reads this signal, prunes the stale runtime from its local state
// and re-registers; without it the dead UUID would keep heartbeating until the
// daemon process restarts.
type DaemonHeartbeatAckPayload struct {
RuntimeID string `json:"runtime_id"`
Status string `json:"status"`
RuntimeGone bool `json:"runtime_gone,omitempty"`
PendingUpdate *DaemonHeartbeatPendingUpdate `json:"pending_update,omitempty"`
PendingModelList *DaemonHeartbeatPendingModelList `json:"pending_model_list,omitempty"`
PendingLocalSkills *DaemonHeartbeatPendingLocalSkills `json:"pending_local_skills,omitempty"`
PendingLocalSkillImport *DaemonHeartbeatPendingLocalSkillImport `json:"pending_local_skill_import,omitempty"`
}
// HeartbeatStatusRuntimeGone is the ack Status used when the runtime row no
// longer exists server-side. Companion to DaemonHeartbeatAckPayload.RuntimeGone.
const HeartbeatStatusRuntimeGone = "runtime_gone"
// DaemonHeartbeatPendingUpdate describes a CLI-update action the daemon
// should run for the runtime.
type DaemonHeartbeatPendingUpdate struct {