diff --git a/CLI_AND_DAEMON.md b/CLI_AND_DAEMON.md index 2144d3cbf..c8c43fb72 100644 --- a/CLI_AND_DAEMON.md +++ b/CLI_AND_DAEMON.md @@ -166,6 +166,7 @@ Daemon behavior is configured via flags or environment variables: | Poll interval | `--poll-interval` | `MULTICA_DAEMON_POLL_INTERVAL` | `3s` | | Heartbeat interval | `--heartbeat-interval` | `MULTICA_DAEMON_HEARTBEAT_INTERVAL` | `15s` | | Agent timeout | `--agent-timeout` | `MULTICA_AGENT_TIMEOUT` | `2h` | +| Codex semantic inactivity timeout | `--codex-semantic-inactivity-timeout` | `MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT` | `10m` | | Max concurrent tasks | `--max-concurrent-tasks` | `MULTICA_DAEMON_MAX_CONCURRENT_TASKS` | `20` | | Daemon ID | `--daemon-id` | `MULTICA_DAEMON_ID` | hostname | | Device name | `--device-name` | `MULTICA_DAEMON_DEVICE_NAME` | hostname | diff --git a/server/cmd/multica/cmd_daemon.go b/server/cmd/multica/cmd_daemon.go index fc5fbd82a..604bb200a 100644 --- a/server/cmd/multica/cmd_daemon.go +++ b/server/cmd/multica/cmd_daemon.go @@ -65,6 +65,7 @@ func init() { f.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)") f.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)") f.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)") + f.Duration("codex-semantic-inactivity-timeout", 0, "Codex semantic inactivity timeout (env: MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT)") f.Int("max-concurrent-tasks", 0, "Max tasks running in parallel (env: MULTICA_DAEMON_MAX_CONCURRENT_TASKS)") daemonLogsCmd.Flags().BoolP("follow", "f", false, "Follow log output") @@ -81,6 +82,7 @@ func init() { rf.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)") rf.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)") rf.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)") + rf.Duration("codex-semantic-inactivity-timeout", 0, "Codex semantic inactivity timeout (env: MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT)") rf.Int("max-concurrent-tasks", 0, "Max tasks running in parallel (env: MULTICA_DAEMON_MAX_CONCURRENT_TASKS)") daemonCmd.AddCommand(daemonStartCmd) @@ -259,6 +261,9 @@ func buildDaemonStartArgs(cmd *cobra.Command) []string { if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 { args = append(args, "--agent-timeout", d.String()) } + if d, _ := cmd.Flags().GetDuration("codex-semantic-inactivity-timeout"); d > 0 { + args = append(args, "--codex-semantic-inactivity-timeout", d.String()) + } if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 { args = append(args, "--max-concurrent-tasks", strconv.Itoa(n)) } @@ -300,6 +305,9 @@ func runDaemonForeground(cmd *cobra.Command) error { if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 { overrides.AgentTimeout = d } + if d, _ := cmd.Flags().GetDuration("codex-semantic-inactivity-timeout"); d > 0 { + overrides.CodexSemanticInactivityTimeout = d + } if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 { overrides.MaxConcurrentTasks = n } diff --git a/server/internal/daemon/config.go b/server/internal/daemon/config.go index fbce1698b..e52ebf386 100644 --- a/server/internal/daemon/config.go +++ b/server/internal/daemon/config.go @@ -11,57 +11,60 @@ import ( ) const ( - DefaultServerURL = "ws://localhost:8080/ws" - DefaultPollInterval = 3 * time.Second - DefaultHeartbeatInterval = 15 * time.Second - DefaultAgentTimeout = 2 * time.Hour - DefaultRuntimeName = "Local Agent" - DefaultWorkspaceSyncInterval = 30 * time.Second - DefaultHealthPort = 19514 - DefaultMaxConcurrentTasks = 20 - DefaultGCInterval = 1 * time.Hour - DefaultGCTTL = 24 * time.Hour // 1 day — AI-coding issues rarely stay open long - DefaultGCOrphanTTL = 72 * time.Hour // 3 days — orphans with no meta (crashes, pre-GC leftovers) + DefaultServerURL = "ws://localhost:8080/ws" + DefaultPollInterval = 3 * time.Second + DefaultHeartbeatInterval = 15 * time.Second + DefaultAgentTimeout = 2 * time.Hour + DefaultCodexSemanticInactivityTimeout = 10 * time.Minute + DefaultRuntimeName = "Local Agent" + DefaultWorkspaceSyncInterval = 30 * time.Second + DefaultHealthPort = 19514 + DefaultMaxConcurrentTasks = 20 + DefaultGCInterval = 1 * time.Hour + DefaultGCTTL = 24 * time.Hour // 1 day — AI-coding issues rarely stay open long + DefaultGCOrphanTTL = 72 * time.Hour // 3 days — orphans with no meta (crashes, pre-GC leftovers) ) // Config holds all daemon configuration. type Config struct { - ServerBaseURL string - DaemonID string - LegacyDaemonIDs []string // historical daemon_ids this machine may have registered under; reported at register time so the server can merge old runtime rows - DeviceName string - RuntimeName string - CLIVersion string // multica CLI version (e.g. "0.1.13") - LaunchedBy string // "desktop" when spawned by the Electron app, empty for standalone - Profile string // profile name (empty = default) - Agents map[string]AgentEntry // keyed by provider: claude, codex, copilot, opencode, openclaw, hermes, gemini, pi, cursor, kimi - WorkspacesRoot string // base path for execution envs (default: ~/multica_workspaces) - KeepEnvAfterTask bool // preserve env after task for debugging - HealthPort int // local HTTP port for health checks (default: 19514) - MaxConcurrentTasks int // max tasks running in parallel (default: 20) - GCEnabled bool // enable periodic workspace garbage collection (default: true) - GCInterval time.Duration // how often the GC loop runs (default: 1h) - GCTTL time.Duration // clean dirs whose issue is done/canceled and updated_at < now()-TTL (default: 24h) - GCOrphanTTL time.Duration // clean orphan dirs with no meta older than this (default: 72h). Dirs whose issue returned 404 are cleaned immediately. - PollInterval time.Duration - HeartbeatInterval time.Duration - AgentTimeout time.Duration + ServerBaseURL string + DaemonID string + LegacyDaemonIDs []string // historical daemon_ids this machine may have registered under; reported at register time so the server can merge old runtime rows + DeviceName string + RuntimeName string + CLIVersion string // multica CLI version (e.g. "0.1.13") + LaunchedBy string // "desktop" when spawned by the Electron app, empty for standalone + Profile string // profile name (empty = default) + Agents map[string]AgentEntry // keyed by provider: claude, codex, copilot, opencode, openclaw, hermes, gemini, pi, cursor, kimi + WorkspacesRoot string // base path for execution envs (default: ~/multica_workspaces) + KeepEnvAfterTask bool // preserve env after task for debugging + HealthPort int // local HTTP port for health checks (default: 19514) + MaxConcurrentTasks int // max tasks running in parallel (default: 20) + GCEnabled bool // enable periodic workspace garbage collection (default: true) + GCInterval time.Duration // how often the GC loop runs (default: 1h) + GCTTL time.Duration // clean dirs whose issue is done/canceled and updated_at < now()-TTL (default: 24h) + GCOrphanTTL time.Duration // clean orphan dirs with no meta older than this (default: 72h). Dirs whose issue returned 404 are cleaned immediately. + PollInterval time.Duration + HeartbeatInterval time.Duration + AgentTimeout time.Duration + CodexSemanticInactivityTimeout time.Duration } // Overrides allows CLI flags to override environment variables and defaults. // Zero values are ignored and the env/default value is used instead. type Overrides struct { - ServerURL string - WorkspacesRoot string - PollInterval time.Duration - HeartbeatInterval time.Duration - AgentTimeout time.Duration - MaxConcurrentTasks int - DaemonID string - DeviceName string - RuntimeName string - Profile string // profile name (empty = default) - HealthPort int // health check port (0 = use default) + ServerURL string + WorkspacesRoot string + PollInterval time.Duration + HeartbeatInterval time.Duration + AgentTimeout time.Duration + CodexSemanticInactivityTimeout time.Duration + MaxConcurrentTasks int + DaemonID string + DeviceName string + RuntimeName string + Profile string // profile name (empty = default) + HealthPort int // health check port (0 = use default) } // LoadConfig builds the daemon configuration from environment variables @@ -184,6 +187,14 @@ func LoadConfig(overrides Overrides) (Config, error) { agentTimeout = overrides.AgentTimeout } + codexSemanticInactivityTimeout, err := durationFromEnv("MULTICA_CODEX_SEMANTIC_INACTIVITY_TIMEOUT", DefaultCodexSemanticInactivityTimeout) + if err != nil { + return Config{}, err + } + if overrides.CodexSemanticInactivityTimeout > 0 { + codexSemanticInactivityTimeout = overrides.CodexSemanticInactivityTimeout + } + maxConcurrentTasks, err := intFromEnv("MULTICA_DAEMON_MAX_CONCURRENT_TASKS", DefaultMaxConcurrentTasks) if err != nil { return Config{}, err @@ -289,24 +300,25 @@ func LoadConfig(overrides Overrides) (Config, error) { } return Config{ - ServerBaseURL: serverBaseURL, - DaemonID: daemonID, - LegacyDaemonIDs: legacyDaemonIDs, - DeviceName: deviceName, - RuntimeName: runtimeName, - Profile: profile, - Agents: agents, - WorkspacesRoot: workspacesRoot, - KeepEnvAfterTask: keepEnv, - GCEnabled: gcEnabled, - GCInterval: gcInterval, - GCTTL: gcTTL, - GCOrphanTTL: gcOrphanTTL, - HealthPort: healthPort, - MaxConcurrentTasks: maxConcurrentTasks, - PollInterval: pollInterval, - HeartbeatInterval: heartbeatInterval, - AgentTimeout: agentTimeout, + ServerBaseURL: serverBaseURL, + DaemonID: daemonID, + LegacyDaemonIDs: legacyDaemonIDs, + DeviceName: deviceName, + RuntimeName: runtimeName, + Profile: profile, + Agents: agents, + WorkspacesRoot: workspacesRoot, + KeepEnvAfterTask: keepEnv, + GCEnabled: gcEnabled, + GCInterval: gcInterval, + GCTTL: gcTTL, + GCOrphanTTL: gcOrphanTTL, + HealthPort: healthPort, + MaxConcurrentTasks: maxConcurrentTasks, + PollInterval: pollInterval, + HeartbeatInterval: heartbeatInterval, + AgentTimeout: agentTimeout, + CodexSemanticInactivityTimeout: codexSemanticInactivityTimeout, }, nil } diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index c076e81d9..0d5460a4c 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -984,7 +984,11 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) { // have built a real session before getting stuck (rate-limit, tool // error, etc.) and we want the next chat turn to resume there // rather than start over and "forget" the conversation. - if err := d.client.FailTask(ctx, task.ID, result.Comment, result.SessionID, result.WorkDir, "agent_error"); err != nil { + failureReason := result.FailureReason + if failureReason == "" { + failureReason = "agent_error" + } + if err := d.client.FailTask(ctx, task.ID, result.Comment, result.SessionID, result.WorkDir, failureReason); err != nil { taskLog.Error("report blocked task failed", "error", err) } default: @@ -1174,12 +1178,13 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo model = entry.Model } execOpts := agent.ExecOptions{ - Cwd: env.WorkDir, - Model: model, - Timeout: d.cfg.AgentTimeout, - ResumeSessionID: task.PriorSessionID, - CustomArgs: customArgs, - McpConfig: mcpConfig, + Cwd: env.WorkDir, + Model: model, + Timeout: d.cfg.AgentTimeout, + SemanticInactivityTimeout: d.cfg.CodexSemanticInactivityTimeout, + ResumeSessionID: task.PriorSessionID, + CustomArgs: customArgs, + McpConfig: mcpConfig, } // openclaw loads its bootstrap files (AGENTS.md, SOUL.md, ...) from its own // workspace dir rather than the task workdir, so the AGENTS.md written by @@ -1264,13 +1269,18 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo // in sync even when the agent times out after building a session. // We mark as "blocked" (not a hard error return) so handleTask // goes through the FailTask path that forwards session info. + comment := result.Error + if comment == "" { + comment = fmt.Sprintf("%s timed out after %s", provider, d.cfg.AgentTimeout) + } return TaskResult{ - Status: "blocked", - Comment: fmt.Sprintf("%s timed out after %s", provider, d.cfg.AgentTimeout), - SessionID: result.SessionID, - WorkDir: env.WorkDir, - EnvRoot: env.RootDir, - Usage: usageEntries, + Status: "blocked", + Comment: comment, + SessionID: result.SessionID, + WorkDir: env.WorkDir, + EnvRoot: env.RootDir, + FailureReason: "timeout", + Usage: usageEntries, }, nil case "cancelled": // Server cancelled the task (e.g. issue reassignment, user cancel). @@ -1363,6 +1373,8 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := d.client.ReportTaskMessages(sendCtx, taskID, toSend); err != nil { taskLog.Debug("failed to report task messages", "error", err) + } else { + taskLog.Debug("reported task messages", "count", len(toSend), "last_seq", toSend[len(toSend)-1].Seq) } cancel() } @@ -1436,6 +1448,7 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro toolName = callIDToTool[msg.CallID] mu.Unlock() } + taskLog.Info("tool_result observed", "seq", s, "tool", toolName, "call_id", msg.CallID) mu.Lock() batch = append(batch, TaskMessageData{ Seq: int(s), diff --git a/server/internal/daemon/daemon_test.go b/server/internal/daemon/daemon_test.go index 9e8ce1093..2495fdd85 100644 --- a/server/internal/daemon/daemon_test.go +++ b/server/internal/daemon/daemon_test.go @@ -10,10 +10,12 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strings" "sync" "sync/atomic" "testing" + "time" "github.com/multica-ai/multica/server/internal/daemon/repocache" "github.com/multica-ai/multica/server/pkg/agent" @@ -421,6 +423,97 @@ func TestExecuteAndDrain_NoRetryWhenSessionEstablished(t *testing.T) { } } +func TestExecuteAndDrain_CodexInactivityReportsToolResultTranscript(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := filepath.Join(t.TempDir(), "codex") + script := "#!/bin/sh\n" + + `read line` + "\n" + + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'` + "\n" + + `read line` + "\n" + + `read line` + "\n" + + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-drain"}}}'` + "\n" + + `read line` + "\n" + + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'` + "\n" + + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-drain","turn":{"id":"turn-drain"}}}'` + "\n" + + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'` + "\n" + + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'` + "\n" + + `sleep 5` + "\n" + if err := os.WriteFile(fakePath, []byte(script), 0o755); err != nil { + t.Fatalf("write fake codex: %v", err) + } + if err := os.Chmod(fakePath, 0o755); err != nil { + t.Fatalf("chmod fake codex: %v", err) + } + + var mu sync.Mutex + var reported []TaskMessageData + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/daemon/tasks/task-stale/messages" { + http.NotFound(w, r) + return + } + var body struct { + Messages []TaskMessageData `json:"messages"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Errorf("decode task messages: %v", err) + http.Error(w, "bad request", http.StatusBadRequest) + return + } + mu.Lock() + reported = append(reported, body.Messages...) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + backend, err := agent.New("codex", agent.Config{ExecutablePath: fakePath, Logger: slog.Default()}) + if err != nil { + t.Fatalf("new codex backend: %v", err) + } + d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()} + result, tools, err := d.executeAndDrain(context.Background(), backend, "prompt", agent.ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 100 * time.Millisecond, + }, slog.Default(), "task-stale") + if err != nil { + t.Fatalf("executeAndDrain: %v", err) + } + if result.Status != "timeout" { + t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error) + } + if tools != 1 { + t.Fatalf("expected one tool use, got %d", tools) + } + + deadline := time.Now().Add(2 * time.Second) + for { + mu.Lock() + var gotToolUse, gotToolResult bool + for _, msg := range reported { + if msg.Seq == 1 && msg.Type == "tool_use" && msg.Tool == "exec_command" { + gotToolUse = true + } + if msg.Seq == 2 && msg.Type == "tool_result" && msg.Tool == "exec_command" && msg.Output == "clean" { + gotToolResult = true + } + } + mu.Unlock() + if gotToolUse && gotToolResult { + return + } + if time.Now().After(deadline) { + mu.Lock() + defer mu.Unlock() + t.Fatalf("expected tool_use seq=1 and tool_result seq=2 in transcript, got %+v", reported) + } + time.Sleep(10 * time.Millisecond) + } +} + // blockingBackend returns a Session whose Result channel is never written to, // so executeAndDrain can only exit via the drainCtx.Done() path. type blockingBackend struct{} diff --git a/server/internal/daemon/types.go b/server/internal/daemon/types.go index dba4a9f0e..2c64e9cae 100644 --- a/server/internal/daemon/types.go +++ b/server/internal/daemon/types.go @@ -85,12 +85,13 @@ type TaskUsageEntry struct { // TaskResult is the outcome of executing a task. type TaskResult struct { - Status string `json:"status"` - Comment string `json:"comment"` - BranchName string `json:"branch_name,omitempty"` - EnvType string `json:"env_type,omitempty"` - SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption - WorkDir string `json:"work_dir,omitempty"` // working directory used during execution - EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server) - Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage + Status string `json:"status"` + Comment string `json:"comment"` + BranchName string `json:"branch_name,omitempty"` + EnvType string `json:"env_type,omitempty"` + SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption + WorkDir string `json:"work_dir,omitempty"` // working directory used during execution + EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server) + FailureReason string `json:"-"` // internal server failure classification + Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage } diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index 842e616f2..4c4256c12 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -22,14 +22,15 @@ type Backend interface { // ExecOptions configures a single execution. type ExecOptions struct { - Cwd string - Model string - SystemPrompt string - MaxTurns int - Timeout time.Duration - ResumeSessionID string // if non-empty, resume a previous agent session - CustomArgs []string // additional CLI arguments appended to the agent command - McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config + Cwd string + Model string + SystemPrompt string + MaxTurns int + Timeout time.Duration + SemanticInactivityTimeout time.Duration + ResumeSessionID string // if non-empty, resume a previous agent session + CustomArgs []string // additional CLI arguments appended to the agent command + McpConfig json.RawMessage // if non-nil, MCP server config to pass via --mcp-config } // Session represents a running agent execution. diff --git a/server/pkg/agent/codex.go b/server/pkg/agent/codex.go index d430bd89e..55165b101 100644 --- a/server/pkg/agent/codex.go +++ b/server/pkg/agent/codex.go @@ -25,7 +25,10 @@ var codexBlockedArgs = map[string]blockedArgMode{ // user supplied a custom_args flag that the `app-server` subcommand // rejects). Kept as its own constant so bumping codex independently of // other agents stays easy if codex starts shipping longer failure traces. -const codexStderrTailBytes = 2048 +const ( + codexStderrTailBytes = 2048 + defaultCodexSemanticInactivityTimeout = 10 * time.Minute +) // codexBackend implements Backend by spawning `codex app-server --listen stdio://` // and communicating via JSON-RPC 2.0 over stdin/stdout. @@ -46,6 +49,10 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti if timeout == 0 { timeout = 20 * time.Minute } + semanticInactivityTimeout := opts.SemanticInactivityTimeout + if semanticInactivityTimeout == 0 { + semanticInactivityTimeout = defaultCodexSemanticInactivityTimeout + } runCtx, cancel := context.WithTimeout(ctx, timeout) codexArgs := append([]string{"app-server", "--listen", "stdio://"}, filterCustomArgs(opts.CustomArgs, codexBlockedArgs, b.cfg.Logger)...) @@ -79,6 +86,7 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti msgCh := make(chan Message, 256) resCh := make(chan Result, 1) + semanticActivityCh := make(chan string, 256) var outputMu sync.Mutex var output strings.Builder @@ -93,12 +101,18 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti pending: make(map[int]*pendingRPC), notificationProtocol: "unknown", onMessage: func(msg Message) { + logCodexAgentMessage(b.cfg.Logger, msg) if msg.Type == MessageText { outputMu.Lock() output.WriteString(msg.Content) outputMu.Unlock() } trySend(msgCh, msg) + trySendString(semanticActivityCh, describeCodexSemanticActivity(msg)) + }, + onSemanticActivity: func(description string) { + b.cfg.Logger.Debug("codex semantic activity observed", "activity", description) + trySendString(semanticActivityCh, description) }, onTurnDone: func(aborted bool) { select { @@ -207,26 +221,51 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti return } - // Wait for turn completion or context cancellation - select { - case aborted := <-turnDone: - switch { - case aborted: - finalStatus = "aborted" - finalError = "turn was aborted" - default: - if errMsg := c.getTurnError(); errMsg != "" { - finalStatus = "failed" - finalError = errMsg + lastSemanticActivity := time.Now() + lastSemanticActivityDescription := "turn/start" + semanticTimer := time.NewTimer(semanticInactivityTimeout) + defer semanticTimer.Stop() + + waitingForTurn := true + for waitingForTurn { + select { + case aborted := <-turnDone: + waitingForTurn = false + switch { + case aborted: + finalStatus = "aborted" + finalError = "turn was aborted" + default: + if errMsg := c.getTurnError(); errMsg != "" { + finalStatus = "failed" + finalError = errMsg + } } - } - case <-runCtx.Done(): - if runCtx.Err() == context.DeadlineExceeded { + case activity := <-semanticActivityCh: + lastSemanticActivity = time.Now() + lastSemanticActivityDescription = activity + resetTimer(semanticTimer, semanticInactivityTimeout) + case <-semanticTimer.C: + waitingForTurn = false finalStatus = "timeout" - finalError = fmt.Sprintf("codex timed out after %s", timeout) - } else { - finalStatus = "aborted" - finalError = "execution cancelled" + finalError = fmt.Sprintf("codex semantic inactivity timeout after %s without agent progress (last activity: %s)", semanticInactivityTimeout, lastSemanticActivityDescription) + b.cfg.Logger.Warn("codex semantic inactivity timeout", + "pid", cmd.Process.Pid, + "thread_id", threadID, + "turn_id", c.turnID, + "timeout", semanticInactivityTimeout.String(), + "last_activity", lastSemanticActivityDescription, + "idle_for", time.Since(lastSemanticActivity).Round(time.Millisecond).String(), + ) + case <-runCtx.Done(): + waitingForTurn = false + if runCtx.Err() == context.DeadlineExceeded { + finalStatus = "timeout" + finalError = fmt.Sprintf("codex timed out after %s", timeout) + } else { + finalStatus = "aborted" + finalError = "execution cancelled" + } } } @@ -337,18 +376,68 @@ func (c *codexClient) startOrResumeThread(ctx context.Context, opts ExecOptions, return threadID, false, nil } +func resetTimer(timer *time.Timer, d time.Duration) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(d) +} + +func trySendString(ch chan<- string, value string) { + select { + case ch <- value: + default: + } +} + +func logCodexAgentMessage(logger *slog.Logger, msg Message) { + if logger == nil { + return + } + attrs := []any{ + "type", string(msg.Type), + "tool", msg.Tool, + "call_id", msg.CallID, + "status", msg.Status, + "content_len", len(msg.Content), + "output_len", len(msg.Output), + } + logger.Info("codex agent message received", attrs...) + if msg.Type == MessageToolResult { + logger.Info("codex tool_result observed", "tool", msg.Tool, "call_id", msg.CallID, "output_len", len(msg.Output)) + } +} + +func describeCodexSemanticActivity(msg Message) string { + switch msg.Type { + case MessageToolUse, MessageToolResult: + if msg.Tool != "" { + return fmt.Sprintf("%s:%s", msg.Type, msg.Tool) + } + case MessageStatus: + if msg.Status != "" { + return fmt.Sprintf("%s:%s", msg.Type, msg.Status) + } + } + return string(msg.Type) +} + // ── codexClient: JSON-RPC 2.0 transport ── type codexClient struct { - cfg Config - stdin interface{ Write([]byte) (int, error) } - mu sync.Mutex - nextID int - pending map[int]*pendingRPC - threadID string - turnID string - onMessage func(Message) - onTurnDone func(aborted bool) + cfg Config + stdin interface{ Write([]byte) (int, error) } + mu sync.Mutex + nextID int + pending map[int]*pendingRPC + threadID string + turnID string + onMessage func(Message) + onSemanticActivity func(description string) + onTurnDone func(aborted bool) notificationProtocol string // "unknown", "legacy", "raw" turnStarted bool @@ -416,6 +505,13 @@ func (c *codexClient) request(ctx context.Context, method string, params any) (j c.mu.Unlock() return nil, fmt.Errorf("write %s: %w", method, err) } + if method == "turn/start" { + threadID := "" + if paramMap, ok := params.(map[string]any); ok { + threadID, _ = paramMap["threadId"].(string) + } + c.cfg.Logger.Info("codex turn/start sent", "request_id", id, "thread_id", threadID) + } select { case res := <-pr.ch: @@ -666,6 +762,8 @@ func (c *codexClient) handleRawNotification(method string, params map[string]any case "turn/completed": turnID := extractNestedString(params, "turn", "id") status := extractNestedString(params, "turn", "status") + threadID, _ := params["threadId"].(string) + c.cfg.Logger.Info("codex turn/completed received", "thread_id", threadID, "turn_id", turnID, "status", status) aborted := status == "cancelled" || status == "canceled" || status == "aborted" || status == "interrupted" @@ -730,13 +828,15 @@ func (c *codexClient) handleRawNotification(method string, params map[string]any } func (c *codexClient) handleItemNotification(method string, params map[string]any) { - item, ok := params["item"].(map[string]any) - if !ok { - return - } - + item, _ := params["item"].(map[string]any) itemType, _ := item["type"].(string) itemID, _ := item["id"].(string) + if isCodexItemProgressActivity(method) && c.onSemanticActivity != nil { + c.onSemanticActivity(describeCodexItemProgressActivity(method, itemType, itemID)) + } + if item == nil { + return + } switch { case method == "item/started" && itemType == "commandExecution": @@ -793,6 +893,28 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an } } +func isCodexItemProgressActivity(method string) bool { + switch method { + case "item/agentMessage/delta", + "item/commandExecution/outputDelta", + "item/fileChange/outputDelta", + "item/mcpToolCall/progress": + return true + default: + return false + } +} + +func describeCodexItemProgressActivity(method, itemType, itemID string) string { + if itemType == "" { + itemType = "unknown" + } + if itemID == "" { + return fmt.Sprintf("%s:%s", method, itemType) + } + return fmt.Sprintf("%s:%s:%s", method, itemType, itemID) +} + // extractUsageFromMap extracts token usage from a map that may contain // "usage", "token_usage", or "tokens" fields. Handles various Codex formats. func (c *codexClient) extractUsageFromMap(data map[string]any) { diff --git a/server/pkg/agent/codex_test.go b/server/pkg/agent/codex_test.go index 686889df9..9ba6a2551 100644 --- a/server/pkg/agent/codex_test.go +++ b/server/pkg/agent/codex_test.go @@ -1011,6 +1011,175 @@ func TestCodexExecuteSurfacesStderrWhenChildExitsEarly(t *testing.T) { } } +func TestCodexExecuteTimesOutWhenTurnStopsAfterToolResult(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-stale"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-stale","turn":{"id":"turn-stale"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-stale","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-stale","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'`+"\n"+ + `sleep 5`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 100 * time.Millisecond, + }) + if result.Status != "timeout" { + t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error) + } + if !strings.Contains(result.Error, "semantic inactivity") { + t.Fatalf("expected semantic inactivity error, got %q", result.Error) + } + if result.SessionID != "thr-stale" { + t.Fatalf("expected session id to be preserved, got %q", result.SessionID) + } +} + +func TestCodexExecuteSemanticInactivityAllowsContinuousMessages(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-progress"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-progress","turn":{"id":"turn-progress"}}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-progress","item":{"type":"agentMessage","id":"msg-1","text":"still working"}}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-progress","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"ok"}}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-progress","turn":{"id":"turn-progress","status":"completed"}}}'`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 90 * time.Millisecond, + }) + if result.Status != "completed" { + t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error) + } + if !strings.Contains(result.Output, "still working") { + t.Fatalf("expected streamed text in output, got %q", result.Output) + } +} + +func TestCodexExecuteSemanticInactivityAllowsContinuousDeltaProgress(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-delta"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-delta","turn":{"id":"turn-delta"}}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/commandExecution/outputDelta","params":{"threadId":"thr-delta","item":{"type":"commandExecution","id":"cmd-1"},"delta":"line 1\n"}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/agentMessage/delta","params":{"threadId":"thr-delta","item":{"type":"agentMessage","id":"msg-1"},"delta":"thinking"}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/fileChange/outputDelta","params":{"threadId":"thr-delta","item":{"type":"fileChange","id":"patch-1"},"delta":"patched"}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/mcpToolCall/progress","params":{"threadId":"thr-delta","item":{"type":"mcpToolCall","id":"mcp-1"},"progress":{"message":"still running"}}}'`+"\n"+ + `sleep 0.05`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-delta","turn":{"id":"turn-delta","status":"completed"}}}'`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 150 * time.Millisecond, + }) + if result.Status != "completed" { + t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error) + } +} + +func TestCodexExecuteSemanticInactivityDoesNotAffectNormalTurnCompletion(t *testing.T) { + t.Parallel() + if runtime.GOOS == "windows" { + t.Skip("shell-script fixture is POSIX-only") + } + + fakePath := writeFakeCodexAppServer(t, ""+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":1,"result":{}}'`+"\n"+ + `read line`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-normal"}}}'`+"\n"+ + `read line`+"\n"+ + `echo '{"jsonrpc":"2.0","id":3,"result":{}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-normal","turn":{"id":"turn-normal"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-normal","item":{"type":"agentMessage","id":"msg-1","text":"Done"}}}'`+"\n"+ + `echo '{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"thr-normal","turn":{"id":"turn-normal","status":"completed"}}}'`+"\n") + + result := executeFakeCodex(t, fakePath, ExecOptions{ + Timeout: 5 * time.Second, + SemanticInactivityTimeout: 100 * time.Millisecond, + }) + if result.Status != "completed" { + t.Fatalf("expected completed, got status=%q error=%q", result.Status, result.Error) + } + if result.Output != "Done" { + t.Fatalf("expected output Done, got %q", result.Output) + } +} + +func writeFakeCodexAppServer(t *testing.T, body string) string { + t.Helper() + fakePath := filepath.Join(t.TempDir(), "codex") + script := "#!/bin/sh\n" + body + writeTestExecutable(t, fakePath, []byte(script)) + return fakePath +} + +func executeFakeCodex(t *testing.T, fakePath string, opts ExecOptions) Result { + t.Helper() + backend, err := New("codex", Config{ExecutablePath: fakePath, Logger: slog.Default()}) + if err != nil { + t.Fatalf("new codex backend: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + session, err := backend.Execute(ctx, "prompt", opts) + if err != nil { + t.Fatalf("execute: %v", err) + } + go func() { + for range session.Messages { + } + }() + select { + case result, ok := <-session.Result: + if !ok { + t.Fatal("result channel closed without a value") + } + return result + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for result") + return Result{} + } +} + func TestWithAgentStderrAppendsHint(t *testing.T) { t.Parallel()