mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
fix(daemon): retry local-skill reports on transient server errors (#1561)
Review follow-up on PR #1557: the server-side change started returning 500 when the store write failed, but the daemon's handleLocalSkillList / handleLocalSkillImport were discarding the ReportLocalSkill*Result error return. Net effect was a silent drop — the daemon moved on, the request stayed in "running" on the server, and the user saw the same "daemon did not respond within 30 seconds" timeout the store refactor was supposed to kill. Fix: route both report calls through reportLocalSkillResultWithRetry, which retries on 5xx + network errors with 0 / 0.5s / 2s / 4s backoff (total ~6.5s, well inside the 60s server-side running timeout), stops on 4xx (request expired / cross-workspace rejection — retry won't help), bails on context cancel, and logs Error on exhaustion so ops has a footprint to grep for. Tests (server/internal/daemon/local_skill_report_test.go, 6 new cases): - 500 twice then success -> 3 attempts, second retry lands - 404 -> exactly 1 attempt (permanent, no retry) - import 502 then success -> 2 attempts - All-500 -> burns through all backoff slots then gives up with ERROR log - Context cancel mid-backoff -> exactly 1 attempt, cancellation logged - Smoke: report paths hit /api/daemon/runtimes/<rt>/local-skills{,import}/<req>/result localSkillReportBackoffs is var-assignable so tests can swap in zero-delay schedules without paying real sleep latency.
This commit is contained in:
@@ -587,14 +587,14 @@ func (d *Daemon) handleLocalSkillList(ctx context.Context, rt Runtime, requestID
|
||||
|
||||
skills, supported, err := listRuntimeLocalSkills(rt.Provider)
|
||||
if err != nil {
|
||||
d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, map[string]any{
|
||||
d.reportLocalSkillListResult(ctx, rt, requestID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, map[string]any{
|
||||
d.reportLocalSkillListResult(ctx, rt, requestID, map[string]any{
|
||||
"status": "completed",
|
||||
"skills": skills,
|
||||
"supported": supported,
|
||||
@@ -606,26 +606,104 @@ func (d *Daemon) handleLocalSkillImport(ctx context.Context, rt Runtime, pending
|
||||
|
||||
skill, supported, err := loadRuntimeLocalSkillBundle(rt.Provider, pending.SkillKey)
|
||||
if err != nil {
|
||||
d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{
|
||||
d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
if !supported {
|
||||
d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{
|
||||
d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": fmt.Sprintf("provider %q does not expose runtime local skills", rt.Provider),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
d.client.ReportLocalSkillImportResult(ctx, rt.ID, pending.ID, map[string]any{
|
||||
d.reportLocalSkillImportResult(ctx, rt, pending.ID, map[string]any{
|
||||
"status": "completed",
|
||||
"skill": skill,
|
||||
})
|
||||
}
|
||||
|
||||
// localSkillReportBackoffs defines the retry schedule for delivering a
|
||||
// local-skill result to the server. First attempt runs immediately, then we
|
||||
// back off. The sum (≈6.5s) stays well under the server-side running timeout
|
||||
// (60s) so a report that eventually lands still updates the request instead
|
||||
// of racing a timeout transition.
|
||||
//
|
||||
// Overridable for tests to avoid real sleeps.
|
||||
var localSkillReportBackoffs = []time.Duration{0, 500 * time.Millisecond, 2 * time.Second, 4 * time.Second}
|
||||
|
||||
// reportLocalSkillListResult delivers a list-report to the server with retry
|
||||
// on transient failures. See reportLocalSkillResultWithRetry for semantics.
|
||||
func (d *Daemon) reportLocalSkillListResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
|
||||
d.reportLocalSkillResultWithRetry(ctx, "list", rt.ID, requestID, func(ctx context.Context) error {
|
||||
return d.client.ReportLocalSkillListResult(ctx, rt.ID, requestID, payload)
|
||||
})
|
||||
}
|
||||
|
||||
// reportLocalSkillImportResult delivers an import-report to the server with
|
||||
// retry on transient failures.
|
||||
func (d *Daemon) reportLocalSkillImportResult(ctx context.Context, rt Runtime, requestID string, payload map[string]any) {
|
||||
d.reportLocalSkillResultWithRetry(ctx, "import", rt.ID, requestID, func(ctx context.Context) error {
|
||||
return d.client.ReportLocalSkillImportResult(ctx, rt.ID, requestID, payload)
|
||||
})
|
||||
}
|
||||
|
||||
// reportLocalSkillResultWithRetry retries `fn` on 5xx / network errors and
|
||||
// stops on success, 4xx, or after exhausting localSkillReportBackoffs.
|
||||
//
|
||||
// Why this exists: the server persists the report through a Redis / DB
|
||||
// write; on a transient store failure it now correctly returns 500 (see
|
||||
// PR #1557). Without a client-side retry the daemon would fire once,
|
||||
// swallow the error, and the pending request stays in "running" on the
|
||||
// server until the 60s timeout — which is exactly the "daemon did not
|
||||
// respond" failure mode the whole store refactor was meant to fix. 4xx is
|
||||
// treated as permanent (request-not-found, cross-workspace token rejected,
|
||||
// bad body) — retrying those just wastes heartbeat cycles.
|
||||
func (d *Daemon) reportLocalSkillResultWithRetry(ctx context.Context, kind, runtimeID, requestID string, fn func(context.Context) error) {
|
||||
var lastErr error
|
||||
for attempt, wait := range localSkillReportBackoffs {
|
||||
if wait > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
d.logger.Error("local skill report cancelled",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt, "error", ctx.Err())
|
||||
return
|
||||
case <-time.After(wait):
|
||||
}
|
||||
}
|
||||
err := fn(ctx)
|
||||
if err == nil {
|
||||
if attempt > 0 {
|
||||
d.logger.Info("local skill report succeeded after retry",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt+1)
|
||||
}
|
||||
return
|
||||
}
|
||||
lastErr = err
|
||||
|
||||
// 4xx is permanent (request expired, workspace mismatch, malformed
|
||||
// body). No amount of retrying will make it succeed.
|
||||
var reqErr *requestError
|
||||
if errors.As(err, &reqErr) && reqErr.StatusCode >= 400 && reqErr.StatusCode < 500 {
|
||||
d.logger.Error("local skill report rejected — not retrying",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"status", reqErr.StatusCode, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Warn("local skill report failed — will retry",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID,
|
||||
"attempt", attempt+1, "error", err)
|
||||
}
|
||||
d.logger.Error("local skill report exhausted retries",
|
||||
"kind", kind, "runtime_id", runtimeID, "request_id", requestID, "error", lastErr)
|
||||
}
|
||||
|
||||
// handleUpdate performs the CLI update when triggered by the server via heartbeat.
|
||||
func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) {
|
||||
// Desktop-managed daemons share their CLI binary with the Electron app,
|
||||
|
||||
171
server/internal/daemon/local_skill_report_test.go
Normal file
171
server/internal/daemon/local_skill_report_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// withFastLocalSkillReportBackoffs swaps in zero-delay retries for the
|
||||
// duration of a test so the suite doesn't pay real sleep latency. Restores
|
||||
// the production schedule on cleanup.
|
||||
func withFastLocalSkillReportBackoffs(t *testing.T) {
|
||||
t.Helper()
|
||||
prev := localSkillReportBackoffs
|
||||
localSkillReportBackoffs = []time.Duration{0, 0, 0, 0}
|
||||
t.Cleanup(func() { localSkillReportBackoffs = prev })
|
||||
}
|
||||
|
||||
// localSkillReportDaemon wires a Daemon instance around an httptest.Server
|
||||
// that records every inbound request and lets the test script status codes
|
||||
// to return. That lets us exercise the retry path end-to-end against the
|
||||
// real daemon.Client code, not a mock.
|
||||
func localSkillReportDaemon(t *testing.T, handler http.HandlerFunc) (*Daemon, *int32) {
|
||||
t.Helper()
|
||||
var calls int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
handler(w, r)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
return &Daemon{
|
||||
client: NewClient(srv.URL),
|
||||
logger: slog.Default(),
|
||||
}, &calls
|
||||
}
|
||||
|
||||
func TestReportLocalSkillListResult_RetriesOn500AndEventuallySucceeds(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
var hits int32
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
// Fail twice with 500, then succeed. Matches the concrete failure
|
||||
// mode the review is pinning: the server returns 500 while the
|
||||
// store write is being retried on its end, and the daemon must
|
||||
// hold on long enough to see it land.
|
||||
n := atomic.AddInt32(&hits, 1)
|
||||
if n <= 2 {
|
||||
http.Error(w, "{}", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"status":"ok"}`))
|
||||
})
|
||||
|
||||
d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 3 {
|
||||
t.Fatalf("expected 3 attempts (2 failures + 1 success), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillListResult_DoesNotRetryOn4xx(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
// 404 is permanent — the request expired, was cross-workspace, or
|
||||
// the server never saw it. Retrying just wastes heartbeat cycles.
|
||||
http.Error(w, `{"error":"request not found"}`, http.StatusNotFound)
|
||||
})
|
||||
|
||||
d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 1 {
|
||||
t.Fatalf("expected exactly 1 attempt (4xx is terminal), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillImportResult_RetriesOn500AndEventuallySucceeds(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
var hits int32
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
n := atomic.AddInt32(&hits, 1)
|
||||
if n == 1 {
|
||||
http.Error(w, "{}", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"status":"ok"}`))
|
||||
})
|
||||
|
||||
d.reportLocalSkillImportResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 2 {
|
||||
t.Fatalf("expected 2 attempts, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillResult_GivesUpAfterAllAttemptsFail(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, "{}", http.StatusInternalServerError)
|
||||
})
|
||||
|
||||
d.reportLocalSkillListResult(context.Background(), Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
// Each element in localSkillReportBackoffs is one attempt — a persistent
|
||||
// outage should burn through every slot and then stop (logging Error).
|
||||
if got := atomic.LoadInt32(calls); int(got) != len(localSkillReportBackoffs) {
|
||||
t.Fatalf("expected %d attempts, got %d", len(localSkillReportBackoffs), got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillResult_AbortsOnContextCancel(t *testing.T) {
|
||||
// Keep one real delay in the schedule so cancel lands mid-backoff.
|
||||
prev := localSkillReportBackoffs
|
||||
localSkillReportBackoffs = []time.Duration{0, 200 * time.Millisecond}
|
||||
t.Cleanup(func() { localSkillReportBackoffs = prev })
|
||||
|
||||
d, calls := localSkillReportDaemon(t, func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, "{}", http.StatusInternalServerError)
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
d.reportLocalSkillListResult(ctx, Runtime{ID: "rt-1"}, "req-1", map[string]any{"status": "completed"})
|
||||
|
||||
// Exactly the first attempt should have hit the server; the cancel
|
||||
// interrupts the sleep before the second attempt fires.
|
||||
if got := atomic.LoadInt32(calls); got != 1 {
|
||||
t.Fatalf("expected exactly 1 attempt before cancel, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportLocalSkillResult_SendsCorrectPath(t *testing.T) {
|
||||
withFastLocalSkillReportBackoffs(t)
|
||||
|
||||
var listPath, importPath string
|
||||
d, _ := localSkillReportDaemon(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
// Smoke: make sure we're hitting the right daemon-side endpoint.
|
||||
// Protects against a future refactor silently pointing reports at
|
||||
// the wrong URL.
|
||||
if strings.Contains(r.URL.Path, "/import/") {
|
||||
importPath = r.URL.Path
|
||||
} else {
|
||||
listPath = r.URL.Path
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"status":"ok"}`))
|
||||
})
|
||||
|
||||
ctx := context.Background()
|
||||
d.reportLocalSkillListResult(ctx, Runtime{ID: "rt-a"}, "req-list", map[string]any{"status": "completed"})
|
||||
d.reportLocalSkillImportResult(ctx, Runtime{ID: "rt-a"}, "req-import", map[string]any{"status": "completed"})
|
||||
|
||||
if !strings.HasSuffix(listPath, "/api/daemon/runtimes/rt-a/local-skills/req-list/result") {
|
||||
t.Fatalf("list path = %q", listPath)
|
||||
}
|
||||
if !strings.HasSuffix(importPath, "/api/daemon/runtimes/rt-a/local-skills/import/req-import/result") {
|
||||
t.Fatalf("import path = %q", importPath)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user