diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go index 033696a2b..016a452b9 100644 --- a/server/pkg/agent/claude.go +++ b/server/pkg/agent/claude.go @@ -107,10 +107,18 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt // fires. The field symptom is "write |1: The pipe has been ended." // surfacing exactly at the per-task timeout when the kill invalidates // the still-blocked pipe. + // + // Keep stdin open after the initial user message. Claude's stream-json + // protocol can emit control_request events mid-run and expects matching + // control_response frames on the same input stream; closing stdin here + // leaves the child stuck waiting for a response until its own fallback + // timeout. writeDone := make(chan error, 1) go func() { err := writeClaudeInput(stdin, prompt) - closeStdin() + if err != nil { + closeStdin() + } writeDone <- err }() @@ -132,6 +140,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt // Close stdout when the context is cancelled so scanner.Scan() unblocks. go func() { <-runCtx.Done() + closeStdin() _ = stdout.Close() }() @@ -172,6 +181,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt finalStatus = "failed" finalError = msg.ResultText } + closeStdin() case "log": if msg.Log != nil { trySend(msgCh, Message{ @@ -180,9 +190,13 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt Content: msg.Log.Message, }) } + case "control_request": + b.handleControlRequest(msg, stdin) } } + closeStdin() + // Wait for process exit exitErr := cmd.Wait() duration := time.Since(startTime) diff --git a/server/pkg/agent/claude_deadlock_test.go b/server/pkg/agent/claude_deadlock_test.go index 7b37d7f99..cb3242f22 100644 --- a/server/pkg/agent/claude_deadlock_test.go +++ b/server/pkg/agent/claude_deadlock_test.go @@ -3,8 +3,8 @@ package agent import ( "bufio" "context" + "encoding/json" "fmt" - "io" "log/slog" "os" "strings" @@ -22,6 +22,9 @@ func TestMain(m *testing.M) { case "startup_stdout_burst": runFakeClaudeStartupStdoutBurst() os.Exit(0) + case "control_request": + runFakeClaudeControlRequest() + os.Exit(0) default: fmt.Fprintf(os.Stderr, "unknown CLAUDE_FAKE_MODE: %q\n", mode) os.Exit(2) @@ -29,11 +32,11 @@ func TestMain(m *testing.M) { } // runFakeClaudeStartupStdoutBurst writes ~256 KiB to stdout BEFORE -// reading any byte from stdin, then drains stdin and emits a stream-json -// result. Reproduces the stdio deadlock: if the daemon writes the prompt -// to stdin before a stdout reader is running, the child blocks writing -// stdout and the daemon blocks writing stdin — neither side can progress -// until the per-task context times out and the child is killed. +// reading any byte from stdin, then reads the first stdin frame and emits a +// stream-json result. Reproduces the stdio deadlock: if the daemon writes +// the prompt to stdin before a stdout reader is running, the child blocks +// writing stdout and the daemon blocks writing stdin — neither side can +// progress until the per-task context times out and the child is killed. func runFakeClaudeStartupStdoutBurst() { line := strings.Repeat("x", 1020) bw := bufio.NewWriter(os.Stdout) @@ -45,12 +48,44 @@ func runFakeClaudeStartupStdoutBurst() { if err := bw.Flush(); err != nil { os.Exit(12) } - if _, err := io.Copy(io.Discard, os.Stdin); err != nil { + if _, err := bufio.NewReader(os.Stdin).ReadString('\n'); err != nil { os.Exit(13) } fmt.Println(`{"type":"result","subtype":"success","is_error":false,"session_id":"sess-deadlock","result":"done"}`) } +func runFakeClaudeControlRequest() { + reader := bufio.NewReader(os.Stdin) + if _, err := reader.ReadString('\n'); err != nil { + fmt.Fprintf(os.Stderr, "read prompt: %v\n", err) + os.Exit(21) + } + fmt.Println(`{"type":"system","session_id":"sess-control"}`) + fmt.Println(`{"type":"control_request","request_id":"req-42","request":{"subtype":"tool_use","tool_name":"Bash","input":{"command":"pwd"}}}`) + + line, err := reader.ReadString('\n') + if err != nil { + fmt.Fprintf(os.Stderr, "read control response: %v\n", err) + os.Exit(22) + } + var resp struct { + Type string `json:"type"` + Response struct { + RequestID string `json:"request_id"` + } `json:"response"` + } + if err := json.Unmarshal([]byte(strings.TrimSpace(line)), &resp); err != nil { + fmt.Fprintf(os.Stderr, "decode control response: %v\n", err) + os.Exit(23) + } + if resp.Type != "control_response" || resp.Response.RequestID != "req-42" { + fmt.Fprintf(os.Stderr, "unexpected control response: %s\n", line) + os.Exit(24) + } + fmt.Println(`{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"approved"}]}}`) + fmt.Println(`{"type":"result","subtype":"success","is_error":false,"session_id":"sess-control","result":"done after control"}`) +} + // TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst verifies that the // claude backend drains stdout concurrently with writing the prompt to // stdin. The buggy path serialises the two: writeClaudeInput runs before @@ -111,3 +146,51 @@ func TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst(t *testing.T) { t.Fatal("timeout waiting for result — claude backend is deadlocked on writeClaudeInput because stdout is not being drained concurrently") } } + +func TestClaudeExecuteRespondsToControlRequest(t *testing.T) { + t.Parallel() + + self, err := os.Executable() + if err != nil { + t.Fatalf("os.Executable: %v", err) + } + + backend, err := New("claude", Config{ + ExecutablePath: self, + Env: map[string]string{"CLAUDE_FAKE_MODE": "control_request"}, + Logger: slog.Default(), + }) + if err != nil { + t.Fatalf("new claude backend: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + session, err := backend.Execute(ctx, "run a command", ExecOptions{Timeout: 8 * time.Second}) + if err != nil { + t.Fatalf("execute returned error: %v", err) + } + go func() { + for range session.Messages { + } + }() + + select { + case result, ok := <-session.Result: + if !ok { + t.Fatal("result channel closed without a value") + } + if result.Status != "completed" { + t.Fatalf("expected status=completed, got %q (error=%q)", result.Status, result.Error) + } + if result.Output != "done after control" { + t.Fatalf("expected result output from fake claude, got %q", result.Output) + } + if result.SessionID != "sess-control" { + t.Fatalf("expected session id sess-control, got %q", result.SessionID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for result — claude backend did not answer control_request") + } +} diff --git a/server/pkg/agent/claude_test.go b/server/pkg/agent/claude_test.go index 86f2a41b5..1e6bb06c7 100644 --- a/server/pkg/agent/claude_test.go +++ b/server/pkg/agent/claude_test.go @@ -625,14 +625,15 @@ func TestClaudeExecuteSurfacesStderrWhenChildExitsEarly(t *testing.T) { t.Skip("shell-script fixture is POSIX-only") } - // Fake claude binary: drains stdin so writeClaudeInput succeeds, writes a - // canonical V8-abort line to stderr, then exits non-zero before emitting - // any stream-json to stdout. This is the exact failure mode that motivated - // PR #1674 — without sampling stderrBuf.Tail() after cmd.Wait() returns, - // Result.Error would be a useless "exit status 3". + // Fake claude binary: reads the initial stdin frame so writeClaudeInput + // succeeds, writes a canonical V8-abort line to stderr, then exits + // non-zero before emitting any stream-json to stdout. This is the exact + // failure mode that motivated PR #1674 — without sampling stderrBuf.Tail() + // after cmd.Wait() returns, Result.Error would be a useless + // "exit status 3". fakePath := filepath.Join(t.TempDir(), "claude") script := "#!/bin/sh\n" + - "cat >/dev/null\n" + + "IFS= read -r _\n" + "echo \"FATAL ERROR: V8 abort: assertion failed\" >&2\n" + "exit 3\n" writeTestExecutable(t, fakePath, []byte(script)) @@ -684,7 +685,7 @@ func TestClaudeExecuteRecordsResultModelUsage(t *testing.T) { fakePath := filepath.Join(t.TempDir(), "claude") script := "#!/bin/sh\n" + - "cat >/dev/null\n" + + "IFS= read -r _\n" + "printf '%s\\n' '{\"type\":\"system\",\"session_id\":\"sess-result-usage\"}'\n" + "printf '%s\\n' '{\"type\":\"result\",\"subtype\":\"success\",\"is_error\":false,\"session_id\":\"sess-result-usage\",\"result\":\"done\",\"modelUsage\":{\"zhipu/coding-plan\":{\"inputTokens\":123,\"outputTokens\":45,\"cacheReadInputTokens\":7,\"cacheCreationInputTokens\":11,\"costUSD\":0.01}}}'\n" writeTestExecutable(t, fakePath, []byte(script))