mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-25 00:19:29 +02:00
Compare commits
2 Commits
v0.3.28
...
agent/j/63
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d596f4d883 | ||
|
|
6bddcd2c34 |
@@ -55,6 +55,26 @@ func isTaskNotFoundError(err error) bool {
|
||||
return strings.Contains(strings.ToLower(reqErr.Body), "task not found")
|
||||
}
|
||||
|
||||
// isRuntimeNotFoundError returns true if the error is a 404 with "runtime not
|
||||
// found" body. The daemon uses this to detect that the runtime row was deleted
|
||||
// server-side (UI Delete, 7-day offline GC) while the daemon was still
|
||||
// heartbeating against the dead UUID, so it can prune the stale runtime from
|
||||
// its local state and re-register instead of looping on the dead ID forever.
|
||||
//
|
||||
// Server-side, this body is paired with pgx.ErrNoRows specifically (other DB
|
||||
// errors return 500), so a transient DB hiccup cannot make the daemon
|
||||
// self-cleanup.
|
||||
func isRuntimeNotFoundError(err error) bool {
|
||||
var reqErr *requestError
|
||||
if !errors.As(err, &reqErr) {
|
||||
return false
|
||||
}
|
||||
if reqErr.StatusCode != http.StatusNotFound {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(strings.ToLower(reqErr.Body), "runtime not found")
|
||||
}
|
||||
|
||||
// Client handles HTTP communication with the Multica server daemon API.
|
||||
type Client struct {
|
||||
baseURL string
|
||||
|
||||
@@ -30,6 +30,13 @@ var (
|
||||
isBrewInstall = cli.IsBrewInstall
|
||||
getBrewPrefix = cli.GetBrewPrefix
|
||||
matchKnownBrewPrefix = cli.MatchKnownBrewPrefix
|
||||
|
||||
// detectAgentVersion / checkAgentMinVersion are indirections over the
|
||||
// real agent helpers so tests can run the registration path without
|
||||
// shelling out to a real CLI. Mirrors the pattern used for the brew
|
||||
// helpers above.
|
||||
detectAgentVersion = agent.DetectVersion
|
||||
checkAgentMinVersion = agent.CheckMinVersion
|
||||
)
|
||||
|
||||
// workspaceState tracks registered runtimes for a single workspace.
|
||||
@@ -75,7 +82,16 @@ type Daemon struct {
|
||||
wsHBMu sync.RWMutex // guards wsHBLastAck
|
||||
wsHBLastAck map[string]time.Time // runtime_id -> last successful WS heartbeat ack timestamp
|
||||
|
||||
// runtimeGoneMu guards runtimeGoneInflight and reregisterNextAttempt. The
|
||||
// state lets heartbeat / poller / WS-ack handlers converge on a single
|
||||
// recovery path when they each detect that a runtime row was deleted
|
||||
// server-side without three of them stampeding registerRuntimesForWorkspace.
|
||||
runtimeGoneMu sync.Mutex
|
||||
runtimeGoneInflight map[string]struct{} // runtime_id -> currently recovering
|
||||
reregisterNextAttempt map[string]time.Time // workspace_id -> earliest time the next re-register attempt may run
|
||||
|
||||
cancelFunc context.CancelFunc // set by Run(); called by triggerRestart
|
||||
rootCtx context.Context // set by Run(); used by long-running recoveries that must survive per-runtime ctx cancellation
|
||||
restartBinary string // non-empty after a successful update; path to the new binary
|
||||
updating atomic.Bool // prevents concurrent update attempts
|
||||
activeTasks atomic.Int64 // number of tasks currently in handleTask; exposed via /health
|
||||
@@ -99,16 +115,18 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
// server can split logs/metrics by client version (parallel to the CLI).
|
||||
client.SetVersion(cfg.CLIVersion)
|
||||
return &Daemon{
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
repoCache: repocache.New(cacheRoot, logger),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSet: newRuntimeSetWatcher(),
|
||||
agentVersions: make(map[string]string),
|
||||
wsHBLastAck: make(map[string]time.Time),
|
||||
activeEnvRoots: make(map[string]int),
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
repoCache: repocache.New(cacheRoot, logger),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSet: newRuntimeSetWatcher(),
|
||||
agentVersions: make(map[string]string),
|
||||
wsHBLastAck: make(map[string]time.Time),
|
||||
activeEnvRoots: make(map[string]int),
|
||||
runtimeGoneInflight: make(map[string]struct{}),
|
||||
reregisterNextAttempt: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,6 +150,247 @@ func (d *Daemon) notifyRuntimeSetChanged() {
|
||||
d.runtimeSet.notify()
|
||||
}
|
||||
|
||||
// reregisterCoalesceWindow caps how often the daemon re-registers a workspace
|
||||
// after detecting a runtime_not_found response. Many stale runtime IDs may be
|
||||
// reported within seconds of each other (one delete clears all of a daemon's
|
||||
// runtimes), and a single re-register call replaces every runtime in the
|
||||
// workspace, so concurrent recoveries must collapse to one API call.
|
||||
const reregisterCoalesceWindow = 30 * time.Second
|
||||
|
||||
// reregisterFailureBackoff is the additional wait inserted before the next
|
||||
// re-register attempt when the previous one failed. This prevents heartbeat
|
||||
// ticks (~15s) from converting a server-side log flood into a re-register
|
||||
// flood when re-registration itself is failing (workspace removed, server
|
||||
// unreachable, ...).
|
||||
const reregisterFailureBackoff = 60 * time.Second
|
||||
|
||||
// handleRuntimeGone is the single recovery entry point shared by the HTTP
|
||||
// heartbeat path, the runtime poller, and the WebSocket runtime_gone ack
|
||||
// handler. All three may notice the same stale runtime within a few ms of
|
||||
// each other, so this function:
|
||||
//
|
||||
// - keys an in-flight set on runtimeID to drop concurrent calls for the same
|
||||
// ID after the first one is already cleaning up; and
|
||||
// - keys a per-workspace next-attempt timestamp on workspaceID so that
|
||||
// concurrent recoveries triggered by the SAME initial event coalesce to a
|
||||
// single registerRuntimesForWorkspace call. The slot is cleared on success
|
||||
// so a later distinct runtime deletion in the same workspace can trigger
|
||||
// its own recovery without waiting for the coalesce window to expire.
|
||||
//
|
||||
// On failure of the underlying re-register, the next-attempt timestamp is
|
||||
// extended by reregisterFailureBackoff so we don't replace a server-side log
|
||||
// flood with a daemon-side register flood. workspaceSyncLoop will retry
|
||||
// independently every DefaultWorkspaceSyncInterval as a safety net.
|
||||
//
|
||||
// The recovery HTTP call uses the daemon root context, not the caller's. The
|
||||
// heartbeat path's per-runtime ctx is cancelled by notifyRuntimeSetChanged the
|
||||
// moment we prune the dead UUID, and if we forwarded that ctx the in-flight
|
||||
// register would self-cancel mid-flight.
|
||||
func (d *Daemon) handleRuntimeGone(runtimeID string) {
|
||||
if runtimeID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Stampede control per runtime ID.
|
||||
d.runtimeGoneMu.Lock()
|
||||
if _, inflight := d.runtimeGoneInflight[runtimeID]; inflight {
|
||||
d.runtimeGoneMu.Unlock()
|
||||
return
|
||||
}
|
||||
d.runtimeGoneInflight[runtimeID] = struct{}{}
|
||||
d.runtimeGoneMu.Unlock()
|
||||
defer func() {
|
||||
d.runtimeGoneMu.Lock()
|
||||
delete(d.runtimeGoneInflight, runtimeID)
|
||||
d.runtimeGoneMu.Unlock()
|
||||
}()
|
||||
|
||||
workspaceID, removed := d.removeStaleRuntime(runtimeID)
|
||||
if !removed {
|
||||
// Already gone from local state — a parallel recovery already
|
||||
// cleaned this up, or workspaceSyncLoop pruned the whole workspace.
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Info("runtime deleted server-side; pruned from local state",
|
||||
"runtime_id", runtimeID, "workspace_id", workspaceID)
|
||||
d.notifyRuntimeSetChanged()
|
||||
|
||||
// Per-workspace coalescing: claim the slot atomically. The first caller
|
||||
// past this check is the only one that will run
|
||||
// registerRuntimesForWorkspace while the coalesce window is open. We
|
||||
// clear the slot on success so a separate later deletion in the same
|
||||
// workspace is NOT suppressed; the inflight set above is what keeps two
|
||||
// callers from racing the same recovery.
|
||||
now := time.Now()
|
||||
d.runtimeGoneMu.Lock()
|
||||
if next, ok := d.reregisterNextAttempt[workspaceID]; ok && now.Before(next) {
|
||||
d.runtimeGoneMu.Unlock()
|
||||
d.logger.Debug("skip re-register: coalescing with recent attempt",
|
||||
"workspace_id", workspaceID)
|
||||
return
|
||||
}
|
||||
d.reregisterNextAttempt[workspaceID] = now.Add(reregisterCoalesceWindow)
|
||||
d.runtimeGoneMu.Unlock()
|
||||
|
||||
if err := d.reregisterWorkspaceAfterRuntimeGone(d.recoveryContext(), workspaceID); err != nil {
|
||||
d.runtimeGoneMu.Lock()
|
||||
d.reregisterNextAttempt[workspaceID] = time.Now().Add(reregisterFailureBackoff)
|
||||
d.runtimeGoneMu.Unlock()
|
||||
// Logged at Warn (not Error) because workspaceSyncLoop retries
|
||||
// independently every DefaultWorkspaceSyncInterval, so a transient
|
||||
// failure here is not a stuck state — just an extra wait.
|
||||
d.logger.Warn("re-register after runtime gone failed",
|
||||
"workspace_id", workspaceID, "error", err)
|
||||
return
|
||||
}
|
||||
// Success: clear the coalesce slot so a future distinct runtime deletion
|
||||
// in this workspace can trigger its own recovery immediately. The
|
||||
// inflight set on runtimeID still prevents same-event stampedes.
|
||||
d.runtimeGoneMu.Lock()
|
||||
delete(d.reregisterNextAttempt, workspaceID)
|
||||
d.runtimeGoneMu.Unlock()
|
||||
}
|
||||
|
||||
// recoveryContext returns the daemon root context for long-running recovery
|
||||
// HTTP calls (re-register, recover-orphans) that must survive the heartbeat
|
||||
// loop tearing down a per-runtime context. Falls back to Background when the
|
||||
// daemon was not started via Run(), e.g. unit-test fixtures.
|
||||
func (d *Daemon) recoveryContext() context.Context {
|
||||
if d.rootCtx != nil {
|
||||
return d.rootCtx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
// removeStaleRuntime drops a runtime ID from its owning workspace's runtimeIDs
|
||||
// list, the daemon-level runtimeIndex, and the WS heartbeat freshness map.
|
||||
// Returns the workspace ID and true if the runtime was tracked, "" and false
|
||||
// otherwise.
|
||||
//
|
||||
// Callers must NOT replace workspaceState pointers — only mutate fields in
|
||||
// place — because ensureRepoReady holds workspaceState.repoRefreshMu through
|
||||
// long repo-sync calls. See syncWorkspacesFromAPI for the same invariant.
|
||||
func (d *Daemon) removeStaleRuntime(runtimeID string) (string, bool) {
|
||||
d.mu.Lock()
|
||||
var workspaceID string
|
||||
for wsID, ws := range d.workspaces {
|
||||
found := false
|
||||
filtered := ws.runtimeIDs[:0:0]
|
||||
for _, rid := range ws.runtimeIDs {
|
||||
if rid == runtimeID {
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, rid)
|
||||
}
|
||||
if found {
|
||||
ws.runtimeIDs = filtered
|
||||
workspaceID = wsID
|
||||
break
|
||||
}
|
||||
}
|
||||
if workspaceID == "" {
|
||||
d.mu.Unlock()
|
||||
return "", false
|
||||
}
|
||||
delete(d.runtimeIndex, runtimeID)
|
||||
d.mu.Unlock()
|
||||
|
||||
d.wsHBMu.Lock()
|
||||
delete(d.wsHBLastAck, runtimeID)
|
||||
d.wsHBMu.Unlock()
|
||||
|
||||
return workspaceID, true
|
||||
}
|
||||
|
||||
// workspaceNeedsRuntimeRecovery reports whether a tracked workspace currently
|
||||
// has zero runtime IDs — the state reached when handleRuntimeGone pruned every
|
||||
// runtime and its inline re-register failed. workspaceSyncLoop calls this on
|
||||
// each tick so the workspace can recover without waiting for an external
|
||||
// trigger.
|
||||
func (d *Daemon) workspaceNeedsRuntimeRecovery(workspaceID string) bool {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
ws, ok := d.workspaces[workspaceID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return len(ws.runtimeIDs) == 0
|
||||
}
|
||||
|
||||
// reregisterWorkspaceAfterRuntimeGone calls registerRuntimesForWorkspace and
|
||||
// updates the existing workspaceState in place. The register response is
|
||||
// authoritative for this workspace's runtime set — every configured provider
|
||||
// is included, with UpsertAgentRuntime returning the same row ID for surviving
|
||||
// providers and a fresh ID for any that were deleted server-side. Replacing
|
||||
// (rather than appending) is required: a partial recovery, where only one
|
||||
// runtime in a multi-provider workspace was deleted, would otherwise produce
|
||||
// duplicates for every provider that wasn't deleted.
|
||||
//
|
||||
// The workspaceState pointer is NEVER replaced (see syncWorkspacesFromAPI's
|
||||
// invariant about repoRefreshMu). Only fields are mutated.
|
||||
func (d *Daemon) reregisterWorkspaceAfterRuntimeGone(ctx context.Context, workspaceID string) error {
|
||||
resp, err := d.registerRuntimesForWorkspace(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("register runtimes: %w", err)
|
||||
}
|
||||
|
||||
newIDs := make([]string, 0, len(resp.Runtimes))
|
||||
newIDSet := make(map[string]struct{}, len(resp.Runtimes))
|
||||
for _, rt := range resp.Runtimes {
|
||||
newIDs = append(newIDs, rt.ID)
|
||||
newIDSet[rt.ID] = struct{}{}
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
ws, ok := d.workspaces[workspaceID]
|
||||
if !ok {
|
||||
d.mu.Unlock()
|
||||
return fmt.Errorf("workspace %s no longer tracked", workspaceID)
|
||||
}
|
||||
// Drop runtimeIndex entries for prior runtime IDs that the server did not
|
||||
// return — typically there are none for upsert-on-existing-provider, but
|
||||
// a daemon config change (provider removed) would leak entries otherwise.
|
||||
for _, oldID := range ws.runtimeIDs {
|
||||
if _, kept := newIDSet[oldID]; !kept {
|
||||
delete(d.runtimeIndex, oldID)
|
||||
}
|
||||
}
|
||||
for _, rt := range resp.Runtimes {
|
||||
d.runtimeIndex[rt.ID] = rt
|
||||
}
|
||||
// Response is authoritative — replace, do not append. Replacing also
|
||||
// catches the rare case where UpsertAgentRuntime returns a different ID
|
||||
// for a surviving provider (e.g. schema change); the daemon converges on
|
||||
// what the server says without leaving stale heartbeat goroutines.
|
||||
ws.runtimeIDs = newIDs
|
||||
if resp.ReposVersion != "" {
|
||||
ws.reposVersion = resp.ReposVersion
|
||||
ws.allowedRepoURLs = repoAllowlist(resp.Repos)
|
||||
}
|
||||
if len(resp.Settings) > 0 {
|
||||
ws.settings = resp.Settings
|
||||
}
|
||||
d.mu.Unlock()
|
||||
|
||||
for _, rid := range newIDs {
|
||||
d.logger.Info("re-registered runtime after server-side deletion",
|
||||
"workspace_id", workspaceID, "runtime_id", rid)
|
||||
}
|
||||
d.notifyRuntimeSetChanged()
|
||||
|
||||
// Tell the server about any tasks the previous (now-deleted) runtime
|
||||
// was working on, mirroring the registration path's recover-orphans call.
|
||||
for _, rid := range newIDs {
|
||||
if err := d.client.RecoverOrphans(ctx, rid); err != nil {
|
||||
d.logger.Warn("recover-orphans after re-register failed",
|
||||
"runtime_id", rid, "error", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runtimeSetWatcher is a tiny pub/sub for runtime-set changes. It exists
|
||||
// because more than one supervisor (taskWakeupLoop, heartbeatLoop, pollLoop)
|
||||
// needs to react to runtime-set changes; a single buffered channel would
|
||||
@@ -226,6 +485,7 @@ func (d *Daemon) Run(ctx context.Context) error {
|
||||
// Wrap context so handleUpdate can cancel the daemon for restart.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
d.cancelFunc = cancel
|
||||
d.rootCtx = ctx
|
||||
|
||||
// Bind health port early to detect another running daemon.
|
||||
healthLn, err := d.listenHealth()
|
||||
@@ -338,12 +598,12 @@ func (d *Daemon) findRuntime(id string) *Runtime {
|
||||
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) (*RegisterResponse, error) {
|
||||
var runtimes []map[string]string
|
||||
for name, entry := range d.cfg.Agents {
|
||||
version, err := agent.DetectVersion(ctx, entry.Path)
|
||||
version, err := detectAgentVersion(ctx, entry.Path)
|
||||
if err != nil {
|
||||
d.logger.Warn("skip registering runtime", "name", name, "error", err)
|
||||
continue
|
||||
}
|
||||
if err := agent.CheckMinVersion(name, version); err != nil {
|
||||
if err := checkAgentMinVersion(name, version); err != nil {
|
||||
d.logger.Warn("skip registering runtime: version too old", "name", name, "version", version, "error", err)
|
||||
continue
|
||||
}
|
||||
@@ -695,7 +955,21 @@ func (d *Daemon) syncWorkspacesFromAPI(ctx context.Context) error {
|
||||
var removed int
|
||||
for id, name := range apiIDs {
|
||||
if currentIDs[id] {
|
||||
continue // important: never replace existing workspaceState; ensureRepoReady holds ws.repoRefreshMu from the original pointer
|
||||
// Already tracked: only intervene if the workspace lost all of
|
||||
// its runtimes (most commonly because handleRuntimeGone pruned
|
||||
// them and its inline re-register failed). The pointer is not
|
||||
// replaced here either — ensureRepoReady holds repoRefreshMu
|
||||
// from the original pointer.
|
||||
if !d.workspaceNeedsRuntimeRecovery(id) {
|
||||
continue
|
||||
}
|
||||
d.logger.Info("workspace has no runtimes; retrying registration", "workspace_id", id, "name", name)
|
||||
if err := d.reregisterWorkspaceAfterRuntimeGone(ctx, id); err != nil {
|
||||
d.logger.Warn("retry register failed", "workspace_id", id, "error", err)
|
||||
continue
|
||||
}
|
||||
registered++
|
||||
continue
|
||||
}
|
||||
resp, err := d.registerRuntimesForWorkspace(ctx, id)
|
||||
if err != nil {
|
||||
@@ -850,10 +1124,26 @@ func (d *Daemon) runHeartbeatTick(ctx context.Context, rid string) {
|
||||
resp, err := d.client.SendHeartbeat(ctx, rid)
|
||||
if err != nil {
|
||||
if ctx.Err() == nil {
|
||||
if isRuntimeNotFoundError(err) {
|
||||
// Server says this runtime is gone — recover instead of
|
||||
// looping on the dead UUID. handleRuntimeGone coalesces
|
||||
// concurrent callers and runs the recovery HTTP call under
|
||||
// the daemon root context so notifyRuntimeSetChanged
|
||||
// tearing down this heartbeat goroutine cannot abort it.
|
||||
go d.handleRuntimeGone(rid)
|
||||
return
|
||||
}
|
||||
d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if resp != nil && resp.RuntimeGone {
|
||||
// The WS path returns a successful ack with RuntimeGone=true for the
|
||||
// same scenario; treat it the same way here in case HTTP starts
|
||||
// surfacing this signal too.
|
||||
go d.handleRuntimeGone(rid)
|
||||
return
|
||||
}
|
||||
d.handleHeartbeatActions(ctx, rid, resp)
|
||||
}
|
||||
|
||||
@@ -1370,6 +1660,14 @@ func (d *Daemon) runRuntimePoller(
|
||||
if err != nil {
|
||||
sem <- slot
|
||||
if pollerCtx.Err() == nil {
|
||||
if isRuntimeNotFoundError(err) {
|
||||
// Server says this runtime is gone — recover and exit
|
||||
// the poller; the runtime-set watcher will tear this
|
||||
// goroutine down via pollerCtx once the workspace is
|
||||
// re-registered with a new runtime ID.
|
||||
go d.handleRuntimeGone(rid)
|
||||
return
|
||||
}
|
||||
d.logger.Warn("claim task failed", "runtime_id", rid, "error", err)
|
||||
}
|
||||
if err := sleepWithContextOrWakeup(pollerCtx, d.cfg.PollInterval, wakeup); err != nil {
|
||||
|
||||
@@ -479,6 +479,87 @@ func TestIsTaskNotFoundError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsRuntimeNotFoundError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "404 with runtime not found body from heartbeat",
|
||||
err: &requestError{
|
||||
Method: http.MethodPost,
|
||||
Path: "/api/daemon/heartbeat",
|
||||
StatusCode: http.StatusNotFound,
|
||||
Body: `{"error":"runtime not found"}`,
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "404 with runtime not found body from claim",
|
||||
err: &requestError{
|
||||
Method: http.MethodPost,
|
||||
Path: "/api/daemon/runtimes/abc/tasks/claim",
|
||||
StatusCode: http.StatusNotFound,
|
||||
Body: `{"error":"runtime not found"}`,
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "mixed-case body still matches",
|
||||
err: &requestError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Body: `{"error":"Runtime Not Found"}`,
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "500 with same body must NOT be treated as runtime-not-found",
|
||||
err: &requestError{
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Body: `{"error":"runtime not found"}`,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "404 with task-not-found body is not runtime-not-found",
|
||||
err: &requestError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Body: `{"error":"task not found"}`,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "404 with workspace-not-found body is not runtime-not-found",
|
||||
err: &requestError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Body: `{"error":"workspace not found"}`,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "non-requestError",
|
||||
err: errors.New("network down"),
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "nil",
|
||||
err: nil,
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
if got := isRuntimeNotFoundError(tc.err); got != tc.want {
|
||||
t.Fatalf("isRuntimeNotFoundError(%v) = %v, want %v", tc.err, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldInterruptAgent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
652
server/internal/daemon/runtime_gone_test.go
Normal file
652
server/internal/daemon/runtime_gone_test.go
Normal file
@@ -0,0 +1,652 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// freshDaemon builds a Daemon with every map field the production New() seeds
|
||||
// so callers can exercise handleRuntimeGone without going through Run.
|
||||
func freshDaemon(serverURL string) *Daemon {
|
||||
return &Daemon{
|
||||
client: NewClient(serverURL),
|
||||
logger: slog.New(slog.NewTextHandler(testNopWriter{}, &slog.HandlerOptions{Level: slog.LevelWarn})),
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
runtimeSet: newRuntimeSetWatcher(),
|
||||
agentVersions: make(map[string]string),
|
||||
wsHBLastAck: make(map[string]time.Time),
|
||||
activeEnvRoots: make(map[string]int),
|
||||
runtimeGoneInflight: make(map[string]struct{}),
|
||||
reregisterNextAttempt: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// testNopWriter discards log output so tests don't spam stderr.
|
||||
type testNopWriter struct{}
|
||||
|
||||
func (testNopWriter) Write(p []byte) (int, error) { return len(p), nil }
|
||||
|
||||
// stubAgentVersion swaps out the agent version probes that registerRuntimesForWorkspace
|
||||
// would normally shell out for, and restores the production hooks on cleanup.
|
||||
// Returns a no-op cleanup so callers can use t.Cleanup directly.
|
||||
func stubAgentVersion(t *testing.T) func() {
|
||||
t.Helper()
|
||||
origDetect := detectAgentVersion
|
||||
origCheck := checkAgentMinVersion
|
||||
detectAgentVersion = func(_ context.Context, _ string) (string, error) {
|
||||
return "9.9.9", nil
|
||||
}
|
||||
checkAgentMinVersion = func(_, _ string) error { return nil }
|
||||
return func() {
|
||||
detectAgentVersion = origDetect
|
||||
checkAgentMinVersion = origCheck
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleRuntime_PrunesAllLocalState(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := freshDaemon("")
|
||||
ws := &workspaceState{
|
||||
workspaceID: "ws-1",
|
||||
runtimeIDs: []string{"rt-1", "rt-2", "rt-3"},
|
||||
}
|
||||
d.workspaces["ws-1"] = ws
|
||||
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
|
||||
d.runtimeIndex["rt-2"] = Runtime{ID: "rt-2"}
|
||||
d.runtimeIndex["rt-3"] = Runtime{ID: "rt-3"}
|
||||
d.wsHBLastAck["rt-2"] = time.Now()
|
||||
|
||||
workspaceID, removed := d.removeStaleRuntime("rt-2")
|
||||
if !removed {
|
||||
t.Fatalf("removeStaleRuntime: removed=false, want true")
|
||||
}
|
||||
if workspaceID != "ws-1" {
|
||||
t.Fatalf("workspaceID = %q, want ws-1", workspaceID)
|
||||
}
|
||||
if got := ws.runtimeIDs; len(got) != 2 || got[0] != "rt-1" || got[1] != "rt-3" {
|
||||
t.Fatalf("runtimeIDs = %v, want [rt-1 rt-3]", got)
|
||||
}
|
||||
if _, ok := d.runtimeIndex["rt-2"]; ok {
|
||||
t.Fatalf("runtimeIndex still contains rt-2")
|
||||
}
|
||||
if _, ok := d.wsHBLastAck["rt-2"]; ok {
|
||||
t.Fatalf("wsHBLastAck still contains rt-2")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleRuntime_UnknownRuntimeIsNoop(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := freshDaemon("")
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1"}}
|
||||
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
|
||||
|
||||
workspaceID, removed := d.removeStaleRuntime("rt-unknown")
|
||||
if removed {
|
||||
t.Fatalf("removeStaleRuntime: removed=true for unknown id, want false")
|
||||
}
|
||||
if workspaceID != "" {
|
||||
t.Fatalf("workspaceID = %q for unknown id, want empty", workspaceID)
|
||||
}
|
||||
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 {
|
||||
t.Fatalf("unrelated workspace runtimeIDs mutated: %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleRuntime_PreservesWorkspaceStatePointer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// The Daemon contract is that workspaceState pointers must NEVER be
|
||||
// replaced — only fields mutated — because ensureRepoReady holds a long
|
||||
// repoRefreshMu through repo syncs. Regressing this turns concurrent
|
||||
// repo refreshes into a deadlock against the wrong mutex copy. Guard it
|
||||
// here so the invariant is observable in tests.
|
||||
d := freshDaemon("")
|
||||
original := &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1"}}
|
||||
d.workspaces["ws-1"] = original
|
||||
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
|
||||
|
||||
d.removeStaleRuntime("rt-1")
|
||||
|
||||
if d.workspaces["ws-1"] != original {
|
||||
t.Fatalf("workspaceState pointer was replaced; ensureRepoReady's mutex assumption broken")
|
||||
}
|
||||
}
|
||||
|
||||
// handleRuntimeGoneFixture wires up a Daemon against a fake server that
|
||||
// answers register/recover-orphans. registerCount is incremented exactly
|
||||
// once per /api/daemon/register call so tests can assert on coalescing.
|
||||
type handleRuntimeGoneFixture struct {
|
||||
daemon *Daemon
|
||||
server *httptest.Server
|
||||
registerCount *atomic.Int64
|
||||
}
|
||||
|
||||
func newHandleRuntimeGoneFixture(t *testing.T) *handleRuntimeGoneFixture {
|
||||
t.Helper()
|
||||
|
||||
var registerCount atomic.Int64
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.URL.Path == "/api/daemon/register":
|
||||
registerCount.Add(1)
|
||||
// Each register call returns the same fresh runtime ID so
|
||||
// downstream assertions can observe it.
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(RegisterResponse{
|
||||
Runtimes: []Runtime{{ID: "rt-new", Name: "Claude", Provider: "claude", Status: "online"}},
|
||||
Repos: []RepoData{},
|
||||
})
|
||||
case strings.HasSuffix(r.URL.Path, "/recover-orphans"):
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := freshDaemon(srv.URL)
|
||||
// Attach a single configured agent so registerRuntimesForWorkspace would
|
||||
// produce a non-empty request body. The fake server ignores the body,
|
||||
// but the registerRuntimesForWorkspace pre-flight (DetectVersion) would
|
||||
// otherwise reject the call.
|
||||
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
|
||||
// Replace the agent version probe so the test doesn't shell out.
|
||||
t.Cleanup(stubAgentVersion(t))
|
||||
return &handleRuntimeGoneFixture{daemon: d, server: srv, registerCount: ®isterCount}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_PrunesAndReregisters(t *testing.T) {
|
||||
// Not t.Parallel: stubAgentVersion mutates package-level vars used by
|
||||
// registerRuntimesForWorkspace. Other Parallel tests in this file that
|
||||
// don't exercise registration are still parallel-safe.
|
||||
fx := newHandleRuntimeGoneFixture(t)
|
||||
d := fx.daemon
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
|
||||
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
|
||||
d.wsHBLastAck["rt-old"] = time.Now()
|
||||
|
||||
d.handleRuntimeGone("rt-old")
|
||||
|
||||
if got := d.runtimeIndex["rt-old"]; got.ID != "" {
|
||||
t.Fatalf("rt-old still present in runtimeIndex: %+v", got)
|
||||
}
|
||||
if _, ok := d.runtimeIndex["rt-new"]; !ok {
|
||||
t.Fatalf("rt-new not added to runtimeIndex after re-register")
|
||||
}
|
||||
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
|
||||
t.Fatalf("workspace runtimeIDs after recovery = %v, want [rt-new]", got)
|
||||
}
|
||||
if _, ok := d.wsHBLastAck["rt-old"]; ok {
|
||||
t.Fatalf("wsHBLastAck not cleared for rt-old")
|
||||
}
|
||||
if got := fx.registerCount.Load(); got != 1 {
|
||||
t.Fatalf("register endpoint called %d times, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_CoalescesConcurrentCallers(t *testing.T) {
|
||||
// Not t.Parallel — stubAgentVersion via newHandleRuntimeGoneFixture.
|
||||
// Three goroutines (heartbeat, poller, WS) may each detect the same
|
||||
// stale runtime within the same beat. Exactly one re-register must
|
||||
// reach the server.
|
||||
fx := newHandleRuntimeGoneFixture(t)
|
||||
d := fx.daemon
|
||||
d.workspaces["ws-1"] = &workspaceState{
|
||||
workspaceID: "ws-1",
|
||||
runtimeIDs: []string{"rt-a", "rt-b", "rt-c"},
|
||||
}
|
||||
d.runtimeIndex["rt-a"] = Runtime{ID: "rt-a"}
|
||||
d.runtimeIndex["rt-b"] = Runtime{ID: "rt-b"}
|
||||
d.runtimeIndex["rt-c"] = Runtime{ID: "rt-c"}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, rid := range []string{"rt-a", "rt-b", "rt-c"} {
|
||||
wg.Add(1)
|
||||
go func(id string) {
|
||||
defer wg.Done()
|
||||
d.handleRuntimeGone(id)
|
||||
}(rid)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if got := fx.registerCount.Load(); got != 1 {
|
||||
t.Fatalf("register endpoint called %d times under stampede, want 1", got)
|
||||
}
|
||||
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
|
||||
t.Fatalf("workspace runtimeIDs after stampede = %v, want [rt-new]", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_BackoffOnFailure(t *testing.T) {
|
||||
// Not t.Parallel — stubAgentVersion.
|
||||
// Failure path: the register endpoint returns 500 — exactly one attempt
|
||||
// should make the round trip; subsequent immediate calls must be
|
||||
// short-circuited by the failure backoff. This is the "don't replace
|
||||
// log spam with register spam" guarantee.
|
||||
var registerCount atomic.Int64
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/api/daemon/register" {
|
||||
registerCount.Add(1)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := freshDaemon(srv.URL)
|
||||
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
|
||||
t.Cleanup(stubAgentVersion(t))
|
||||
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-1", "rt-2"}}
|
||||
d.runtimeIndex["rt-1"] = Runtime{ID: "rt-1"}
|
||||
d.runtimeIndex["rt-2"] = Runtime{ID: "rt-2"}
|
||||
|
||||
d.handleRuntimeGone("rt-1")
|
||||
d.handleRuntimeGone("rt-2")
|
||||
|
||||
if got := registerCount.Load(); got != 1 {
|
||||
t.Fatalf("register endpoint called %d times on failure path, want 1 (second call should be coalesced)", got)
|
||||
}
|
||||
// Local state pruning still happened for both, even though re-register
|
||||
// failed: the workspace is now empty, which workspaceSyncLoop will
|
||||
// retry on the next tick.
|
||||
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 0 {
|
||||
t.Fatalf("workspace runtimeIDs after failed recovery = %v, want []", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWSHeartbeatAck_RuntimeGoneTriggersRecovery(t *testing.T) {
|
||||
// The WS path's twin of an HTTP 404 "runtime not found". When the server
|
||||
// flags a runtime as gone, the daemon must NOT record a freshness mark
|
||||
// — doing so would tell the HTTP heartbeat to skip its tick and let the
|
||||
// daemon keep believing the runtime is alive.
|
||||
fx := newHandleRuntimeGoneFixture(t)
|
||||
d := fx.daemon
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
|
||||
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
|
||||
d.wsHBLastAck["rt-old"] = time.Now()
|
||||
|
||||
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{
|
||||
RuntimeID: "rt-old",
|
||||
Status: "runtime_gone",
|
||||
RuntimeGone: true,
|
||||
})
|
||||
|
||||
// handleRuntimeGone is fired asynchronously via `go`; spin briefly.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
d.mu.Lock()
|
||||
_, stillOld := d.runtimeIndex["rt-old"]
|
||||
_, gotNew := d.runtimeIndex["rt-new"]
|
||||
d.mu.Unlock()
|
||||
if !stillOld && gotNew {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if _, stillOld := d.runtimeIndex["rt-old"]; stillOld {
|
||||
t.Fatalf("rt-old not pruned after RuntimeGone ack")
|
||||
}
|
||||
if _, ok := d.wsHBLastAck["rt-old"]; ok {
|
||||
t.Fatalf("WS freshness mark not cleared for gone runtime — HTTP heartbeat would skip its tick")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWSHeartbeatAck_NormalAckRecordsFreshness(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := freshDaemon("")
|
||||
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{
|
||||
RuntimeID: "rt-1",
|
||||
Status: "ok",
|
||||
})
|
||||
if !d.wsHeartbeatRecentlyAcked("rt-1") {
|
||||
t.Fatalf("normal ack should record WS freshness for rt-1")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWSHeartbeatAck_EmptyAckIgnored(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := freshDaemon("")
|
||||
d.handleWSHeartbeatAck(context.Background(), nil)
|
||||
d.handleWSHeartbeatAck(context.Background(), &HeartbeatResponse{RuntimeID: ""})
|
||||
// Should not panic, should not record any state.
|
||||
if len(d.wsHBLastAck) != 0 {
|
||||
t.Fatalf("empty ack recorded state: %v", d.wsHBLastAck)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkspaceNeedsRuntimeRecovery(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := freshDaemon("")
|
||||
d.workspaces["ws-empty"] = &workspaceState{workspaceID: "ws-empty"}
|
||||
d.workspaces["ws-full"] = &workspaceState{workspaceID: "ws-full", runtimeIDs: []string{"rt-1"}}
|
||||
|
||||
if !d.workspaceNeedsRuntimeRecovery("ws-empty") {
|
||||
t.Fatalf("ws-empty should need recovery")
|
||||
}
|
||||
if d.workspaceNeedsRuntimeRecovery("ws-full") {
|
||||
t.Fatalf("ws-full should NOT need recovery")
|
||||
}
|
||||
if d.workspaceNeedsRuntimeRecovery("ws-unknown") {
|
||||
t.Fatalf("untracked workspace should NOT need recovery")
|
||||
}
|
||||
}
|
||||
|
||||
// multiProviderRegisterFixture mirrors handleRuntimeGoneFixture but speaks the
|
||||
// upsert semantics of UpsertAgentRuntime: surviving providers keep their
|
||||
// runtime IDs across re-registers, deleted ones get a fresh ID. The fake
|
||||
// server is the source of truth and rewrites its own knowledge of which
|
||||
// providers are alive each time a runtime is deleted.
|
||||
//
|
||||
// markDeleted(rid) emulates a UI Delete by removing the row server-side and
|
||||
// returning a brand-new ID for that provider on the next register call.
|
||||
type multiProviderRegisterFixture struct {
|
||||
daemon *Daemon
|
||||
server *httptest.Server
|
||||
registerCount *atomic.Int64
|
||||
mu sync.Mutex
|
||||
// providerToID maps provider -> current server-side runtime ID. The fake
|
||||
// register handler reads/mutates this so the test reflects realistic
|
||||
// upsert behavior.
|
||||
providerToID map[string]string
|
||||
idCounter int
|
||||
}
|
||||
|
||||
func newMultiProviderRegisterFixture(t *testing.T, providers map[string]string) *multiProviderRegisterFixture {
|
||||
t.Helper()
|
||||
|
||||
fx := &multiProviderRegisterFixture{
|
||||
providerToID: make(map[string]string, len(providers)),
|
||||
}
|
||||
for p, id := range providers {
|
||||
fx.providerToID[p] = id
|
||||
}
|
||||
|
||||
var registerCount atomic.Int64
|
||||
fx.registerCount = ®isterCount
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.URL.Path == "/api/daemon/register":
|
||||
registerCount.Add(1)
|
||||
fx.mu.Lock()
|
||||
runtimes := make([]Runtime, 0, len(fx.providerToID))
|
||||
for provider, id := range fx.providerToID {
|
||||
if id == "" {
|
||||
// Provider was marked deleted; mint a fresh ID
|
||||
// (the UpsertAgentRuntime INSERT branch).
|
||||
fx.idCounter++
|
||||
id = fmt.Sprintf("%s-new-%d", provider, fx.idCounter)
|
||||
fx.providerToID[provider] = id
|
||||
}
|
||||
runtimes = append(runtimes, Runtime{
|
||||
ID: id, Name: provider, Provider: provider, Status: "online",
|
||||
})
|
||||
}
|
||||
fx.mu.Unlock()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(RegisterResponse{
|
||||
Runtimes: runtimes,
|
||||
Repos: []RepoData{},
|
||||
})
|
||||
case strings.HasSuffix(r.URL.Path, "/recover-orphans"):
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := freshDaemon(srv.URL)
|
||||
d.cfg.Agents = make(map[string]AgentEntry, len(providers))
|
||||
for p := range providers {
|
||||
d.cfg.Agents[p] = AgentEntry{Path: "/usr/bin/true"}
|
||||
}
|
||||
t.Cleanup(stubAgentVersion(t))
|
||||
fx.daemon = d
|
||||
fx.server = srv
|
||||
return fx
|
||||
}
|
||||
|
||||
// markDeleted simulates server-side runtime deletion: the next register call
|
||||
// will mint a new ID for this provider, matching the UI Delete + re-register
|
||||
// path's UpsertAgentRuntime INSERT branch.
|
||||
func (fx *multiProviderRegisterFixture) markDeleted(provider string) {
|
||||
fx.mu.Lock()
|
||||
defer fx.mu.Unlock()
|
||||
fx.providerToID[provider] = ""
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_PartialWorkspaceRecoveryKeepsSibling(t *testing.T) {
|
||||
// Workspace has two providers, only one runtime is deleted. The siblings
|
||||
// must NOT end up duplicated in workspaceState.runtimeIDs — that would
|
||||
// leak through allRuntimeIDs(), deregister(), and re-recovery state.
|
||||
// This is the regression test for Finding #3 (register response is
|
||||
// authoritative for the workspace's runtime set, not an append).
|
||||
fx := newMultiProviderRegisterFixture(t, map[string]string{
|
||||
"claude": "rt-claude-1",
|
||||
"codex": "rt-codex-1",
|
||||
})
|
||||
d := fx.daemon
|
||||
d.workspaces["ws-1"] = &workspaceState{
|
||||
workspaceID: "ws-1",
|
||||
runtimeIDs: []string{"rt-claude-1", "rt-codex-1"},
|
||||
}
|
||||
d.runtimeIndex["rt-claude-1"] = Runtime{ID: "rt-claude-1", Provider: "claude"}
|
||||
d.runtimeIndex["rt-codex-1"] = Runtime{ID: "rt-codex-1", Provider: "codex"}
|
||||
|
||||
// Only the claude runtime gets deleted server-side.
|
||||
fx.markDeleted("claude")
|
||||
d.handleRuntimeGone("rt-claude-1")
|
||||
|
||||
got := append([]string(nil), d.workspaces["ws-1"].runtimeIDs...)
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("workspace runtimeIDs has %d entries after partial recovery; want 2; got %v", len(got), got)
|
||||
}
|
||||
// Set comparison: must contain rt-codex-1 (surviving) and a freshly
|
||||
// minted claude id, with NO duplicates.
|
||||
seen := make(map[string]int, len(got))
|
||||
for _, id := range got {
|
||||
seen[id]++
|
||||
}
|
||||
for id, count := range seen {
|
||||
if count != 1 {
|
||||
t.Fatalf("duplicate runtime id %q (count=%d) after partial recovery: %v", id, count, got)
|
||||
}
|
||||
}
|
||||
if _, ok := seen["rt-codex-1"]; !ok {
|
||||
t.Fatalf("surviving codex runtime missing from workspace state after recovery: %v", got)
|
||||
}
|
||||
if _, ok := seen["rt-claude-1"]; ok {
|
||||
t.Fatalf("deleted claude runtime should not be in workspace state: %v", got)
|
||||
}
|
||||
// And the runtimeIndex must reflect the same: codex kept, claude-1 dropped.
|
||||
if _, ok := d.runtimeIndex["rt-claude-1"]; ok {
|
||||
t.Fatalf("rt-claude-1 still in runtimeIndex after deletion")
|
||||
}
|
||||
if _, ok := d.runtimeIndex["rt-codex-1"]; !ok {
|
||||
t.Fatalf("rt-codex-1 dropped from runtimeIndex during partial recovery")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_DistinctDeletionsWithinCoalesceWindowBothRecover(t *testing.T) {
|
||||
// Two sequential, distinct runtime deletions in the same workspace fired
|
||||
// within the 30s coalesce window. Each deletion must trigger its own
|
||||
// re-register: success on call #1 must NOT suppress call #2. Regression
|
||||
// for Finding #2 (success-case clear of reregisterNextAttempt).
|
||||
fx := newMultiProviderRegisterFixture(t, map[string]string{
|
||||
"claude": "rt-claude-1",
|
||||
"codex": "rt-codex-1",
|
||||
})
|
||||
d := fx.daemon
|
||||
d.workspaces["ws-1"] = &workspaceState{
|
||||
workspaceID: "ws-1",
|
||||
runtimeIDs: []string{"rt-claude-1", "rt-codex-1"},
|
||||
}
|
||||
d.runtimeIndex["rt-claude-1"] = Runtime{ID: "rt-claude-1", Provider: "claude"}
|
||||
d.runtimeIndex["rt-codex-1"] = Runtime{ID: "rt-codex-1", Provider: "codex"}
|
||||
|
||||
// Sequential, NOT concurrent: the first call fully completes before the
|
||||
// second starts, so the in-flight set never collides.
|
||||
fx.markDeleted("claude")
|
||||
d.handleRuntimeGone("rt-claude-1")
|
||||
|
||||
if got := fx.registerCount.Load(); got != 1 {
|
||||
t.Fatalf("after first deletion: register called %d times, want 1", got)
|
||||
}
|
||||
// Inspect the new claude id the fake assigned, so we can detect that
|
||||
// the second recovery actually ran register again.
|
||||
fx.mu.Lock()
|
||||
claudeIDAfterFirst := fx.providerToID["claude"]
|
||||
fx.mu.Unlock()
|
||||
|
||||
// Now delete codex within the coalesce window (effectively t<1s after
|
||||
// the first recovery), simulating a user deleting a second runtime
|
||||
// shortly after the first.
|
||||
fx.markDeleted("codex")
|
||||
d.handleRuntimeGone("rt-codex-1")
|
||||
|
||||
if got := fx.registerCount.Load(); got != 2 {
|
||||
t.Fatalf("after second distinct deletion: register called %d times, want 2 (coalesce window must clear on success)", got)
|
||||
}
|
||||
got := append([]string(nil), d.workspaces["ws-1"].runtimeIDs...)
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("workspace runtimeIDs after both recoveries = %v, want 2 entries", got)
|
||||
}
|
||||
seen := make(map[string]int, len(got))
|
||||
for _, id := range got {
|
||||
seen[id]++
|
||||
}
|
||||
for id, count := range seen {
|
||||
if count != 1 {
|
||||
t.Fatalf("duplicate runtime id %q after sequential recoveries: %v", id, got)
|
||||
}
|
||||
}
|
||||
if _, ok := seen[claudeIDAfterFirst]; !ok {
|
||||
t.Fatalf("claude id from first recovery missing after second deletion of codex: have %v, expected to keep %q", got, claudeIDAfterFirst)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_RecoveryContextSurvivesCallerCancellation(t *testing.T) {
|
||||
// Regression for Finding #1: handleRuntimeGone must not use the per-
|
||||
// runtime heartbeat ctx for the register HTTP call. notifyRuntimeSetChanged
|
||||
// tears that ctx down as soon as we prune the dead runtime, so forwarding
|
||||
// it would self-cancel the in-flight register.
|
||||
//
|
||||
// We assert by inspecting the register handler's request context: it
|
||||
// must not be Done when the daemon's rootCtx is alive, regardless of what
|
||||
// upstream contexts (heartbeat, poller, WS) are doing.
|
||||
var observedCancelled atomic.Bool
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/api/daemon/register" {
|
||||
// Inspect the inbound request ctx. If handleRuntimeGone had
|
||||
// forwarded a cancelled caller ctx, this would be Done.
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
observedCancelled.Store(true)
|
||||
default:
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(RegisterResponse{
|
||||
Runtimes: []Runtime{{ID: "rt-new", Name: "claude", Provider: "claude", Status: "online"}},
|
||||
})
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := freshDaemon(srv.URL)
|
||||
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
|
||||
t.Cleanup(stubAgentVersion(t))
|
||||
|
||||
// rootCtx is what handleRuntimeGone uses for recovery. We keep it alive.
|
||||
rootCtx, rootCancel := context.WithCancel(context.Background())
|
||||
defer rootCancel()
|
||||
d.rootCtx = rootCtx
|
||||
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
|
||||
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
|
||||
|
||||
d.handleRuntimeGone("rt-old")
|
||||
|
||||
if observedCancelled.Load() {
|
||||
t.Fatalf("register HTTP call ran with a cancelled context — recovery would self-cancel under runtime-set churn")
|
||||
}
|
||||
if got := d.workspaces["ws-1"].runtimeIDs; len(got) != 1 || got[0] != "rt-new" {
|
||||
t.Fatalf("workspace runtimeIDs after recovery = %v, want [rt-new]", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRuntimeGone_RecoveryContextStopsOnDaemonShutdown(t *testing.T) {
|
||||
// Companion to RecoveryContextSurvivesCallerCancellation: when the daemon
|
||||
// IS shutting down, recovery must abort promptly instead of holding the
|
||||
// HTTP call open until its 30s client timeout. We bound the server
|
||||
// handler with a short safety timeout so test cleanup never hangs on a
|
||||
// stuck connection — the assertion is on the daemon-side return time,
|
||||
// not on server-side context propagation.
|
||||
registerEntered := make(chan struct{}, 1)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/api/daemon/register" {
|
||||
select {
|
||||
case registerEntered <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := freshDaemon(srv.URL)
|
||||
d.cfg.Agents = map[string]AgentEntry{"claude": {Path: "/usr/bin/true"}}
|
||||
t.Cleanup(stubAgentVersion(t))
|
||||
|
||||
rootCtx, rootCancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(rootCancel)
|
||||
d.rootCtx = rootCtx
|
||||
|
||||
d.workspaces["ws-1"] = &workspaceState{workspaceID: "ws-1", runtimeIDs: []string{"rt-old"}}
|
||||
d.runtimeIndex["rt-old"] = Runtime{ID: "rt-old"}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
d.handleRuntimeGone("rt-old")
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-registerEntered:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("register endpoint was never reached")
|
||||
}
|
||||
|
||||
rootCancel()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("handleRuntimeGone did not abort after daemon root context cancellation")
|
||||
}
|
||||
}
|
||||
@@ -231,6 +231,30 @@ func marshalRaw(v any) json.RawMessage {
|
||||
return data
|
||||
}
|
||||
|
||||
// handleWSHeartbeatAck dispatches one heartbeat_ack received over the WS
|
||||
// task-wakeup connection. Extracted from readTaskWakeupMessages so tests can
|
||||
// exercise the branching logic without a real WebSocket.
|
||||
//
|
||||
// A RuntimeGone=true ack is the WebSocket twin of an HTTP 404 "runtime not
|
||||
// found": it tells the daemon the runtime row was deleted server-side. We
|
||||
// route it through the same self-heal entry point as the HTTP path and do
|
||||
// NOT record a heartbeat freshness mark — pretending the runtime is alive
|
||||
// would let HTTP keep skipping its own heartbeat against the dead UUID.
|
||||
//
|
||||
// handleRuntimeGone uses the daemon root context for its register call, so
|
||||
// this function can safely pass any caller context here.
|
||||
func (d *Daemon) handleWSHeartbeatAck(ctx context.Context, ack *HeartbeatResponse) {
|
||||
if ack == nil || ack.RuntimeID == "" {
|
||||
return
|
||||
}
|
||||
if ack.RuntimeGone {
|
||||
go d.handleRuntimeGone(ack.RuntimeID)
|
||||
return
|
||||
}
|
||||
d.recordWSHeartbeatAck(ack.RuntimeID)
|
||||
d.handleHeartbeatActions(ctx, ack.RuntimeID, ack)
|
||||
}
|
||||
|
||||
func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<- struct{}) error {
|
||||
conn.SetReadLimit(64 * 1024)
|
||||
for {
|
||||
@@ -262,11 +286,7 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
|
||||
d.logger.Debug("ws heartbeat ack invalid payload", "error", err)
|
||||
continue
|
||||
}
|
||||
if ack.RuntimeID == "" {
|
||||
continue
|
||||
}
|
||||
d.recordWSHeartbeatAck(ack.RuntimeID)
|
||||
d.handleHeartbeatActions(context.Background(), ack.RuntimeID, &ack)
|
||||
d.handleWSHeartbeatAck(context.Background(), &ack)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,11 @@ func (h *Handler) requireDaemonWorkspaceAccess(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
|
||||
// requireDaemonRuntimeAccess looks up a runtime and verifies the caller owns its workspace.
|
||||
//
|
||||
// Only pgx.ErrNoRows is treated as a real "runtime gone" 404 — the daemon uses
|
||||
// that response to drop the stale runtime from its in-memory map and re-register,
|
||||
// so collapsing transient DB errors into the same 404 would force the daemon to
|
||||
// self-cleanup on a hiccup. Other DB errors become 500.
|
||||
func (h *Handler) requireDaemonRuntimeAccess(w http.ResponseWriter, r *http.Request, runtimeID string) (db.AgentRuntime, bool) {
|
||||
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
|
||||
if !ok {
|
||||
@@ -61,7 +66,12 @@ func (h *Handler) requireDaemonRuntimeAccess(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "runtime not found")
|
||||
if isNotFound(err) {
|
||||
writeError(w, http.StatusNotFound, "runtime not found")
|
||||
return db.AgentRuntime{}, false
|
||||
}
|
||||
slog.Warn("get agent runtime failed", "runtime_id", runtimeID, "error", err)
|
||||
writeError(w, http.StatusInternalServerError, "failed to load runtime")
|
||||
return db.AgentRuntime{}, false
|
||||
}
|
||||
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(rt.WorkspaceID)) {
|
||||
@@ -672,8 +682,18 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
rt, lookupErr := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
|
||||
runtimeLookupMs = time.Since(lookupStart).Milliseconds()
|
||||
if lookupErr != nil {
|
||||
outcome = "runtime_not_found"
|
||||
writeError(w, http.StatusNotFound, "runtime not found")
|
||||
// Only pgx.ErrNoRows means the runtime row is gone. Daemon reads this
|
||||
// 404 as a signal to drop the stale runtime locally; treating a
|
||||
// transient DB error the same way would force daemons to self-cleanup
|
||||
// on a hiccup.
|
||||
if isNotFound(lookupErr) {
|
||||
outcome = "runtime_not_found"
|
||||
writeError(w, http.StatusNotFound, "runtime not found")
|
||||
return
|
||||
}
|
||||
outcome = "runtime_lookup_error"
|
||||
slog.Warn("get agent runtime failed", "runtime_id", req.RuntimeID, "error", lookupErr)
|
||||
writeError(w, http.StatusInternalServerError, "failed to load runtime")
|
||||
return
|
||||
}
|
||||
wsCheckStart := time.Now()
|
||||
@@ -729,6 +749,13 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
// Workspace authorization is re-checked on every heartbeat instead of trusted
|
||||
// from the upgrade-time check because runtime ownership can change (e.g. a
|
||||
// runtime is reassigned to another workspace mid-connection).
|
||||
//
|
||||
// When the runtime row is missing (pgx.ErrNoRows), the function returns a
|
||||
// successful ack with Status=HeartbeatStatusRuntimeGone and RuntimeGone=true
|
||||
// instead of an error. That keeps the hub from logging every beat at Warn,
|
||||
// and tells the daemon to drop the stale runtime and re-register. Other DB
|
||||
// errors still propagate as errors so they keep their existing Warn logging
|
||||
// and the daemon does not mistake a hiccup for a deletion.
|
||||
func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws.ClientIdentity, runtimeID string) (*protocol.DaemonHeartbeatAckPayload, error) {
|
||||
runtimeUUID, err := util.ParseUUID(runtimeID)
|
||||
if err != nil {
|
||||
@@ -736,7 +763,14 @@ func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws
|
||||
}
|
||||
rt, err := h.Queries.GetAgentRuntime(ctx, runtimeUUID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("runtime not found: %w", err)
|
||||
if isNotFound(err) {
|
||||
return &protocol.DaemonHeartbeatAckPayload{
|
||||
RuntimeID: runtimeID,
|
||||
Status: protocol.HeartbeatStatusRuntimeGone,
|
||||
RuntimeGone: true,
|
||||
}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("get agent runtime: %w", err)
|
||||
}
|
||||
if identity.WorkspaceID != "" && identity.WorkspaceID != uuidToString(rt.WorkspaceID) {
|
||||
return nil, fmt.Errorf("runtime not in connection workspace")
|
||||
|
||||
@@ -12,9 +12,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/multica-ai/multica/server/internal/daemonws"
|
||||
"github.com/multica-ai/multica/server/internal/middleware"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside
|
||||
@@ -185,6 +188,63 @@ func TestDaemonHeartbeat_WithDaemonToken_CrossWorkspace(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandleDaemonWSHeartbeat_RuntimeGoneReturnsAckNotError pins the fix for
|
||||
// issue #2391: when GetAgentRuntime returns pgx.ErrNoRows (runtime row was
|
||||
// deleted server-side), the WS handler must return a successful ack with
|
||||
// RuntimeGone=true rather than an error. Returning an error makes the WS hub
|
||||
// log every beat at Warn — the flood the issue is about.
|
||||
func TestHandleDaemonWSHeartbeat_RuntimeGoneReturnsAckNotError(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
// A well-formed UUID that does NOT exist in agent_runtime. The handler
|
||||
// must turn the resulting pgx.ErrNoRows into a RuntimeGone ack.
|
||||
missingRuntime := uuid.New().String()
|
||||
ack, err := testHandler.HandleDaemonWSHeartbeat(context.Background(),
|
||||
daemonws.ClientIdentity{WorkspaceID: testWorkspaceID},
|
||||
missingRuntime)
|
||||
if err != nil {
|
||||
t.Fatalf("HandleDaemonWSHeartbeat: unexpected error %v", err)
|
||||
}
|
||||
if ack == nil {
|
||||
t.Fatal("HandleDaemonWSHeartbeat: nil ack for missing runtime")
|
||||
}
|
||||
if !ack.RuntimeGone {
|
||||
t.Fatalf("ack.RuntimeGone = false, want true")
|
||||
}
|
||||
if ack.Status != protocol.HeartbeatStatusRuntimeGone {
|
||||
t.Fatalf("ack.Status = %q, want %q", ack.Status, protocol.HeartbeatStatusRuntimeGone)
|
||||
}
|
||||
if ack.RuntimeID != missingRuntime {
|
||||
t.Fatalf("ack.RuntimeID = %q, want %q", ack.RuntimeID, missingRuntime)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDaemonHeartbeat_HTTPRuntimeGoneReturns404 pins the HTTP-path mirror:
|
||||
// pgx.ErrNoRows on the runtime lookup is the only DB error mapped to 404.
|
||||
// Anything else (transient pool issue, schema mismatch, ...) must surface
|
||||
// as 500 so the daemon does not mistake a hiccup for a deletion.
|
||||
func TestDaemonHeartbeat_HTTPRuntimeGoneReturns404(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
missingRuntime := uuid.New().String()
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest(http.MethodPost, "/api/daemon/heartbeat", map[string]any{
|
||||
"runtime_id": missingRuntime,
|
||||
}, testWorkspaceID, "test-daemon")
|
||||
testHandler.DaemonHeartbeat(w, req)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404 for missing runtime, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(strings.ToLower(w.Body.String()), "runtime not found") {
|
||||
t.Fatalf("expected 'runtime not found' body, got %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestDaemonHeartbeat_SlowProbeDoesNotWedge pins the invariant that a stalled
|
||||
// HasPending probe cannot wedge the heartbeat endpoint past the per-probe
|
||||
// timeout. The probe is the only bounded call; PopPending is ack-safe-
|
||||
|
||||
@@ -1,13 +1,56 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
chimw "github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
// boundedBuffer captures up to Cap bytes from a stream then silently drops the
|
||||
// rest. Used by RequestLogger so a large response body cannot blow up logger
|
||||
// memory while we mirror just enough bytes to classify the response.
|
||||
type boundedBuffer struct {
|
||||
buf bytes.Buffer
|
||||
cap int
|
||||
}
|
||||
|
||||
func (b *boundedBuffer) Write(p []byte) (int, error) {
|
||||
remain := b.cap - b.buf.Len()
|
||||
if remain <= 0 {
|
||||
return len(p), nil
|
||||
}
|
||||
if len(p) > remain {
|
||||
b.buf.Write(p[:remain])
|
||||
return len(p), nil
|
||||
}
|
||||
b.buf.Write(p)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (b *boundedBuffer) Bytes() []byte { return b.buf.Bytes() }
|
||||
|
||||
// softNotFoundBodyCaptureLimit is the maximum number of body bytes the
|
||||
// request logger inspects to decide whether a 404 is an expected stale-state
|
||||
// signal (runtime/task deleted server-side). The JSON error envelope is small
|
||||
// — 256 bytes is enough to see the "error" field — and the cap means an
|
||||
// unbounded handler body cannot blow up logger memory.
|
||||
const softNotFoundBodyCaptureLimit = 256
|
||||
|
||||
// softNotFoundMarkers are 404 response bodies the daemon emits routinely as
|
||||
// part of normal lifecycle events: a runtime deleted from the UI, a task GC'd
|
||||
// after an issue was removed, etc. Logging these at Warn turned production
|
||||
// stderr into a flood whenever a runtime was deleted (see issue #2391). They
|
||||
// stay machine-recognizable at Info, while genuine 4xx (wrong path, bad
|
||||
// auth, real bugs) keep Warn.
|
||||
var softNotFoundMarkers = []string{
|
||||
"runtime not found",
|
||||
"task not found",
|
||||
}
|
||||
|
||||
// RequestLogger is a structured HTTP request logger using slog.
|
||||
// It replaces Chi's built-in chimw.Logger with colored, structured output.
|
||||
func RequestLogger(next http.Handler) http.Handler {
|
||||
@@ -21,6 +64,12 @@ func RequestLogger(next http.Handler) http.Handler {
|
||||
start := time.Now()
|
||||
ww := chimw.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||
|
||||
// Capture a small body prefix so 404s can be classified by content.
|
||||
// chimw.WrapResponseWriter exposes Tee for exactly this — the body
|
||||
// keeps flowing to the client; we mirror up to N bytes for inspection.
|
||||
bodyPrefix := &boundedBuffer{cap: softNotFoundBodyCaptureLimit}
|
||||
ww.Tee(bodyPrefix)
|
||||
|
||||
next.ServeHTTP(ww, r)
|
||||
|
||||
duration := time.Since(start)
|
||||
@@ -53,6 +102,12 @@ func RequestLogger(next http.Handler) http.Handler {
|
||||
switch {
|
||||
case status >= 500:
|
||||
slog.Error("http request", attrs...)
|
||||
case status == http.StatusNotFound && isSoftNotFound(bodyPrefix.Bytes()):
|
||||
// Lifecycle 404 — runtime/task was deleted server-side. The daemon
|
||||
// catches this exact body and triggers its own self-heal, so it is
|
||||
// neither noise nor a bug; logging at Info keeps the signal in
|
||||
// structured logs without flooding the warn channel.
|
||||
slog.Info("http request", attrs...)
|
||||
case status >= 400:
|
||||
slog.Warn("http request", attrs...)
|
||||
default:
|
||||
@@ -60,3 +115,18 @@ func RequestLogger(next http.Handler) http.Handler {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// isSoftNotFound reports whether the captured response body matches one of
|
||||
// the expected stale-state 404 signals listed in softNotFoundMarkers.
|
||||
func isSoftNotFound(body []byte) bool {
|
||||
if len(body) == 0 {
|
||||
return false
|
||||
}
|
||||
lower := strings.ToLower(string(body))
|
||||
for _, marker := range softNotFoundMarkers {
|
||||
if strings.Contains(lower, marker) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
160
server/internal/middleware/request_logger_test.go
Normal file
160
server/internal/middleware/request_logger_test.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// withCapturedLogs swaps the default slog logger for one that writes to buf,
|
||||
// then restores it on cleanup. Returns the buffer so tests can inspect what
|
||||
// RequestLogger emitted.
|
||||
//
|
||||
// Uses a shared mutex because t.Parallel tests would otherwise race on the
|
||||
// global slog.Default — tests in this file intentionally do NOT run in
|
||||
// parallel for that reason.
|
||||
var defaultLoggerMu sync.Mutex
|
||||
|
||||
func withCapturedLogs(t *testing.T) *bytes.Buffer {
|
||||
t.Helper()
|
||||
defaultLoggerMu.Lock()
|
||||
buf := &bytes.Buffer{}
|
||||
orig := slog.Default()
|
||||
slog.SetDefault(slog.New(slog.NewTextHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})))
|
||||
t.Cleanup(func() {
|
||||
slog.SetDefault(orig)
|
||||
defaultLoggerMu.Unlock()
|
||||
})
|
||||
return buf
|
||||
}
|
||||
|
||||
func runRequestLogger(t *testing.T, status int, body string) *bytes.Buffer {
|
||||
t.Helper()
|
||||
logs := withCapturedLogs(t)
|
||||
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(status)
|
||||
_, _ = w.Write([]byte(body))
|
||||
}))
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/daemon/heartbeat", nil).
|
||||
WithContext(context.Background())
|
||||
handler.ServeHTTP(httptest.NewRecorder(), req)
|
||||
return logs
|
||||
}
|
||||
|
||||
// requireLogLevel asserts that the captured output contains exactly the
|
||||
// expected slog level prefix and not any of the disallowed ones.
|
||||
func requireLogLevel(t *testing.T, logs *bytes.Buffer, want string, disallowed ...string) {
|
||||
t.Helper()
|
||||
out := logs.String()
|
||||
if !strings.Contains(out, "level="+want) {
|
||||
t.Fatalf("expected level=%s in logs, got:\n%s", want, out)
|
||||
}
|
||||
for _, dis := range disallowed {
|
||||
if strings.Contains(out, "level="+dis) {
|
||||
t.Fatalf("did not expect level=%s in logs, got:\n%s", dis, out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestLogger_RuntimeNotFound404DowngradesToInfo(t *testing.T) {
|
||||
// The whole reason this middleware change exists: a flood of WRN lines
|
||||
// after a runtime is deleted (issue #2391). The daemon catches the same
|
||||
// body and self-heals, so the line is signal-not-noise.
|
||||
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"runtime not found"}`)
|
||||
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
|
||||
}
|
||||
|
||||
func TestRequestLogger_TaskNotFound404DowngradesToInfo(t *testing.T) {
|
||||
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"task not found"}`)
|
||||
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
|
||||
}
|
||||
|
||||
func TestRequestLogger_GenericNotFound404KeepsWarn(t *testing.T) {
|
||||
// A 404 with an unfamiliar body is still a real 404 — most likely a
|
||||
// daemon hitting a wrong path, which is what Warn is for. We do NOT
|
||||
// want to downgrade these blindly.
|
||||
logs := runRequestLogger(t, http.StatusNotFound, `{"error":"not found"}`)
|
||||
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
|
||||
}
|
||||
|
||||
func TestRequestLogger_400StaysWarn(t *testing.T) {
|
||||
logs := runRequestLogger(t, http.StatusBadRequest, `{"error":"bad input"}`)
|
||||
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
|
||||
}
|
||||
|
||||
func TestRequestLogger_500StaysError(t *testing.T) {
|
||||
logs := runRequestLogger(t, http.StatusInternalServerError, `{"error":"boom"}`)
|
||||
requireLogLevel(t, logs, "ERROR", "WARN", "INFO")
|
||||
}
|
||||
|
||||
func TestRequestLogger_200StaysInfo(t *testing.T) {
|
||||
logs := runRequestLogger(t, http.StatusOK, `{"ok":true}`)
|
||||
requireLogLevel(t, logs, "INFO", "WARN", "ERROR")
|
||||
}
|
||||
|
||||
func TestRequestLogger_HealthEndpointIsSkipped(t *testing.T) {
|
||||
logs := withCapturedLogs(t)
|
||||
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
req := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||
handler.ServeHTTP(httptest.NewRecorder(), req)
|
||||
if logs.Len() != 0 {
|
||||
t.Fatalf("/health should not be logged, got:\n%s", logs.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestLogger_BodyStillReachesClient(t *testing.T) {
|
||||
// The body capture is implemented via Tee, which must mirror writes
|
||||
// rather than swallow them. Regress-protect: assert the response writer
|
||||
// still gets the full body.
|
||||
rec := httptest.NewRecorder()
|
||||
handler := RequestLogger(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
_, _ = w.Write([]byte(`{"error":"runtime not found"}`))
|
||||
}))
|
||||
_ = withCapturedLogs(t)
|
||||
handler.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/api/daemon/heartbeat", nil))
|
||||
if got := rec.Body.String(); got != `{"error":"runtime not found"}` {
|
||||
t.Fatalf("response body lost or mutated: got %q", got)
|
||||
}
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("status = %d, want 404", rec.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestLogger_LargeBodyBeyondCaptureLimit(t *testing.T) {
|
||||
// If the soft-404 marker only appears beyond the capture limit we
|
||||
// intentionally keep Warn — capturing arbitrary-size bodies is the
|
||||
// memory blowup we are guarding against. This test pins that
|
||||
// trade-off.
|
||||
prefix := strings.Repeat("x", softNotFoundBodyCaptureLimit+8)
|
||||
logs := runRequestLogger(t, http.StatusNotFound, prefix+`{"error":"runtime not found"}`)
|
||||
requireLogLevel(t, logs, "WARN", "INFO", "ERROR")
|
||||
}
|
||||
|
||||
func TestIsSoftNotFound(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
body string
|
||||
want bool
|
||||
}{
|
||||
{`{"error":"runtime not found"}`, true},
|
||||
{`{"error":"task not found"}`, true},
|
||||
{`{"error":"Runtime Not Found"}`, true},
|
||||
{`{"error":"not found"}`, false},
|
||||
{`{"error":"workspace not found"}`, false},
|
||||
{"", false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := isSoftNotFound([]byte(tc.body)); got != tc.want {
|
||||
t.Errorf("isSoftNotFound(%q) = %v, want %v", tc.body, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,15 +104,28 @@ type DaemonHeartbeatRequestPayload struct {
|
||||
|
||||
// DaemonHeartbeatAckPayload is the server's reply to DaemonHeartbeatRequestPayload.
|
||||
// JSON shape mirrors the HTTP heartbeat response so daemon code can decode either.
|
||||
//
|
||||
// RuntimeGone is the WebSocket replacement for the HTTP 404 "runtime not found"
|
||||
// response. When the server discovers the runtime row was deleted (UI delete,
|
||||
// 7-day offline GC), it sends back an ack with Status=HeartbeatStatusRuntimeGone
|
||||
// and RuntimeGone=true rather than tearing down the connection with an error.
|
||||
// The daemon reads this signal, prunes the stale runtime from its local state
|
||||
// and re-registers; without it the dead UUID would keep heartbeating until the
|
||||
// daemon process restarts.
|
||||
type DaemonHeartbeatAckPayload struct {
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status string `json:"status"`
|
||||
RuntimeGone bool `json:"runtime_gone,omitempty"`
|
||||
PendingUpdate *DaemonHeartbeatPendingUpdate `json:"pending_update,omitempty"`
|
||||
PendingModelList *DaemonHeartbeatPendingModelList `json:"pending_model_list,omitempty"`
|
||||
PendingLocalSkills *DaemonHeartbeatPendingLocalSkills `json:"pending_local_skills,omitempty"`
|
||||
PendingLocalSkillImport *DaemonHeartbeatPendingLocalSkillImport `json:"pending_local_skill_import,omitempty"`
|
||||
}
|
||||
|
||||
// HeartbeatStatusRuntimeGone is the ack Status used when the runtime row no
|
||||
// longer exists server-side. Companion to DaemonHeartbeatAckPayload.RuntimeGone.
|
||||
const HeartbeatStatusRuntimeGone = "runtime_gone"
|
||||
|
||||
// DaemonHeartbeatPendingUpdate describes a CLI-update action the daemon
|
||||
// should run for the runtime.
|
||||
type DaemonHeartbeatPendingUpdate struct {
|
||||
|
||||
Reference in New Issue
Block a user