fix: timeout stalled Codex turns (#1730)

* fix: timeout stalled codex turns

* fix: count codex progress events as activity
This commit is contained in:
dyjxg4xygary
2026-04-27 06:23:31 -04:00
committed by GitHub
parent 4c81fbed2b
commit 6bd5bbad9c
9 changed files with 542 additions and 122 deletions

View File

@@ -166,6 +166,7 @@ Daemon behavior is configured via flags or environment variables:
| Poll interval | `--poll-interval` | `MULTICA_DAEMON_POLL_INTERVAL` | `3s` |
| Heartbeat interval | `--heartbeat-interval` | `MULTICA_DAEMON_HEARTBEAT_INTERVAL` | `15s` |
| Agent timeout | `--agent-timeout` | `MULTICA_AGENT_TIMEOUT` | `2h` |
| Codex semantic inactivity timeout | `--codex-semantic-inactivity-timeout` | `MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT` | `10m` |
| Max concurrent tasks | `--max-concurrent-tasks` | `MULTICA_DAEMON_MAX_CONCURRENT_TASKS` | `20` |
| Daemon ID | `--daemon-id` | `MULTICA_DAEMON_ID` | hostname |
| Device name | `--device-name` | `MULTICA_DAEMON_DEVICE_NAME` | hostname |

View File

@@ -65,6 +65,7 @@ func init() {
f.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)")
f.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)")
f.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)")
f.Duration("codex-semantic-inactivity-timeout", 0, "Codex semantic inactivity timeout (env: MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT)")
f.Int("max-concurrent-tasks", 0, "Max tasks running in parallel (env: MULTICA_DAEMON_MAX_CONCURRENT_TASKS)")
daemonLogsCmd.Flags().BoolP("follow", "f", false, "Follow log output")
@@ -81,6 +82,7 @@ func init() {
rf.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)")
rf.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)")
rf.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)")
rf.Duration("codex-semantic-inactivity-timeout", 0, "Codex semantic inactivity timeout (env: MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT)")
rf.Int("max-concurrent-tasks", 0, "Max tasks running in parallel (env: MULTICA_DAEMON_MAX_CONCURRENT_TASKS)")
daemonCmd.AddCommand(daemonStartCmd)
@@ -259,6 +261,9 @@ func buildDaemonStartArgs(cmd *cobra.Command) []string {
if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 {
args = append(args, "--agent-timeout", d.String())
}
if d, _ := cmd.Flags().GetDuration("codex-semantic-inactivity-timeout"); d > 0 {
args = append(args, "--codex-semantic-inactivity-timeout", d.String())
}
if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 {
args = append(args, "--max-concurrent-tasks", strconv.Itoa(n))
}
@@ -300,6 +305,9 @@ func runDaemonForeground(cmd *cobra.Command) error {
if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 {
overrides.AgentTimeout = d
}
if d, _ := cmd.Flags().GetDuration("codex-semantic-inactivity-timeout"); d > 0 {
overrides.CodexSemanticInactivityTimeout = d
}
if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 {
overrides.MaxConcurrentTasks = n
}

View File

@@ -11,57 +11,60 @@ import (
)
const (
DefaultServerURL = "ws://localhost:8080/ws"
DefaultPollInterval = 3 * time.Second
DefaultHeartbeatInterval = 15 * time.Second
DefaultAgentTimeout = 2 * time.Hour
DefaultRuntimeName = "Local Agent"
DefaultWorkspaceSyncInterval = 30 * time.Second
DefaultHealthPort = 19514
DefaultMaxConcurrentTasks = 20
DefaultGCInterval = 1 * time.Hour
DefaultGCTTL = 24 * time.Hour // 1 day — AI-coding issues rarely stay open long
DefaultGCOrphanTTL = 72 * time.Hour // 3 daysorphans with no meta (crashes, pre-GC leftovers)
DefaultServerURL = "ws://localhost:8080/ws"
DefaultPollInterval = 3 * time.Second
DefaultHeartbeatInterval = 15 * time.Second
DefaultAgentTimeout = 2 * time.Hour
DefaultCodexSemanticInactivityTimeout = 10 * time.Minute
DefaultRuntimeName = "Local Agent"
DefaultWorkspaceSyncInterval = 30 * time.Second
DefaultHealthPort = 19514
DefaultMaxConcurrentTasks = 20
DefaultGCInterval = 1 * time.Hour
DefaultGCTTL = 24 * time.Hour // 1 day — AI-coding issues rarely stay open long
DefaultGCOrphanTTL = 72 * time.Hour // 3 days — orphans with no meta (crashes, pre-GC leftovers)
)
// Config holds all daemon configuration.
type Config struct {
ServerBaseURL string
DaemonID string
LegacyDaemonIDs []string // historical daemon_ids this machine may have registered under; reported at register time so the server can merge old runtime rows
DeviceName string
RuntimeName string
CLIVersion string // multica CLI version (e.g. "0.1.13")
LaunchedBy string // "desktop" when spawned by the Electron app, empty for standalone
Profile string // profile name (empty = default)
Agents map[string]AgentEntry // keyed by provider: claude, codex, copilot, opencode, openclaw, hermes, gemini, pi, cursor, kimi
WorkspacesRoot string // base path for execution envs (default: ~/multica_workspaces)
KeepEnvAfterTask bool // preserve env after task for debugging
HealthPort int // local HTTP port for health checks (default: 19514)
MaxConcurrentTasks int // max tasks running in parallel (default: 20)
GCEnabled bool // enable periodic workspace garbage collection (default: true)
GCInterval time.Duration // how often the GC loop runs (default: 1h)
GCTTL time.Duration // clean dirs whose issue is done/canceled and updated_at < now()-TTL (default: 24h)
GCOrphanTTL time.Duration // clean orphan dirs with no meta older than this (default: 72h). Dirs whose issue returned 404 are cleaned immediately.
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
ServerBaseURL string
DaemonID string
LegacyDaemonIDs []string // historical daemon_ids this machine may have registered under; reported at register time so the server can merge old runtime rows
DeviceName string
RuntimeName string
CLIVersion string // multica CLI version (e.g. "0.1.13")
LaunchedBy string // "desktop" when spawned by the Electron app, empty for standalone
Profile string // profile name (empty = default)
Agents map[string]AgentEntry // keyed by provider: claude, codex, copilot, opencode, openclaw, hermes, gemini, pi, cursor, kimi
WorkspacesRoot string // base path for execution envs (default: ~/multica_workspaces)
KeepEnvAfterTask bool // preserve env after task for debugging
HealthPort int // local HTTP port for health checks (default: 19514)
MaxConcurrentTasks int // max tasks running in parallel (default: 20)
GCEnabled bool // enable periodic workspace garbage collection (default: true)
GCInterval time.Duration // how often the GC loop runs (default: 1h)
GCTTL time.Duration // clean dirs whose issue is done/canceled and updated_at < now()-TTL (default: 24h)
GCOrphanTTL time.Duration // clean orphan dirs with no meta older than this (default: 72h). Dirs whose issue returned 404 are cleaned immediately.
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
CodexSemanticInactivityTimeout time.Duration
}
// Overrides allows CLI flags to override environment variables and defaults.
// Zero values are ignored and the env/default value is used instead.
type Overrides struct {
ServerURL string
WorkspacesRoot string
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
MaxConcurrentTasks int
DaemonID string
DeviceName string
RuntimeName string
Profile string // profile name (empty = default)
HealthPort int // health check port (0 = use default)
ServerURL string
WorkspacesRoot string
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
CodexSemanticInactivityTimeout time.Duration
MaxConcurrentTasks int
DaemonID string
DeviceName string
RuntimeName string
Profile string // profile name (empty = default)
HealthPort int // health check port (0 = use default)
}
// LoadConfig builds the daemon configuration from environment variables
@@ -184,6 +187,14 @@ func LoadConfig(overrides Overrides) (Config, error) {
agentTimeout = overrides.AgentTimeout
}
codexSemanticInactivityTimeout, err := durationFromEnv("MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT", DefaultCodexSemanticInactivityTimeout)
if err != nil {
return Config{}, err
}
if overrides.CodexSemanticInactivityTimeout > 0 {
codexSemanticInactivityTimeout = overrides.CodexSemanticInactivityTimeout
}
maxConcurrentTasks, err := intFromEnv("MULTICA_DAEMON_MAX_CONCURRENT_TASKS", DefaultMaxConcurrentTasks)
if err != nil {
return Config{}, err
@@ -289,24 +300,25 @@ func LoadConfig(overrides Overrides) (Config, error) {
}
return Config{
ServerBaseURL: serverBaseURL,
DaemonID: daemonID,
LegacyDaemonIDs: legacyDaemonIDs,
DeviceName: deviceName,
RuntimeName: runtimeName,
Profile: profile,
Agents: agents,
WorkspacesRoot: workspacesRoot,
KeepEnvAfterTask: keepEnv,
GCEnabled: gcEnabled,
GCInterval: gcInterval,
GCTTL: gcTTL,
GCOrphanTTL: gcOrphanTTL,
HealthPort: healthPort,
MaxConcurrentTasks: maxConcurrentTasks,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
AgentTimeout: agentTimeout,
ServerBaseURL: serverBaseURL,
DaemonID: daemonID,
LegacyDaemonIDs: legacyDaemonIDs,
DeviceName: deviceName,
RuntimeName: runtimeName,
Profile: profile,
Agents: agents,
WorkspacesRoot: workspacesRoot,
KeepEnvAfterTask: keepEnv,
GCEnabled: gcEnabled,
GCInterval: gcInterval,
GCTTL: gcTTL,
GCOrphanTTL: gcOrphanTTL,
HealthPort: healthPort,
MaxConcurrentTasks: maxConcurrentTasks,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
AgentTimeout: agentTimeout,
CodexSemanticInactivityTimeout: codexSemanticInactivityTimeout,
}, nil
}

View File

@@ -984,7 +984,11 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
// have built a real session before getting stuck (rate-limit, tool
// error, etc.) and we want the next chat turn to resume there
// rather than start over and "forget" the conversation.
if err := d.client.FailTask(ctx, task.ID, result.Comment, result.SessionID, result.WorkDir, "agent_error"); err != nil {
failureReason := result.FailureReason
if failureReason == "" {
failureReason = "agent_error"
}
if err := d.client.FailTask(ctx, task.ID, result.Comment, result.SessionID, result.WorkDir, failureReason); err != nil {
taskLog.Error("report blocked task failed", "error", err)
}
default:
@@ -1174,12 +1178,13 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
model = entry.Model
}
execOpts := agent.ExecOptions{
Cwd: env.WorkDir,
Model: model,
Timeout: d.cfg.AgentTimeout,
ResumeSessionID: task.PriorSessionID,
CustomArgs: customArgs,
McpConfig: mcpConfig,
Cwd: env.WorkDir,
Model: model,
Timeout: d.cfg.AgentTimeout,
SemanticInactivityTimeout: d.cfg.CodexSemanticInactivityTimeout,
ResumeSessionID: task.PriorSessionID,
CustomArgs: customArgs,
McpConfig: mcpConfig,
}
// openclaw loads its bootstrap files (AGENTS.md, SOUL.md, ...) from its own
// workspace dir rather than the task workdir, so the AGENTS.md written by
@@ -1264,13 +1269,18 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
// in sync even when the agent times out after building a session.
// We mark as "blocked" (not a hard error return) so handleTask
// goes through the FailTask path that forwards session info.
comment := result.Error
if comment == "" {
comment = fmt.Sprintf("%s timed out after %s", provider, d.cfg.AgentTimeout)
}
return TaskResult{
Status: "blocked",
Comment: fmt.Sprintf("%s timed out after %s", provider, d.cfg.AgentTimeout),
SessionID: result.SessionID,
WorkDir: env.WorkDir,
EnvRoot: env.RootDir,
Usage: usageEntries,
Status: "blocked",
Comment: comment,
SessionID: result.SessionID,
WorkDir: env.WorkDir,
EnvRoot: env.RootDir,
FailureReason: "timeout",
Usage: usageEntries,
}, nil
case "cancelled":
// Server cancelled the task (e.g. issue reassignment, user cancel).
@@ -1363,6 +1373,8 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := d.client.ReportTaskMessages(sendCtx, taskID, toSend); err != nil {
taskLog.Debug("failed to report task messages", "error", err)
} else {
taskLog.Debug("reported task messages", "count", len(toSend), "last_seq", toSend[len(toSend)-1].Seq)
}
cancel()
}
@@ -1436,6 +1448,7 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
toolName = callIDToTool[msg.CallID]
mu.Unlock()
}
taskLog.Info("tool_result observed", "seq", s, "tool", toolName, "call_id", msg.CallID)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),

