Compare commits

...

1 Commits

Author SHA1 Message Date
J
55aa4180cf fix(daemon): retry terminal task callbacks on transient errors (MUL-2780)
CompleteTask / FailTask used to be fire-once. A 1-second upstream 502
burst would drop the call, then the immediate fail-fallback also 502'd,
leaving the task stuck in `running` forever and showing the agent as
"still working" in the UI.

Add a bounded retry around the two terminal callbacks: 4s, 8s, 16s,
32s, 64s backoff schedule (5 retries, ~124s ceiling), retrying only
on transient errors (5xx, 408, 429, transport-level) and bailing
immediately on permanent 4xx. Also fix a latent bug where a transient
complete failure would silently downgrade a successful run to a fail:
the fallback now triggers only on permanent errors. Server-side
CompleteTask / FailTask are already idempotent on "already terminal",
so replays from a retry are safe even if the prior 502'd response was
actually persisted.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-28 16:47:41 +08:00
4 changed files with 396 additions and 7 deletions

View File

@@ -217,7 +217,7 @@ func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, s
if workDir != "" {
body["work_dir"] = workDir
}
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil)
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil, defaultTerminalRetrySchedule)
}
func (c *Client) ReportTaskUsage(ctx context.Context, taskID string, usage []TaskUsageEntry) error {
@@ -240,7 +240,7 @@ func (c *Client) FailTask(ctx context.Context, taskID, errMsg, sessionID, workDi
if failureReason != "" {
body["failure_reason"] = failureReason
}
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), body, nil)
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), body, nil, defaultTerminalRetrySchedule)
}
// PinTaskSession persists the agent's session_id and work_dir on the task
@@ -461,6 +461,103 @@ func (c *Client) GetWorkspaceRepos(ctx context.Context, workspaceID string) (*Wo
return &resp, nil
}
// defaultTerminalRetrySchedule is the backoff used by postJSONWithRetry for
// terminal task callbacks (CompleteTask / FailTask). N entries → N+1 attempts
// in the worst case (one immediate + N retries). Five backoffs totalling
// 124s is wide enough to ride out the short upstream blips we've seen
// (MUL-2780) without leaving the task stuck if the outage outlives the
// window.
var defaultTerminalRetrySchedule = []time.Duration{
4 * time.Second,
8 * time.Second,
16 * time.Second,
32 * time.Second,
64 * time.Second,
}
// retrySleep is the sleep used between retry attempts. Pulled into a package
// variable so tests can swap in an instant sleep without rewriting the
// caller's schedule.
var retrySleep = func(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// isTransientError reports whether err looks like a hiccup that's likely to
// resolve on retry: connection / TLS / I/O errors at the transport layer
// (including client timeouts surfacing as context.DeadlineExceeded inside
// http.Client.Do), 5xx server responses, and 408/429 rate-limit-style 4xx
// codes. Other 4xx codes are treated as permanent — retrying a 400 (bad
// body) or 404 (task not found) only burns time.
//
// The caller is responsible for separately bailing on parent-context
// cancellation; this predicate cannot distinguish "the daemon is shutting
// down" from "the HTTP client timed out a single attempt" because both
// reach here as context errors wrapped by net/http.
func isTransientError(err error) bool {
if err == nil {
return false
}
var reqErr *requestError
if errors.As(err, &reqErr) {
if reqErr.StatusCode >= 500 {
return true
}
if reqErr.StatusCode == http.StatusRequestTimeout || reqErr.StatusCode == http.StatusTooManyRequests {
return true
}
return false
}
return true
}
// postJSONWithRetry posts a JSON body with bounded exponential backoff,
// intended for "must reach the server" terminal callbacks (CompleteTask /
// FailTask). It retries transient errors per isTransientError and stops
// immediately on permanent 4xx responses so we don't burn the schedule on
// requests the server has already rejected.
//
// schedule controls the sleeps between attempts. With N entries the helper
// performs N+1 attempts in the worst case (one initial + N retries). The
// returned error is the last response from the server, so callers can still
// inspect it with isTransientError to decide whether to fall back to a
// different terminal call (e.g. complete → fail on permanent error only).
//
// The server-side CompleteTask / FailTask treat "already terminal" as an
// idempotent success (see service/task.go), so a duplicate replay from a
// retry is safe even if the server's prior response was lost in transit.
func (c *Client) postJSONWithRetry(ctx context.Context, path string, reqBody any, respBody any, schedule []time.Duration) error {
var lastErr error
for attempt := 0; ; attempt++ {
if err := ctx.Err(); err != nil {
if lastErr != nil {
return lastErr
}
return err
}
err := c.postJSON(ctx, path, reqBody, respBody)
if err == nil {
return nil
}
lastErr = err
if !isTransientError(err) {
return err
}
if attempt >= len(schedule) {
return err
}
if sleepErr := retrySleep(ctx, schedule[attempt]); sleepErr != nil {
return err
}
}
}
func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {

View File

@@ -3,10 +3,13 @@ package daemon
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"runtime"
"sync/atomic"
"testing"
"time"
)
func TestClient_IdentityHeaders_PostJSON(t *testing.T) {
@@ -82,6 +85,161 @@ func TestClient_VersionOmittedWhenUnset(t *testing.T) {
}
}
// noSleepRetry replaces retrySleep with an immediate no-op so tests don't
// actually wait the 4s/8s/16s/... backoffs. Returns a restore func.
func noSleepRetry(t *testing.T) func() {
t.Helper()
prev := retrySleep
retrySleep = func(ctx context.Context, _ time.Duration) error {
if err := ctx.Err(); err != nil {
return err
}
return nil
}
return func() { retrySleep = prev }
}
func TestIsTransientError(t *testing.T) {
cases := []struct {
name string
err error
want bool
}{
{"nil is not transient", nil, false},
{"5xx is transient", &requestError{StatusCode: http.StatusBadGateway}, true},
{"503 is transient", &requestError{StatusCode: http.StatusServiceUnavailable}, true},
{"408 is transient", &requestError{StatusCode: http.StatusRequestTimeout}, true},
{"429 is transient", &requestError{StatusCode: http.StatusTooManyRequests}, true},
{"400 is permanent", &requestError{StatusCode: http.StatusBadRequest}, false},
{"401 is permanent", &requestError{StatusCode: http.StatusUnauthorized}, false},
{"404 is permanent", &requestError{StatusCode: http.StatusNotFound}, false},
{"transport-level error is transient", errors.New("connection reset by peer"), true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := isTransientError(tc.err); got != tc.want {
t.Fatalf("isTransientError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestPostJSONWithRetry_TransientThenSuccess(t *testing.T) {
defer noSleepRetry(t)()
var calls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
n := calls.Add(1)
if n < 3 {
w.WriteHeader(http.StatusBadGateway)
return
}
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
c := NewClient(srv.URL)
schedule := []time.Duration{time.Nanosecond, time.Nanosecond, time.Nanosecond}
if err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule); err != nil {
t.Fatalf("postJSONWithRetry: %v", err)
}
if got := calls.Load(); got != 3 {
t.Fatalf("expected 3 attempts (2 transient + 1 success), got %d", got)
}
}
func TestPostJSONWithRetry_TransientExhausts(t *testing.T) {
defer noSleepRetry(t)()
var calls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
calls.Add(1)
w.WriteHeader(http.StatusBadGateway)
}))
defer srv.Close()
c := NewClient(srv.URL)
schedule := []time.Duration{time.Nanosecond, time.Nanosecond}
err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule)
if err == nil {
t.Fatal("expected error after schedule exhausted, got nil")
}
if !isTransientError(err) {
t.Fatalf("expected transient error, got %v", err)
}
if got := calls.Load(); got != int32(len(schedule)+1) {
t.Fatalf("expected %d attempts (initial + %d retries), got %d", len(schedule)+1, len(schedule), got)
}
}
func TestPostJSONWithRetry_PermanentBailsImmediately(t *testing.T) {
defer noSleepRetry(t)()
var calls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
calls.Add(1)
w.WriteHeader(http.StatusBadRequest)
}))
defer srv.Close()
c := NewClient(srv.URL)
schedule := []time.Duration{time.Nanosecond, time.Nanosecond, time.Nanosecond}
err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule)
if err == nil {
t.Fatal("expected error, got nil")
}
if got := calls.Load(); got != 1 {
t.Fatalf("expected exactly 1 attempt on permanent error, got %d", got)
}
}
func TestPostJSONWithRetry_CtxCancelStopsRetries(t *testing.T) {
// Use the real sleeper here so we can observe a cancel preempting it.
var calls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
calls.Add(1)
w.WriteHeader(http.StatusBadGateway)
}))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Cancel quickly so the first sleep is aborted long before its 1s.
time.Sleep(50 * time.Millisecond)
cancel()
}()
c := NewClient(srv.URL)
schedule := []time.Duration{time.Second, time.Second, time.Second}
start := time.Now()
err := c.postJSONWithRetry(ctx, "/x", map[string]any{}, nil, schedule)
elapsed := time.Since(start)
if err == nil {
t.Fatal("expected error after ctx cancel, got nil")
}
if elapsed > 750*time.Millisecond {
t.Fatalf("expected ctx cancel to short-circuit retry, took %s", elapsed)
}
if got := calls.Load(); got != 1 {
t.Fatalf("expected exactly 1 attempt before cancel, got %d", got)
}
}
func TestDefaultTerminalRetrySchedule_MatchesAgreedPlan(t *testing.T) {
// MUL-2780 settled on a 5-step exponential backoff (4s, 8s, 16s, 32s, 64s).
// Pin it so a future "tidy this up" refactor can't silently flatten or
// shorten the recovery window without explicit discussion.
want := []time.Duration{4 * time.Second, 8 * time.Second, 16 * time.Second, 32 * time.Second, 64 * time.Second}
if len(defaultTerminalRetrySchedule) != len(want) {
t.Fatalf("schedule length: got %d, want %d", len(defaultTerminalRetrySchedule), len(want))
}
for i, d := range want {
if defaultTerminalRetrySchedule[i] != d {
t.Errorf("schedule[%d]: got %s, want %s", i, defaultTerminalRetrySchedule[i], d)
}
}
}
func TestNormalizeGOOS(t *testing.T) {
cases := map[string]string{
"darwin": "macos",

View File

@@ -2367,11 +2367,28 @@ func (d *Daemon) reportTaskResult(ctx context.Context, taskID string, result Tas
switch result.Status {
case "completed":
taskLog.Info("task completed", "status", result.Status)
if err := d.client.CompleteTask(ctx, taskID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
taskLog.Error("complete task failed, falling back to fail", "error", err)
if failErr := d.client.FailTask(ctx, taskID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
taskLog.Error("fail task fallback also failed", "error", failErr)
}
err := d.client.CompleteTask(ctx, taskID, result.Comment, result.BranchName, result.SessionID, result.WorkDir)
if err == nil {
return
}
// CompleteTask retries transient errors internally. A transient
// error reaching us here means the schedule was exhausted while
// the upstream was still 5xx / unreachable. Converting that into
// a fail would lose the agent's actual result and surface a
// misleading red badge in the UI — leave the task in running
// instead so a future fix (server-side stuck-task reaper, or a
// daemon-side persistent pending queue) can recover it. Only
// permanent server-side rejections (4xx other than 408/429)
// warrant the legacy fallback, because at that point the server
// has already refused this task and the only useful UI signal
// left is a concrete failure.
if isTransientError(err) {
taskLog.Error("complete task failed after retries; leaving task in running rather than falling back to fail", "error", err)
return
}
taskLog.Error("complete task rejected by server, falling back to fail", "error", err)
if failErr := d.client.FailTask(ctx, taskID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
taskLog.Error("fail task fallback also failed", "error", failErr)
}
default:
failureReason := result.FailureReason

View File

@@ -1781,6 +1781,123 @@ func TestReportTaskResult_NonCompletedHitsFailEndpoint(t *testing.T) {
}
}
// Regression test for the MUL-2780 incident: a short 502 burst on the
// /complete callback used to (a) drop the task at the first failure and
// (b) wrongly fall back to /fail, surfacing a successful run as red.
// With the retry helper in place, a transient 502 followed by a 200 must
// resolve via /complete without ever touching /fail.
func TestReportTaskResult_RetriesTransientCompleteThenSucceeds(t *testing.T) {
defer noSleepRetry(t)()
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
n := completeCalls.Add(1)
if n == 1 {
w.WriteHeader(http.StatusBadGateway)
return
}
w.WriteHeader(http.StatusOK)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-retry", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != 2 {
t.Fatalf("expected 2 complete attempts (one 502, one 200), got %d", got)
}
if got := failCalls.Load(); got != 0 {
t.Fatalf("transient 502 must not fall back to /fail (would lose successful result), got %d /fail calls", got)
}
}
// Pins the new "don't downgrade success to failure on transient errors"
// rule: when /complete is 502 across the entire retry schedule, we must
// NOT fall through to /fail — that would surface a real success as a
// failure in the UI. The task is left in running for a future recovery
// path to pick up.
func TestReportTaskResult_TransientCompleteExhaustedDoesNotFallback(t *testing.T) {
defer noSleepRetry(t)()
prevSchedule := defaultTerminalRetrySchedule
defaultTerminalRetrySchedule = []time.Duration{time.Nanosecond, time.Nanosecond}
t.Cleanup(func() { defaultTerminalRetrySchedule = prevSchedule })
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
completeCalls.Add(1)
w.WriteHeader(http.StatusBadGateway)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-stuck", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != int32(len(defaultTerminalRetrySchedule)+1) {
t.Fatalf("expected %d complete attempts, got %d", len(defaultTerminalRetrySchedule)+1, got)
}
if got := failCalls.Load(); got != 0 {
t.Fatalf("exhausted transient retries must NOT fall back to /fail; got %d /fail calls", got)
}
}
// On permanent 4xx from /complete (e.g. 400 bad body, 404 task not found)
// the helper bails immediately and the daemon falls back to /fail so the
// UI shows a concrete failure rather than a perpetually-running task.
func TestReportTaskResult_PermanentCompleteFallsBackToFail(t *testing.T) {
defer noSleepRetry(t)()
var completeCalls, failCalls atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case strings.HasSuffix(req.URL.Path, "/complete"):
completeCalls.Add(1)
w.WriteHeader(http.StatusBadRequest)
case strings.HasSuffix(req.URL.Path, "/fail"):
failCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
d.reportTaskResult(context.Background(), "task-bad", TaskResult{
Status: "completed",
Comment: "ok",
}, slog.Default())
if got := completeCalls.Load(); got != 1 {
t.Fatalf("permanent 400 should not retry, got %d complete attempts", got)
}
if got := failCalls.Load(); got != 1 {
t.Fatalf("permanent /complete should fall back to /fail exactly once, got %d", got)
}
}
// TestHandleTask_ReportsUsageBeforeCancel verifies that ReportTaskUsage is called
// even when the server marks the task as cancelled during the post-run status
// check. Regression test for the ordering bug where the cancel check ran before