mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
fix(agent): drain claude stdout while writing prompt to stdin (#3490)
The claude backend wrote the full prompt to the child's stdin and closed it before starting the stdout reader goroutine. With --verbose --output-format stream-json the CLI emits a startup banner before reading its first stdin frame; with no reader draining stdout, the child blocks on its stdout write, never reads stdin, and our stdin Write blocks until the per-task context fires. The field symptom is tasks failing exactly at the 2 h per-task timeout with "write |1: The pipe has been ended." Move writeClaudeInput into its own goroutine so the prompt write and the stdout drain proceed concurrently. Guard stdin close with sync.Once (it can now be called from both the writer goroutine and, previously, the result handler). Join the write result at cmd.Wait() and surface a write failure as a "failed" status only when no result event arrived and no session was established, so a genuine startup death still reports the stderr tail. Add a regression test that re-execs the test binary as a fake claude which bursts 256 KiB to stdout before reading stdin, with a 128 KiB prompt pushed at stdin — both past any plausible OS pipe buffer — so a regression hangs until the test deadline instead of passing. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -4,13 +4,13 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -78,12 +78,8 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
cancel()
|
||||
return nil, fmt.Errorf("claude stdin pipe: %w", err)
|
||||
}
|
||||
closeStdin := func() {
|
||||
if stdin != nil {
|
||||
_ = stdin.Close()
|
||||
stdin = nil
|
||||
}
|
||||
}
|
||||
var closeStdinOnce sync.Once
|
||||
closeStdin := func() { closeStdinOnce.Do(func() { _ = stdin.Close() }) }
|
||||
// Capture stderr into both the daemon log (as before) and a bounded tail
|
||||
// buffer so we can include the last few KB in Result.Error when claude
|
||||
// exits unexpectedly. Without the tail, an exit-code-only failure looks
|
||||
@@ -97,19 +93,6 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
cancel()
|
||||
return nil, fmt.Errorf("start claude: %w", err)
|
||||
}
|
||||
if err := writeClaudeInput(stdin, prompt); err != nil {
|
||||
// claude almost certainly died during startup (broken pipe). The
|
||||
// real reason is sitting in stderrBuf — surface it the same way the
|
||||
// post-handshake error path does, otherwise the daemon log is the
|
||||
// only place that knows whether it was a V8 abort, a missing native
|
||||
// module, or anything else. cmd.Wait() flushes os/exec's stderr
|
||||
// copy goroutine, so stderrBuf.Tail() is safe to read.
|
||||
closeStdin()
|
||||
cancel()
|
||||
_ = cmd.Wait()
|
||||
return nil, errors.New(withAgentStderr(fmt.Sprintf("write claude input: %v", err), "claude", stderrBuf.Tail()))
|
||||
}
|
||||
closeStdin()
|
||||
|
||||
b.cfg.Logger.Info("claude started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
|
||||
|
||||
@@ -119,6 +102,21 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
msgCh := make(chan Message, 256)
|
||||
resCh := make(chan Result, 1)
|
||||
|
||||
// writeClaudeInput runs in its own goroutine so it cannot deadlock
|
||||
// against the stdout reader. With --verbose --output-format stream-json
|
||||
// the CLI emits a startup banner before reading its first stdin frame;
|
||||
// if nothing is draining stdout while we write the prompt, claude blocks
|
||||
// writing stdout, never reads stdin, and our Write blocks until runCtx
|
||||
// fires. The field symptom is "write |1: The pipe has been ended."
|
||||
// surfacing exactly at the per-task timeout when the kill invalidates
|
||||
// the still-blocked pipe.
|
||||
writeDone := make(chan error, 1)
|
||||
go func() {
|
||||
err := writeClaudeInput(stdin, prompt)
|
||||
closeStdin()
|
||||
writeDone <- err
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer close(msgCh)
|
||||
@@ -165,7 +163,6 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
}
|
||||
trySend(msgCh, Message{Type: MessageStatus, Status: "running", SessionID: sessionID})
|
||||
case "result":
|
||||
closeStdin()
|
||||
sessionID = msg.SessionID
|
||||
if msg.ResultText != "" {
|
||||
output.Reset()
|
||||
@@ -192,14 +189,25 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
// Wait for process exit
|
||||
exitErr := cmd.Wait()
|
||||
duration := time.Since(startTime)
|
||||
// writeDone is buffered (cap 1) and the writer always sends — by the
|
||||
// time cmd has exited, the prompt write has either succeeded, hit a
|
||||
// broken pipe, or been unblocked by the kill that ended cmd.
|
||||
writeErr := <-writeDone
|
||||
|
||||
if runCtx.Err() == context.DeadlineExceeded {
|
||||
switch {
|
||||
case runCtx.Err() == context.DeadlineExceeded:
|
||||
finalStatus = "timeout"
|
||||
finalError = fmt.Sprintf("claude timed out after %s", timeout)
|
||||
} else if runCtx.Err() == context.Canceled {
|
||||
case runCtx.Err() == context.Canceled:
|
||||
finalStatus = "aborted"
|
||||
finalError = "execution cancelled"
|
||||
} else if exitErr != nil && finalStatus == "completed" {
|
||||
case writeErr != nil && finalStatus == "completed" && sessionID == "":
|
||||
// No result event landed and the prompt write failed — claude
|
||||
// died before reading the prompt. Surface the write error; the
|
||||
// stderr tail attached below carries the real reason.
|
||||
finalStatus = "failed"
|
||||
finalError = fmt.Sprintf("write claude input: %v", writeErr)
|
||||
case exitErr != nil && finalStatus == "completed":
|
||||
finalStatus = "failed"
|
||||
finalError = fmt.Sprintf("claude exited with error: %v", exitErr)
|
||||
}
|
||||
|
||||
113
server/pkg/agent/claude_deadlock_test.go
Normal file
113
server/pkg/agent/claude_deadlock_test.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestMain intercepts when the test binary is re-executed as a fake
|
||||
// child process by the agent backend. The fake's behavior is selected via
|
||||
// CLAUDE_FAKE_MODE; absent that env var, this is a normal `go test` run.
|
||||
func TestMain(m *testing.M) {
|
||||
switch mode := os.Getenv("CLAUDE_FAKE_MODE"); mode {
|
||||
case "":
|
||||
os.Exit(m.Run())
|
||||
case "startup_stdout_burst":
|
||||
runFakeClaudeStartupStdoutBurst()
|
||||
os.Exit(0)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "unknown CLAUDE_FAKE_MODE: %q\n", mode)
|
||||
os.Exit(2)
|
||||
}
|
||||
}
|
||||
|
||||
// runFakeClaudeStartupStdoutBurst writes ~256 KiB to stdout BEFORE
|
||||
// reading any byte from stdin, then drains stdin and emits a stream-json
|
||||
// result. Reproduces the stdio deadlock: if the daemon writes the prompt
|
||||
// to stdin before a stdout reader is running, the child blocks writing
|
||||
// stdout and the daemon blocks writing stdin — neither side can progress
|
||||
// until the per-task context times out and the child is killed.
|
||||
func runFakeClaudeStartupStdoutBurst() {
|
||||
line := strings.Repeat("x", 1020)
|
||||
bw := bufio.NewWriter(os.Stdout)
|
||||
for i := 0; i < 256; i++ {
|
||||
if _, err := fmt.Fprintf(bw, `{"type":"log","log":{"level":"info","message":"%s"}}`+"\n", line); err != nil {
|
||||
os.Exit(11)
|
||||
}
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
os.Exit(12)
|
||||
}
|
||||
if _, err := io.Copy(io.Discard, os.Stdin); err != nil {
|
||||
os.Exit(13)
|
||||
}
|
||||
fmt.Println(`{"type":"result","subtype":"success","is_error":false,"session_id":"sess-deadlock","result":"done"}`)
|
||||
}
|
||||
|
||||
// TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst verifies that the
|
||||
// claude backend drains stdout concurrently with writing the prompt to
|
||||
// stdin. The buggy path serialises the two: writeClaudeInput runs before
|
||||
// the reader goroutine starts, so a child that emits startup output
|
||||
// before its first stdin read deadlocks both directions. Field evidence
|
||||
// in the daemon log shows tasks failing exactly at the 2 h per-task
|
||||
// timeout with "write |1: The pipe has been ended.", produced when
|
||||
// runCtx fires, the child is killed, and the blocked stdin Write
|
||||
// finally unwinds.
|
||||
//
|
||||
// The fake child writes 256 KiB to stdout then 128 KiB of prompt is
|
||||
// pushed at stdin — both well past any plausible OS pipe buffer
|
||||
// (Linux ~64 KiB, Windows 4-64 KiB) — so a regression here hangs until
|
||||
// the test deadline rather than passing slowly.
|
||||
func TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
self, err := os.Executable()
|
||||
if err != nil {
|
||||
t.Fatalf("os.Executable: %v", err)
|
||||
}
|
||||
|
||||
backend, err := New("claude", Config{
|
||||
ExecutablePath: self,
|
||||
Env: map[string]string{"CLAUDE_FAKE_MODE": "startup_stdout_burst"},
|
||||
Logger: slog.Default(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("new claude backend: %v", err)
|
||||
}
|
||||
|
||||
// 128 KiB prompt forces writeClaudeInput to block until the child
|
||||
// drains stdin, which the buggy code cannot reach because the reader
|
||||
// goroutine hasn't started yet.
|
||||
prompt := strings.Repeat("p", 128*1024)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(ctx, prompt, ExecOptions{Timeout: 20 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("execute returned error: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result, ok := <-session.Result:
|
||||
if !ok {
|
||||
t.Fatal("result channel closed without a value")
|
||||
}
|
||||
if result.Status != "completed" {
|
||||
t.Fatalf("expected status=completed, got %q (error=%q)", result.Status, result.Error)
|
||||
}
|
||||
case <-time.After(15 * time.Second):
|
||||
t.Fatal("timeout waiting for result — claude backend is deadlocked on writeClaudeInput because stdout is not being drained concurrently")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user