diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index f23e9efd5..caea065b4 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -587,14 +587,14 @@ func (d *Daemon) handleLocalSkillList(ctx context.Context, rt Runtime, requestID skills, supported, err := listRuntimeLocalSkills(rt.Provider) if err != nil { - d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, map[string]any{ + d.reportLocalSkillListResult(ctx, rt, requestID, map[string]any{ "status": "failed", "error": err.Error(), }) return } - d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, map[string]any{ + d.reportLocalSkillListResult(ctx, rt, requestID, map[string]any{ "status": "completed", "skills": skills, "supported": supported, @@ -606,26 +606,104 @@ func (d *Daemon) handleLocalSkillImport(ctx context.Context, rt Runtime, pending skill, supported, err := loadRuntimeLocalSkillBundle(rt.Provider, pending.SkillKey) if err != nil { - d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{ + d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{ "status": "failed", "error": err.Error(), }) return } if !supported { - d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{ + d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{ "status": "failed", "error": fmt.Sprintf("provider %q does not expose runtime local skills", rt.Provider), }) return } - d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{ + d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{ "status": "completed", "skill": skill, }) } +// localSkillReportBackoffs defines the retry schedule for delivering a +// local-skill result to the server. First attempt runs immediately, then we +// back off. The sum (≈6.5s) stays well under the server-side running timeout +// (60s) so a report that eventually lands still updates the request instead +// of racing a timeout transition. +// +// Overridable for tests to avoid real sleeps. +var localSkillReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second} + +// reportLocalSkillListResult delivers a list-report to the server with retry +// on transient failures. See reportLocalSkillResultWithRetry for semantics. +func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) { + d.reportLocalSkillResultWithRetry(ctx, "list", rt.ID, requestID, func(ctx context.Context) error { + return d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, payload) + }) +} + +// reportLocalSkillImportResult delivers an import-report to the server with +// retry on transient failures. +func (d *Daemon) reportLocalSkillImportResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) { + d.reportLocalSkillResultWithRetry(ctx, "import", rt.ID, requestID, func(ctx context.Context) error { + return d.client.ReportLocalSkillImportResult(ctx, rt.ID, requestID, payload) + }) +} + +// reportLocalSkillResultWithRetry retries `fn` on 5xx / network errors and +// stops on success, 4xx, or after exhausting localSkillReportBackoffs. +// +// Why this exists: the server persists the report through a Redis / DB +// write; on a transient store failure it now correctly returns 500 (see +// PR #1557). Without a client-side retry the daemon would fire once, +// swallow the error, and the pending request stays in "running" on the +// server until the 60s timeout — which is exactly the "daemon did not +// respond" failure mode the whole store refactor was meant to fix. 4xx is +// treated as permanent (request-not-found, cross-workspace token rejected, +// bad body) — retrying those just wastes heartbeat cycles. +func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) { + var lastErr error + for attempt, wait := range localSkillReportBackoffs { + if wait > 0 { + select { + case <-ctx.Done(): + d.logger.Error("local skill report cancelled", + "kind", kind, "runtime_id", runtimeID, "request_id", requestID, + "attempt", attempt, "error", ctx.Err()) + return + case <-time.After(wait): + } + } + err := fn(ctx) + if err == nil { + if attempt > 0 { + d.logger.Info("local skill report succeeded after retry", + "kind", kind, "runtime_id", runtimeID, "request_id", requestID, + "attempt", attempt+1) + } + return + } + lastErr = err + + // 4xx is permanent (request expired, workspace mismatch, malformed + // body). No amount of retrying will make it succeed. + var reqErr *requestError + if errors.As(err, &reqErr) && reqErr.StatusCode >= 400 && reqErr.StatusCode < 500 { + d.logger.Error("local skill report rejected — not retrying", + "kind", kind, "runtime_id", runtimeID, "request_id", requestID, + "status", reqErr.StatusCode, "error", err) + return + } + + d.logger.Warn("local skill report failed — will retry", + "kind", kind, "runtime_id", runtimeID, "request_id", requestID, + "attempt", attempt+1, "error", err) + } + d.logger.Error("local skill report exhausted retries", + "kind", kind, "runtime_id", runtimeID, "request_id", requestID, "error", lastErr) +} + // handleUpdate performs the CLI update when triggered by the server via heartbeat. func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) { // Desktop-managed daemons share their CLI binary with the Electron app, diff --git a/server/internal/daemon/local_skill_report_test.go b/server/internal/daemon/local_skill_report_test.go new file mode 100644 index 000000000..8ffbafcfd --- /dev/null +++ b/server/internal/daemon/local_skill_report_test.go @@ -0,0 +1,171 @@ +package daemon + +import ( + "context" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +// withFastLocalSkillReportBackoffs swaps in zero-delay retries for the +// duration of a test so the suite doesn't pay real sleep latency. Restores +// the production schedule on cleanup. +func withFastLocalSkillReportBackoffs(t *testing.T) { + t.Helper() + prev := localSkillReportBackoffs + localSkillReportBackoffs = []time.Duration{0, 0, 0, 0} + t.Cleanup(func() { localSkillReportBackoffs = prev }) +} + +// localSkillReportDaemon wires a Daemon instance around an httptest.Server +// that records every inbound request and lets the test script status codes +// to return. That lets us exercise the retry path end-to-end against the +// real daemon.Client code, not a mock. +func localSkillReportDaemon(t *testing.T, handler http.HandlerFunc) (*Daemon, *int32) { + t.Helper() + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + handler(w, r) + })) + t.Cleanup(srv.Close) + return &Daemon{ + client: NewClient(srv.URL), + logger: slog.Default(), + }, &calls +} + +func TestReportLocalSkillListResult_RetriesOn500AndEventuallySucceeds(t *testing.T) { + withFastLocalSkillReportBackoffs(t) + + var hits int32 + d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) { + // Fail twice with 500, then succeed. Matches the concrete failure + // mode the review is pinning: the server returns 500 while the + // store write is being retried on its end, and the daemon must + // hold on long enough to see it land. + n := atomic.AddInt32(&hits, 1) + if n <= 2 { + http.Error(w, "{}", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"ok"}`)) + }) + + d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"}) + + if got := atomic.LoadInt32(calls); got != 3 { + t.Fatalf("expected 3 attempts (2 failures + 1 success), got %d", got) + } +} + +func TestReportLocalSkillListResult_DoesNotRetryOn4xx(t *testing.T) { + withFastLocalSkillReportBackoffs(t) + + d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) { + // 404 is permanent — the request expired, was cross-workspace, or + // the server never saw it. Retrying just wastes heartbeat cycles. + http.Error(w, `{"error":"request not found"}`, http.StatusNotFound) + }) + + d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"}) + + if got := atomic.LoadInt32(calls); got != 1 { + t.Fatalf("expected exactly 1 attempt (4xx is terminal), got %d", got) + } +} + +func TestReportLocalSkillImportResult_RetriesOn500AndEventuallySucceeds(t *testing.T) { + withFastLocalSkillReportBackoffs(t) + + var hits int32 + d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) { + n := atomic.AddInt32(&hits, 1) + if n == 1 { + http.Error(w, "{}", http.StatusBadGateway) + return + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"ok"}`)) + }) + + d.reportLocalSkillImportResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"}) + + if got := atomic.LoadInt32(calls); got != 2 { + t.Fatalf("expected 2 attempts, got %d", got) + } +} + +func TestReportLocalSkillResult_GivesUpAfterAllAttemptsFail(t *testing.T) { + withFastLocalSkillReportBackoffs(t) + + d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "{}", http.StatusInternalServerError) + }) + + d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"}) + + // Each element in localSkillReportBackoffs is one attempt — a persistent + // outage should burn through every slot and then stop (logging Error). + if got := atomic.LoadInt32(calls); int(got) != len(localSkillReportBackoffs) { + t.Fatalf("expected %d attempts, got %d", len(localSkillReportBackoffs), got) + } +} + +func TestReportLocalSkillResult_AbortsOnContextCancel(t *testing.T) { + // Keep one real delay in the schedule so cancel lands mid-backoff. + prev := localSkillReportBackoffs + localSkillReportBackoffs = []time.Duration{0, 200 * time.Millisecond} + t.Cleanup(func() { localSkillReportBackoffs = prev }) + + d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "{}", http.StatusInternalServerError) + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(30 * time.Millisecond) + cancel() + }() + d.reportLocalSkillListResult(ctx, Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"}) + + // Exactly the first attempt should have hit the server; the cancel + // interrupts the sleep before the second attempt fires. + if got := atomic.LoadInt32(calls); got != 1 { + t.Fatalf("expected exactly 1 attempt before cancel, got %d", got) + } +} + +func TestReportLocalSkillResult_SendsCorrectPath(t *testing.T) { + withFastLocalSkillReportBackoffs(t) + + var listPath, importPath string + d, _ := localSkillReportDaemon(t, func(w http.ResponseWriter, r *http.Request) { + // Smoke: make sure we're hitting the right daemon-side endpoint. + // Protects against a future refactor silently pointing reports at + // the wrong URL. + if strings.Contains(r.URL.Path, "/import/") { + importPath = r.URL.Path + } else { + listPath = r.URL.Path + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"ok"}`)) + }) + + ctx := context.Background() + d.reportLocalSkillListResult(ctx, Runtime{ID: "rt-a"}, "req-list", map[string]any{"status": "completed"}) + d.reportLocalSkillImportResult(ctx, Runtime{ID: "rt-a"}, "req-import", map[string]any{"status": "completed"}) + + if !strings.HasSuffix(listPath, "/api/daemon/runtimes/rt-a/local-skills/req-list/result") { + t.Fatalf("list path = %q", listPath) + } + if !strings.HasSuffix(importPath, "/api/daemon/runtimes/rt-a/local-skills/import/req-import/result") { + t.Fatalf("import path = %q", importPath) + } +}