View File

@@ -10,10 +10,12 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/multica-ai/multica/server/internal/daemon/repocache"
"github.com/multica-ai/multica/server/pkg/agent"
@@ -421,6 +423,97 @@ func TestExecuteAndDrain_NoRetryWhenSessionEstablished(t *testing.T) {
}
}
func TestExecuteAndDrain_CodexInactivityReportsToolResultTranscript(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := filepath.Join(t.TempDir(), "codex")
script := "#!/bin/sh\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'` + "\n" +
`read line` + "\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-drain"}}}'` + "\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-drain","turn":{"id":"turn-drain"}}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'` + "\n" +
`sleep 5` + "\n"
if err := os.WriteFile(fakePath, []byte(script), 0o755); err != nil {
t.Fatalf("write fake codex: %v", err)
}
if err := os.Chmod(fakePath, 0o755); err != nil {
t.Fatalf("chmod fake codex: %v", err)
}
var mu sync.Mutex
var reported []TaskMessageData
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/daemon/tasks/task-stale/messages" {
http.NotFound(w, r)
return
}
var body struct {
Messages []TaskMessageData `json:"messages"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("decode task messages: %v", err)
http.Error(w, "bad request", http.StatusBadRequest)
return
}
mu.Lock()
reported = append(reported, body.Messages...)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
backend, err := agent.New("codex", agent.Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new codex backend: %v", err)
}
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
result, tools, err := d.executeAndDrain(context.Background(), backend, "prompt", agent.ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 100 * time.Millisecond,
}, slog.Default(), "task-stale")
if err != nil {
t.Fatalf("executeAndDrain: %v", err)
}
if result.Status != "timeout" {
t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error)
}
if tools != 1 {
t.Fatalf("expected one tool use, got %d", tools)
}
deadline := time.Now().Add(2 * time.Second)
for {
mu.Lock()
var gotToolUse, gotToolResult bool
for _, msg := range reported {
if msg.Seq == 1 && msg.Type == "tool_use" && msg.Tool == "exec_command" {
gotToolUse = true
}
if msg.Seq == 2 && msg.Type == "tool_result" && msg.Tool == "exec_command" && msg.Output == "clean" {
gotToolResult = true
}
}
mu.Unlock()
if gotToolUse && gotToolResult {
return
}
if time.Now().After(deadline) {
mu.Lock()
defer mu.Unlock()
t.Fatalf("expected tool_use seq=1 and tool_result seq=2 in transcript, got %+v", reported)
}
time.Sleep(10 * time.Millisecond)
}
}
// blockingBackend returns a Session whose Result channel is never written to,
// so executeAndDrain can only exit via the drainCtx.Done() path.
type blockingBackend struct{}

