mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-27 17:47:43 +02:00
Compare commits
1 Commits
agent/lamb
...
agent/j/8b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55aa4180cf |
@@ -217,7 +217,7 @@ func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, s
|
||||
if workDir != "" {
|
||||
body["work_dir"] = workDir
|
||||
}
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil)
|
||||
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil, defaultTerminalRetrySchedule)
|
||||
}
|
||||
|
||||
func (c *Client) ReportTaskUsage(ctx context.Context, taskID string, usage []TaskUsageEntry) error {
|
||||
@@ -240,7 +240,7 @@ func (c *Client) FailTask(ctx context.Context, taskID, errMsg, sessionID, workDi
|
||||
if failureReason != "" {
|
||||
body["failure_reason"] = failureReason
|
||||
}
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), body, nil)
|
||||
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), body, nil, defaultTerminalRetrySchedule)
|
||||
}
|
||||
|
||||
// PinTaskSession persists the agent's session_id and work_dir on the task
|
||||
@@ -461,6 +461,103 @@ func (c *Client) GetWorkspaceRepos(ctx context.Context, workspaceID string) (*Wo
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// defaultTerminalRetrySchedule is the backoff used by postJSONWithRetry for
|
||||
// terminal task callbacks (CompleteTask / FailTask). N entries → N+1 attempts
|
||||
// in the worst case (one immediate + N retries). Five backoffs totalling
|
||||
// 124s is wide enough to ride out the short upstream blips we've seen
|
||||
// (MUL-2780) without leaving the task stuck if the outage outlives the
|
||||
// window.
|
||||
var defaultTerminalRetrySchedule = []time.Duration{
|
||||
4 * time.Second,
|
||||
8 * time.Second,
|
||||
16 * time.Second,
|
||||
32 * time.Second,
|
||||
64 * time.Second,
|
||||
}
|
||||
|
||||
// retrySleep is the sleep used between retry attempts. Pulled into a package
|
||||
// variable so tests can swap in an instant sleep without rewriting the
|
||||
// caller's schedule.
|
||||
var retrySleep = func(ctx context.Context, d time.Duration) error {
|
||||
timer := time.NewTimer(d)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// isTransientError reports whether err looks like a hiccup that's likely to
|
||||
// resolve on retry: connection / TLS / I/O errors at the transport layer
|
||||
// (including client timeouts surfacing as context.DeadlineExceeded inside
|
||||
// http.Client.Do), 5xx server responses, and 408/429 rate-limit-style 4xx
|
||||
// codes. Other 4xx codes are treated as permanent — retrying a 400 (bad
|
||||
// body) or 404 (task not found) only burns time.
|
||||
//
|
||||
// The caller is responsible for separately bailing on parent-context
|
||||
// cancellation; this predicate cannot distinguish "the daemon is shutting
|
||||
// down" from "the HTTP client timed out a single attempt" because both
|
||||
// reach here as context errors wrapped by net/http.
|
||||
func isTransientError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var reqErr *requestError
|
||||
if errors.As(err, &reqErr) {
|
||||
if reqErr.StatusCode >= 500 {
|
||||
return true
|
||||
}
|
||||
if reqErr.StatusCode == http.StatusRequestTimeout || reqErr.StatusCode == http.StatusTooManyRequests {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// postJSONWithRetry posts a JSON body with bounded exponential backoff,
|
||||
// intended for "must reach the server" terminal callbacks (CompleteTask /
|
||||
// FailTask). It retries transient errors per isTransientError and stops
|
||||
// immediately on permanent 4xx responses so we don't burn the schedule on
|
||||
// requests the server has already rejected.
|
||||
//
|
||||
// schedule controls the sleeps between attempts. With N entries the helper
|
||||
// performs N+1 attempts in the worst case (one initial + N retries). The
|
||||
// returned error is the last response from the server, so callers can still
|
||||
// inspect it with isTransientError to decide whether to fall back to a
|
||||
// different terminal call (e.g. complete → fail on permanent error only).
|
||||
//
|
||||
// The server-side CompleteTask / FailTask treat "already terminal" as an
|
||||
// idempotent success (see service/task.go), so a duplicate replay from a
|
||||
// retry is safe even if the server's prior response was lost in transit.
|
||||
func (c *Client) postJSONWithRetry(ctx context.Context, path string, reqBody any, respBody any, schedule []time.Duration) error {
|
||||
var lastErr error
|
||||
for attempt := 0; ; attempt++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
if lastErr != nil {
|
||||
return lastErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
err := c.postJSON(ctx, path, reqBody, respBody)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
if !isTransientError(err) {
|
||||
return err
|
||||
}
|
||||
if attempt >= len(schedule) {
|
||||
return err
|
||||
}
|
||||
if sleepErr := retrySleep(ctx, schedule[attempt]); sleepErr != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
|
||||
var body io.Reader
|
||||
if reqBody != nil {
|
||||
|
||||
@@ -3,10 +3,13 @@ package daemon
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestClient_IdentityHeaders_PostJSON(t *testing.T) {
|
||||
@@ -82,6 +85,161 @@ func TestClient_VersionOmittedWhenUnset(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// noSleepRetry replaces retrySleep with an immediate no-op so tests don't
|
||||
// actually wait the 4s/8s/16s/... backoffs. Returns a restore func.
|
||||
func noSleepRetry(t *testing.T) func() {
|
||||
t.Helper()
|
||||
prev := retrySleep
|
||||
retrySleep = func(ctx context.Context, _ time.Duration) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return func() { retrySleep = prev }
|
||||
}
|
||||
|
||||
func TestIsTransientError(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"nil is not transient", nil, false},
|
||||
{"5xx is transient", &requestError{StatusCode: http.StatusBadGateway}, true},
|
||||
{"503 is transient", &requestError{StatusCode: http.StatusServiceUnavailable}, true},
|
||||
{"408 is transient", &requestError{StatusCode: http.StatusRequestTimeout}, true},
|
||||
{"429 is transient", &requestError{StatusCode: http.StatusTooManyRequests}, true},
|
||||
{"400 is permanent", &requestError{StatusCode: http.StatusBadRequest}, false},
|
||||
{"401 is permanent", &requestError{StatusCode: http.StatusUnauthorized}, false},
|
||||
{"404 is permanent", &requestError{StatusCode: http.StatusNotFound}, false},
|
||||
{"transport-level error is transient", errors.New("connection reset by peer"), true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := isTransientError(tc.err); got != tc.want {
|
||||
t.Fatalf("isTransientError(%v) = %v, want %v", tc.err, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostJSONWithRetry_TransientThenSuccess(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
var calls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
n := calls.Add(1)
|
||||
if n < 3 {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewClient(srv.URL)
|
||||
schedule := []time.Duration{time.Nanosecond, time.Nanosecond, time.Nanosecond}
|
||||
if err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule); err != nil {
|
||||
t.Fatalf("postJSONWithRetry: %v", err)
|
||||
}
|
||||
if got := calls.Load(); got != 3 {
|
||||
t.Fatalf("expected 3 attempts (2 transient + 1 success), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostJSONWithRetry_TransientExhausts(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
var calls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls.Add(1)
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewClient(srv.URL)
|
||||
schedule := []time.Duration{time.Nanosecond, time.Nanosecond}
|
||||
err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule)
|
||||
if err == nil {
|
||||
t.Fatal("expected error after schedule exhausted, got nil")
|
||||
}
|
||||
if !isTransientError(err) {
|
||||
t.Fatalf("expected transient error, got %v", err)
|
||||
}
|
||||
if got := calls.Load(); got != int32(len(schedule)+1) {
|
||||
t.Fatalf("expected %d attempts (initial + %d retries), got %d", len(schedule)+1, len(schedule), got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostJSONWithRetry_PermanentBailsImmediately(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
var calls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls.Add(1)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewClient(srv.URL)
|
||||
schedule := []time.Duration{time.Nanosecond, time.Nanosecond, time.Nanosecond}
|
||||
err := c.postJSONWithRetry(context.Background(), "/x", map[string]any{}, nil, schedule)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Fatalf("expected exactly 1 attempt on permanent error, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostJSONWithRetry_CtxCancelStopsRetries(t *testing.T) {
|
||||
// Use the real sleeper here so we can observe a cancel preempting it.
|
||||
var calls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls.Add(1)
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
// Cancel quickly so the first sleep is aborted long before its 1s.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
c := NewClient(srv.URL)
|
||||
schedule := []time.Duration{time.Second, time.Second, time.Second}
|
||||
start := time.Now()
|
||||
err := c.postJSONWithRetry(ctx, "/x", map[string]any{}, nil, schedule)
|
||||
elapsed := time.Since(start)
|
||||
if err == nil {
|
||||
t.Fatal("expected error after ctx cancel, got nil")
|
||||
}
|
||||
if elapsed > 750*time.Millisecond {
|
||||
t.Fatalf("expected ctx cancel to short-circuit retry, took %s", elapsed)
|
||||
}
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Fatalf("expected exactly 1 attempt before cancel, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultTerminalRetrySchedule_MatchesAgreedPlan(t *testing.T) {
|
||||
// MUL-2780 settled on a 5-step exponential backoff (4s, 8s, 16s, 32s, 64s).
|
||||
// Pin it so a future "tidy this up" refactor can't silently flatten or
|
||||
// shorten the recovery window without explicit discussion.
|
||||
want := []time.Duration{4 * time.Second, 8 * time.Second, 16 * time.Second, 32 * time.Second, 64 * time.Second}
|
||||
if len(defaultTerminalRetrySchedule) != len(want) {
|
||||
t.Fatalf("schedule length: got %d, want %d", len(defaultTerminalRetrySchedule), len(want))
|
||||
}
|
||||
for i, d := range want {
|
||||
if defaultTerminalRetrySchedule[i] != d {
|
||||
t.Errorf("schedule[%d]: got %s, want %s", i, defaultTerminalRetrySchedule[i], d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeGOOS(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"darwin": "macos",
|
||||
|
||||
@@ -2367,11 +2367,28 @@ func (d *Daemon) reportTaskResult(ctx context.Context, taskID string, result Tas
|
||||
switch result.Status {
|
||||
case "completed":
|
||||
taskLog.Info("task completed", "status", result.Status)
|
||||
if err := d.client.CompleteTask(ctx, taskID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
|
||||
taskLog.Error("complete task failed, falling back to fail", "error", err)
|
||||
if failErr := d.client.FailTask(ctx, taskID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
|
||||
taskLog.Error("fail task fallback also failed", "error", failErr)
|
||||
}
|
||||
err := d.client.CompleteTask(ctx, taskID, result.Comment, result.BranchName, result.SessionID, result.WorkDir)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
// CompleteTask retries transient errors internally. A transient
|
||||
// error reaching us here means the schedule was exhausted while
|
||||
// the upstream was still 5xx / unreachable. Converting that into
|
||||
// a fail would lose the agent's actual result and surface a
|
||||
// misleading red badge in the UI — leave the task in running
|
||||
// instead so a future fix (server-side stuck-task reaper, or a
|
||||
// daemon-side persistent pending queue) can recover it. Only
|
||||
// permanent server-side rejections (4xx other than 408/429)
|
||||
// warrant the legacy fallback, because at that point the server
|
||||
// has already refused this task and the only useful UI signal
|
||||
// left is a concrete failure.
|
||||
if isTransientError(err) {
|
||||
taskLog.Error("complete task failed after retries; leaving task in running rather than falling back to fail", "error", err)
|
||||
return
|
||||
}
|
||||
taskLog.Error("complete task rejected by server, falling back to fail", "error", err)
|
||||
if failErr := d.client.FailTask(ctx, taskID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
|
||||
taskLog.Error("fail task fallback also failed", "error", failErr)
|
||||
}
|
||||
default:
|
||||
failureReason := result.FailureReason
|
||||
|
||||
@@ -1781,6 +1781,123 @@ func TestReportTaskResult_NonCompletedHitsFailEndpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test for the MUL-2780 incident: a short 502 burst on the
|
||||
// /complete callback used to (a) drop the task at the first failure and
|
||||
// (b) wrongly fall back to /fail, surfacing a successful run as red.
|
||||
// With the retry helper in place, a transient 502 followed by a 200 must
|
||||
// resolve via /complete without ever touching /fail.
|
||||
func TestReportTaskResult_RetriesTransientCompleteThenSucceeds(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
var completeCalls, failCalls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
switch {
|
||||
case strings.HasSuffix(req.URL.Path, "/complete"):
|
||||
n := completeCalls.Add(1)
|
||||
if n == 1 {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
case strings.HasSuffix(req.URL.Path, "/fail"):
|
||||
failCalls.Add(1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
|
||||
d.reportTaskResult(context.Background(), "task-retry", TaskResult{
|
||||
Status: "completed",
|
||||
Comment: "ok",
|
||||
}, slog.Default())
|
||||
|
||||
if got := completeCalls.Load(); got != 2 {
|
||||
t.Fatalf("expected 2 complete attempts (one 502, one 200), got %d", got)
|
||||
}
|
||||
if got := failCalls.Load(); got != 0 {
|
||||
t.Fatalf("transient 502 must not fall back to /fail (would lose successful result), got %d /fail calls", got)
|
||||
}
|
||||
}
|
||||
|
||||
// Pins the new "don't downgrade success to failure on transient errors"
|
||||
// rule: when /complete is 502 across the entire retry schedule, we must
|
||||
// NOT fall through to /fail — that would surface a real success as a
|
||||
// failure in the UI. The task is left in running for a future recovery
|
||||
// path to pick up.
|
||||
func TestReportTaskResult_TransientCompleteExhaustedDoesNotFallback(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
prevSchedule := defaultTerminalRetrySchedule
|
||||
defaultTerminalRetrySchedule = []time.Duration{time.Nanosecond, time.Nanosecond}
|
||||
t.Cleanup(func() { defaultTerminalRetrySchedule = prevSchedule })
|
||||
|
||||
var completeCalls, failCalls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
switch {
|
||||
case strings.HasSuffix(req.URL.Path, "/complete"):
|
||||
completeCalls.Add(1)
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
case strings.HasSuffix(req.URL.Path, "/fail"):
|
||||
failCalls.Add(1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
|
||||
d.reportTaskResult(context.Background(), "task-stuck", TaskResult{
|
||||
Status: "completed",
|
||||
Comment: "ok",
|
||||
}, slog.Default())
|
||||
|
||||
if got := completeCalls.Load(); got != int32(len(defaultTerminalRetrySchedule)+1) {
|
||||
t.Fatalf("expected %d complete attempts, got %d", len(defaultTerminalRetrySchedule)+1, got)
|
||||
}
|
||||
if got := failCalls.Load(); got != 0 {
|
||||
t.Fatalf("exhausted transient retries must NOT fall back to /fail; got %d /fail calls", got)
|
||||
}
|
||||
}
|
||||
|
||||
// On permanent 4xx from /complete (e.g. 400 bad body, 404 task not found)
|
||||
// the helper bails immediately and the daemon falls back to /fail so the
|
||||
// UI shows a concrete failure rather than a perpetually-running task.
|
||||
func TestReportTaskResult_PermanentCompleteFallsBackToFail(t *testing.T) {
|
||||
defer noSleepRetry(t)()
|
||||
|
||||
var completeCalls, failCalls atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
switch {
|
||||
case strings.HasSuffix(req.URL.Path, "/complete"):
|
||||
completeCalls.Add(1)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
case strings.HasSuffix(req.URL.Path, "/fail"):
|
||||
failCalls.Add(1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
|
||||
d.reportTaskResult(context.Background(), "task-bad", TaskResult{
|
||||
Status: "completed",
|
||||
Comment: "ok",
|
||||
}, slog.Default())
|
||||
|
||||
if got := completeCalls.Load(); got != 1 {
|
||||
t.Fatalf("permanent 400 should not retry, got %d complete attempts", got)
|
||||
}
|
||||
if got := failCalls.Load(); got != 1 {
|
||||
t.Fatalf("permanent /complete should fall back to /fail exactly once, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandleTask_ReportsUsageBeforeCancel verifies that ReportTaskUsage is called
|
||||
// even when the server marks the task as cancelled during the post-run status
|
||||
// check. Regression test for the ordering bug where the cancel check ran before
|
||||
|
||||
Reference in New Issue
Block a user