Files
multica/server/internal/daemon/daemon_test.go
NanamiKite 2e34016f1f fix(daemon): interrupt local agent on server-side terminal task states (#3878)
shouldInterruptAgent now treats every terminal task status (completed/failed/cancelled, via isAgentTaskTerminal) plus a 404 task-not-found as an interruption signal, so the daemon stops a local agent once the backend has finalized the task — e.g. the runtime offline sweeper flipping running -> failed during a disconnect/reconnect. Previously only `cancelled`/404 interrupted, so the agent ran to completion and its CompleteTask call failed against a non-running row, wasting compute and adding log noise.

Closes #3877
2026-06-08 14:00:30 +08:00

2160 lines
70 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package daemon
import (
"context"
"encoding/json"
"errors"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/multica-ai/multica/server/internal/daemon/repocache"
"github.com/multica-ai/multica/server/pkg/agent"
)
func createDaemonTestRepo(t *testing.T) string {
t.Helper()
dir := t.TempDir()
for _, args := range [][]string{
{"init", dir},
{"-C", dir, "commit", "--allow-empty", "-m", "initial"},
} {
cmd := exec.Command("git", args...)
cmd.Env = append(os.Environ(),
"GIT_AUTHOR_NAME=test", "GIT_AUTHOR_EMAIL=test@test.com",
"GIT_COMMITTER_NAME=test", "GIT_COMMITTER_EMAIL=test@test.com",
)
if out, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("git setup failed: %s: %v", out, err)
}
}
return dir
}
func TestNormalizeServerBaseURL(t *testing.T) {
t.Parallel()
got, err := NormalizeServerBaseURL("ws://localhost:8080/ws")
if err != nil {
t.Fatalf("NormalizeServerBaseURL returned error: %v", err)
}
if got != "http://localhost:8080" {
t.Fatalf("expected http://localhost:8080, got %s", got)
}
}
func TestTriggerRestart_BrewLinuxCellarDeleted(t *testing.T) {
originalIsBrewInstall := isBrewInstall
originalGetBrewPrefix := getBrewPrefix
t.Cleanup(func() {
isBrewInstall = originalIsBrewInstall
getBrewPrefix = originalGetBrewPrefix
})
prefix := filepath.Join(t.TempDir(), "home", "linuxbrew", ".linuxbrew")
deletedCellarPath := filepath.Join(prefix, "Cellar", "multica", "0.2.9", "bin", "multica")
isBrewInstall = func() bool { return true }
getBrewPrefix = func() string { return prefix }
d := &Daemon{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
d.triggerRestart()
want := filepath.Join(prefix, "bin", "multica")
if got := d.RestartBinary(); got != want {
t.Fatalf("restart binary = %q, want %q", got, want)
}
if got := d.RestartBinary(); got == deletedCellarPath {
t.Fatalf("restart binary used deleted Cellar path %q", got)
}
}
// When `brew --prefix` is unavailable but the executable path is under a
// known Cellar root, triggerRestart must recover the prefix from the
// known-prefix list and target <prefix>/bin/multica.
func TestTriggerRestart_BrewPrefixUnavailable_FallsBackToKnownPrefix(t *testing.T) {
originalIsBrewInstall := isBrewInstall
originalGetBrewPrefix := getBrewPrefix
originalMatchKnownBrewPrefix := matchKnownBrewPrefix
t.Cleanup(func() {
isBrewInstall = originalIsBrewInstall
getBrewPrefix = originalGetBrewPrefix
matchKnownBrewPrefix = originalMatchKnownBrewPrefix
})
const knownPrefix = "/home/linuxbrew/.linuxbrew"
isBrewInstall = func() bool { return true }
getBrewPrefix = func() string { return "" }
matchKnownBrewPrefix = func(string) string { return knownPrefix }
d := &Daemon{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
d.triggerRestart()
want := filepath.Join(knownPrefix, "bin", "multica")
if got := d.RestartBinary(); got != want {
t.Fatalf("restart binary = %q, want %q", got, want)
}
}
// When `brew --prefix` is unavailable AND the executable is not under any
// known Cellar root, triggerRestart logs a warning and keeps the executable
// path (no fabricated <prefix>/bin/multica path).
func TestTriggerRestart_BrewPrefixUnavailable_NoKnownPrefix_KeepsExecutable(t *testing.T) {
originalIsBrewInstall := isBrewInstall
originalGetBrewPrefix := getBrewPrefix
originalMatchKnownBrewPrefix := matchKnownBrewPrefix
t.Cleanup(func() {
isBrewInstall = originalIsBrewInstall
getBrewPrefix = originalGetBrewPrefix
matchKnownBrewPrefix = originalMatchKnownBrewPrefix
})
isBrewInstall = func() bool { return true }
getBrewPrefix = func() string { return "" }
matchKnownBrewPrefix = func(string) string { return "" }
d := &Daemon{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
d.triggerRestart()
exe, err := os.Executable()
if err != nil {
t.Fatalf("os.Executable: %v", err)
}
if got := d.RestartBinary(); got != exe {
t.Fatalf("restart binary = %q, want unchanged executable %q", got, exe)
}
}
func TestNewTaskSlotSemaphoreReturnsStableSlotIndexes(t *testing.T) {
t.Parallel()
sem := newTaskSlotSemaphore(4)
seen := make(map[int]bool)
for i := 0; i < 4; i++ {
select {
case slot := <-sem:
if slot < 0 || slot > 3 {
t.Fatalf("slot out of range: %d", slot)
}
if seen[slot] {
t.Fatalf("duplicate slot: %d", slot)
}
seen[slot] = true
default:
t.Fatalf("expected slot %d to be available", i)
}
}
select {
case slot := <-sem:
t.Fatalf("expected semaphore to be empty, got slot %d", slot)
default:
}
sem <- 2
select {
case slot := <-sem:
if slot != 2 {
t.Fatalf("expected released slot 2, got %d", slot)
}
default:
t.Fatal("expected released slot to be available")
}
}
func TestProviderNeedsInlineSystemPrompt(t *testing.T) {
t.Parallel()
cases := []struct {
provider string
want bool
}{
{provider: "openclaw", want: true},
// Hermes ACP starts in the task cwd and loads AGENTS.md / .agent_context
// directly. Inlining the full runtime brief duplicates that context and
// can trip upstream provider safety filters on otherwise harmless tasks.
{provider: "hermes", want: false},
{provider: "kiro", want: true},
{provider: "kimi", want: true},
{provider: "codex", want: false},
{provider: "claude", want: false},
}
for _, tc := range cases {
t.Run(tc.provider, func(t *testing.T) {
t.Parallel()
if got := providerNeedsInlineSystemPrompt(tc.provider); got != tc.want {
t.Fatalf("providerNeedsInlineSystemPrompt(%q) = %v, want %v", tc.provider, got, tc.want)
}
})
}
}
// TestComposeOpenclawIncludeRoots — the Elon must-fix regression: the
// daemon must grant OpenClaw permission to follow the wrapper's $include
// link from envRoot into the user's active config dir, while preserving
// any roots the user already configured in their shell env so their own
// cross-directory layouts keep working.
func TestComposeOpenclawIncludeRoots(t *testing.T) {
t.Parallel()
sep := string(os.PathListSeparator)
cases := []struct {
name string
add string
user string
want string
wantSet bool
}{
{
// Fresh install — preparer emits no $include, so daemon
// shouldn't touch OPENCLAW_INCLUDE_ROOTS at all.
name: "fresh_install_no_root_to_grant",
add: "",
user: "/some/user/dir",
wantSet: false,
},
{
// User has no existing value — output is just the granted dir.
name: "no_user_value",
add: "/home/alice/.openclaw",
user: "",
want: "/home/alice/.openclaw",
wantSet: true,
},
{
// User has their own include roots — daemon must prepend
// granted dir AND preserve user's entries verbatim.
name: "preserves_user_value",
add: "/home/alice/.openclaw",
user: "/etc/openclaw" + sep + "/opt/openclaw/shared",
want: "/home/alice/.openclaw" + sep + "/etc/openclaw" + sep + "/opt/openclaw/shared",
wantSet: true,
},
{
// User's value already contains the granted dir — daemon
// must dedupe rather than emit a redundant entry that would
// trip OpenClaw confused-deputy heuristics.
name: "dedupes_when_user_already_grants_same_dir",
add: "/home/alice/.openclaw",
user: "/home/alice/.openclaw" + sep + "/etc/openclaw",
want: "/home/alice/.openclaw" + sep + "/etc/openclaw",
wantSet: true,
},
{
// Stray empty segments from a malformed user env are skipped.
name: "skips_empty_segments_in_user_value",
add: "/home/alice/.openclaw",
user: "" + sep + "/etc/openclaw" + sep + "",
want: "/home/alice/.openclaw" + sep + "/etc/openclaw",
wantSet: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, ok := composeOpenclawIncludeRoots(tc.add, tc.user)
if ok != tc.wantSet {
t.Fatalf("ok = %v, want %v (got = %q)", ok, tc.wantSet, got)
}
if got != tc.want {
t.Errorf("got = %q, want %q", got, tc.want)
}
})
}
}
func TestBuildPromptContainsIssueID(t *testing.T) {
t.Parallel()
issueID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
prompt := BuildPrompt(Task{
IssueID: issueID,
Agent: &AgentData{
Name: "Local Codex",
Skills: []SkillData{
{Name: "Concise", Content: "Be concise."},
},
},
}, "claude")
// Prompt should contain the issue ID and CLI hint.
for _, want := range []string{
issueID,
"multica issue get",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q", want)
}
}
// Skills should NOT be inlined in the prompt (they're in runtime config).
for _, absent := range []string{"## Agent Skills", "Be concise."} {
if strings.Contains(prompt, absent) {
t.Fatalf("prompt should NOT contain %q (skills are in runtime config)", absent)
}
}
}
func TestBuildPromptNoIssueDetails(t *testing.T) {
t.Parallel()
prompt := BuildPrompt(Task{
IssueID: "test-id",
Agent: &AgentData{Name: "Test"},
}, "claude")
// Prompt should not contain issue title/description (agent fetches via CLI).
for _, absent := range []string{"**Issue:**", "**Summary:**"} {
if strings.Contains(prompt, absent) {
t.Fatalf("prompt should NOT contain %q — agent fetches details via CLI", absent)
}
}
}
func TestBuildPromptAutopilotRunOnly(t *testing.T) {
t.Parallel()
prompt := BuildPrompt(Task{
AutopilotRunID: "run-1",
AutopilotID: "autopilot-1",
AutopilotTitle: "Daily dependency check",
AutopilotDescription: "Check dependencies and report outdated packages.",
AutopilotSource: "manual",
}, "claude")
for _, want := range []string{
"run-only mode",
"Autopilot run ID: run-1",
"Daily dependency check",
"Check dependencies and report outdated packages.",
"multica autopilot get autopilot-1 --output json",
"Do not run `multica issue get`",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("autopilot prompt missing %q\n---\n%s", want, prompt)
}
}
if strings.Contains(prompt, "Your assigned issue ID is:") {
t.Fatalf("autopilot prompt should not use issue assignment template\n---\n%s", prompt)
}
}
func TestBuildPromptCommentTriggered(t *testing.T) {
t.Parallel()
issueID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
commentID := "c1c2c3c4-d5d6-7890-abcd-ef1234567890"
commentContent := "请把报告翻译成英文"
prompt := BuildPrompt(Task{
IssueID: issueID,
TriggerCommentID: commentID,
TriggerCommentContent: commentContent,
Agent: &AgentData{Name: "Test"},
}, "claude")
// Prompt should contain the comment content, the trigger comment id, and
// the full reply command with --parent. Re-emitting --parent on every turn
// is what prevents resumed sessions from reusing the previous turn's
// --parent UUID.
for _, want := range []string{
issueID,
commentContent,
"Focus on THIS comment",
commentID,
"multica issue comment add " + issueID + " --parent " + commentID,
"do NOT reuse --parent values from previous turns",
// Silence-as-valid-exit for agent-to-agent loops depends on the
// reply command being framed conditionally rather than as a hard
// requirement. Guard the phrasing so the conflict with the new
// workflow (MUL-1323) doesn't come back.
"If you decide to reply",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q\n---\n%s", want, prompt)
}
}
// Should still contain CLI hint for fetching issue context.
if !strings.Contains(prompt, "multica issue get") {
t.Fatal("prompt missing CLI hint for issue context")
}
}
// TestBuildPromptCommentTriggeredByAgent covers the agent-to-agent mention
// loop signal injected into the per-turn prompt (MUL-1323 / GH#1576). When
// the triggering comment was posted by another agent, the prompt must name
// the author, warn against sign-off @mentions, and point at silence as a
// valid exit.
func TestBuildPromptCommentTriggeredByAgent(t *testing.T) {
t.Parallel()
prompt := BuildPrompt(Task{
IssueID: "issue-1",
TriggerCommentID: "comment-1",
TriggerCommentContent: "thanks, looks good!",
TriggerAuthorType: "agent",
TriggerAuthorName: "Atlas",
Agent: &AgentData{Name: "Test"},
}, "claude")
for _, want := range []string{
"Another agent (Atlas)",
"do not @mention the other agent as a sign-off",
"Silence is the preferred way",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q\n---\n%s", want, prompt)
}
}
}
// TestBuildPromptCommentTriggeredByMember guards against the agent-loop warning
// leaking into human-authored triggers — a human asking a question should not
// be pre-discouraged from getting a reply.
func TestBuildPromptCommentTriggeredByMember(t *testing.T) {
t.Parallel()
prompt := BuildPrompt(Task{
IssueID: "issue-1",
TriggerCommentID: "comment-1",
TriggerCommentContent: "can you translate this?",
TriggerAuthorType: "member",
TriggerAuthorName: "Alice",
Agent: &AgentData{Name: "Test"},
}, "claude")
if !strings.Contains(prompt, "A user just left a new comment") {
t.Fatalf("member-triggered prompt should label the author as a user\n---\n%s", prompt)
}
if strings.Contains(prompt, "Another agent") {
t.Fatalf("member-triggered prompt should not claim the author was another agent")
}
// Must NOT use the old "You MUST respond" language — that conflicts with
// the agent-to-agent silence-as-valid-exit workflow. Even on human-authored
// triggers, the reply command is framed conditionally for a single
// consistent rule across turn types.
if strings.Contains(prompt, "MUST respond") {
t.Fatalf("prompt should not contain unconditional \"MUST respond\" language\n---\n%s", prompt)
}
if !strings.Contains(prompt, "If you decide to reply") {
t.Fatalf("prompt should frame the reply command conditionally\n---\n%s", prompt)
}
}
func TestBuildPromptCommentTriggeredNoContent(t *testing.T) {
t.Parallel()
// When TriggerCommentID is set but content is empty (e.g. fetch failed),
// it should still use the comment prompt path.
prompt := BuildPrompt(Task{
IssueID: "test-id",
TriggerCommentID: "comment-id",
Agent: &AgentData{Name: "Test"},
}, "claude")
if !strings.Contains(prompt, "multica issue get") {
t.Fatal("prompt missing CLI hint")
}
}
// TestBuildPromptSquadLeaderNoActionProhibition verifies that when a squad
// leader is triggered by another agent's comment, the per-turn prompt
// explicitly forbids posting a comment whose only purpose is to announce
// no_action or "exiting silently". This is the fix for MUL-2168.
func TestBuildPromptSquadLeaderNoActionProhibition(t *testing.T) {
t.Parallel()
prompt := BuildPrompt(Task{
IssueID: "issue-1",
TriggerCommentID: "comment-1",
TriggerCommentContent: "Progress update: tests passing.",
TriggerAuthorType: "agent",
TriggerAuthorName: "Worker",
Agent: &AgentData{
Name: "Leader",
Instructions: "You lead the team.\n\n## Squad Operating Protocol\n\nYou are the LEADER.",
},
}, "claude")
for _, want := range []string{
"Squad leader no_action rule",
"DO NOT post any comment",
"multica squad activity",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("squad leader prompt missing %q\n---\n%s", want, prompt)
}
}
// Non-squad-leader agent should NOT get the squad leader rule.
nonLeaderPrompt := BuildPrompt(Task{
IssueID: "issue-1",
TriggerCommentID: "comment-1",
TriggerCommentContent: "Progress update: tests passing.",
TriggerAuthorType: "agent",
TriggerAuthorName: "Worker",
Agent: &AgentData{
Name: "Regular",
Instructions: "You are a regular agent.",
},
}, "claude")
if strings.Contains(nonLeaderPrompt, "Squad leader no_action rule") {
t.Fatalf("non-squad-leader prompt should NOT contain squad leader rule\n---\n%s", nonLeaderPrompt)
}
}
func TestIsWorkspaceNotFoundError(t *testing.T) {
t.Parallel()
err := &requestError{
Method: http.MethodPost,
Path: "/api/daemon/register",
StatusCode: http.StatusNotFound,
Body: `{"error":"workspace not found"}`,
}
if !isWorkspaceNotFoundError(err) {
t.Fatal("expected workspace not found error to be recognized")
}
if isWorkspaceNotFoundError(&requestError{StatusCode: http.StatusInternalServerError, Body: `{"error":"workspace not found"}`}) {
t.Fatal("did not expect 500 to be treated as workspace not found")
}
}
func TestIsTaskNotFoundError(t *testing.T) {
t.Parallel()
cases := []struct {
name string
err error
want bool
}{
{
name: "404 with task not found body",
err: &requestError{
Method: http.MethodPost,
Path: "/api/daemon/tasks/abc/messages",
StatusCode: http.StatusNotFound,
Body: `{"error":"task not found"}`,
},
want: true,
},
{
name: "404 with mixed-case body still matches",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"Task Not Found"}`,
},
want: true,
},
{
name: "500 with same body is not task-not-found",
err: &requestError{
StatusCode: http.StatusInternalServerError,
Body: `{"error":"task not found"}`,
},
want: false,
},
{
name: "404 with workspace-not-found body is not task-not-found",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"workspace not found"}`,
},
want: false,
},
{
name: "non-requestError",
err: errors.New("network down"),
want: false,
},
{
name: "nil",
err: nil,
want: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := isTaskNotFoundError(tc.err); got != tc.want {
t.Fatalf("isTaskNotFoundError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestIsRuntimeNotFoundError(t *testing.T) {
t.Parallel()
cases := []struct {
name string
err error
want bool
}{
{
name: "404 with runtime not found body from heartbeat",
err: &requestError{
Method: http.MethodPost,
Path: "/api/daemon/heartbeat",
StatusCode: http.StatusNotFound,
Body: `{"error":"runtime not found"}`,
},
want: true,
},
{
name: "404 with runtime not found body from claim",
err: &requestError{
Method: http.MethodPost,
Path: "/api/daemon/runtimes/abc/tasks/claim",
StatusCode: http.StatusNotFound,
Body: `{"error":"runtime not found"}`,
},
want: true,
},
{
name: "mixed-case body still matches",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"Runtime Not Found"}`,
},
want: true,
},
{
name: "500 with same body must NOT be treated as runtime-not-found",
err: &requestError{
StatusCode: http.StatusInternalServerError,
Body: `{"error":"runtime not found"}`,
},
want: false,
},
{
name: "404 with task-not-found body is not runtime-not-found",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"task not found"}`,
},
want: false,
},
{
name: "404 with workspace-not-found body is not runtime-not-found",
err: &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"workspace not found"}`,
},
want: false,
},
{
name: "non-requestError",
err: errors.New("network down"),
want: false,
},
{
name: "nil",
err: nil,
want: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := isRuntimeNotFoundError(tc.err); got != tc.want {
t.Fatalf("isRuntimeNotFoundError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestShouldInterruptAgent(t *testing.T) {
t.Parallel()
notFound := &requestError{
StatusCode: http.StatusNotFound,
Body: `{"error":"task not found"}`,
}
transient := &requestError{
StatusCode: http.StatusBadGateway,
Body: `<html>...</html>`,
}
cases := []struct {
name string
status string
err error
want bool
}{
{name: "status cancelled", status: "cancelled", err: nil, want: true},
{name: "status failed (offline sweeper)", status: "failed", err: nil, want: true},
{name: "status completed (finished elsewhere)", status: "completed", err: nil, want: true},
{name: "task deleted (404)", status: "", err: notFound, want: true},
{name: "running normally", status: "running", err: nil, want: false},
{name: "waiting_local_directory keeps running", status: "waiting_local_directory", err: nil, want: false},
{name: "dispatched keeps running", status: "dispatched", err: nil, want: false},
{name: "transient 5xx is not a cancel signal", status: "", err: transient, want: false},
{name: "no information yet", status: "", err: nil, want: false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := shouldInterruptAgent(tc.status, tc.err); got != tc.want {
t.Fatalf("shouldInterruptAgent(%q, %v) = %v, want %v", tc.status, tc.err, got, tc.want)
}
})
}
}
// TestWatchTaskCancellation_TaskDeleted reproduces the zombie-task bug:
// when the server deletes a task while it is running (issue removed,
// agent reassigned, etc.), GetTaskStatus starts returning 404. Before the
// fix the daemon kept polling and never interrupted the running agent —
// codex would keep emitting tool calls for minutes against a dead task.
//
// After the fix, watchTaskCancellation must close its channel within a
// few poll intervals so the caller can cancel the agent context.
func TestWatchTaskCancellation_TaskDeleted(t *testing.T) {
t.Parallel()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, "/status") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"task not found"}`))
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
cancelled := d.watchTaskCancellation(ctx, "task-deleted", 10*time.Millisecond, slog.Default())
select {
case <-cancelled:
// Expected: the watcher detected the 404 and signalled cancellation.
case <-time.After(2 * time.Second):
t.Fatal("watchTaskCancellation did not signal cancellation when task was deleted (404)")
}
}
// TestWatchTaskCancellation_StatusCancelled keeps the existing behaviour
// (server transitions task status to "cancelled") working alongside the
// new 404 path.
func TestWatchTaskCancellation_StatusCancelled(t *testing.T) {
t.Parallel()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, "/status") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"cancelled"}`))
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
cancelled := d.watchTaskCancellation(ctx, "task-cancelled", 10*time.Millisecond, slog.Default())
select {
case <-cancelled:
case <-time.After(2 * time.Second):
t.Fatal("watchTaskCancellation did not signal cancellation when status=cancelled")
}
}
// TestWatchTaskCancellation_RunningTaskNotInterrupted ensures the watcher
// does NOT trigger on transient errors or while the task is still running.
func TestWatchTaskCancellation_RunningTaskNotInterrupted(t *testing.T) {
t.Parallel()
var calls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
calls.Add(1)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"running"}`))
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
cancelled := d.watchTaskCancellation(ctx, "task-running", 10*time.Millisecond, slog.Default())
select {
case <-cancelled:
t.Fatal("watchTaskCancellation should not signal cancellation while task is running")
case <-time.After(150 * time.Millisecond):
}
if calls.Load() < 5 {
t.Fatalf("expected the watcher to poll at least 5 times in 150ms, got %d", calls.Load())
}
}
func TestMergeUsage(t *testing.T) {
t.Parallel()
a := map[string]agent.TokenUsage{
"model-a": {InputTokens: 10, OutputTokens: 5},
}
b := map[string]agent.TokenUsage{
"model-a": {InputTokens: 20, OutputTokens: 10, CacheReadTokens: 3},
"model-b": {InputTokens: 100},
}
merged := mergeUsage(a, b)
if got := merged["model-a"]; got.InputTokens != 30 || got.OutputTokens != 15 || got.CacheReadTokens != 3 {
t.Fatalf("model-a: expected {30,15,3,0}, got %+v", got)
}
if got := merged["model-b"]; got.InputTokens != 100 {
t.Fatalf("model-b: expected InputTokens=100, got %+v", got)
}
if got := mergeUsage(nil, b); len(got) != 2 {
t.Fatal("mergeUsage(nil, b) should return b")
}
if got := mergeUsage(a, nil); len(got) != 1 {
t.Fatal("mergeUsage(a, nil) should return a")
}
}
// fakeBackend is a test double for agent.Backend that returns preconfigured
// results. Each call to Execute pops the next entry from the results slice.
type fakeBackend struct {
calls []agent.ExecOptions
results []agent.Result
errors []error
idx atomic.Int32
}
func (b *fakeBackend) Execute(_ context.Context, _ string, opts agent.ExecOptions) (*agent.Session, error) {
i := int(b.idx.Add(1)) - 1
b.calls = append(b.calls, opts)
if i < len(b.errors) && b.errors[i] != nil {
return nil, b.errors[i]
}
msgCh := make(chan agent.Message)
resCh := make(chan agent.Result, 1)
close(msgCh)
resCh <- b.results[i]
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func newTestDaemon(t *testing.T) *Daemon {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
return &Daemon{
client: NewClient(srv.URL),
logger: slog.Default(),
}
}
func newRepoReadyTestDaemon(t *testing.T, handler http.HandlerFunc) *Daemon {
t.Helper()
srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
d := &Daemon{
client: NewClient(srv.URL),
repoCache: repocache.New(t.TempDir(), slog.Default()),
logger: slog.Default(),
workspaces: make(map[string]*workspaceState),
runtimeIndex: make(map[string]Runtime),
}
// Drain background syncs (started by registerTaskRepos) before the
// t.TempDir cache root is cleaned up, otherwise an in-flight clone/fetch
// races against the deletion and the test fails with a misleading
// "directory not empty" cleanup error.
t.Cleanup(d.waitBackgroundSyncs)
return d
}
func TestExecuteAndDrain_ResumeFailureFallback(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
ctx := context.Background()
taskLog := slog.Default()
fb := &fakeBackend{
results: []agent.Result{
{Status: "failed", Error: "session not found", Usage: map[string]agent.TokenUsage{
"m1": {InputTokens: 5},
}},
{Status: "completed", Output: "done", SessionID: "new-sess", Usage: map[string]agent.TokenUsage{
"m1": {InputTokens: 10, OutputTokens: 20},
}},
},
}
// First attempt: resume fails (no SessionID in result).
opts := agent.ExecOptions{ResumeSessionID: "stale-id"}
result, _, err := d.executeAndDrain(ctx, fb, "prompt", opts, taskLog, "task-1")
if err != nil {
t.Fatalf("first call error: %v", err)
}
if result.Status != "failed" || result.SessionID != "" {
t.Fatalf("expected failed result with empty SessionID, got %+v", result)
}
// Simulate the retry logic from runTask.
if result.Status == "failed" && result.SessionID == "" {
firstUsage := result.Usage
opts.ResumeSessionID = ""
retryResult, _, retryErr := d.executeAndDrain(ctx, fb, "prompt", opts, taskLog, "task-1")
if retryErr != nil {
t.Fatalf("retry error: %v", retryErr)
}
result = retryResult
result.Usage = mergeUsage(firstUsage, result.Usage)
}
if result.Status != "completed" || result.Output != "done" {
t.Fatalf("expected completed result, got %+v", result)
}
if result.SessionID != "new-sess" {
t.Fatalf("expected new-sess, got %s", result.SessionID)
}
// Usage should be merged.
if u := result.Usage["m1"]; u.InputTokens != 15 || u.OutputTokens != 20 {
t.Fatalf("expected merged usage {15,20}, got %+v", u)
}
// Second call should NOT have ResumeSessionID.
if fb.calls[1].ResumeSessionID != "" {
t.Fatal("retry should not have ResumeSessionID")
}
}
func TestExecuteAndDrain_NoRetryWhenSessionEstablished(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
fb := &fakeBackend{
results: []agent.Result{
{Status: "failed", Error: "model error", SessionID: "valid-sess"},
},
}
opts := agent.ExecOptions{ResumeSessionID: "some-id"}
result, _, err := d.executeAndDrain(context.Background(), fb, "p", opts, slog.Default(), "t")
if err != nil {
t.Fatal(err)
}
// SessionID is set → session was established → should NOT retry.
shouldRetry := result.Status == "failed" && result.SessionID == ""
if shouldRetry {
t.Fatal("should not retry when SessionID is present")
}
if int(fb.idx.Load()) != 1 {
t.Fatalf("expected 1 call, got %d", fb.idx.Load())
}
}
func TestExecuteAndDrain_CodexInactivityReportsToolResultTranscript(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("shell-script fixture is POSIX-only")
}
fakePath := filepath.Join(t.TempDir(), "codex")
script := "#!/bin/sh\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":1,"result":{}}'` + "\n" +
`read line` + "\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":2,"result":{"thread":{"id":"thr-drain"}}}'` + "\n" +
`read line` + "\n" +
`echo '{"jsonrpc":"2.0","id":3,"result":{}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"turn/started","params":{"threadId":"thr-drain","turn":{"id":"turn-drain"}}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","command":"git status"}}}'` + "\n" +
`echo '{"jsonrpc":"2.0","method":"item/completed","params":{"threadId":"thr-drain","item":{"type":"commandExecution","id":"cmd-1","aggregatedOutput":"clean"}}}'` + "\n" +
`sleep 5` + "\n"
if err := os.WriteFile(fakePath, []byte(script), 0o755); err != nil {
t.Fatalf("write fake codex: %v", err)
}
if err := os.Chmod(fakePath, 0o755); err != nil {
t.Fatalf("chmod fake codex: %v", err)
}
var mu sync.Mutex
var reported []TaskMessageData
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/daemon/tasks/task-stale/messages" {
http.NotFound(w, r)
return
}
var body struct {
Messages []TaskMessageData `json:"messages"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("decode task messages: %v", err)
http.Error(w, "bad request", http.StatusBadRequest)
return
}
mu.Lock()
reported = append(reported, body.Messages...)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
backend, err := agent.New("codex", agent.Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new codex backend: %v", err)
}
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
result, tools, err := d.executeAndDrain(context.Background(), backend, "prompt", agent.ExecOptions{
Timeout: 5 * time.Second,
SemanticInactivityTimeout: 100 * time.Millisecond,
}, slog.Default(), "task-stale")
if err != nil {
t.Fatalf("executeAndDrain: %v", err)
}
if result.Status != "timeout" {
t.Fatalf("expected timeout, got status=%q error=%q", result.Status, result.Error)
}
if tools != 1 {
t.Fatalf("expected one tool use, got %d", tools)
}
deadline := time.Now().Add(2 * time.Second)
for {
mu.Lock()
var gotToolUse, gotToolResult bool
for _, msg := range reported {
if msg.Seq == 1 && msg.Type == "tool_use" && msg.Tool == "exec_command" {
gotToolUse = true
}
if msg.Seq == 2 && msg.Type == "tool_result" && msg.Tool == "exec_command" && msg.Output == "clean" {
gotToolResult = true
}
}
mu.Unlock()
if gotToolUse && gotToolResult {
return
}
if time.Now().After(deadline) {
mu.Lock()
defer mu.Unlock()
t.Fatalf("expected tool_use seq=1 and tool_result seq=2 in transcript, got %+v", reported)
}
time.Sleep(10 * time.Millisecond)
}
}
// blockingBackend returns a Session whose Result channel is never written to,
// so executeAndDrain can only exit via the drainCtx.Done() path.
type blockingBackend struct{}
func (blockingBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message)
resCh := make(chan agent.Result)
close(msgCh)
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_ContextCancelled_ReportsCancelled(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
result, _, err := d.executeAndDrain(ctx, blockingBackend{}, "p", agent.ExecOptions{}, slog.Default(), "t")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "cancelled" {
t.Fatalf("expected status=cancelled when parent ctx is cancelled, got %q (err=%q)", result.Status, result.Error)
}
}
// idleWatchdogBackend simulates the MUL-2225 hang: emit one message to mark
// activity, then go silent forever. With a short AgentIdleWatchdog, the
// watchdog should fire and short-circuit executeAndDrain. With no wall-clock
// cap (opts.Timeout = 0) the drain loop imposes no deadline of its own, so the
// idle watchdog is the only thing that ends this otherwise-forever-silent run.
type idleWatchdogBackend struct {
emitOne bool // when true, emit one message before going silent; when false, never emit anything
}
func (b idleWatchdogBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 1)
resCh := make(chan agent.Result)
if b.emitOne {
msgCh <- agent.Message{Type: agent.MessageText, Content: "hello"}
}
// Deliberately do NOT close msgCh and never write to resCh — this models
// a backend whose subprocess is hung and will never naturally complete.
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_FiresOnInactivity(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
start := time.Now()
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog, got %q (err=%q)", result.Status, result.Error)
}
if !strings.Contains(result.Error, "idle watchdog") {
t.Fatalf("expected error to mention idle watchdog, got %q", result.Error)
}
// The watchdog should fire within a few ticks (interval = window/2 with
// no floor for sub-minute windows). 5× window is generous and keeps the
// test from racing in slow CI.
if elapsed := time.Since(start); elapsed > 5*d.cfg.AgentIdleWatchdog {
t.Fatalf("watchdog took too long to fire: %s (window=%s)", elapsed, d.cfg.AgentIdleWatchdog)
}
}
func TestExecuteAndDrain_IdleWatchdog_FiresWhenNoMessageEverArrives(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// emitOne=false models a backend that hangs before sending any message.
// lastActivityAt is initialised at executeAndDrain entry, so the same
// window applies even with zero traffic.
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: false}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-zero")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog when backend never emits, got %q (err=%q)", result.Status, result.Error)
}
}
func TestExecuteAndDrain_IdleWatchdog_DisabledWhenZero(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
// Default zero value — watchdog disabled. Without a parent cancel the
// blockingBackend would otherwise hang the test, so we cancel after a
// short delay to confirm the run does NOT terminate as idle_watchdog.
d.cfg.AgentIdleWatchdog = 0
ctx, cancel := context.WithCancel(context.Background())
time.AfterFunc(80*time.Millisecond, cancel)
result, _, err := d.executeAndDrain(ctx, idleWatchdogBackend{emitOne: true}, "p", agent.ExecOptions{}, slog.Default(), "t-idle-off")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status == "idle_watchdog" {
t.Fatalf("watchdog should not fire when AgentIdleWatchdog=0, got status=%q", result.Status)
}
if result.Status != "cancelled" {
t.Fatalf("expected status=cancelled (parent ctx fired), got %q", result.Status)
}
}
func TestExecuteAndDrain_IdleWatchdog_HappyPathDoesNotFire(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 200 * time.Millisecond
// fakeBackend completes immediately with a normal result, well inside the
// idle window. The watchdog must not corrupt the disposition.
fb := &fakeBackend{
results: []agent.Result{
{Status: "completed", Output: "done"},
},
}
result, _, err := d.executeAndDrain(context.Background(), fb, "p", agent.ExecOptions{}, slog.Default(), "t-idle-happy")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "completed" {
t.Fatalf("expected status=completed on happy path, got %q (err=%q)", result.Status, result.Error)
}
if result.Output != "done" {
t.Fatalf("expected output preserved, got %q", result.Output)
}
}
// longToolCallBackend simulates a legitimate long-running tool call (e.g.
// `npm install`, `docker build`, full test suite). The backend emits a
// tool_use, stays silent past the idle window while the tool runs, then emits
// a tool_result and completes. This is the false-positive case the watchdog
// must NOT misfire on: an in-flight tool call is forward progress, not a hang.
type longToolCallBackend struct {
toolSilence time.Duration // how long to stay silent between tool_use and tool_result
}
func (b longToolCallBackend) Execute(ctx context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 4)
resCh := make(chan agent.Result, 1)
msgCh <- agent.Message{
Type: agent.MessageToolUse,
Tool: "Bash",
CallID: "call-1",
Input: map[string]any{"cmd": "npm install"},
}
go func() {
select {
case <-time.After(b.toolSilence):
case <-ctx.Done():
// Watchdog cancelled us — propagate so the caller sees aborted.
resCh <- agent.Result{Status: "aborted", Error: ctx.Err().Error()}
close(msgCh)
close(resCh)
return
}
msgCh <- agent.Message{
Type: agent.MessageToolResult,
Tool: "Bash",
CallID: "call-1",
Output: "installed 142 packages",
}
msgCh <- agent.Message{Type: agent.MessageText, Content: "done"}
close(msgCh)
resCh <- agent.Result{Status: "completed", Output: "done"}
close(resCh)
}()
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_DoesNotFireDuringInFlightToolCall(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
// 50 ms window; tool stays silent for ~4× the window. Without the
// in-flight-tool gate, the watchdog would fire and the run would come
// back as idle_watchdog. With the gate, it must complete normally.
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
result, _, err := d.executeAndDrain(
context.Background(),
longToolCallBackend{toolSilence: 200 * time.Millisecond},
"p",
agent.ExecOptions{},
slog.Default(),
"t-long-tool",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status == "idle_watchdog" {
t.Fatalf("watchdog must not fire while a tool_use is in flight, got status=%q (err=%q)", result.Status, result.Error)
}
if result.Status != "completed" {
t.Fatalf("expected status=completed, got %q (err=%q)", result.Status, result.Error)
}
}
// stuckInFlightToolBackend models a hung tool: it emits a tool_use and then
// goes silent forever — the matching tool_result never arrives, so inFlightTools
// stays at 1 (e.g. a child process that never returns). With no wall-clock cap
// (the MUL-3064 default), AgentToolWatchdog is the only thing that ends it.
type stuckInFlightToolBackend struct{}
func (stuckInFlightToolBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 2)
resCh := make(chan agent.Result)
msgCh <- agent.Message{Type: agent.MessageToolUse, Tool: "Bash", CallID: "c1"}
// Deliberately leave msgCh open, never emit tool_result, never write resCh.
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_FiresOnStuckInFlightTool(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
// The normal idle window would be skipped while a tool is in flight; the
// AgentToolWatchdog budget is what must fire here.
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
d.cfg.AgentToolWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
start := time.Now()
result, _, err := d.executeAndDrain(ctx, stuckInFlightToolBackend{}, "p", agent.ExecOptions{}, slog.Default(), "t-stuck-tool")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog for a hung in-flight tool, got %q (err=%q)", result.Status, result.Error)
}
if elapsed := time.Since(start); elapsed > 2*time.Second {
t.Fatalf("tool watchdog took too long to fire: %s (window=%s)", elapsed, d.cfg.AgentToolWatchdog)
}
}
// tailIdleAfterToolBackend exercises the boundary case: a tool call completes,
// and THEN the backend goes silent without ever finishing. After the
// tool_result lands, in-flight count returns to zero and lastActivityAt is
// fresh; the watchdog should fire exactly one window later, not earlier.
type tailIdleAfterToolBackend struct{}
func (tailIdleAfterToolBackend) Execute(_ context.Context, _ string, _ agent.ExecOptions) (*agent.Session, error) {
msgCh := make(chan agent.Message, 4)
resCh := make(chan agent.Result)
msgCh <- agent.Message{Type: agent.MessageToolUse, Tool: "Bash", CallID: "c1"}
msgCh <- agent.Message{Type: agent.MessageToolResult, Tool: "Bash", CallID: "c1", Output: "ok"}
// Deliberately leave msgCh open and never write to resCh.
return &agent.Session{Messages: msgCh, Result: resCh}, nil
}
func TestExecuteAndDrain_IdleWatchdog_FiresAfterToolResultIfBackendStaysSilent(t *testing.T) {
t.Parallel()
d := newTestDaemon(t)
d.cfg.AgentIdleWatchdog = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
result, _, err := d.executeAndDrain(ctx, tailIdleAfterToolBackend{}, "p", agent.ExecOptions{}, slog.Default(), "t-tail-idle")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != "idle_watchdog" {
t.Fatalf("expected status=idle_watchdog after tool_result with no further activity, got %q (err=%q)", result.Status, result.Error)
}
}
// ensureRepoReady must refresh `workspaceState.settings` on every checkout —
// even when the repo cache already holds the URL. The /repo/checkout handler
// reads `workspaceCoAuthoredByEnabled` right after, and the 30s workspace
// sync tick is too slow to make a freshly-flipped GitHub toggle feel live.
// PR #2847 review by Emacs caught this fast-path regression; the test
// asserts the cached-repo path still issues exactly one refresh.
func TestEnsureRepoReadyCachedRepoStillRefreshesSettings(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
var refreshCalls atomic.Int32
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/api/daemon/workspaces/ws-1/repos" {
http.NotFound(w, r)
return
}
refreshCalls.Add(1)
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{{URL: sourceRepo}},
ReposVersion: "v2",
Settings: json.RawMessage(`{"github_enabled":false,"co_authored_by_enabled":true}`),
})
})
if err := d.repoCache.Sync("ws-1", []repocache.RepoInfo{{URL: sourceRepo}}); err != nil {
t.Fatalf("seed repo cache: %v", err)
}
// Workspace starts with the master switch ON. The server above will return
// the user's just-flipped OFF state — ensureRepoReady must pick that up
// before the handler reads workspaceCoAuthoredByEnabled.
d.workspaces["ws-1"] = newWorkspaceState(
"ws-1",
nil,
"v1",
[]RepoData{{URL: sourceRepo}},
json.RawMessage(`{"github_enabled":true,"co_authored_by_enabled":true}`),
)
if !d.workspaceCoAuthoredByEnabled("ws-1") {
t.Fatalf("precondition: expected co-author hook enabled before checkout")
}
if err := d.ensureRepoReady(context.Background(), "ws-1", sourceRepo); err != nil {
t.Fatalf("ensureRepoReady: %v", err)
}
if got := refreshCalls.Load(); got != 1 {
t.Fatalf("expected exactly 1 refresh call on cached repo, got %d", got)
}
if d.workspaceCoAuthoredByEnabled("ws-1") {
t.Fatalf("expected co-author hook disabled after server-side toggle; daemon used stale workspaceState.settings via cache fast path")
}
}
func TestEnsureRepoReadyTrimsURL(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
var refreshCalls atomic.Int32
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/api/daemon/workspaces/ws-1/repos" {
http.NotFound(w, r)
return
}
refreshCalls.Add(1)
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{{URL: sourceRepo}},
ReposVersion: "v2",
})
})
if err := d.repoCache.Sync("ws-1", []repocache.RepoInfo{{URL: sourceRepo}}); err != nil {
t.Fatalf("seed repo cache: %v", err)
}
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "v1", []RepoData{{URL: sourceRepo}}, nil)
// URL with trailing whitespace should still resolve to the cached repo.
if err := d.ensureRepoReady(context.Background(), "ws-1", " "+sourceRepo+" "); err != nil {
t.Fatalf("ensureRepoReady with padded URL: %v", err)
}
// Even on cache hit we refresh settings once so toggle flips feel live.
if got := refreshCalls.Load(); got != 1 {
t.Fatalf("expected 1 refresh call for trimmed URL, got %d", got)
}
}
func TestEnsureRepoReadyRefreshesOnMiss(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
var refreshCalls atomic.Int32
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/api/daemon/workspaces/ws-1/repos" {
http.NotFound(w, r)
return
}
refreshCalls.Add(1)
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{{URL: sourceRepo}},
ReposVersion: "v2",
})
})
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
if err := d.ensureRepoReady(context.Background(), "ws-1", sourceRepo); err != nil {
t.Fatalf("ensureRepoReady: %v", err)
}
if got := refreshCalls.Load(); got != 1 {
t.Fatalf("expected 1 refresh call, got %d", got)
}
if d.repoCache.Lookup("ws-1", sourceRepo) == "" {
t.Fatal("expected repo to be cached after refresh")
}
}
// A project github_repo URL that the workspace itself does not bind must still
// be allowed for `multica repo checkout` after registerTaskRepos runs. Without
// this, the new project-repos-override-workspace-repos behavior would surface
// repos in the meta-skill that the agent then can't actually clone.
func TestRegisterTaskReposAllowsProjectOnlyURL(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
var refreshCalls atomic.Int32
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
refreshCalls.Add(1)
// If the workspace endpoint is hit it returns an empty list — the
// project-only URL must NOT depend on this for allowlist membership.
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{},
ReposVersion: "v1",
})
})
// Workspace has zero workspace-bound repos; the project resource gives us
// the only repo URL the agent should be able to check out.
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
d.registerTaskRepos("ws-1", []RepoData{{URL: sourceRepo}})
// The async clone goroutine in registerTaskRepos may not have finished;
// poll briefly until the cache is populated so the test isn't racy.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
if d.repoCache.Lookup("ws-1", sourceRepo) != "" {
break
}
time.Sleep(20 * time.Millisecond)
}
if d.repoCache.Lookup("ws-1", sourceRepo) == "" {
t.Fatalf("expected repo to be cached after registerTaskRepos, but Lookup returned empty")
}
if !d.workspaceRepoAllowed("ws-1", sourceRepo) {
t.Fatal("expected project repo to pass workspaceRepoAllowed")
}
if err := d.ensureRepoReady(context.Background(), "ws-1", sourceRepo); err != nil {
t.Fatalf("ensureRepoReady: %v", err)
}
// ensureRepoReady refreshes settings on every call (RFC MUL-2414 §4.8; PR
// #2847 review by Emacs) so a freshly-flipped GitHub toggle takes effect
// without waiting for the 30s sync tick. We expect exactly one refresh —
// the project-only URL still skips re-cloning because the cache is warm.
if got := refreshCalls.Load(); got != 1 {
t.Fatalf("expected 1 workspace-repos refresh (settings live-refresh on checkout), got %d", got)
}
}
// Confirms that a workspace refresh wiping allowedRepoURLs does not also wipe
// task-scoped URLs (project repos). Without the separate taskRepoURLs map a
// concurrent refresh would silently revoke project-only URLs and the next
// checkout would fail.
func TestRegisterTaskReposSurvivesWorkspaceRefresh(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{},
ReposVersion: "v2",
})
})
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
d.registerTaskRepos("ws-1", []RepoData{{URL: sourceRepo}})
// Wait for the registration to populate the cache.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && d.repoCache.Lookup("ws-1", sourceRepo) == "" {
time.Sleep(20 * time.Millisecond)
}
if _, err := d.refreshWorkspaceRepos(context.Background(), "ws-1"); err != nil {
t.Fatalf("refreshWorkspaceRepos: %v", err)
}
if !d.workspaceRepoAllowed("ws-1", sourceRepo) {
t.Fatal("project repo URL was wiped by workspace refresh")
}
}
func TestEnsureRepoReadyReturnsNotConfigured(t *testing.T) {
t.Parallel()
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{},
ReposVersion: "v1",
})
})
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
err := d.ensureRepoReady(context.Background(), "ws-1", "git@example.com:team/api.git")
if !errors.Is(err, ErrRepoNotConfigured) {
t.Fatalf("expected ErrRepoNotConfigured, got %v", err)
}
}
func TestEnsureRepoReadyReportsSyncFailure(t *testing.T) {
t.Parallel()
missingRepo := filepath.Join(t.TempDir(), "missing-repo")
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{{URL: missingRepo}},
ReposVersion: "v1",
})
})
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
err := d.ensureRepoReady(context.Background(), "ws-1", missingRepo)
if err == nil || !strings.Contains(err.Error(), "repo is configured but not synced:") {
t.Fatalf("expected sync failure error, got %v", err)
}
if got := d.workspaceLastRepoSyncErr("ws-1"); got == "" {
t.Fatal("expected lastRepoSyncErr to be recorded")
}
}
func TestEnsureRepoReadyConcurrentMissRefreshesOnce(t *testing.T) {
t.Parallel()
sourceRepo := createDaemonTestRepo(t)
var refreshCalls atomic.Int32
d := newRepoReadyTestDaemon(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || r.URL.Path != "/api/daemon/workspaces/ws-1/repos" {
http.NotFound(w, r)
return
}
refreshCalls.Add(1)
json.NewEncoder(w).Encode(WorkspaceReposResponse{
WorkspaceID: "ws-1",
Repos: []RepoData{{URL: sourceRepo}},
ReposVersion: "v2",
})
})
d.workspaces["ws-1"] = newWorkspaceState("ws-1", nil, "", nil, nil)
const concurrency = 8
var wg sync.WaitGroup
errCh := make(chan error, concurrency)
for range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
errCh <- d.ensureRepoReady(context.Background(), "ws-1", sourceRepo)
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
t.Fatalf("ensureRepoReady returned error: %v", err)
}
}
// All 8 goroutines race on a cold miss; the per-workspace mutex
// must serialize them so the server is only called once.
if got := refreshCalls.Load(); got != 1 {
t.Fatalf("expected exactly 1 refresh call, got %d", got)
}
}
func TestShellArgsFromEnv(t *testing.T) {
t.Setenv("MULTICA_CLAUDE_ARGS", `--max-turns 60 --append-system-prompt "multi word"`)
got, err := shellArgsFromEnv("MULTICA_CLAUDE_ARGS")
if err != nil {
t.Fatalf("shellArgsFromEnv: %v", err)
}
want := []string{"--max-turns", "60", "--append-system-prompt", "multi word"}
if strings.Join(got, "\x00") != strings.Join(want, "\x00") {
t.Fatalf("got %#v, want %#v", got, want)
}
}
func TestShellArgsFromEnvEmptyIsNil(t *testing.T) {
t.Setenv("MULTICA_CODEX_ARGS", " ")
got, err := shellArgsFromEnv("MULTICA_CODEX_ARGS")
if err != nil {
t.Fatalf("shellArgsFromEnv: %v", err)
}
if got != nil {
t.Fatalf("expected nil for empty env, got %#v", got)
}
}
func TestDefaultArgsForProvider(t *testing.T) {
cfg := Config{ClaudeArgs: []string{"--max-turns", "60"}, CodexArgs: []string{"--sandbox", "workspace-write"}}
if got := defaultArgsForProvider(cfg, "claude"); strings.Join(got, " ") != "--max-turns 60" {
t.Fatalf("unexpected claude args: %#v", got)
}
if got := defaultArgsForProvider(cfg, "codex"); strings.Join(got, " ") != "--sandbox workspace-write" {
t.Fatalf("unexpected codex args: %#v", got)
}
if got := defaultArgsForProvider(cfg, "gemini"); got != nil {
t.Fatalf("expected nil for unsupported provider, got %#v", got)
}
}
// reportTaskResultRecorder captures which terminal endpoint
// (.../complete or .../fail) reportTaskResult hits and the body it
// posts, so the tests can assert the disposition (success vs fail)
// independently of the rest of handleTask.
type reportTaskResultRecorder struct {
mu sync.Mutex
path string
method string
payload map[string]any
}
func (r *reportTaskResultRecorder) handler(t *testing.T) http.HandlerFunc {
t.Helper()
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
t.Errorf("read body: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var payload map[string]any
if len(body) > 0 {
if err := json.Unmarshal(body, &payload); err != nil {
t.Errorf("decode body: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
r.mu.Lock()
r.path = req.URL.Path
r.method = req.Method
r.payload = payload
r.mu.Unlock()
w.WriteHeader(http.StatusOK)
})
}
func TestReportTaskResult_CompletedHitsCompleteEndpoint(t *testing.T) {
t.Parallel()
rec := &reportTaskResultRecorder{}
srv := httptest.NewServer(rec.handler(t))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-1", TaskResult{
Status: "completed",
Comment: "all good",
BranchName: "agent/foo",
SessionID: "ses-1",
WorkDir: "/tmp/foo",
}, slog.Default())
rec.mu.Lock()
defer rec.mu.Unlock()
if rec.path != "/api/daemon/tasks/task-1/complete" {
t.Fatalf("expected /complete endpoint, got %s", rec.path)
}
if rec.payload["output"] != "all good" {
t.Errorf("output: got %v", rec.payload["output"])
}
if rec.payload["branch_name"] != "agent/foo" {
t.Errorf("branch_name: got %v", rec.payload["branch_name"])
}
if rec.payload["session_id"] != "ses-1" {
t.Errorf("session_id: got %v", rec.payload["session_id"])
}
}
// Pins the GitHub multica#1952 fail-closed behaviour: a task whose
// agent run never produced a real result (blocked, cancelled, or any
// future status we forget to enumerate) MUST go through FailTask, so
// the UI never shows a green "Completed" badge for a run that didn't
// actually do anything (e.g. provider 429 / out-of-credit).
func TestReportTaskResult_NonCompletedHitsFailEndpoint(t *testing.T) {
t.Parallel()
cases := []struct {
name string
status string
comment string
failureReasonIn string
wantFailureReason string
}{
{
name: "blocked with explicit reason preserves it",
status: "blocked",
comment: "rate limit reached",
failureReasonIn: "iteration_limit",
wantFailureReason: "iteration_limit",
},
{
// MUL-2946: when the daemon doesn't supply a refined
// reason, the comment text is run through
// taskfailure.Classify so the failure_reason column
// lands in the canonical refined taxonomy instead of
// the legacy "agent_error" coarse bucket.
name: "blocked without reason classifies comment as rate-limit",
status: "blocked",
comment: "rate limit reached",
failureReasonIn: "",
wantFailureReason: "agent_error.provider_capacity_or_rate_limit",
},
{
name: "blocked without reason and unrecognized comment lands in agent_error.unknown",
status: "blocked",
comment: "the agent gave up for reasons we don't recognize",
failureReasonIn: "",
wantFailureReason: "agent_error.unknown",
},
{
name: "cancelled defaults to cancelled reason regardless of comment",
status: "cancelled",
comment: "rate limit reached",
failureReasonIn: "",
wantFailureReason: "cancelled",
},
{
name: "unknown status routes through classifier",
status: "weird_new_status",
comment: "rate limit reached",
failureReasonIn: "",
wantFailureReason: "agent_error.provider_capacity_or_rate_limit",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
rec := &reportTaskResultRecorder{}
srv := httptest.NewServer(rec.handler(t))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-x", TaskResult{
Status: tc.status,
Comment: tc.comment,
SessionID: "ses-x",
WorkDir: "/tmp/x",
FailureReason: tc.failureReasonIn,
}, slog.Default())
rec.mu.Lock()
defer rec.mu.Unlock()
if rec.path != "/api/daemon/tasks/task-x/fail" {
t.Fatalf("expected /fail endpoint for status=%q, got %s", tc.status, rec.path)
}
if rec.payload["error"] != tc.comment {
t.Errorf("error body: got %v", rec.payload["error"])
}
if got := rec.payload["failure_reason"]; got != tc.wantFailureReason {
t.Errorf("failure_reason: got %v, want %q", got, tc.wantFailureReason)
}
if rec.payload["session_id"] != "ses-x" {
t.Errorf("session_id should be forwarded on failure paths so chat resume keeps working, got %v", rec.payload["session_id"])
}
})
}
}
// Regression test for the MUL-2780 incident: a short 502 burst on the
// /complete callback used to (a) drop the task at the first failure and
// (b) wrongly fall back to /fail, surfacing a successful run as red.
// With the retry helper in place, a transient 502 followed by a 200 must
// resolve via /complete without ever touching /fail.
func TestReportTaskResult_RetriesTransientCompleteThenSucceeds(t *testing.T) {
defer noSleepRetry(t)()
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
n := completeCalls.Add(1)
if n == 1 {
w.WriteHeader(http.StatusBadGateway)
return
}
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-retry", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != 2 {
t.Fatalf("expected 2 complete attempts (one 502, one 200), got %d", got)
}
if got := failCalls.Load(); got != 0 {
t.Fatalf("transient 502 must not fall back to /fail (would lose successful result), got %d /fail calls", got)
}
}
// Pins the new "don't downgrade success to failure on transient errors"
// rule: when /complete is 502 across the entire retry schedule, we must
// NOT fall through to /fail — that would surface a real success as a
// failure in the UI. The task is left in running for a future recovery
// path to pick up.
func TestReportTaskResult_TransientCompleteExhaustedDoesNotFallback(t *testing.T) {
defer noSleepRetry(t)()
prevSchedule := defaultTerminalRetrySchedule
defaultTerminalRetrySchedule = []time.Duration{time.Nanosecond, time.Nanosecond}
t.Cleanup(func() { defaultTerminalRetrySchedule = prevSchedule })
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
completeCalls.Add(1)
w.WriteHeader(http.StatusBadGateway)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-stuck", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != int32(len(defaultTerminalRetrySchedule)+1) {
t.Fatalf("expected %d complete attempts, got %d", len(defaultTerminalRetrySchedule)+1, got)
}
if got := failCalls.Load(); got != 0 {
t.Fatalf("exhausted transient retries must NOT fall back to /fail; got %d /fail calls", got)
}
}
// On permanent 4xx from /complete (e.g. 400 bad body, 404 task not found)
// the helper bails immediately and the daemon falls back to /fail so the
// UI shows a concrete failure rather than a perpetually-running task.
func TestReportTaskResult_PermanentCompleteFallsBackToFail(t *testing.T) {
defer noSleepRetry(t)()
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
completeCalls.Add(1)
w.WriteHeader(http.StatusBadRequest)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-bad", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != 1 {
t.Fatalf("permanent 400 should not retry, got %d complete attempts", got)
}
if got := failCalls.Load(); got != 1 {
t.Fatalf("permanent /complete should fall back to /fail exactly once, got %d", got)
}
}
// TestHandleTask_ReportsUsageBeforeCancel verifies that ReportTaskUsage is called
// even when the server marks the task as cancelled during the post-run status
// check. Regression test for the ordering bug where the cancel check ran before
// usage was reported, silently discarding accumulated tokens.
func TestHandleTask_ReportsUsageBeforeCancel(t *testing.T) {
t.Parallel()
var callOrder []string
var mu sync.Mutex
recordCall := func(name string) {
mu.Lock()
callOrder = append(callOrder, name)
mu.Unlock()
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/start"):
recordCall("start")
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/progress"):
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/usage"):
recordCall("usage")
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/status"):
recordCall("status")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"cancelled"}`))
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{
client: NewClient(srv.URL),
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
workspaces: make(map[string]*workspaceState),
runtimeIndex: map[string]Runtime{"rt-1": {ID: "rt-1", Provider: "claude"}},
cancelPollInterval: time.Hour, // effectively disable poll-cancel path; we want the post-run status check
}
// Inject a fake runner that returns a result with usage tokens, bypassing
// real agent process execution.
d.runner = taskRunnerFunc(func(_ context.Context, _ Task, _ string, _ int, _ *slog.Logger) (TaskResult, error) {
return TaskResult{
Status: "completed",
Usage: []TaskUsageEntry{
{Provider: "anthropic", Model: "claude-opus-4-6", InputTokens: 100, OutputTokens: 50},
},
}, nil
})
task := Task{
ID: "task-abc",
RuntimeID: "rt-1",
IssueID: "issue-xyz",
Agent: &AgentData{Name: "test-agent"},
}
d.handleTask(context.Background(), task, 0)
mu.Lock()
order := make([]string, len(callOrder))
copy(order, callOrder)
mu.Unlock()
// usage must appear before status in the call order.
usageIdx, statusIdx := -1, -1
for i, name := range order {
switch name {
case "usage":
usageIdx = i
case "status":
statusIdx = i
}
}
if usageIdx == -1 {
t.Fatal("ReportTaskUsage was never called — usage is lost for cancelled tasks")
}
if statusIdx == -1 {
t.Fatal("GetTaskStatus was never called")
}
if usageIdx > statusIdx {
t.Fatalf("usage was reported AFTER status check (order: %v) — regression", order)
}
}
// TestHandleTask_ReportsUsageWhenCancelledByPoll verifies that ReportTaskUsage is
// called even when the task is cancelled mid-execution by the poll goroutine.
// Regression test for the cancelledByPoll early-return path that previously
// discarded accumulated usage before calling ReportTaskUsage.
func TestHandleTask_ReportsUsageWhenCancelledByPoll(t *testing.T) {
t.Parallel()
var callOrder []string
var mu sync.Mutex
recordCall := func(name string) {
mu.Lock()
callOrder = append(callOrder, name)
mu.Unlock()
}
// statusCallCount lets the poll goroutine return "cancelled" on first call
// while still handling later calls from the post-run status check.
var statusCallCount atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/start"):
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/progress"):
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/usage"):
recordCall("usage")
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(r.URL.Path, "/status"):
// First call is from the poll goroutine — return "cancelled" to
// trigger runCancel() and close(cancelledByPoll).
if statusCallCount.Add(1) == 1 {
recordCall("poll-status")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"cancelled"}`))
} else {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"running"}`))
}
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{
client: NewClient(srv.URL),
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
workspaces: make(map[string]*workspaceState),
runtimeIndex: map[string]Runtime{"rt-1": {ID: "rt-1", Provider: "claude"}},
cancelPollInterval: 10 * time.Millisecond, // fire quickly so test is fast
}
// Inject a runner that blocks until runCtx is cancelled (simulating a real
// agent being interrupted), then returns usage tokens as claude.go does.
d.runner = taskRunnerFunc(func(runCtx context.Context, _ Task, _ string, _ int, _ *slog.Logger) (TaskResult, error) {
<-runCtx.Done()
return TaskResult{
Status: "aborted",
Usage: []TaskUsageEntry{
{Provider: "anthropic", Model: "claude-opus-4-6", InputTokens: 200, OutputTokens: 80},
},
}, nil
})
task := Task{
ID: "task-poll",
RuntimeID: "rt-1",
IssueID: "issue-poll",
Agent: &AgentData{Name: "test-agent"},
}
d.handleTask(context.Background(), task, 0)
mu.Lock()
order := make([]string, len(callOrder))
copy(order, callOrder)
mu.Unlock()
// Verify the poll goroutine actually fired — without this assertion the test
// could pass via the post-run GetTaskStatus check without ever taking the
// cancelledByPoll path, making it a vacuous regression guard.
pollStatusIdx := -1
usageIdx := -1
for i, name := range order {
switch name {
case "poll-status":
pollStatusIdx = i
case "usage":
usageIdx = i
}
}
if pollStatusIdx == -1 {
t.Fatalf("poll goroutine never fired (order: %v) — cancelledByPoll path not exercised", order)
}
if usageIdx == -1 {
t.Fatalf("ReportTaskUsage was never called on poll-cancelled path (order: %v) — tokens lost", order)
}
// poll-status must precede usage: poll fires → runCtx cancelled → runner unblocks → usage flushed.
// If usage comes first, usage was reported before the runner was interrupted, which is impossible
// given that the runner blocks on runCtx.Done().
if usageIdx < pollStatusIdx {
t.Fatalf("usage reported before poll-status (order: %v) — poll-status must come first", order)
}
}