mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-24 07:59:30 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/57
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
778906a5ee | ||
|
|
11cafdc911 |
@@ -20,6 +20,16 @@ const (
|
||||
DefaultHeartbeatInterval = 15 * time.Second
|
||||
DefaultAgentTimeout = 2 * time.Hour
|
||||
DefaultCodexSemanticInactivityTimeout = 10 * time.Minute
|
||||
// DefaultAgentIdleWatchdog is the per-task safety net that force-stops a
|
||||
// run when the backend has emitted no message for this long AND its
|
||||
// message queue is empty. Backends like Claude Code can hang indefinitely
|
||||
// on a stuck child process (e.g. `docker ps` against a frozen dockerd),
|
||||
// in which case `cmd.Wait()` never returns and the task sits at "running"
|
||||
// for its full DefaultAgentTimeout (2 h). 5 min is conservative enough to
|
||||
// avoid false positives during long tool calls but tight enough to keep
|
||||
// stuck runs out of the operator's hair. Set MULTICA_AGENT_IDLE_WATCHDOG=0
|
||||
// to disable.
|
||||
DefaultAgentIdleWatchdog = 5 * time.Minute
|
||||
DefaultRuntimeName = "Local Agent"
|
||||
DefaultWorkspaceSyncInterval = 30 * time.Second
|
||||
DefaultHealthPort = 19514
|
||||
@@ -66,6 +76,7 @@ type Config struct {
|
||||
HeartbeatInterval time.Duration
|
||||
AgentTimeout time.Duration
|
||||
CodexSemanticInactivityTimeout time.Duration
|
||||
AgentIdleWatchdog time.Duration // force-stop a run when the backend goes silent this long with an empty queue (0 = disabled)
|
||||
ClaudeArgs []string
|
||||
CodexArgs []string
|
||||
}
|
||||
@@ -240,6 +251,14 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
codexSemanticInactivityTimeout = overrides.CodexSemanticInactivityTimeout
|
||||
}
|
||||
|
||||
// MULTICA_AGENT_IDLE_WATCHDOG=0 disables the per-task idle watchdog. We
|
||||
// route 0 through durationFromEnv so the operator can opt out without
|
||||
// patching the binary; any positive duration overrides the 5-minute default.
|
||||
agentIdleWatchdog, err := durationFromEnv("MULTICA_AGENT_IDLE_WATCHDOG", DefaultAgentIdleWatchdog)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
maxConcurrentTasks, err := intFromEnv("MULTICA_DAEMON_MAX_CONCURRENT_TASKS", DefaultMaxConcurrentTasks)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
@@ -374,6 +393,7 @@ func LoadConfig(overrides Overrides) (Config, error) {
|
||||
HeartbeatInterval: heartbeatInterval,
|
||||
AgentTimeout: agentTimeout,
|
||||
CodexSemanticInactivityTimeout: codexSemanticInactivityTimeout,
|
||||
AgentIdleWatchdog: agentIdleWatchdog,
|
||||
ClaudeArgs: claudeArgs,
|
||||
CodexArgs: codexArgs,
|
||||
}, nil
|
||||
|
||||
@@ -2495,6 +2495,25 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
FailureReason: "timeout",
|
||||
Usage: usageEntries,
|
||||
}, nil
|
||||
case "idle_watchdog":
|
||||
// The idle watchdog force-stopped the run because the backend
|
||||
// went silent (e.g. claude blocked on a tool call against a
|
||||
// frozen child process). Route through the blocked path with a
|
||||
// dedicated failure_reason so the run leaves "running" state and
|
||||
// operators can tell idle-stop apart from a real timeout.
|
||||
comment := result.Error
|
||||
if comment == "" {
|
||||
comment = idleWatchdogReason(d.cfg.AgentIdleWatchdog)
|
||||
}
|
||||
return TaskResult{
|
||||
Status: "blocked",
|
||||
Comment: comment,
|
||||
SessionID: result.SessionID,
|
||||
WorkDir: env.WorkDir,
|
||||
EnvRoot: env.RootDir,
|
||||
FailureReason: "idle_watchdog",
|
||||
Usage: usageEntries,
|
||||
}, nil
|
||||
case "cancelled":
|
||||
// Server cancelled the task (e.g. issue reassignment, user cancel).
|
||||
// handleTask's cancelledByPoll branch already discards this result,
|
||||
@@ -2547,7 +2566,15 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
// executeAndDrain runs a backend, drains its message stream (forwarding to the
|
||||
// server), and waits for the final result.
|
||||
func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, prompt string, opts agent.ExecOptions, taskLog *slog.Logger, taskID string) (agent.Result, int32, error) {
|
||||
session, err := backend.Execute(ctx, prompt, opts)
|
||||
// Wrap the caller's ctx so the idle watchdog (below) can interrupt both
|
||||
// the agent subprocess (via the ctx passed to backend.Execute) AND the
|
||||
// drain loop with a single cancel. Without this layer the backend would
|
||||
// stay tied to the parent ctx and our cancellation could only abort
|
||||
// drain, leaving the subprocess running.
|
||||
agentCtx, agentCancel := context.WithCancel(ctx)
|
||||
defer agentCancel()
|
||||
|
||||
session, err := backend.Execute(agentCtx, prompt, opts)
|
||||
if err != nil {
|
||||
return agent.Result{}, 0, err
|
||||
}
|
||||
@@ -2560,10 +2587,29 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
if opts.Timeout == 0 {
|
||||
drainTimeout = 21 * time.Minute
|
||||
}
|
||||
drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout)
|
||||
drainCtx, drainCancel := context.WithTimeout(agentCtx, drainTimeout)
|
||||
defer drainCancel()
|
||||
|
||||
var toolCount atomic.Int32
|
||||
// lastActivityAt records (as unix nanos) when the drain loop most
|
||||
// recently received a message from the backend. The idle watchdog
|
||||
// reads this to decide whether the agent has gone silent for too long.
|
||||
// Initialise to the start so a backend that never emits a single
|
||||
// message also trips the watchdog.
|
||||
var lastActivityAt atomic.Int64
|
||||
lastActivityAt.Store(time.Now().UnixNano())
|
||||
// inFlightTools counts tool_use messages that haven't yet been paired
|
||||
// with a matching tool_result. A non-zero count means the agent is
|
||||
// legitimately waiting on a tool (e.g. `npm install`, `docker build`)
|
||||
// that may run far longer than the idle window without emitting any
|
||||
// message — so the watchdog must not interpret that silence as a hang.
|
||||
var inFlightTools atomic.Int32
|
||||
var idleWatchdogFired atomic.Bool
|
||||
idleWindow := d.cfg.AgentIdleWatchdog
|
||||
if idleWindow > 0 {
|
||||
go d.runIdleWatchdog(agentCtx, idleWindow, &lastActivityAt, &inFlightTools, &idleWatchdogFired, agentCancel, session.Messages, taskLog, taskID)
|
||||
}
|
||||
|
||||
go func() {
|
||||
var seq atomic.Int32
|
||||
var mu sync.Mutex
|
||||
@@ -2629,6 +2675,12 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
if !ok {
|
||||
goto drainDone
|
||||
}
|
||||
// Stamp activity as soon as a message lands. The idle
|
||||
// watchdog reads this to decide whether the backend has
|
||||
// gone silent — stamping before processing makes sure a
|
||||
// slow downstream call (mu.Lock contention, batch resize)
|
||||
// can't be misattributed to backend silence.
|
||||
lastActivityAt.Store(time.Now().UnixNano())
|
||||
switch msg.Type {
|
||||
case agent.MessageStatus:
|
||||
// Persist the session/work_dir as soon as the backend
|
||||
@@ -2648,6 +2700,7 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
}
|
||||
case agent.MessageToolUse:
|
||||
n := toolCount.Add(1)
|
||||
inFlightTools.Add(1)
|
||||
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
|
||||
if msg.CallID != "" {
|
||||
mu.Lock()
|
||||
@@ -2664,6 +2717,20 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
})
|
||||
mu.Unlock()
|
||||
case agent.MessageToolResult:
|
||||
// Decrement only when the count would stay >= 0. A stray
|
||||
// tool_result with no matching tool_use (backend bug or
|
||||
// reconnect mid-stream) shouldn't push the counter
|
||||
// negative — that would re-arm the watchdog one tool_use
|
||||
// too early on the next call.
|
||||
for {
|
||||
cur := inFlightTools.Load()
|
||||
if cur <= 0 {
|
||||
break
|
||||
}
|
||||
if inFlightTools.CompareAndSwap(cur, cur-1) {
|
||||
break
|
||||
}
|
||||
}
|
||||
s := seq.Add(1)
|
||||
output := msg.Output
|
||||
if len(output) > 8192 {
|
||||
@@ -2719,8 +2786,29 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
|
||||
select {
|
||||
case result := <-session.Result:
|
||||
if idleWatchdogFired.Load() {
|
||||
// The backend's wait goroutine (e.g. claude.go) translates the
|
||||
// SIGKILL we delivered via agentCancel into Status="aborted".
|
||||
// Re-tag it as "idle_watchdog" so runTask routes the
|
||||
// disposition through a dedicated failure_reason, not the
|
||||
// generic "agent_error" bucket the aborted path falls into.
|
||||
result.Status = "idle_watchdog"
|
||||
if result.Error == "" {
|
||||
result.Error = idleWatchdogReason(idleWindow)
|
||||
}
|
||||
}
|
||||
return result, toolCount.Load(), nil
|
||||
case <-drainCtx.Done():
|
||||
// Idle watchdog cancels via agentCancel(), which propagates here as
|
||||
// context.Canceled. Check this BEFORE the generic cancelled/timeout
|
||||
// classifiers so a watchdog-induced stop isn't misreported as
|
||||
// "task cancelled by server".
|
||||
if idleWatchdogFired.Load() {
|
||||
return agent.Result{
|
||||
Status: "idle_watchdog",
|
||||
Error: idleWatchdogReason(idleWindow),
|
||||
}, toolCount.Load(), nil
|
||||
}
|
||||
// Distinguish external cancellation (e.g. server-initiated cancel
|
||||
// because the issue was reassigned, or the user invoked CancelTask)
|
||||
// from genuine drain-deadline timeouts. context.Canceled means the
|
||||
@@ -2739,6 +2827,76 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
}
|
||||
}
|
||||
|
||||
// idleWatchdogReason formats the human-facing explanation surfaced on
|
||||
// idle_watchdog dispositions. Centralised so the result-arrival branch and the
|
||||
// drain-timeout branch in executeAndDrain emit identical wording.
|
||||
func idleWatchdogReason(window time.Duration) string {
|
||||
return fmt.Sprintf("agent produced no new messages for %s and message queue was empty; force-stopped by idle watchdog", window)
|
||||
}
|
||||
|
||||
// runIdleWatchdog ticks until either agentCtx is cancelled or the backend has
|
||||
// been silent for at least window with no in-flight tool call. On firing, it
|
||||
// sets fired and calls cancel, which propagates to the agent subprocess (via
|
||||
// the ctx passed to backend.Execute) and to drainCtx. The check requires:
|
||||
//
|
||||
// 1. inFlightTools == 0 — the backend has emitted a tool_use whose
|
||||
// matching tool_result hasn't arrived yet, meaning a real tool (e.g.
|
||||
// `npm install`, `docker build`) is legitimately running. Long tool
|
||||
// calls produce no messages between use and result; killing here would
|
||||
// yank the agent mid-build. AND
|
||||
// 2. time since lastActivityAt exceeds window — the drain loop is single
|
||||
// reader, so a stale stamp means no message has actually arrived; AND
|
||||
// 3. session.Messages buffer is empty — defensive against a hypothetical
|
||||
// drain stall where unprocessed messages would still imply progress.
|
||||
//
|
||||
// Tick interval is window/2 (floored at 30 s in production, but the floor only
|
||||
// kicks in for windows >= 1 min so tests can pass tiny windows like 50 ms and
|
||||
// see the watchdog fire within a few ticks).
|
||||
func (d *Daemon) runIdleWatchdog(agentCtx context.Context, window time.Duration, lastActivityAt *atomic.Int64, inFlightTools *atomic.Int32, fired *atomic.Bool, cancel context.CancelFunc, messages <-chan agent.Message, taskLog *slog.Logger, taskID string) {
|
||||
interval := window / 2
|
||||
if window >= time.Minute && interval < 30*time.Second {
|
||||
interval = 30 * time.Second
|
||||
}
|
||||
if interval <= 0 {
|
||||
interval = window
|
||||
}
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-agentCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// In-flight tool call: the agent has emitted tool_use and
|
||||
// the corresponding tool_result hasn't landed yet. A long
|
||||
// build/install/test can sit here silently for many minutes
|
||||
// — that is forward progress, not a hang.
|
||||
if inFlightTools.Load() > 0 {
|
||||
continue
|
||||
}
|
||||
last := time.Unix(0, lastActivityAt.Load())
|
||||
idleFor := time.Since(last)
|
||||
if idleFor < window {
|
||||
continue
|
||||
}
|
||||
// A buffered-but-undrained message means the drain loop is
|
||||
// behind, not the backend. Wait one more tick rather than
|
||||
// killing a backend that is still producing output.
|
||||
if len(messages) > 0 {
|
||||
continue
|
||||
}
|
||||
taskLog.Warn("idle watchdog firing: no agent activity, force-stopping run",
|
||||
"task", shortID(taskID),
|
||||
"idle_for", idleFor.Round(time.Second).String(),
|
||||
"threshold", window.String(),
|
||||
)
|
||||
fired.Store(true)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mergeUsage(a, b map[string]agent.TokenUsage) map[string]agent.TokenUsage {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
|
||||
@@ -1094,6 +1094,230 @@ func TestExecuteAndDrain_ContextCancelled_ReportsCancelled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// idleWatchdogBackend simulates the MUL-2225 hang: emit one message to mark
|
||||
// activity, then go silent forever. With a short AgentIdleWatchdog, the
|
||||
// watchdog should fire and short-circuit executeAndDrain instead of waiting
|
||||
// for the full drainTimeout (which is ~21 minutes by default).
|
||||
type idleWatchdogBackend struct {
|
||||
emitOne bool // when true, emit one message before going silent; when false, never emit anything
|
||||
}
|
||||
|
||||
func (b idleWatchdogBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
|
||||
msgCh := make(chan agent.Message, 1)
|
||||
resCh := make(chan agent.Result)
|
||||
if b.emitOne {
|
||||
msgCh <- agent.Message{Type: agent.MessageText, Content: "hello"}
|
||||
}
|
||||
// Deliberately do NOT close msgCh and never write to resCh — this models
|
||||
// a backend whose subprocess is hung and will never naturally complete.
|
||||
return &agent.Session{Messages: msgCh, Result: resCh}, nil
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_FiresOnInactivity(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
start := time.Now()
|
||||
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status != "idle_watchdog" {
|
||||
t.Fatalf("expected status=idle_watchdog, got %q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
if !strings.Contains(result.Error, "idle watchdog") {
|
||||
t.Fatalf("expected error to mention idle watchdog, got %q", result.Error)
|
||||
}
|
||||
// The watchdog should fire within a few ticks (interval = window/2 with
|
||||
// no floor for sub-minute windows). 5× window is generous and keeps the
|
||||
// test from racing in slow CI.
|
||||
if elapsed := time.Since(start); elapsed > 5*d.cfg.AgentIdleWatchdog {
|
||||
t.Fatalf("watchdog took too long to fire: %s (window=%s)", elapsed, d.cfg.AgentIdleWatchdog)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_FiresWhenNoMessageEverArrives(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// emitOne=false models a backend that hangs before sending any message.
|
||||
// lastActivityAt is initialised at executeAndDrain entry, so the same
|
||||
// window applies even with zero traffic.
|
||||
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: false}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-zero")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status != "idle_watchdog" {
|
||||
t.Fatalf("expected status=idle_watchdog when backend never emits, got %q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_DisabledWhenZero(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
// Default zero value — watchdog disabled. Without a parent cancel the
|
||||
// blockingBackend would otherwise hang the test, so we cancel after a
|
||||
// short delay to confirm the run does NOT terminate as idle_watchdog.
|
||||
d.cfg.AgentIdleWatchdog = 0
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
time.AfterFunc(80*time.Millisecond, cancel)
|
||||
|
||||
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-off")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status == "idle_watchdog" {
|
||||
t.Fatalf("watchdog should not fire when AgentIdleWatchdog=0, got status=%q", result.Status)
|
||||
}
|
||||
if result.Status != "cancelled" {
|
||||
t.Fatalf("expected status=cancelled (parent ctx fired), got %q", result.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_HappyPathDoesNotFire(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
d.cfg.AgentIdleWatchdog = 200 * time.Millisecond
|
||||
|
||||
// fakeBackend completes immediately with a normal result, well inside the
|
||||
// idle window. The watchdog must not corrupt the disposition.
|
||||
fb := &fakeBackend{
|
||||
results: []agent.Result{
|
||||
{Status: "completed", Output: "done"},
|
||||
},
|
||||
}
|
||||
|
||||
result, _, err := d.executeAndDrain(context.Background(), fb, "p", agent.ExecOptions{}, slog.Default(), "t-idle-happy")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status != "completed" {
|
||||
t.Fatalf("expected status=completed on happy path, got %q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
if result.Output != "done" {
|
||||
t.Fatalf("expected output preserved, got %q", result.Output)
|
||||
}
|
||||
}
|
||||
|
||||
// longToolCallBackend simulates a legitimate long-running tool call (e.g.
|
||||
// `npm install`, `docker build`, full test suite). The backend emits a
|
||||
// tool_use, stays silent past the idle window while the tool runs, then emits
|
||||
// a tool_result and completes. This is the false-positive case the watchdog
|
||||
// must NOT misfire on: an in-flight tool call is forward progress, not a hang.
|
||||
type longToolCallBackend struct {
|
||||
toolSilence time.Duration // how long to stay silent between tool_use and tool_result
|
||||
}
|
||||
|
||||
func (b longToolCallBackend) Execute(ctx context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
|
||||
msgCh := make(chan agent.Message, 4)
|
||||
resCh := make(chan agent.Result, 1)
|
||||
|
||||
msgCh <- agent.Message{
|
||||
Type: agent.MessageToolUse,
|
||||
Tool: "Bash",
|
||||
CallID: "call-1",
|
||||
Input: map[string]any{"cmd": "npm install"},
|
||||
}
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-time.After(b.toolSilence):
|
||||
case <-ctx.Done():
|
||||
// Watchdog cancelled us — propagate so the caller sees aborted.
|
||||
resCh <- agent.Result{Status: "aborted", Error: ctx.Err().Error()}
|
||||
close(msgCh)
|
||||
close(resCh)
|
||||
return
|
||||
}
|
||||
msgCh <- agent.Message{
|
||||
Type: agent.MessageToolResult,
|
||||
Tool: "Bash",
|
||||
CallID: "call-1",
|
||||
Output: "installed 142 packages",
|
||||
}
|
||||
msgCh <- agent.Message{Type: agent.MessageText, Content: "done"}
|
||||
close(msgCh)
|
||||
resCh <- agent.Result{Status: "completed", Output: "done"}
|
||||
close(resCh)
|
||||
}()
|
||||
|
||||
return &agent.Session{Messages: msgCh, Result: resCh}, nil
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_DoesNotFireDuringInFlightToolCall(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
// 50 ms window; tool stays silent for ~4× the window. Without the
|
||||
// in-flight-tool gate, the watchdog would fire and the run would come
|
||||
// back as idle_watchdog. With the gate, it must complete normally.
|
||||
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
|
||||
|
||||
result, _, err := d.executeAndDrain(
|
||||
context.Background(),
|
||||
longToolCallBackend{toolSilence: 200 * time.Millisecond},
|
||||
"p",
|
||||
agent.ExecOptions{},
|
||||
slog.Default(),
|
||||
"t-long-tool",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status == "idle_watchdog" {
|
||||
t.Fatalf("watchdog must not fire while a tool_use is in flight, got status=%q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
if result.Status != "completed" {
|
||||
t.Fatalf("expected status=completed, got %q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
}
|
||||
|
||||
// tailIdleAfterToolBackend exercises the boundary case: a tool call completes,
|
||||
// and THEN the backend goes silent without ever finishing. After the
|
||||
// tool_result lands, in-flight count returns to zero and lastActivityAt is
|
||||
// fresh; the watchdog should fire exactly one window later, not earlier.
|
||||
type tailIdleAfterToolBackend struct{}
|
||||
|
||||
func (tailIdleAfterToolBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
|
||||
msgCh := make(chan agent.Message, 4)
|
||||
resCh := make(chan agent.Result)
|
||||
msgCh <- agent.Message{Type: agent.MessageToolUse, Tool: "Bash", CallID: "c1"}
|
||||
msgCh <- agent.Message{Type: agent.MessageToolResult, Tool: "Bash", CallID: "c1", Output: "ok"}
|
||||
// Deliberately leave msgCh open and never write to resCh.
|
||||
return &agent.Session{Messages: msgCh, Result: resCh}, nil
|
||||
}
|
||||
|
||||
func TestExecuteAndDrain_IdleWatchdog_FiresAfterToolResultIfBackendStaysSilent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newTestDaemon(t)
|
||||
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
result, _, err := d.executeAndDrain(ctx, tailIdleAfterToolBackend{}, "p", agent.ExecOptions{}, slog.Default(), "t-tail-idle")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.Status != "idle_watchdog" {
|
||||
t.Fatalf("expected status=idle_watchdog after tool_result with no further activity, got %q (err=%q)", result.Status, result.Error)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureRepoReadyFastPathDoesNotRefresh(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user