mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
Fix Claude control request handling (#3827)
Co-authored-by: Eve <eve@multica-ai.local> Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
@@ -107,10 +107,18 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
// fires. The field symptom is "write |1: The pipe has been ended."
|
||||
// surfacing exactly at the per-task timeout when the kill invalidates
|
||||
// the still-blocked pipe.
|
||||
//
|
||||
// Keep stdin open after the initial user message. Claude's stream-json
|
||||
// protocol can emit control_request events mid-run and expects matching
|
||||
// control_response frames on the same input stream; closing stdin here
|
||||
// leaves the child stuck waiting for a response until its own fallback
|
||||
// timeout.
|
||||
writeDone := make(chan error, 1)
|
||||
go func() {
|
||||
err := writeClaudeInput(stdin, prompt)
|
||||
closeStdin()
|
||||
if err != nil {
|
||||
closeStdin()
|
||||
}
|
||||
writeDone <- err
|
||||
}()
|
||||
|
||||
@@ -132,6 +140,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
closeStdin()
|
||||
_ = stdout.Close()
|
||||
}()
|
||||
|
||||
@@ -172,6 +181,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
finalStatus = "failed"
|
||||
finalError = msg.ResultText
|
||||
}
|
||||
closeStdin()
|
||||
case "log":
|
||||
if msg.Log != nil {
|
||||
trySend(msgCh, Message{
|
||||
@@ -180,9 +190,13 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
Content: msg.Log.Message,
|
||||
})
|
||||
}
|
||||
case "control_request":
|
||||
b.handleControlRequest(msg, stdin)
|
||||
}
|
||||
}
|
||||
|
||||
closeStdin()
|
||||
|
||||
// Wait for process exit
|
||||
exitErr := cmd.Wait()
|
||||
duration := time.Since(startTime)
|
||||
|
||||
@@ -3,8 +3,8 @@ package agent
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
@@ -22,6 +22,9 @@ func TestMain(m *testing.M) {
|
||||
case "startup_stdout_burst":
|
||||
runFakeClaudeStartupStdoutBurst()
|
||||
os.Exit(0)
|
||||
case "control_request":
|
||||
runFakeClaudeControlRequest()
|
||||
os.Exit(0)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "unknown CLAUDE_FAKE_MODE: %q\n", mode)
|
||||
os.Exit(2)
|
||||
@@ -29,11 +32,11 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
// runFakeClaudeStartupStdoutBurst writes ~256 KiB to stdout BEFORE
|
||||
// reading any byte from stdin, then drains stdin and emits a stream-json
|
||||
// result. Reproduces the stdio deadlock: if the daemon writes the prompt
|
||||
// to stdin before a stdout reader is running, the child blocks writing
|
||||
// stdout and the daemon blocks writing stdin — neither side can progress
|
||||
// until the per-task context times out and the child is killed.
|
||||
// reading any byte from stdin, then reads the first stdin frame and emits a
|
||||
// stream-json result. Reproduces the stdio deadlock: if the daemon writes
|
||||
// the prompt to stdin before a stdout reader is running, the child blocks
|
||||
// writing stdout and the daemon blocks writing stdin — neither side can
|
||||
// progress until the per-task context times out and the child is killed.
|
||||
func runFakeClaudeStartupStdoutBurst() {
|
||||
line := strings.Repeat("x", 1020)
|
||||
bw := bufio.NewWriter(os.Stdout)
|
||||
@@ -45,12 +48,44 @@ func runFakeClaudeStartupStdoutBurst() {
|
||||
if err := bw.Flush(); err != nil {
|
||||
os.Exit(12)
|
||||
}
|
||||
if _, err := io.Copy(io.Discard, os.Stdin); err != nil {
|
||||
if _, err := bufio.NewReader(os.Stdin).ReadString('\n'); err != nil {
|
||||
os.Exit(13)
|
||||
}
|
||||
fmt.Println(`{"type":"result","subtype":"success","is_error":false,"session_id":"sess-deadlock","result":"done"}`)
|
||||
}
|
||||
|
||||
func runFakeClaudeControlRequest() {
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
if _, err := reader.ReadString('\n'); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "read prompt: %v\n", err)
|
||||
os.Exit(21)
|
||||
}
|
||||
fmt.Println(`{"type":"system","session_id":"sess-control"}`)
|
||||
fmt.Println(`{"type":"control_request","request_id":"req-42","request":{"subtype":"tool_use","tool_name":"Bash","input":{"command":"pwd"}}}`)
|
||||
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "read control response: %v\n", err)
|
||||
os.Exit(22)
|
||||
}
|
||||
var resp struct {
|
||||
Type string `json:"type"`
|
||||
Response struct {
|
||||
RequestID string `json:"request_id"`
|
||||
} `json:"response"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(strings.TrimSpace(line)), &resp); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "decode control response: %v\n", err)
|
||||
os.Exit(23)
|
||||
}
|
||||
if resp.Type != "control_response" || resp.Response.RequestID != "req-42" {
|
||||
fmt.Fprintf(os.Stderr, "unexpected control response: %s\n", line)
|
||||
os.Exit(24)
|
||||
}
|
||||
fmt.Println(`{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"approved"}]}}`)
|
||||
fmt.Println(`{"type":"result","subtype":"success","is_error":false,"session_id":"sess-control","result":"done after control"}`)
|
||||
}
|
||||
|
||||
// TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst verifies that the
|
||||
// claude backend drains stdout concurrently with writing the prompt to
|
||||
// stdin. The buggy path serialises the two: writeClaudeInput runs before
|
||||
@@ -111,3 +146,51 @@ func TestClaudeExecuteDoesNotDeadlockOnStartupStdoutBurst(t *testing.T) {
|
||||
t.Fatal("timeout waiting for result — claude backend is deadlocked on writeClaudeInput because stdout is not being drained concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClaudeExecuteRespondsToControlRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
self, err := os.Executable()
|
||||
if err != nil {
|
||||
t.Fatalf("os.Executable: %v", err)
|
||||
}
|
||||
|
||||
backend, err := New("claude", Config{
|
||||
ExecutablePath: self,
|
||||
Env: map[string]string{"CLAUDE_FAKE_MODE": "control_request"},
|
||||
Logger: slog.Default(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("new claude backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(ctx, "run a command", ExecOptions{Timeout: 8 * time.Second})
|
||||
if err != nil {
|
||||
t.Fatalf("execute returned error: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result, ok := <-session.Result:
|
||||
if !ok {
|
||||
t.Fatal("result channel closed without a value")
|
||||
}
|
||||
if result.Status != "completed" {
|
||||
t.Fatalf("expected status=completed, got %q (error=%q)", result.Status, result.Error)
|
||||
}
|
||||
if result.Output != "done after control" {
|
||||
t.Fatalf("expected result output from fake claude, got %q", result.Output)
|
||||
}
|
||||
if result.SessionID != "sess-control" {
|
||||
t.Fatalf("expected session id sess-control, got %q", result.SessionID)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for result — claude backend did not answer control_request")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -625,14 +625,15 @@ func TestClaudeExecuteSurfacesStderrWhenChildExitsEarly(t *testing.T) {
|
||||
t.Skip("shell-script fixture is POSIX-only")
|
||||
}
|
||||
|
||||
// Fake claude binary: drains stdin so writeClaudeInput succeeds, writes a
|
||||
// canonical V8-abort line to stderr, then exits non-zero before emitting
|
||||
// any stream-json to stdout. This is the exact failure mode that motivated
|
||||
// PR #1674 — without sampling stderrBuf.Tail() after cmd.Wait() returns,
|
||||
// Result.Error would be a useless "exit status 3".
|
||||
// Fake claude binary: reads the initial stdin frame so writeClaudeInput
|
||||
// succeeds, writes a canonical V8-abort line to stderr, then exits
|
||||
// non-zero before emitting any stream-json to stdout. This is the exact
|
||||
// failure mode that motivated PR #1674 — without sampling stderrBuf.Tail()
|
||||
// after cmd.Wait() returns, Result.Error would be a useless
|
||||
// "exit status 3".
|
||||
fakePath := filepath.Join(t.TempDir(), "claude")
|
||||
script := "#!/bin/sh\n" +
|
||||
"cat >/dev/null\n" +
|
||||
"IFS= read -r _\n" +
|
||||
"echo \"FATAL ERROR: V8 abort: assertion failed\" >&2\n" +
|
||||
"exit 3\n"
|
||||
writeTestExecutable(t, fakePath, []byte(script))
|
||||
@@ -684,7 +685,7 @@ func TestClaudeExecuteRecordsResultModelUsage(t *testing.T) {
|
||||
|
||||
fakePath := filepath.Join(t.TempDir(), "claude")
|
||||
script := "#!/bin/sh\n" +
|
||||
"cat >/dev/null\n" +
|
||||
"IFS= read -r _\n" +
|
||||
"printf '%s\\n' '{\"type\":\"system\",\"session_id\":\"sess-result-usage\"}'\n" +
|
||||
"printf '%s\\n' '{\"type\":\"result\",\"subtype\":\"success\",\"is_error\":false,\"session_id\":\"sess-result-usage\",\"result\":\"done\",\"modelUsage\":{\"zhipu/coding-plan\":{\"inputTokens\":123,\"outputTokens\":45,\"cacheReadInputTokens\":7,\"cacheCreationInputTokens\":11,\"costUSD\":0.01}}}'\n"
|
||||
writeTestExecutable(t, fakePath, []byte(script))
|
||||
|
||||
Reference in New Issue
Block a user