From 60bae6262298dddd001135fb3c624cd33bc65a62 Mon Sep 17 00:00:00 2001 From: Bohan Jiang <52446949+Bohan-J@users.noreply.github.com> Date: Mon, 18 May 2026 15:14:45 +0800 Subject: [PATCH] feat(codex): add per-exec_command watchdog to escape dropped function_call_output (MUL-2337) (#2779) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(codex): add per-exec_command watchdog to escape dropped function_call_output (MUL-2337) Codex app-server can drop the second function_call_output when two exec_command calls fan out in the same turn and both async-yield through the yield_time_ms boundary (observed 2026-05-18, MUL-2334 — Trump Agent wedged for 6+ min with no semantic activity events to drive any existing timer). The model then waits forever for the missing output; only the 10-minute semantic inactivity timeout would eventually rescue the run. Add a per-call watchdog in the codex client that tracks open exec_command / commandExecution items by call_id and fails the turn quickly (default 2 min, configurable via ExecOptions.ExecCommandStuckTimeout) when one stays open without progress. outputDelta events reset the per-call progress timestamp so long-running streaming commands aren't flagged. This is a daemon-side mitigation only — codex itself still has the upstream race, but the daemon no longer burns the full inactivity budget before the run is marked failed and a new run can recover. Co-authored-by: multica-agent * feat(codex): track legacy exec_command_output_delta in watchdog (MUL-2337) Mirrors the raw v2 item/commandExecution/outputDelta refresh on the legacy codex/event protocol so a long-running streaming exec doesn't get falsely flagged as stuck after begin + 2 min. Co-authored-by: multica-agent --------- Co-authored-by: multica-agent --- server/pkg/agent/agent.go | 16 +++- server/pkg/agent/codex.go | 152 +++++++++++++++++++++++++++++++ server/pkg/agent/codex_test.go | 161 +++++++++++++++++++++++++++++++++ 3 files changed, 325 insertions(+), 4 deletions(-) diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index bdfd088a8..4c4ea86d2 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -31,10 +31,18 @@ type ExecOptions struct { MaxTurns int Timeout time.Duration SemanticInactivityTimeout time.Duration - ResumeSessionID string // if non-empty, resume a previous agent session - ExtraArgs []string // daemon-wide default CLI arguments appended before CustomArgs; currently read by claude and codex backends only - CustomArgs []string // per-agent CLI arguments appended after ExtraArgs - McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config + // ExecCommandStuckTimeout bounds how long a single exec_command (or v2 + // commandExecution item) may stay open without a matching end / progress + // event before the turn is declared stuck. Currently honoured by the codex + // backend, which has a known bug where two parallel exec_command calls + // racing through the yield_time_ms boundary can drop the second + // function_call_output, leaving the model waiting forever. Zero means use + // the backend default. + ExecCommandStuckTimeout time.Duration + ResumeSessionID string // if non-empty, resume a previous agent session + ExtraArgs []string // daemon-wide default CLI arguments appended before CustomArgs; currently read by claude and codex backends only + CustomArgs []string // per-agent CLI arguments appended after ExtraArgs + McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config } // Session represents a running agent execution. diff --git a/server/pkg/agent/codex.go b/server/pkg/agent/codex.go index 72d7a5691..32c09bf59 100644 --- a/server/pkg/agent/codex.go +++ b/server/pkg/agent/codex.go @@ -28,6 +28,21 @@ var codexBlockedArgs = map[string]blockedArgMode{ const ( codexStderrTailBytes = 2048 defaultCodexSemanticInactivityTimeout = 10 * time.Minute + + // defaultCodexExecStuckTimeout bounds how long a single exec_command may + // stay open without a matching end or output-delta event before we + // declare the turn stuck. Codex itself can drop the second + // function_call_output when two exec_command calls fan out in the same + // turn and both async-yield through the yield_time_ms boundary (observed + // 2026-05-18, MUL-2334). When that happens the model waits forever for + // the missing output; this watchdog ends the turn so the next run can + // recover instead of burning the overall semantic inactivity budget. + defaultCodexExecStuckTimeout = 2 * time.Minute + + // codexExecStuckCheckInterval is how often the lifecycle loop polls for + // stuck exec calls. Kept short so the watchdog fires close to the + // threshold rather than up to one full interval late. + codexExecStuckCheckInterval = 5 * time.Second ) // codexBackend implements Backend by spawning `codex app-server --listen stdio://` @@ -233,6 +248,23 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti semanticTimer := time.NewTimer(semanticInactivityTimeout) defer semanticTimer.Stop() + execStuckTimeout := opts.ExecCommandStuckTimeout + if execStuckTimeout == 0 { + execStuckTimeout = defaultCodexExecStuckTimeout + } + // Scale the polling cadence so tests with short thresholds don't have + // to wait up to 5 s for the first watchdog tick. Production thresholds + // stay on the codexExecStuckCheckInterval (5 s) baseline. + checkInterval := codexExecStuckCheckInterval + if execStuckTimeout < 2*checkInterval { + checkInterval = execStuckTimeout / 4 + if checkInterval < 10*time.Millisecond { + checkInterval = 10 * time.Millisecond + } + } + execStuckTicker := time.NewTicker(checkInterval) + defer execStuckTicker.Stop() + waitingForTurn := true for waitingForTurn { select { @@ -264,6 +296,24 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti "last_activity", lastSemanticActivityDescription, "idle_for", time.Since(lastSemanticActivity).Round(time.Millisecond).String(), ) + case <-execStuckTicker.C: + callID, info, ok := c.findStuckExec(execStuckTimeout) + if !ok { + continue + } + waitingForTurn = false + finalStatus = "timeout" + finalError = fmt.Sprintf("codex exec_command stuck for %s without progress (call_id=%s command=%s); likely a dropped function_call_output from app-server", time.Since(info.LastProgressAt).Round(time.Second), callID, truncate(info.Command, 120)) + b.cfg.Logger.Warn("codex exec_command stuck", + "pid", cmd.Process.Pid, + "thread_id", threadID, + "turn_id", c.turnID, + "call_id", callID, + "command", truncate(info.Command, 200), + "started_at", info.StartedAt.Format(time.RFC3339), + "stuck_for", time.Since(info.LastProgressAt).Round(time.Millisecond).String(), + "threshold", execStuckTimeout.String(), + ) case <-runCtx.Done(): waitingForTurn = false if runCtx.Err() == context.DeadlineExceeded { @@ -383,6 +433,21 @@ func (c *codexClient) startOrResumeThread(ctx context.Context, opts ExecOptions, return threadID, false, nil } +// truncate returns s shortened to at most n runes, suffixed with "…" when +// truncation actually happened. Used for embedding potentially large +// command strings into log fields and error messages without unbounded +// growth. +func truncate(s string, n int) string { + if n <= 0 { + return "" + } + runes := []rune(s) + if len(runes) <= n { + return s + } + return string(runes[:n]) + "…" +} + func resetTimer(timer *time.Timer, d time.Duration) { if !timer.Stop() { select { @@ -455,6 +520,80 @@ type codexClient struct { turnErrorMu sync.Mutex turnError string // captured from turn/completed status=failed or terminal error notifications + + // openExec tracks exec_command / commandExecution calls that have a + // begin event but no end event yet. The lifecycle goroutine polls + // findStuckExec to detect calls stuck past the watchdog threshold. + // outputDelta resets the LastProgressAt timestamp so long-running + // commands that keep streaming output are not falsely flagged. + execMu sync.Mutex + openExec map[string]openExecInfo +} + +type openExecInfo struct { + Command string + StartedAt time.Time + LastProgressAt time.Time +} + +func (c *codexClient) recordExecBegin(callID, command string) { + if callID == "" { + return + } + c.execMu.Lock() + defer c.execMu.Unlock() + if c.openExec == nil { + c.openExec = map[string]openExecInfo{} + } + now := time.Now() + c.openExec[callID] = openExecInfo{ + Command: command, + StartedAt: now, + LastProgressAt: now, + } +} + +func (c *codexClient) recordExecProgress(callID string) { + if callID == "" { + return + } + c.execMu.Lock() + defer c.execMu.Unlock() + if info, ok := c.openExec[callID]; ok { + info.LastProgressAt = time.Now() + c.openExec[callID] = info + } +} + +func (c *codexClient) recordExecEnd(callID string) { + if callID == "" { + return + } + c.execMu.Lock() + defer c.execMu.Unlock() + delete(c.openExec, callID) +} + +// findStuckExec returns the oldest exec call whose last progress event is +// older than threshold, or ok=false if nothing is stuck. "Oldest" is by +// LastProgressAt so the most-overdue call is surfaced first. +func (c *codexClient) findStuckExec(threshold time.Duration) (callID string, info openExecInfo, ok bool) { + c.execMu.Lock() + defer c.execMu.Unlock() + cutoff := time.Now().Add(-threshold) + var oldest time.Time + for id, ei := range c.openExec { + if ei.LastProgressAt.After(cutoff) { + continue + } + if !ok || ei.LastProgressAt.Before(oldest) { + callID = id + info = ei + oldest = ei.LastProgressAt + ok = true + } + } + return } func (c *codexClient) setTurnError(msg string) { @@ -709,6 +848,7 @@ func (c *codexClient) handleEvent(msg map[string]any) { case "exec_command_begin": callID, _ := msg["call_id"].(string) command, _ := msg["command"].(string) + c.recordExecBegin(callID, command) if c.onMessage != nil { c.onMessage(Message{ Type: MessageToolUse, @@ -717,9 +857,16 @@ func (c *codexClient) handleEvent(msg map[string]any) { Input: map[string]any{"command": command}, }) } + case "exec_command_output_delta": + // Legacy peer of item/commandExecution/outputDelta. Refresh the + // watchdog so long-running streaming commands don't get flagged as + // stuck. We don't emit a Message here — raw v2 doesn't either. + callID, _ := msg["call_id"].(string) + c.recordExecProgress(callID) case "exec_command_end": callID, _ := msg["call_id"].(string) output, _ := msg["output"].(string) + c.recordExecEnd(callID) if c.onMessage != nil { c.onMessage(Message{ Type: MessageToolResult, @@ -865,6 +1012,7 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an switch { case method == "item/started" && itemType == "commandExecution": command, _ := item["command"].(string) + c.recordExecBegin(itemID, command) if c.onMessage != nil { c.onMessage(Message{ Type: MessageToolUse, @@ -874,8 +1022,12 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an }) } + case method == "item/commandExecution/outputDelta": + c.recordExecProgress(itemID) + case method == "item/completed" && itemType == "commandExecution": output, _ := item["aggregatedOutput"].(string) + c.recordExecEnd(itemID) if c.onMessage != nil { c.onMessage(Message{ Type: MessageToolResult, diff --git a/server/pkg/agent/codex_test.go b/server/pkg/agent/codex_test.go index 7e6d88a63..d8217262a 100644 --- a/server/pkg/agent/codex_test.go +++ b/server/pkg/agent/codex_test.go @@ -295,6 +295,50 @@ func TestCodexLegacyEventExecCommand(t *testing.T) { } } +// TestCodexLegacyEventOutputDeltaResetsExecProgress confirms that the +// legacy exec_command_output_delta event refreshes openExec[callID]. +// LastProgressAt so the watchdog won't flag a long-running streaming +// command as stuck. The raw v2 path covers item/commandExecution/outputDelta +// — this is the legacy peer. +func TestCodexLegacyEventOutputDeltaResetsExecProgress(t *testing.T) { + t.Parallel() + + c, _, _ := newTestCodexClient(t) + + c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_begin","call_id":"c1","command":"long task"}}}`) + + c.execMu.Lock() + startInfo, ok := c.openExec["c1"] + c.execMu.Unlock() + if !ok { + t.Fatal("expected openExec entry after exec_command_begin") + } + + // Wait long enough that a delta-driven refresh is observable. + time.Sleep(20 * time.Millisecond) + + c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_output_delta","call_id":"c1","stream":"stdout","chunk":"tick"}}}`) + + c.execMu.Lock() + progressedInfo, ok := c.openExec["c1"] + c.execMu.Unlock() + if !ok { + t.Fatal("openExec entry should still be present after output_delta") + } + if !progressedInfo.LastProgressAt.After(startInfo.LastProgressAt) { + t.Fatalf("expected LastProgressAt to advance after output_delta, got start=%s progressed=%s", startInfo.LastProgressAt, progressedInfo.LastProgressAt) + } + + c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_end","call_id":"c1","output":"done"}}}`) + + c.execMu.Lock() + _, stillOpen := c.openExec["c1"] + c.execMu.Unlock() + if stillOpen { + t.Fatal("openExec entry should be cleared after exec_command_end") + } +} + func TestCodexLegacyEventTaskComplete(t *testing.T) { t.Parallel() @@ -1206,6 +1250,123 @@ func TestCodexExecuteSemanticInactivityDoesNotAffectNormalTurnCompletion(t *test } } +// TestCodexExecuteWatchdogFiresWhenExecCommandHangs reproduces the MUL-2337 +// scenario: codex emits two parallel commandExecution begin events but only +// one completes — the second function_call_output is lost upstream. The +// per-call exec watchdog must end the turn quickly rather than burning the +// full semantic inactivity budget. +func TestCodexExecuteWatchdogFiresWhenExecCommandHangs(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-hang"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-hang","turn":{"id":"turn-hang"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-a","command":"multica issue get"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-b","command":"multica issue comment list"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-a","aggregatedOutput":"ok"}}}'`+"\n"+ + `sleep 5`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 3 * time.Second, + ExecCommandStuckTimeout: 150 * time.Millisecond, + }) + if result.Status != "timeout" { + t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error) + } + if !strings.Contains(result.Error, "exec_command stuck") { + t.Fatalf("expected exec_command stuck error, got %q", result.Error) + } + if !strings.Contains(result.Error, "cmd-b") { + t.Fatalf("expected stuck call_id cmd-b in error, got %q", result.Error) + } + if !strings.Contains(result.Error, "multica issue comment list") { + t.Fatalf("expected stuck command text in error, got %q", result.Error) + } +} + +// TestCodexExecuteWatchdogResetsOnOutputDelta covers a long-running exec +// that keeps streaming output. The watchdog should not flag it as stuck +// even when total elapsed exceeds the threshold. +func TestCodexExecuteWatchdogResetsOnOutputDelta(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-stream"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-stream","turn":{"id":"turn-stream"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream","command":"long task"}}}'`+"\n"+ + `for i in 1 2 3 4 5 6; do sleep 0.08; echo '{"jsonrpc":"2.0","method":"item/commandExecution/outputDelta","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream"},"delta":"tick"}}'; done`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream","aggregatedOutput":"done"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-stream","turn":{"id":"turn-stream","status":"completed"}}}'`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 3 * time.Second, + ExecCommandStuckTimeout: 200 * time.Millisecond, + }) + if result.Status != "completed" { + t.Fatalf("expected completed (outputDelta resets the watchdog), got status=%q error=%q", result.Status, result.Error) + } +} + +// TestCodexExecuteWatchdogIgnoresClosedExecs guards against a regression +// where the watchdog would still hold state for a completed exec call, +// causing false positives once the next quiet period exceeded the +// threshold. After item/completed the openExec map should be empty. +func TestCodexExecuteWatchdogIgnoresClosedExecs(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-clean"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-clean","turn":{"id":"turn-clean"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-clean","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-clean","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'`+"\n"+ + `sleep 5`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 200 * time.Millisecond, + // Exec stuck threshold sits well above SemanticInactivityTimeout so + // the semantic timer always fires first when nothing is genuinely + // stuck. Keeping a healthy gap prevents test flakes when the reader + // goroutine is scheduled late under parallel test load. + ExecCommandStuckTimeout: 2 * time.Second, + }) + if result.Status != "timeout" { + t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error) + } + if !strings.Contains(result.Error, "semantic inactivity") { + t.Fatalf("expected semantic inactivity to fire (not exec stuck), got %q", result.Error) + } +} + func writeFakeCodexAppServer(t *testing.T, body string) string { t.Helper() fakePath := filepath.Join(t.TempDir(), "codex")