Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
778906a5ee fix(daemon): skip idle watchdog while a tool call is in flight
A legitimate long-running tool call (npm install, docker build, test
suite) can sit silent between tool_use and tool_result for many minutes.
Without this gate, the watchdog would yank the agent mid-build.

Track unmatched tool_use messages in an atomic counter; only let the
watchdog fire when the counter is zero. tool_result clamps non-negative
so a stray result with no matching use can't re-arm the watchdog one
call too early.

Adds two regression tests:
  - DoesNotFireDuringInFlightToolCall: tool_use -> silence past
    window -> tool_result -> completed (must NOT fire)
  - FiresAfterToolResultIfBackendStaysSilent: tool_use -> tool_result
    -> silence past window (MUST fire — backend really is stuck)

Co-authored-by: multica-agent <github@multica.ai>
2026-05-15 18:57:41 +08:00
Jiang Bohan
11cafdc911 feat(daemon): force-stop hung agent runs via idle watchdog (MUL-2281)
A backend whose subprocess hangs on a stuck child process (e.g. claude
blocked on `docker ps` against a frozen dockerd) keeps the daemon's run
record at status="running" until the full DefaultAgentTimeout (2 h)
expires, because cmd.Wait() never returns and Session.Result is never
written. MUL-2225 spent 17+ minutes in this state in the wild.

Add a per-task idle watchdog around executeAndDrain:

- Wrap the caller's ctx so a single cancel propagates to the agent
  subprocess (via the ctx passed to backend.Execute) AND the drain loop.
- Stamp lastActivityAt every time the drain loop receives a message.
- Tick at window/2; when idle_for >= window AND session.Messages buffer
  is empty, set a fired flag and call cancel.
- Tag the resulting Result.Status as "idle_watchdog" so runTask routes
  it through a dedicated failure_reason instead of "agent_error".

Default window is 5 min, configurable via MULTICA_AGENT_IDLE_WATCHDOG;
set to 0 to disable. Tests cover the activity-then-silence case, the
zero-message case, the disabled case, and the happy path.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-15 18:37:09 +08:00
3 changed files with 404 additions and 2 deletions

View File

