mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
On cancellation/timeout the opencode backend closed the stdout read end immediately, leaving the child writing into a closed pipe. Every write then returns EPIPE and, per anomalyco/opencode#33653, can spin an orphaned process at 100% CPU — surfacing as high idle CPU after a cancelled task or daemon restart (MUL-3655). Cleanup now runs opencode in its own process group and, on cancel, drives a graceful group-wide SIGTERM → grace → SIGKILL, closing the stdout pipe only as a last-resort unblock once the tree has been signalled (SIGKILL is uncatchable, so no member can write again — no EPIPE window). The group signal also reaps tool subprocesses opencode spawned instead of orphaning them. WaitDelay remains the hard backstop. Adds unix tests covering the graceful path and the SIGTERM-ignored → SIGKILL escalation, asserting the whole process group is reaped and the run never deadlocks on the scanner. Windows behaviour is unchanged (no process groups). Co-authored-by: J <j@multica.ai> Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
@@ -11,9 +11,26 @@ import (
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// opencodeTerminateGraceNanos optionally overrides, in nanoseconds, how long a
|
||||
// cancelled opencode process is given to exit after SIGTERM before it (and its
|
||||
// whole process group) is SIGKILLed. Zero means use the default. It is atomic
|
||||
// so tests can shorten the grace without racing the cancellation goroutine that
|
||||
// reads it. See the cancellation handler in Execute for why termination must
|
||||
// precede closing the stdout pipe (#4533).
|
||||
var opencodeTerminateGraceNanos atomic.Int64
|
||||
|
||||
func opencodeTerminateGrace() time.Duration {
|
||||
if n := opencodeTerminateGraceNanos.Load(); n > 0 {
|
||||
return time.Duration(n)
|
||||
}
|
||||
return 5 * time.Second
|
||||
}
|
||||
|
||||
// opencodeBlockedArgs are flags hardcoded by the daemon that must not be
|
||||
// overridden by user-configured custom_args.
|
||||
var opencodeBlockedArgs = map[string]blockedArgMode{
|
||||
@@ -82,6 +99,18 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
hideAgentWindow(cmd)
|
||||
// Run opencode in its own process group so cancellation can reach the
|
||||
// whole tree (opencode plus any tool subprocess it spawns), not just the
|
||||
// direct child — otherwise a cancelled or restarted run can orphan a
|
||||
// descendant that keeps spinning (#4533).
|
||||
configureProcessGroup(cmd)
|
||||
// Take over context cancellation. The default CommandContext behaviour
|
||||
// SIGKILLs only the leader the instant runCtx is done; we instead drive a
|
||||
// graceful, group-wide SIGTERM→SIGKILL from the cancellation goroutine
|
||||
// below and close the stdout read end only after the tree has been
|
||||
// signalled. Returning nil here keeps os/exec from racing us with its own
|
||||
// kill; WaitDelay remains the hard backstop.
|
||||
cmd.Cancel = func() error { return nil }
|
||||
b.cfg.Logger.Info("agent command", "exec", execPath, "args", args)
|
||||
cmd.WaitDelay = 10 * time.Second
|
||||
if opts.Cwd != "" {
|
||||
@@ -145,9 +174,34 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
msgCh := make(chan Message, 256)
|
||||
resCh := make(chan Result, 1)
|
||||
|
||||
// Close stdout when the context is cancelled so the scanner unblocks.
|
||||
// procDone closes once cmd.Wait() returns, letting the cancellation handler
|
||||
// skip a process that already exited and avoid signalling a dead pid.
|
||||
procDone := make(chan struct{})
|
||||
|
||||
// On cancellation / timeout, terminate opencode (and the tool subprocesses
|
||||
// it spawned) BEFORE unblocking the scanner. The previous implementation
|
||||
// closed the stdout read end immediately, which left opencode writing into
|
||||
// a closed pipe: every write returns EPIPE and, per anomalyco/opencode#33653,
|
||||
// can spin the orphaned process at 100% CPU. Instead we SIGTERM the whole
|
||||
// process group, give it a grace period to exit cleanly, then SIGKILL it.
|
||||
// SIGKILL is uncatchable, so once it is delivered no group member can run
|
||||
// (or write) again — only then is it safe to close the stdout read end as a
|
||||
// last-resort unblock for a scanner that a wedged descendant still keeps
|
||||
// open. WaitDelay is the final backstop (#4533).
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
select {
|
||||
case <-procDone:
|
||||
return // finished on its own; nothing to terminate
|
||||
case <-runCtx.Done():
|
||||
}
|
||||
if cmd.Process != nil {
|
||||
signalProcessGroup(cmd.Process, syscall.SIGTERM)
|
||||
select {
|
||||
case <-procDone: // exited within the grace window
|
||||
case <-time.After(opencodeTerminateGrace()):
|
||||
signalProcessGroup(cmd.Process, syscall.SIGKILL)
|
||||
}
|
||||
}
|
||||
_ = stdout.Close()
|
||||
}()
|
||||
|
||||
@@ -159,8 +213,9 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
startTime := time.Now()
|
||||
scanResult := b.processEvents(stdout, msgCh)
|
||||
|
||||
// Wait for process exit.
|
||||
// Wait for process exit, then release the cancellation handler.
|
||||
exitErr := cmd.Wait()
|
||||
close(procDone)
|
||||
duration := time.Since(startTime)
|
||||
|
||||
if runCtx.Err() == context.DeadlineExceeded {
|
||||
|
||||
157
server/pkg/agent/opencode_cancel_unix_test.go
Normal file
157
server/pkg/agent/opencode_cancel_unix_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
//go:build unix
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// opencodeCancelFakeScript returns a POSIX-sh script that impersonates a
|
||||
// long-running `opencode`: it spawns a background grandchild, records both its
|
||||
// own (process-group-leader) pid and the grandchild pid, then streams stdout in
|
||||
// a tight loop forever. This is the shape that orphans and spins on EPIPE when
|
||||
// the daemon closes stdout while the process is still alive. When ignoreTerm is
|
||||
// true the whole group ignores SIGTERM, forcing the SIGKILL escalation path.
|
||||
func opencodeCancelFakeScript(ignoreTerm bool) string {
|
||||
trap := "trap 'exit 0' TERM\n"
|
||||
if ignoreTerm {
|
||||
trap = "trap '' TERM\n"
|
||||
}
|
||||
return "#!/bin/sh\n" + trap +
|
||||
`# Background grandchild so the test can assert the *whole* group is
|
||||
# terminated on cancellation, not just the direct child.
|
||||
( sleep 300 ) &
|
||||
child=$!
|
||||
if [ -n "$OPENCODE_PID_FILE" ]; then
|
||||
printf '%s %s\n' "$$" "$child" > "$OPENCODE_PID_FILE"
|
||||
fi
|
||||
printf '{"type":"step_start","timestamp":1,"sessionID":"ses_fake","part":{"type":"step-start"}}\n'
|
||||
while true; do
|
||||
printf '{"type":"text","timestamp":2,"sessionID":"ses_fake","part":{"type":"text","text":"tick"}}\n'
|
||||
sleep 0.1
|
||||
done
|
||||
`
|
||||
}
|
||||
|
||||
// TestOpencodeCancellationTerminatesProcessGroupGraceful verifies that
|
||||
// cancelling a run terminates a SIGTERM-respecting opencode and its whole
|
||||
// process group, returns an "aborted" result without hanging, and leaves no
|
||||
// orphaned descendant.
|
||||
func TestOpencodeCancellationTerminatesProcessGroupGraceful(t *testing.T) {
|
||||
runOpencodeCancellationTest(t, opencodeCancelFakeScript(false))
|
||||
}
|
||||
|
||||
// TestOpencodeCancellationEscalatesToSIGKILL verifies the worst case from
|
||||
// #4533: opencode (and its children) ignore SIGTERM and keep writing to stdout.
|
||||
// Cancellation must escalate to a group SIGKILL, still return promptly, and
|
||||
// still reap the whole group — without deadlocking on the stdout scanner or
|
||||
// closing the pipe under a live writer.
|
||||
func TestOpencodeCancellationEscalatesToSIGKILL(t *testing.T) {
|
||||
opencodeTerminateGraceNanos.Store(int64(300 * time.Millisecond))
|
||||
t.Cleanup(func() { opencodeTerminateGraceNanos.Store(0) })
|
||||
runOpencodeCancellationTest(t, opencodeCancelFakeScript(true))
|
||||
}
|
||||
|
||||
func runOpencodeCancellationTest(t *testing.T, script string) {
|
||||
t.Helper()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
pidFile := filepath.Join(tempDir, "pids")
|
||||
fakePath := filepath.Join(tempDir, "opencode")
|
||||
writeTestExecutable(t, fakePath, []byte(script))
|
||||
|
||||
backend, err := New("opencode", Config{
|
||||
ExecutablePath: fakePath,
|
||||
Logger: slog.Default(),
|
||||
Env: map[string]string{"OPENCODE_PID_FILE": pidFile},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("new opencode backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{Cwd: tempDir})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
|
||||
// Drain streamed messages so processEvents never blocks on a full channel.
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
pids := waitForPids(t, pidFile)
|
||||
|
||||
cancel() // user cancels the task
|
||||
|
||||
select {
|
||||
case res := <-session.Result:
|
||||
if res.Status != "aborted" {
|
||||
t.Errorf("status = %q, want aborted", res.Status)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Execute did not return after cancellation (possible scanner deadlock or unkilled process)")
|
||||
}
|
||||
|
||||
// The leader and the grandchild must both be gone — cancellation reaped the
|
||||
// whole group, leaving no orphan spinning.
|
||||
for _, pid := range pids {
|
||||
waitProcessGone(t, pid)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForPids polls pidFile until it contains the space-separated pids the fake
|
||||
// recorded, then returns them.
|
||||
func waitForPids(t *testing.T, pidFile string) []int {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
raw, err := os.ReadFile(pidFile)
|
||||
if err == nil {
|
||||
fields := strings.Fields(string(raw))
|
||||
if len(fields) >= 2 {
|
||||
pids := make([]int, 0, len(fields))
|
||||
ok := true
|
||||
for _, f := range fields {
|
||||
n, perr := strconv.Atoi(f)
|
||||
if perr != nil || n <= 0 {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
pids = append(pids, n)
|
||||
}
|
||||
if ok {
|
||||
return pids
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("fake opencode never recorded its pids in %s", pidFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitProcessGone polls until signal 0 to pid reports the process no longer
|
||||
// exists (ESRCH), failing if it is still alive after the deadline.
|
||||
func waitProcessGone(t *testing.T, pid int) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if err := syscall.Kill(pid, 0); err == syscall.ESRCH {
|
||||
return
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Errorf("process %d still alive after cancellation — orphaned/leaked", pid)
|
||||
}
|
||||
@@ -2,7 +2,37 @@
|
||||
|
||||
package agent
|
||||
|
||||
import "os/exec"
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// hideAgentWindow is a no-op on non-Windows platforms.
|
||||
func hideAgentWindow(cmd *exec.Cmd) {}
|
||||
|
||||
// configureProcessGroup puts the child into its own process group (it becomes
|
||||
// the group leader, so the group id equals the child pid). This lets the
|
||||
// daemon signal the entire tree — the agent CLI plus any tool subprocess it
|
||||
// spawns — in one call, instead of killing only the direct child and leaking
|
||||
// grandchildren that keep running (and, for opencode, spinning on EPIPE) after
|
||||
// a task is cancelled or the daemon restarts. See signalProcessGroup.
|
||||
func configureProcessGroup(cmd *exec.Cmd) {
|
||||
if cmd.SysProcAttr == nil {
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{}
|
||||
}
|
||||
cmd.SysProcAttr.Setpgid = true
|
||||
}
|
||||
|
||||
// signalProcessGroup sends sig to the whole process group led by p (when the
|
||||
// command was started with configureProcessGroup), falling back to the single
|
||||
// process if the group send fails. Targeting the group (negative pid) reaches
|
||||
// the descendants the agent spawned, not just the leader.
|
||||
func signalProcessGroup(p *os.Process, sig syscall.Signal) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
if err := syscall.Kill(-p.Pid, sig); err != nil {
|
||||
_ = p.Signal(sig)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
@@ -30,3 +31,19 @@ func hideAgentWindow(cmd *exec.Cmd) {
|
||||
cmd.SysProcAttr.HideWindow = true
|
||||
cmd.SysProcAttr.CreationFlags |= createNewConsole
|
||||
}
|
||||
|
||||
// configureProcessGroup is a no-op on Windows: there is no Setpgid/process-group
|
||||
// signalling. Descendant cleanup relies on the hidden console group set up by
|
||||
// hideAgentWindow plus exec.CommandContext / WaitDelay terminating the child.
|
||||
func configureProcessGroup(cmd *exec.Cmd) {}
|
||||
|
||||
// signalProcessGroup terminates the process on Windows. Windows has no
|
||||
// SIGTERM/SIGKILL distinction or process-group signalling, so the signal is
|
||||
// ignored and the process is killed directly (TerminateProcess via Kill). The
|
||||
// caller's grace window still applies before this is invoked with SIGKILL.
|
||||
func signalProcessGroup(p *os.Process, _ syscall.Signal) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
_ = p.Kill()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user