View File

@@ -85,12 +85,13 @@ type TaskUsageEntry struct {
// TaskResult is the outcome of executing a task.
type TaskResult struct {
Status string `json:"status"`
Comment string `json:"comment"`
BranchName string `json:"branch_name,omitempty"`
EnvType string `json:"env_type,omitempty"`
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server)
Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage
Status string `json:"status"`
Comment string `json:"comment"`
BranchName string `json:"branch_name,omitempty"`
EnvType string `json:"env_type,omitempty"`
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server)
FailureReason string `json:"-"` // internal server failure classification
Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage
}

View File

@@ -22,14 +22,15 @@ type Backend interface {
// ExecOptions configures a single execution.
type ExecOptions struct {
Cwd string
Model string
SystemPrompt string
MaxTurns int
Timeout time.Duration
ResumeSessionID string // if non-empty, resume a previous agent session
CustomArgs []string // additional CLI arguments appended to the agent command
McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config
Cwd string
Model string
SystemPrompt string
MaxTurns int
Timeout time.Duration
SemanticInactivityTimeout time.Duration
ResumeSessionID string // if non-empty, resume a previous agent session
CustomArgs []string // additional CLI arguments appended to the agent command
McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config
}
// Session represents a running agent execution.

View File

@@ -25,7 +25,10 @@ var codexBlockedArgs = map[string]blockedArgMode{
// user supplied a custom_args flag that the `app-server` subcommand
// rejects). Kept as its own constant so bumping codex independently of
// other agents stays easy if codex starts shipping longer failure traces.
const codexStderrTailBytes = 2048
const (
codexStderrTailBytes = 2048
defaultCodexSemanticInactivityTimeout = 10 * time.Minute
)
// codexBackend implements Backend by spawning `codex app-server --listen stdio://`
// and communicating via JSON-RPC 2.0 over stdin/stdout.
@@ -46,6 +49,10 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
if timeout == 0 {
timeout = 20 * time.Minute
}
semanticInactivityTimeout := opts.SemanticInactivityTimeout
if semanticInactivityTimeout == 0 {
semanticInactivityTimeout = defaultCodexSemanticInactivityTimeout
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
codexArgs := append([]string{"app-server", "--listen", "stdio://"}, filterCustomArgs(opts.CustomArgs, codexBlockedArgs, b.cfg.Logger)...)
@@ -79,6 +86,7 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
semanticActivityCh := make(chan string, 256)
var outputMu sync.Mutex
var output strings.Builder
@@ -93,12 +101,18 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
pending: make(map[int]*pendingRPC),
notificationProtocol: "unknown",
onMessage: func(msg Message) {
logCodexAgentMessage(b.cfg.Logger, msg)
if msg.Type == MessageText {
outputMu.Lock()
output.WriteString(msg.Content)
outputMu.Unlock()
}
trySend(msgCh, msg)
trySendString(semanticActivityCh, describeCodexSemanticActivity(msg))
},
onSemanticActivity: func(description string) {
b.cfg.Logger.Debug("codex semantic activity observed", "activity", description)
trySendString(semanticActivityCh, description)
},
onTurnDone: func(aborted bool) {
select {
@@ -207,26 +221,51 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
return
}
// Wait for turn completion or context cancellation
select {
case aborted := <-turnDone:
switch {
case aborted:
finalStatus = "aborted"
finalError = "turn was aborted"
default:
if errMsg := c.getTurnError(); errMsg != "" {
finalStatus = "failed"
finalError = errMsg
lastSemanticActivity := time.Now()
lastSemanticActivityDescription := "turn/start"
semanticTimer := time.NewTimer(semanticInactivityTimeout)
defer semanticTimer.Stop()
waitingForTurn := true
for waitingForTurn {
select {
case aborted := <-turnDone:
waitingForTurn = false
switch {
case aborted:
finalStatus = "aborted"
finalError = "turn was aborted"
default:
if errMsg := c.getTurnError(); errMsg != "" {
finalStatus = "failed"
finalError = errMsg
}
}
}
case <-runCtx.Done():
if runCtx.Err() == context.DeadlineExceeded {
case activity := <-semanticActivityCh:
lastSemanticActivity = time.Now()
lastSemanticActivityDescription = activity
resetTimer(semanticTimer, semanticInactivityTimeout)
case <-semanticTimer.C:
waitingForTurn = false
finalStatus = "timeout"
finalError = fmt.Sprintf("codex timed out after %s", timeout)
} else {
finalStatus = "aborted"
finalError = "execution cancelled"
finalError = fmt.Sprintf("codex semantic inactivity timeout after %s without agent progress (last activity: %s)", semanticInactivityTimeout, lastSemanticActivityDescription)
b.cfg.Logger.Warn("codex semantic inactivity timeout",
"pid", cmd.Process.Pid,
"thread_id", threadID,
"turn_id", c.turnID,
"timeout", semanticInactivityTimeout.String(),
"last_activity", lastSemanticActivityDescription,
"idle_for", time.Since(lastSemanticActivity).Round(time.Millisecond).String(),
)
case <-runCtx.Done():
waitingForTurn = false
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("codex timed out after %s", timeout)
} else {
finalStatus = "aborted"
finalError = "execution cancelled"
}
}
}
@@ -337,18 +376,68 @@ func (c *codexClient) startOrResumeThread(ctx context.Context, opts ExecOptions,
return threadID, false, nil
}
func resetTimer(timer *time.Timer, d time.Duration) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(d)
}
func trySendString(ch chan<- string, value string) {
select {
case ch <- value:
default:
}
}
func logCodexAgentMessage(logger *slog.Logger, msg Message) {
if logger == nil {
return
}
attrs := []any{
"type", string(msg.Type),
"tool", msg.Tool,
"call_id", msg.CallID,
"status", msg.Status,
"content_len", len(msg.Content),
"output_len", len(msg.Output),
}
logger.Info("codex agent message received", attrs...)
if msg.Type == MessageToolResult {
logger.Info("codex tool_result observed", "tool", msg.Tool, "call_id", msg.CallID, "output_len", len(msg.Output))
}
}
func describeCodexSemanticActivity(msg Message) string {
switch msg.Type {
case MessageToolUse, MessageToolResult:
if msg.Tool != "" {
return fmt.Sprintf("%s:%s", msg.Type, msg.Tool)
}
case MessageStatus:
if msg.Status != "" {
return fmt.Sprintf("%s:%s", msg.Type, msg.Status)
}
}
return string(msg.Type)
}
// ── codexClient: JSON-RPC 2.0 transport ──
type codexClient struct {
cfg Config
stdin interface{ Write([]byte) (int, error) }
mu sync.Mutex
nextID int
pending map[int]*pendingRPC
threadID string
turnID string
onMessage func(Message)
onTurnDone func(aborted bool)
cfg Config
stdin interface{ Write([]byte) (int, error) }
mu sync.Mutex
nextID int
pending map[int]*pendingRPC
threadID string
turnID string
onMessage func(Message)
onSemanticActivity func(description string)
onTurnDone func(aborted bool)
notificationProtocol string // "unknown", "legacy", "raw"
turnStarted bool
@@ -416,6 +505,13 @@ func (c *codexClient) request(ctx context.Context, method string, params any) (j
c.mu.Unlock()
return nil, fmt.Errorf("write %s: %w", method, err)
}
if method == "turn/start" {
threadID := ""
if paramMap, ok := params.(map[string]any); ok {
threadID, _ = paramMap["threadId"].(string)
}
c.cfg.Logger.Info("codex turn/start sent", "request_id", id, "thread_id", threadID)
}
select {
case res := <-pr.ch:
@@ -666,6 +762,8 @@ func (c *codexClient) handleRawNotification(method string, params map[string]any
case "turn/completed":
turnID := extractNestedString(params, "turn", "id")
status := extractNestedString(params, "turn", "status")
threadID, _ := params["threadId"].(string)
c.cfg.Logger.Info("codex turn/completed received", "thread_id", threadID, "turn_id", turnID, "status", status)
aborted := status == "cancelled" || status == "canceled" ||
status == "aborted" || status == "interrupted"
@@ -730,13 +828,15 @@ func (c *codexClient) handleRawNotification(method string, params map[string]any
}
func (c *codexClient) handleItemNotification(method string, params map[string]any) {
item, ok := params["item"].(map[string]any)
if !ok {
return
}
item, _ := params["item"].(map[string]any)
itemType, _ := item["type"].(string)
itemID, _ := item["id"].(string)
if isCodexItemProgressActivity(method) && c.onSemanticActivity != nil {
c.onSemanticActivity(describeCodexItemProgressActivity(method, itemType, itemID))
}
if item == nil {
return
}
switch {
case method == "item/started" && itemType == "commandExecution":
@@ -793,6 +893,28 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an
}
}
func isCodexItemProgressActivity(method string) bool {
switch method {
case "item/agentMessage/delta",
"item/commandExecution/outputDelta",
"item/fileChange/outputDelta",
"item/mcpToolCall/progress":
return true
default:
return false
}
}
func describeCodexItemProgressActivity(method, itemType, itemID string) string {
if itemType == "" {
itemType = "unknown"
}
if itemID == "" {
return fmt.Sprintf("%s:%s", method, itemType)
}
return fmt.Sprintf("%s:%s:%s", method, itemType, itemID)
}
// extractUsageFromMap extracts token usage from a map that may contain
// "usage", "token_usage", or "tokens" fields. Handles various Codex formats.
func (c *codexClient) extractUsageFromMap(data map[string]any) {

View File

@@ -1011,6 +1011,175 @@ func TestCodexExecuteSurfacesStderrWhenChildExitsEarly(t *testing.T) {
}
}
func TestCodexExecuteTimesOutWhenTurnStopsAfterToolResult(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-stale"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-stale","turn":{"id":"turn-stale"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-stale","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-stale","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'`+"\n"+
`sleep 5`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 100 * time.Millisecond,
})
if result.Status != "timeout" {
t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error)
}
if !strings.Contains(result.Error, "semantic inactivity") {
t.Fatalf("expected semantic inactivity error, got %q", result.Error)
}
if result.SessionID != "thr-stale" {
t.Fatalf("expected session id to be preserved, got %q", result.SessionID)
}
}
func TestCodexExecuteSemanticInactivityAllowsContinuousMessages(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-progress"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-progress","turn":{"id":"turn-progress"}}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-progress","item":{"type":"agentMessage","id":"msg-1","text":"still working"}}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-progress","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"ok"}}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-progress","turn":{"id":"turn-progress","status":"completed"}}}'`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 90 * time.Millisecond,
})
if result.Status != "completed" {
t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error)
}
if !strings.Contains(result.Output, "still working") {
t.Fatalf("expected streamed text in output, got %q", result.Output)
}
}
func TestCodexExecuteSemanticInactivityAllowsContinuousDeltaProgress(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-delta"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-delta","turn":{"id":"turn-delta"}}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/commandExecution/outputDelta","params":{"threadId":"thr-delta","item":{"type":"commandExecution","id":"cmd-1"},"delta":"line 1\n"}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/agentMessage/delta","params":{"threadId":"thr-delta","item":{"type":"agentMessage","id":"msg-1"},"delta":"thinking"}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/fileChange/outputDelta","params":{"threadId":"thr-delta","item":{"type":"fileChange","id":"patch-1"},"delta":"patched"}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/mcpToolCall/progress","params":{"threadId":"thr-delta","item":{"type":"mcpToolCall","id":"mcp-1"},"progress":{"message":"still running"}}}'`+"\n"+
`sleep 0.05`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-delta","turn":{"id":"turn-delta","status":"completed"}}}'`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 150 * time.Millisecond,
})
if result.Status != "completed" {
t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error)
}
}
func TestCodexExecuteSemanticInactivityDoesNotAffectNormalTurnCompletion(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := writeFakeCodexAppServer(t, ""+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+
`read line`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-normal"}}}'`+"\n"+
`read line`+"\n"+
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-normal","turn":{"id":"turn-normal"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-normal","item":{"type":"agentMessage","id":"msg-1","text":"Done"}}}'`+"\n"+
`echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-normal","turn":{"id":"turn-normal","status":"completed"}}}'`+"\n")
result := executeFakeCodex(t, fakePath, ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 100 * time.Millisecond,
})
if result.Status != "completed" {
t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error)
}
if result.Output != "Done" {
t.Fatalf("expected output Done, got %q", result.Output)
}
}
func writeFakeCodexAppServer(t *testing.T, body string) string {
t.Helper()
fakePath := filepath.Join(t.TempDir(), "codex")
script := "#!/bin/sh\n" + body
writeTestExecutable(t, fakePath, []byte(script))
return fakePath
}
func executeFakeCodex(t *testing.T, fakePath string, opts ExecOptions) Result {
t.Helper()
backend, err := New("codex", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new codex backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt", opts)
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case result, ok := <-session.Result:
if !ok {
t.Fatal("result channel closed without a value")
}
return result
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
return Result{}
}
}
func TestWithAgentStderrAppendsHint(t *testing.T) {
t.Parallel()