@@ -20,6 +20,16 @@ const (
DefaultHeartbeatInterval = 15 * time.Second
DefaultAgentTimeout = 2 * time.Hour
DefaultCodexSemanticInactivityTimeout = 10 * time.Minute
// DefaultAgentIdleWatchdog is the per-task safety net that force-stops a
// run when the backend has emitted no message for this long AND its
// message queue is empty. Backends like Claude Code can hang indefinitely
// on a stuck child process (e.g. `docker ps` against a frozen dockerd),
// in which case `cmd.Wait()` never returns and the task sits at "running"
// for its full DefaultAgentTimeout (2 h). 5 min is conservative enough to
// avoid false positives during long tool calls but tight enough to keep
// stuck runs out of the operator's hair. Set MULTICA_AGENT_IDLE_WATCHDOG=0
// to disable.
DefaultAgentIdleWatchdog = 5 * time.Minute
DefaultRuntimeName = "Local Agent"
DefaultWorkspaceSyncInterval = 30 * time.Second
DefaultHealthPort = 19514
@@ -66,6 +76,7 @@ type Config struct {
HeartbeatInterval time.Duration
AgentTimeout time.Duration
CodexSemanticInactivityTimeout time.Duration
AgentIdleWatchdog time.Duration // force-stop a run when the backend goes silent this long with an empty queue (0 = disabled)
ClaudeArgs []string
CodexArgs []string
}
@@ -240,6 +251,14 @@ func LoadConfig(overrides Overrides) (Config, error) {
codexSemanticInactivityTimeout = overrides.CodexSemanticInactivityTimeout
}
// MULTICA_AGENT_IDLE_WATCHDOG=0 disables the per-task idle watchdog. We
// route 0 through durationFromEnv so the operator can opt out without
// patching the binary; any positive duration overrides the 5-minute default.
agentIdleWatchdog, err := durationFromEnv("MULTICA_AGENT_IDLE_WATCHDOG", DefaultAgentIdleWatchdog)
if err != nil {
return Config{}, err
}
maxConcurrentTasks, err := intFromEnv("MULTICA_DAEMON_MAX_CONCURRENT_TASKS", DefaultMaxConcurrentTasks)
if err != nil {
return Config{}, err
@@ -374,6 +393,7 @@ func LoadConfig(overrides Overrides) (Config, error) {
HeartbeatInterval: heartbeatInterval,
AgentTimeout: agentTimeout,
CodexSemanticInactivityTimeout: codexSemanticInactivityTimeout,
AgentIdleWatchdog: agentIdleWatchdog,
ClaudeArgs: claudeArgs,
CodexArgs: codexArgs,
}, nil

View File

@@ -2495,6 +2495,25 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
FailureReason: "timeout",
Usage: usageEntries,
}, nil
case "idle_watchdog":
// The idle watchdog force-stopped the run because the backend
// went silent (e.g. claude blocked on a tool call against a
// frozen child process). Route through the blocked path with a
// dedicated failure_reason so the run leaves "running" state and
// operators can tell idle-stop apart from a real timeout.
comment := result.Error
if comment == "" {
comment = idleWatchdogReason(d.cfg.AgentIdleWatchdog)
}
return TaskResult{
Status: "blocked",
Comment: comment,
SessionID: result.SessionID,
WorkDir: env.WorkDir,
EnvRoot: env.RootDir,
FailureReason: "idle_watchdog",
Usage: usageEntries,
}, nil
case "cancelled":
// Server cancelled the task (e.g. issue reassignment, user cancel).
// handleTask's cancelledByPoll branch already discards this result,
@@ -2547,7 +2566,15 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
// executeAndDrain runs a backend, drains its message stream (forwarding to the
// server), and waits for the final result.
func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, prompt string, opts agent.ExecOptions, taskLog *slog.Logger, taskID string) (agent.Result, int32, error) {
session, err := backend.Execute(ctx, prompt, opts)
// Wrap the caller's ctx so the idle watchdog (below) can interrupt both
// the agent subprocess (via the ctx passed to backend.Execute) AND the
// drain loop with a single cancel. Without this layer the backend would
// stay tied to the parent ctx and our cancellation could only abort
// drain, leaving the subprocess running.
agentCtx, agentCancel := context.WithCancel(ctx)
defer agentCancel()
session, err := backend.Execute(agentCtx, prompt, opts)
if err != nil {
return agent.Result{}, 0, err
}
@@ -2560,10 +2587,29 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
if opts.Timeout == 0 {
drainTimeout = 21 * time.Minute
}
drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout)
drainCtx, drainCancel := context.WithTimeout(agentCtx, drainTimeout)
defer drainCancel()
var toolCount atomic.Int32
// lastActivityAt records (as unix nanos) when the drain loop most
// recently received a message from the backend. The idle watchdog
// reads this to decide whether the agent has gone silent for too long.
// Initialise to the start so a backend that never emits a single
// message also trips the watchdog.
var lastActivityAt atomic.Int64
lastActivityAt.Store(time.Now().UnixNano())
// inFlightTools counts tool_use messages that haven't yet been paired
// with a matching tool_result. A non-zero count means the agent is
// legitimately waiting on a tool (e.g. `npm install`, `docker build`)
// that may run far longer than the idle window without emitting any
// message — so the watchdog must not interpret that silence as a hang.
var inFlightTools atomic.Int32
var idleWatchdogFired atomic.Bool
idleWindow := d.cfg.AgentIdleWatchdog
if idleWindow > 0 {
go d.runIdleWatchdog(agentCtx, idleWindow, &lastActivityAt, &inFlightTools, &idleWatchdogFired, agentCancel, session.Messages, taskLog, taskID)
}
go func() {
var seq atomic.Int32
var mu sync.Mutex
@@ -2629,6 +2675,12 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
if !ok {
goto drainDone
}
// Stamp activity as soon as a message lands. The idle
// watchdog reads this to decide whether the backend has
// gone silent — stamping before processing makes sure a
// slow downstream call (mu.Lock contention, batch resize)
// can't be misattributed to backend silence.
lastActivityAt.Store(time.Now().UnixNano())
switch msg.Type {
case agent.MessageStatus:
// Persist the session/work_dir as soon as the backend
@@ -2648,6 +2700,7 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
}
case agent.MessageToolUse:
n := toolCount.Add(1)
inFlightTools.Add(1)
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
if msg.CallID != "" {
mu.Lock()
@@ -2664,6 +2717,20 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
})
mu.Unlock()
case agent.MessageToolResult:
// Decrement only when the count would stay >= 0. A stray
// tool_result with no matching tool_use (backend bug or
// reconnect mid-stream) shouldn't push the counter
// negative — that would re-arm the watchdog one tool_use
// too early on the next call.
for {
cur := inFlightTools.Load()
if cur <= 0 {
break
}
if inFlightTools.CompareAndSwap(cur, cur-1) {
break
}
}
s := seq.Add(1)
output := msg.Output
if len(output) > 8192 {
@@ -2719,8 +2786,29 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
select {
case result := <-session.Result:
if idleWatchdogFired.Load() {
// The backend's wait goroutine (e.g. claude.go) translates the
// SIGKILL we delivered via agentCancel into Status="aborted".
// Re-tag it as "idle_watchdog" so runTask routes the
// disposition through a dedicated failure_reason, not the
// generic "agent_error" bucket the aborted path falls into.
result.Status = "idle_watchdog"
if result.Error == "" {
result.Error = idleWatchdogReason(idleWindow)
}
}
return result, toolCount.Load(), nil
case <-drainCtx.Done():
// Idle watchdog cancels via agentCancel(), which propagates here as
// context.Canceled. Check this BEFORE the generic cancelled/timeout
// classifiers so a watchdog-induced stop isn't misreported as
// "task cancelled by server".
if idleWatchdogFired.Load() {
return agent.Result{
Status: "idle_watchdog",
Error: idleWatchdogReason(idleWindow),
}, toolCount.Load(), nil
}
// Distinguish external cancellation (e.g. server-initiated cancel
// because the issue was reassigned, or the user invoked CancelTask)
// from genuine drain-deadline timeouts. context.Canceled means the
@@ -2739,6 +2827,76 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
}
}
// idleWatchdogReason formats the human-facing explanation surfaced on
// idle_watchdog dispositions. Centralised so the result-arrival branch and the
// drain-timeout branch in executeAndDrain emit identical wording.
func idleWatchdogReason(window time.Duration) string {
return fmt.Sprintf("agent produced no new messages for %s and message queue was empty; force-stopped by idle watchdog", window)
}
// runIdleWatchdog ticks until either agentCtx is cancelled or the backend has
// been silent for at least window with no in-flight tool call. On firing, it
// sets fired and calls cancel, which propagates to the agent subprocess (via
// the ctx passed to backend.Execute) and to drainCtx. The check requires:
//
// 1. inFlightTools == 0 — the backend has emitted a tool_use whose
// matching tool_result hasn't arrived yet, meaning a real tool (e.g.
// `npm install`, `docker build`) is legitimately running. Long tool
// calls produce no messages between use and result; killing here would
// yank the agent mid-build. AND
// 2. time since lastActivityAt exceeds window — the drain loop is single
// reader, so a stale stamp means no message has actually arrived; AND
// 3. session.Messages buffer is empty — defensive against a hypothetical
// drain stall where unprocessed messages would still imply progress.
//
// Tick interval is window/2 (floored at 30 s in production, but the floor only
// kicks in for windows >= 1 min so tests can pass tiny windows like 50 ms and
// see the watchdog fire within a few ticks).
func (d *Daemon) runIdleWatchdog(agentCtx context.Context, window time.Duration, lastActivityAt *atomic.Int64, inFlightTools *atomic.Int32, fired *atomic.Bool, cancel context.CancelFunc, messages <-chan agent.Message, taskLog *slog.Logger, taskID string) {
interval := window / 2
if window >= time.Minute && interval < 30*time.Second {
interval = 30 * time.Second
}
if interval <= 0 {
interval = window
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-agentCtx.Done():
return
case <-ticker.C:
// In-flight tool call: the agent has emitted tool_use and
// the corresponding tool_result hasn't landed yet. A long
// build/install/test can sit here silently for many minutes
// — that is forward progress, not a hang.
if inFlightTools.Load() > 0 {
continue
}
last := time.Unix(0, lastActivityAt.Load())
idleFor := time.Since(last)
if idleFor < window {
continue
}
// A buffered-but-undrained message means the drain loop is
// behind, not the backend. Wait one more tick rather than
// killing a backend that is still producing output.
if len(messages) > 0 {
continue
}
taskLog.Warn("idle watchdog firing: no agent activity, force-stopping run",
"task", shortID(taskID),
"idle_for", idleFor.Round(time.Second).String(),
"threshold", window.String(),
)
fired.Store(true)
cancel()
return
}
}
}
func mergeUsage(a, b map[string]agent.TokenUsage) map[string]agent.TokenUsage {
if len(a) == 0 {
return b

View File

@@ -1094,6 +1094,230 @@ func TestExecuteAndDrain_ContextCancelled_ReportsCancelled(t *testing.T) {
}
}
// idleWatchdogBackend simulates the MUL-2225 hang: emit one message to mark
// activity, then go silent forever. With a short AgentIdleWatchdog, the
// watchdog should fire and short-circuit executeAndDrain instead of waiting
// for the full drainTimeout (which is ~21 minutes by default).
type idleWatchdogBackend struct {
emitOne bool // when true, emit one message before going silent; when false, never emit anything
}
func (b idleWatchdogBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 1)
resCh := make(chan agent.Result)
if b.emitOne {
msgCh <- agent.Message{Type: agent.MessageText, Content: "hello"}
}
// Deliberately do NOT close msgCh and never write to resCh — this models
// a backend whose subprocess is hung and will never naturally complete.
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_FiresOnInactivity(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
start := time.Now()
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog, got %q (err=%q)", result.Status, result.Error)
}
if !strings.Contains(result.Error, "idle watchdog") {
t.Fatalf("expected error to mention idle watchdog, got %q", result.Error)
}
// The watchdog should fire within a few ticks (interval = window/2 with
// no floor for sub-minute windows). 5× window is generous and keeps the
// test from racing in slow CI.
if elapsed := time.Since(start); elapsed > 5*d.cfg.AgentIdleWatchdog {
t.Fatalf("watchdog took too long to fire: %s (window=%s)", elapsed, d.cfg.AgentIdleWatchdog)
}
}
func TestExecuteAndDrain_IdleWatchdog_FiresWhenNoMessageEverArrives(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// emitOne=false models a backend that hangs before sending any message.
// lastActivityAt is initialised at executeAndDrain entry, so the same
// window applies even with zero traffic.
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: false}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-zero")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog when backend never emits, got %q (err=%q)", result.Status, result.Error)
}
}
func TestExecuteAndDrain_IdleWatchdog_DisabledWhenZero(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
// Default zero value — watchdog disabled. Without a parent cancel the
// blockingBackend would otherwise hang the test, so we cancel after a
// short delay to confirm the run does NOT terminate as idle_watchdog.
d.cfg.AgentIdleWatchdog = 0
ctx, cancel := context.WithCancel(context.Background())
time.AfterFunc(80*time.Millisecond, cancel)
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-off")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status == "idle_watchdog" {
t.Fatalf("watchdog should not fire when AgentIdleWatchdog=0, got status=%q", result.Status)
}
if result.Status != "cancelled" {
t.Fatalf("expected status=cancelled (parent ctx fired), got %q", result.Status)
}
}
func TestExecuteAndDrain_IdleWatchdog_HappyPathDoesNotFire(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 200 * time.Millisecond
// fakeBackend completes immediately with a normal result, well inside the
// idle window. The watchdog must not corrupt the disposition.
fb := &fakeBackend{
results: []agent.Result{
{Status: "completed", Output: "done"},
},
}
result, _, err := d.executeAndDrain(context.Background(), fb, "p", agent.ExecOptions{}, slog.Default(), "t-idle-happy")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "completed" {
t.Fatalf("expected status=completed on happy path, got %q (err=%q)", result.Status, result.Error)
}
if result.Output != "done" {
t.Fatalf("expected output preserved, got %q", result.Output)
}
}
// longToolCallBackend simulates a legitimate long-running tool call (e.g.
// `npm install`, `docker build`, full test suite). The backend emits a
// tool_use, stays silent past the idle window while the tool runs, then emits
// a tool_result and completes. This is the false-positive case the watchdog
// must NOT misfire on: an in-flight tool call is forward progress, not a hang.
type longToolCallBackend struct {
toolSilence time.Duration // how long to stay silent between tool_use and tool_result
}
func (b longToolCallBackend) Execute(ctx context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 4)
resCh := make(chan agent.Result, 1)
msgCh <- agent.Message{
Type: agent.MessageToolUse,
Tool: "Bash",
CallID: "call-1",
Input: map[string]any{"cmd": "npm install"},
}
go func() {
select {
case <-time.After(b.toolSilence):
case <-ctx.Done():
// Watchdog cancelled us — propagate so the caller sees aborted.
resCh <- agent.Result{Status: "aborted", Error: ctx.Err().Error()}
close(msgCh)
close(resCh)
return
}
msgCh <- agent.Message{
Type: agent.MessageToolResult,
Tool: "Bash",
CallID: "call-1",
Output: "installed 142 packages",
}
msgCh <- agent.Message{Type: agent.MessageText, Content: "done"}
close(msgCh)
resCh <- agent.Result{Status: "completed", Output: "done"}
close(resCh)
}()
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_DoesNotFireDuringInFlightToolCall(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
// 50 ms window; tool stays silent for ~4× the window. Without the
// in-flight-tool gate, the watchdog would fire and the run would come
// back as idle_watchdog. With the gate, it must complete normally.
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
result, _, err := d.executeAndDrain(
context.Background(),
longToolCallBackend{toolSilence: 200 * time.Millisecond},
"p",
agent.ExecOptions{},
slog.Default(),
"t-long-tool",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status == "idle_watchdog" {
t.Fatalf("watchdog must not fire while a tool_use is in flight, got status=%q (err=%q)", result.Status, result.Error)
}
if result.Status != "completed" {
t.Fatalf("expected status=completed, got %q (err=%q)", result.Status, result.Error)
}
}
// tailIdleAfterToolBackend exercises the boundary case: a tool call completes,
// and THEN the backend goes silent without ever finishing. After the
// tool_result lands, in-flight count returns to zero and lastActivityAt is
// fresh; the watchdog should fire exactly one window later, not earlier.
type tailIdleAfterToolBackend struct{}
func (tailIdleAfterToolBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 4)
resCh := make(chan agent.Result)
msgCh <- agent.Message{Type: agent.MessageToolUse, Tool: "Bash", CallID: "c1"}
msgCh <- agent.Message{Type: agent.MessageToolResult, Tool: "Bash", CallID: "c1", Output: "ok"}
// Deliberately leave msgCh open and never write to resCh.
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_FiresAfterToolResultIfBackendStaysSilent(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
result, _, err := d.executeAndDrain(ctx, tailIdleAfterToolBackend{}, "p", agent.ExecOptions{}, slog.Default(), "t-tail-idle")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog after tool_result with no further activity, got %q (err=%q)", result.Status, result.Error)
}
}
func TestEnsureRepoReadyFastPathDoesNotRefresh(t *testing.T) {
t.Parallel()