feat(codex): add per-exec_command watchdog to escape dropped function_call_output (MUL-2337) (#2779)

* feat(codex): add per-exec_command watchdog to escape dropped function_call_output (MUL-2337)

Codex app-server can drop the second function_call_output when two
exec_command calls fan out in the same turn and both async-yield through
the yield_time_ms boundary (observed 2026-05-18, MUL-2334 — Trump Agent
wedged for 6+ min with no semantic activity events to drive any existing
timer). The model then waits forever for the missing output; only the
10-minute semantic inactivity timeout would eventually rescue the run.

Add a per-call watchdog in the codex client that tracks open
exec_command / commandExecution items by call_id and fails the turn
quickly (default 2 min, configurable via ExecOptions.ExecCommandStuckTimeout)
when one stays open without progress. outputDelta events reset the
per-call progress timestamp so long-running streaming commands aren't
flagged.

This is a daemon-side mitigation only — codex itself still has the
upstream race, but the daemon no longer burns the full inactivity budget
before the run is marked failed and a new run can recover.

Co-authored-by: multica-agent <github@multica.ai>

* feat(codex): track legacy exec_command_output_delta in watchdog (MUL-2337)

Mirrors the raw v2 item/commandExecution/outputDelta refresh on the legacy
codex/event protocol so a long-running streaming exec doesn't get falsely
flagged as stuck after begin + 2 min.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Bohan Jiang
2026-05-18 15:14:45 +08:00
committed by GitHub
parent c328c402d8
commit 60bae62622
3 changed files with 325 additions and 4 deletions

View File

@@ -31,10 +31,18 @@ type ExecOptions struct {
MaxTurns int
Timeout time.Duration
SemanticInactivityTimeout time.Duration
ResumeSessionID string // if non-empty, resume a previous agent session
ExtraArgs []string // daemon-wide default CLI arguments appended before CustomArgs; currently read by claude and codex backends only
CustomArgs []string // per-agent CLI arguments appended after ExtraArgs
McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config
// ExecCommandStuckTimeout bounds how long a single exec_command (or v2
// commandExecution item) may stay open without a matching end / progress
// event before the turn is declared stuck. Currently honoured by the codex
// backend, which has a known bug where two parallel exec_command calls
// racing through the yield_time_ms boundary can drop the second
// function_call_output, leaving the model waiting forever. Zero means use
// the backend default.
ExecCommandStuckTimeout time.Duration
ResumeSessionID string // if non-empty, resume a previous agent session
ExtraArgs []string // daemon-wide default CLI arguments appended before CustomArgs; currently read by claude and codex backends only
CustomArgs []string // per-agent CLI arguments appended after ExtraArgs
McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config
}
// Session represents a running agent execution.

View File

@@ -28,6 +28,21 @@ var codexBlockedArgs = map[string]blockedArgMode{
const (
codexStderrTailBytes = 2048
defaultCodexSemanticInactivityTimeout = 10 * time.Minute
// defaultCodexExecStuckTimeout bounds how long a single exec_command may
// stay open without a matching end or output-delta event before we
// declare the turn stuck. Codex itself can drop the second
// function_call_output when two exec_command calls fan out in the same
// turn and both async-yield through the yield_time_ms boundary (observed
// 2026-05-18, MUL-2334). When that happens the model waits forever for
// the missing output; this watchdog ends the turn so the next run can
// recover instead of burning the overall semantic inactivity budget.
defaultCodexExecStuckTimeout = 2 * time.Minute
// codexExecStuckCheckInterval is how often the lifecycle loop polls for
// stuck exec calls. Kept short so the watchdog fires close to the
// threshold rather than up to one full interval late.
codexExecStuckCheckInterval = 5 * time.Second
)
// codexBackend implements Backend by spawning `codex app-server --listen stdio://`
@@ -233,6 +248,23 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
semanticTimer := time.NewTimer(semanticInactivityTimeout)
defer semanticTimer.Stop()
execStuckTimeout := opts.ExecCommandStuckTimeout
if execStuckTimeout == 0 {
execStuckTimeout = defaultCodexExecStuckTimeout
}
// Scale the polling cadence so tests with short thresholds don't have
// to wait up to 5 s for the first watchdog tick. Production thresholds
// stay on the codexExecStuckCheckInterval (5 s) baseline.
checkInterval := codexExecStuckCheckInterval
if execStuckTimeout < 2*checkInterval {
checkInterval = execStuckTimeout / 4
if checkInterval < 10*time.Millisecond {
checkInterval = 10 * time.Millisecond
}
}
execStuckTicker := time.NewTicker(checkInterval)
defer execStuckTicker.Stop()
waitingForTurn := true
for waitingForTurn {
select {
@@ -264,6 +296,24 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
"last_activity", lastSemanticActivityDescription,
"idle_for", time.Since(lastSemanticActivity).Round(time.Millisecond).String(),
)
case <-execStuckTicker.C:
callID, info, ok := c.findStuckExec(execStuckTimeout)
if !ok {
continue
}
waitingForTurn = false
finalStatus = "timeout"
finalError = fmt.Sprintf("codex exec_command stuck for %s without progress (call_id=%s command=%s); likely a dropped function_call_output from app-server", time.Since(info.LastProgressAt).Round(time.Second), callID, truncate(info.Command, 120))
b.cfg.Logger.Warn("codex exec_command stuck",
"pid", cmd.Process.Pid,
"thread_id", threadID,
"turn_id", c.turnID,
"call_id", callID,
"command", truncate(info.Command, 200),
"started_at", info.StartedAt.Format(time.RFC3339),
"stuck_for", time.Since(info.LastProgressAt).Round(time.Millisecond).String(),
"threshold", execStuckTimeout.String(),
)
case <-runCtx.Done():
waitingForTurn = false
if runCtx.Err() == context.DeadlineExceeded {
@@ -383,6 +433,21 @@ func (c *codexClient) startOrResumeThread(ctx context.Context, opts ExecOptions,
return threadID, false, nil
}
// truncate returns s shortened to at most n runes, suffixed with "…" when
// truncation actually happened. Used for embedding potentially large
// command strings into log fields and error messages without unbounded
// growth.
func truncate(s string, n int) string {
if n <= 0 {
return ""
}
runes := []rune(s)
if len(runes) <= n {
return s
}
return string(runes[:n]) + "…"
}
func resetTimer(timer *time.Timer, d time.Duration) {
if !timer.Stop() {
select {
@@ -455,6 +520,80 @@ type codexClient struct {
turnErrorMu sync.Mutex
turnError string // captured from turn/completed status=failed or terminal error notifications
// openExec tracks exec_command / commandExecution calls that have a
// begin event but no end event yet. The lifecycle goroutine polls
// findStuckExec to detect calls stuck past the watchdog threshold.
// outputDelta resets the LastProgressAt timestamp so long-running
// commands that keep streaming output are not falsely flagged.
execMu sync.Mutex
openExec map[string]openExecInfo
}
type openExecInfo struct {
Command string
StartedAt time.Time
LastProgressAt time.Time
}
func (c *codexClient) recordExecBegin(callID, command string) {
if callID == "" {
return
}
c.execMu.Lock()
defer c.execMu.Unlock()
if c.openExec == nil {
c.openExec = map[string]openExecInfo{}
}
now := time.Now()
c.openExec[callID] = openExecInfo{
Command: command,
StartedAt: now,
LastProgressAt: now,
}
}
func (c *codexClient) recordExecProgress(callID string) {
if callID == "" {
return
}
c.execMu.Lock()
defer c.execMu.Unlock()
if info, ok := c.openExec[callID]; ok {
info.LastProgressAt = time.Now()
c.openExec[callID] = info
}
}
func (c *codexClient) recordExecEnd(callID string) {
if callID == "" {
return
}
c.execMu.Lock()
defer c.execMu.Unlock()
delete(c.openExec, callID)
}
// findStuckExec returns the oldest exec call whose last progress event is
// older than threshold, or ok=false if nothing is stuck. "Oldest" is by
// LastProgressAt so the most-overdue call is surfaced first.
func (c *codexClient) findStuckExec(threshold time.Duration) (callID string, info openExecInfo, ok bool) {
c.execMu.Lock()
defer c.execMu.Unlock()
cutoff := time.Now().Add(-threshold)
var oldest time.Time
for id, ei := range c.openExec {
if ei.LastProgressAt.After(cutoff) {
continue
}
if !ok || ei.LastProgressAt.Before(oldest) {
callID = id
info = ei
oldest = ei.LastProgressAt
ok = true
}
}
return
}
func (c *codexClient) setTurnError(msg string) {
@@ -709,6 +848,7 @@ func (c *codexClient) handleEvent(msg map[string]any) {
case "exec_command_begin":
callID, _ := msg["call_id"].(string)
command, _ := msg["command"].(string)
c.recordExecBegin(callID, command)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
@@ -717,9 +857,16 @@ func (c *codexClient) handleEvent(msg map[string]any) {
Input: map[string]any{"command": command},
})
}
case "exec_command_output_delta":
// Legacy peer of item/commandExecution/outputDelta. Refresh the
// watchdog so long-running streaming commands don't get flagged as
// stuck. We don't emit a Message here — raw v2 doesn't either.
callID, _ := msg["call_id"].(string)
c.recordExecProgress(callID)
case "exec_command_end":
callID, _ := msg["call_id"].(string)
output, _ := msg["output"].(string)
c.recordExecEnd(callID)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
@@ -865,6 +1012,7 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an
switch {
case method == "item/started" && itemType == "commandExecution":
command, _ := item["command"].(string)
c.recordExecBegin(itemID, command)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
@@ -874,8 +1022,12 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an
})
}
case method == "item/commandExecution/outputDelta":
c.recordExecProgress(itemID)
case method == "item/completed" && itemType == "commandExecution":
output, _ := item["aggregatedOutput"].(string)
c.recordExecEnd(itemID)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,

View File

@@ -295,6 +295,50 @@ func TestCodexLegacyEventExecCommand(t *testing.T) {
}
}
// TestCodexLegacyEventOutputDeltaResetsExecProgress confirms that the
// legacy exec_command_output_delta event refreshes openExec[callID].
// LastProgressAt so the watchdog won't flag a long-running streaming
// command as stuck. The raw v2 path covers item/commandExecution/outputDelta
// — this is the legacy peer.
func TestCodexLegacyEventOutputDeltaResetsExecProgress(t *testing.T) {
t.Parallel()
c, _, _ := newTestCodexClient(t)
c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_begin","call_id":"c1","command":"long task"}}}`)
c.execMu.Lock()
startInfo, ok := c.openExec["c1"]
c.execMu.Unlock()
if !ok {
t.Fatal("expected openExec entry after exec_command_begin")
}
// Wait long enough that a delta-driven refresh is observable.
time.Sleep(20 * time.Millisecond)
c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_output_delta","call_id":"c1","stream":"stdout","chunk":"tick"}}}`)
c.execMu.Lock()
progressedInfo, ok := c.openExec["c1"]
c.execMu.Unlock()
if !ok {
t.Fatal("openExec entry should still be present after output_delta")
}
if !progressedInfo.LastProgressAt.After(startInfo.LastProgressAt) {
t.Fatalf("expected LastProgressAt to advance after output_delta, got start=%s progressed=%s", startInfo.LastProgressAt, progressedInfo.LastProgressAt)
}
c.handleLine(`{"jsonrpc":"2.0","method":"codex/event","params":{"msg":{"type":"exec_command_end","call_id":"c1","output":"done"}}}`)
c.execMu.Lock()
_, stillOpen := c.openExec["c1"]
c.execMu.Unlock()
if stillOpen {
t.Fatal("openExec entry should be cleared after exec_command_end")
}
}
func TestCodexLegacyEventTaskComplete(t *testing.T) {
t.Parallel()
@@ -1206,6 +1250,123 @@ func TestCodexExecuteSemanticInactivityDoesNotAffectNormalTurnCompletion(t *test
}
}
// TestCodexExecuteWatchdogFiresWhenExecCommandHangs reproduces the MUL-2337
// scenario: codex emits two parallel commandExecution begin events but only
// one completes — the second function_call_output is lost upstream. The
// per-call exec watchdog must end the turn quickly rather than burning the
// full semantic inactivity budget.
func TestCodexExecuteWatchdogFiresWhenExecCommandHangs(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-hang"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-hang","turn":{"id":"turn-hang"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-a","command":"multica issue get"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-b","command":"multica issue comment list"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-hang","item":{"type":"commandExecution","id":"cmd-a","aggregatedOutput":"ok"}}}'`+"\n"+
`sleep 5`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 3 * time.Second,
ExecCommandStuckTimeout: 150 * time.Millisecond,
})
if result.Status != "timeout" {
t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error)
}
if !strings.Contains(result.Error, "exec_command stuck") {
t.Fatalf("expected exec_command stuck error, got %q", result.Error)
}
if !strings.Contains(result.Error, "cmd-b") {
t.Fatalf("expected stuck call_id cmd-b in error, got %q", result.Error)
}
if !strings.Contains(result.Error, "multica issue comment list") {
t.Fatalf("expected stuck command text in error, got %q", result.Error)
}
}
// TestCodexExecuteWatchdogResetsOnOutputDelta covers a long-running exec
// that keeps streaming output. The watchdog should not flag it as stuck
// even when total elapsed exceeds the threshold.
func TestCodexExecuteWatchdogResetsOnOutputDelta(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-stream"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-stream","turn":{"id":"turn-stream"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream","command":"long task"}}}'`+"\n"+
`for i in 1 2 3 4 5 6; do sleep 0.08; echo '{"jsonrpc":"2.0","method":"item/commandExecution/outputDelta","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream"},"delta":"tick"}}'; done`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-stream","item":{"type":"commandExecution","id":"cmd-stream","aggregatedOutput":"done"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-stream","turn":{"id":"turn-stream","status":"completed"}}}'`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 3 * time.Second,
ExecCommandStuckTimeout: 200 * time.Millisecond,
})
if result.Status != "completed" {
t.Fatalf("expected completed (outputDelta resets the watchdog), got status=%q error=%q", result.Status, result.Error)
}
}
// TestCodexExecuteWatchdogIgnoresClosedExecs guards against a regression
// where the watchdog would still hold state for a completed exec call,
// causing false positives once the next quiet period exceeded the
// threshold. After item/completed the openExec map should be empty.
func TestCodexExecuteWatchdogIgnoresClosedExecs(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-clean"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-clean","turn":{"id":"turn-clean"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-clean","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-clean","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'`+"\n"+
`sleep 5`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 200 * time.Millisecond,
// Exec stuck threshold sits well above SemanticInactivityTimeout so
// the semantic timer always fires first when nothing is genuinely
// stuck. Keeping a healthy gap prevents test flakes when the reader
// goroutine is scheduled late under parallel test load.
ExecCommandStuckTimeout: 2 * time.Second,
})
if result.Status != "timeout" {
t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error)
}
if !strings.Contains(result.Error, "semantic inactivity") {
t.Fatalf("expected semantic inactivity to fire (not exec stuck), got %q", result.Error)
}
}
func writeFakeCodexAppServer(t *testing.T, body string) string {
t.Helper()
fakePath := filepath.Join(t.TempDir(), "codex")