Files
multica/server/internal/daemon/client.go
Multica Eve f59cb2f494 MUL-3834: harden daemon websocket reconnect (#4699)
* MUL-3834 harden daemon websocket reconnect

Co-authored-by: multica-agent <github@multica.ai>

* MUL-3834 stabilize daemon websocket liveness tests

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica-ai.local>
Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 16:46:57 +08:00

740 lines
27 KiB
Go

package daemon
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"runtime"
"strings"
"time"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// requestError is returned by postJSON/getJSON when the server responds with an error status.
type requestError struct {
Method string
Path string
StatusCode int
Body string
}
func (e *requestError) Error() string {
return fmt.Sprintf("%s %s returned %d: %s", e.Method, e.Path, e.StatusCode, e.Body)
}
// isWorkspaceNotFoundError returns true if the error is a 404 with "workspace not found" body.
func isWorkspaceNotFoundError(err error) bool {
var reqErr *requestError
if !errors.As(err, &reqErr) {
return false
}
if reqErr.StatusCode != http.StatusNotFound {
return false
}
return strings.Contains(strings.ToLower(reqErr.Body), "workspace not found")
}
// isTaskNotFoundError returns true if the error is a 404 with "task not found"
// body. The daemon uses this to detect that a task was deleted server-side
// (issue removed, agent reassigned, ...) while the local agent was still
// running, so it can interrupt the agent rather than letting it keep
// emitting tool calls against a dead task.
func isTaskNotFoundError(err error) bool {
var reqErr *requestError
if !errors.As(err, &reqErr) {
return false
}
if reqErr.StatusCode != http.StatusNotFound {
return false
}
return strings.Contains(strings.ToLower(reqErr.Body), "task not found")
}
// isUnauthorizedError returns true if the error is a 401 from the server.
// Used by the token-renewal loop to surface a clear "re-login required"
// message instead of a generic transport-level retry.
func isUnauthorizedError(err error) bool {
var reqErr *requestError
if !errors.As(err, &reqErr) {
return false
}
return reqErr.StatusCode == http.StatusUnauthorized
}
// isRuntimeNotFoundError returns true if the error is a 404 with "runtime not
// found" body. The daemon uses this to detect that the runtime row was deleted
// server-side (UI Delete, 7-day offline GC) while the daemon was still
// heartbeating against the dead UUID, so it can prune the stale runtime from
// its local state and re-register instead of looping on the dead ID forever.
//
// Server-side, this body is paired with pgx.ErrNoRows specifically (other DB
// errors return 500), so a transient DB hiccup cannot make the daemon
// self-cleanup.
func isRuntimeNotFoundError(err error) bool {
var reqErr *requestError
if !errors.As(err, &reqErr) {
return false
}
if reqErr.StatusCode != http.StatusNotFound {
return false
}
return strings.Contains(strings.ToLower(reqErr.Body), "runtime not found")
}
// Client handles HTTP communication with the Multica server daemon API.
type Client struct {
baseURL string
token string
client *http.Client
// bundleClient downloads skill bundles. Unlike client it carries no fixed
// Timeout: bundles can be large and slow on jittery links, so the caller
// supplies a per-request, size-scaled deadline via context instead of
// being capped by the 30s control-plane timeout that fits heartbeat /
// claim but not a multi-megabyte body read. (GitHub #4505)
bundleClient *http.Client
// Identity headers sent on every request as X-Client-*. Populated by
// SetIdentity(); empty values are simply omitted.
platform string
version string
os string
}
// NewClient creates a new daemon API client.
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
client: &http.Client{Timeout: 30 * time.Second, Transport: cloneDefaultTransport()},
bundleClient: &http.Client{},
platform: "daemon",
os: normalizeGOOS(runtime.GOOS),
}
}
func cloneDefaultTransport() http.RoundTripper {
if transport, ok := http.DefaultTransport.(*http.Transport); ok {
return transport.Clone()
}
return http.DefaultTransport
}
// CloseIdleConnections drops pooled control-plane HTTP connections. The
// daemon calls this after repeated heartbeat transport failures so a stale
// keep-alive socket from a server restart cannot delay recovery indefinitely.
func (c *Client) CloseIdleConnections() {
if c == nil || c.client == nil {
return
}
c.client.CloseIdleConnections()
}
// normalizeGOOS maps Go's runtime.GOOS values to the protocol vocabulary
// used by X-Client-OS / client_os ("macos" / "windows" / "linux").
func normalizeGOOS(goos string) string {
switch goos {
case "darwin":
return "macos"
case "windows":
return "windows"
case "linux":
return "linux"
default:
return goos
}
}
// SetVersion records the daemon's CLI version, sent as X-Client-Version.
// Called by Daemon.Run after config is loaded.
func (c *Client) SetVersion(v string) {
c.version = v
}
// setIdentityHeaders attaches X-Client-Platform/Version/OS to req when set.
func (c *Client) setIdentityHeaders(req *http.Request) {
if c.platform != "" {
req.Header.Set("X-Client-Platform", c.platform)
}
if c.version != "" {
req.Header.Set("X-Client-Version", c.version)
}
if c.os != "" {
req.Header.Set("X-Client-OS", c.os)
}
req.Header.Set("X-Client-Capabilities", protocol.DaemonCapabilitySkillBundlesV1)
}
// SetToken sets the auth token for authenticated requests.
func (c *Client) SetToken(token string) {
c.token = token
}
// Token returns the current auth token.
func (c *Client) Token() string {
return c.token
}
func (c *Client) ClaimTask(ctx context.Context, runtimeID string) (*Task, error) {
var resp struct {
Task *Task `json:"task"`
}
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil {
return nil, err
}
return resp.Task, nil
}
// ResolveSkillBundle downloads a single skill bundle. It uses bundleClient (no
// fixed timeout) so the deadline is governed entirely by ctx, which the daemon
// scales to the bundle's size, and retries transient transport blips within
// whatever budget ctx leaves. Resolving one skill per request — rather than the
// agent's whole bundle in one atomic body read — lets each download fit its own
// deadline and be cached independently, so a slow link makes incremental
// progress instead of failing the entire set on every dispatch. (GitHub #4505)
func (c *Client) ResolveSkillBundle(ctx context.Context, runtimeID, taskID string, ref SkillRefData) (SkillData, error) {
var resp struct {
Bundles []SkillData `json:"bundles"`
}
path := fmt.Sprintf("/api/daemon/runtimes/%s/tasks/%s/skill-bundles/resolve", runtimeID, taskID)
if err := c.postJSONViaWithRetry(ctx, c.bundleClient, path, map[string]any{
"skills": []SkillRefData{ref},
}, &resp, skillBundleResolveRetrySchedule); err != nil {
return SkillData{}, err
}
if len(resp.Bundles) != 1 {
return SkillData{}, fmt.Errorf("resolve skill bundle: expected 1 bundle, got %d", len(resp.Bundles))
}
return resp.Bundles[0], nil
}
func (c *Client) ExtendTaskPrepareLease(ctx context.Context, runtimeID, taskID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/%s/prepare-lease", runtimeID, taskID), map[string]any{}, nil)
}
func (c *Client) StartTask(ctx context.Context, taskID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil)
}
// MarkTaskWaitingLocalDirectory parks a freshly-dispatched task in the
// waiting_local_directory state on the server. The daemon calls this after
// it has claimed a task whose project carries a local_directory resource
// but the path mutex is held by another in-flight task. reason is a short
// human-readable hint (e.g. "<path>") surfaced by the UI alongside the
// status. Idempotent on the daemon's side — calling twice with the same
// reason is a no-op once the row is already waiting_local_directory (the
// underlying SQL filters on status='dispatched', so the second call is a
// 400 the daemon swallows and proceeds to wait).
func (c *Client) MarkTaskWaitingLocalDirectory(ctx context.Context, taskID, reason string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/wait-local-directory", taskID), map[string]any{
"reason": reason,
}, nil)
}
func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, step, total int) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{
"summary": summary,
"step": step,
"total": total,
}, nil)
}
// TaskMessageData represents a single agent execution message for batch reporting.
type TaskMessageData struct {
Seq int `json:"seq"`
Type string `json:"type"`
Tool string `json:"tool,omitempty"`
Content string `json:"content,omitempty"`
Input map[string]any `json:"input,omitempty"`
Output string `json:"output,omitempty"`
}
func (c *Client) ReportTaskMessages(ctx context.Context, taskID string, messages []TaskMessageData) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/messages", taskID), map[string]any{
"messages": messages,
}, nil)
}
func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error {
body := map[string]any{"output": output}
if branchName != "" {
body["branch_name"] = branchName
}
if sessionID != "" {
body["session_id"] = sessionID
}
if workDir != "" {
body["work_dir"] = workDir
}
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil, defaultTerminalRetrySchedule)
}
func (c *Client) ReportTaskUsage(ctx context.Context, taskID string, usage []TaskUsageEntry) error {
if len(usage) == 0 {
return nil
}
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/usage", taskID), map[string]any{
"usage": usage,
}, nil)
}
func (c *Client) FailTask(ctx context.Context, taskID, errMsg, sessionID, workDir, failureReason string) error {
body := map[string]any{"error": errMsg}
if sessionID != "" {
body["session_id"] = sessionID
}
if workDir != "" {
body["work_dir"] = workDir
}
if failureReason != "" {
body["failure_reason"] = failureReason
}
return c.postJSONWithRetry(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), body, nil, defaultTerminalRetrySchedule)
}
// PinTaskSession persists the agent's session_id and work_dir on the task
// row mid-flight so a daemon crash doesn't lose the resume pointer.
func (c *Client) PinTaskSession(ctx context.Context, taskID, sessionID, workDir string) error {
if sessionID == "" && workDir == "" {
return nil
}
body := map[string]any{}
if sessionID != "" {
body["session_id"] = sessionID
}
if workDir != "" {
body["work_dir"] = workDir
}
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/session", taskID), body, nil)
}
// RecoverOrphans tells the server to fail any dispatched/running tasks the
// previous daemon process for this runtime left behind. The server will
// auto-retry eligible tasks.
func (c *Client) RecoverOrphans(ctx context.Context, runtimeID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/recover-orphans", runtimeID), map[string]any{}, nil)
}
// GetTaskStatus returns the current status of a task. Used by the daemon to
// detect terminal/interruption signals (cancelled, failed, completed, or a
// 404 task-not-found) while a task is executing.
func (c *Client) GetTaskStatus(ctx context.Context, taskID string) (string, error) {
var resp struct {
Status string `json:"status"`
}
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/status", taskID), &resp); err != nil {
return "", err
}
return resp.Status, nil
}
// HeartbeatResponse, PendingUpdate, etc. alias the wire types so HTTP and WS
// heartbeat paths share a single type and a single decoder shape. Aliases
// (rather than wrappers) keep call sites unchanged.
type (
HeartbeatResponse = protocol.DaemonHeartbeatAckPayload
PendingUpdate = protocol.DaemonHeartbeatPendingUpdate
PendingModelList = protocol.DaemonHeartbeatPendingModelList
PendingLocalSkills = protocol.DaemonHeartbeatPendingLocalSkills
PendingLocalSkillImport = protocol.DaemonHeartbeatPendingLocalSkillImport
)
func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) {
var resp HeartbeatResponse
if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]any{
"runtime_id": runtimeID,
"supports_batch_import": true,
}, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// ReportUpdateResult sends the CLI update result back to the server.
func (c *Client) ReportUpdateResult(ctx context.Context, runtimeID, updateID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/update/%s/result", runtimeID, updateID), result, nil)
}
// ReportModelListResult sends the model-discovery result back to the server.
func (c *Client) ReportModelListResult(ctx context.Context, runtimeID, requestID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/models/%s/result", runtimeID, requestID), result, nil)
}
// ReportLocalSkillListResult sends the runtime-local-skill inventory back to the server.
func (c *Client) ReportLocalSkillListResult(ctx context.Context, runtimeID, requestID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/local-skills/%s/result", runtimeID, requestID), result, nil)
}
// ReportLocalSkillImportResult sends a runtime-local-skill bundle back to the server.
func (c *Client) ReportLocalSkillImportResult(ctx context.Context, runtimeID, requestID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/local-skills/import/%s/result", runtimeID, requestID), result, nil)
}
// WorkspaceInfo holds minimal workspace metadata returned by the API.
type WorkspaceInfo struct {
ID string `json:"id"`
Name string `json:"name"`
}
// RenewTokenResponse mirrors handler.RenewPATResponse — kept loose (string +
// bool) because the daemon never parses the timestamp itself; it just logs it
// for operator visibility.
type RenewTokenResponse struct {
ExpiresAt string `json:"expires_at"`
Renewed bool `json:"renewed"`
}
// RenewToken asks the server to extend the daemon's current PAT in place when
// it's within the server-side renewal window. The server is authoritative on
// the threshold — the daemon doesn't know the token's expires_at locally —
// so this is safe to call on any cadence; the only thing extra calls cost is
// one round trip and one cheap SELECT.
func (c *Client) RenewToken(ctx context.Context) (*RenewTokenResponse, error) {
var resp RenewTokenResponse
if err := c.postJSON(ctx, "/api/tokens/current/renew", map[string]any{}, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// ListWorkspaces fetches all workspaces the authenticated user belongs to.
func (c *Client) ListWorkspaces(ctx context.Context) ([]WorkspaceInfo, error) {
var workspaces []WorkspaceInfo
if err := c.getJSON(ctx, "/api/workspaces", &workspaces); err != nil {
return nil, err
}
return workspaces, nil
}
// IssueGCStatus holds the minimal issue info returned by the GC check endpoint.
type IssueGCStatus struct {
Status string `json:"status"`
UpdatedAt time.Time `json:"updated_at"`
}
// GetIssueGCCheck returns the status and updated_at of an issue for GC decisions.
func (c *Client) GetIssueGCCheck(ctx context.Context, issueID string) (*IssueGCStatus, error) {
var resp IssueGCStatus
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/issues/%s/gc-check", issueID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
// ChatSessionGCStatus mirrors IssueGCStatus for chat sessions.
type ChatSessionGCStatus struct {
Status string `json:"status"`
UpdatedAt time.Time `json:"updated_at"`
}
// GetChatSessionGCCheck returns the status of a chat session for GC decisions.
// A 404 from this endpoint indicates the session row was hard-deleted (the
// user explicitly removed it), which the caller treats as an immediate-clean
// signal.
func (c *Client) GetChatSessionGCCheck(ctx context.Context, sessionID string) (*ChatSessionGCStatus, error) {
var resp ChatSessionGCStatus
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/chat-sessions/%s/gc-check", sessionID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
// AutopilotRunGCStatus carries the status of an autopilot run. CompletedAt
// is the run's terminal timestamp (zero for non-terminal runs). The GC loop
// reclaims a terminal run's never-reused workdir as soon as it sees the
// terminal status, so it no longer gates on CompletedAt; the field is kept for
// the API response contract and diagnostics.
type AutopilotRunGCStatus struct {
Status string `json:"status"`
CompletedAt time.Time `json:"completed_at"`
}
// GetAutopilotRunGCCheck returns the status of an autopilot run for GC decisions.
func (c *Client) GetAutopilotRunGCCheck(ctx context.Context, runID string) (*AutopilotRunGCStatus, error) {
var resp AutopilotRunGCStatus
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/autopilot-runs/%s/gc-check", runID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
// TaskGCStatus carries the agent_task_queue status for quick-create cleanup.
// Quick-create tasks have no separate parent record, so GC keys directly on
// the task itself.
type TaskGCStatus struct {
Status string `json:"status"`
CompletedAt time.Time `json:"completed_at"`
}
// GetTaskGCCheck returns the status of an agent task for GC decisions.
func (c *Client) GetTaskGCCheck(ctx context.Context, taskID string) (*TaskGCStatus, error) {
var resp TaskGCStatus
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/gc-check", taskID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *Client) Deregister(ctx context.Context, runtimeIDs []string) error {
return c.postJSON(ctx, "/api/daemon/deregister", map[string]any{
"runtime_ids": runtimeIDs,
}, nil)
}
// RegisterResponse holds the server's response to a daemon registration.
type RegisterResponse struct {
Runtimes []Runtime `json:"runtimes"`
Repos []RepoData `json:"repos"`
ReposVersion string `json:"repos_version"`
Settings json.RawMessage `json:"settings,omitempty"`
}
func (c *Client) Register(ctx context.Context, req map[string]any) (*RegisterResponse, error) {
var resp RegisterResponse
if err := c.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
type WorkspaceReposResponse struct {
WorkspaceID string `json:"workspace_id"`
Repos []RepoData `json:"repos"`
ReposVersion string `json:"repos_version"`
Settings json.RawMessage `json:"settings,omitempty"`
}
func (c *Client) GetWorkspaceRepos(ctx context.Context, workspaceID string) (*WorkspaceReposResponse, error) {
var resp WorkspaceReposResponse
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/workspaces/%s/repos", workspaceID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
// RuntimeProfile mirrors the server's workspace custom runtime profile
// (MUL-3284). protocol_family is the provider used for task routing (it
// selects the agent backend), while command_name is the actual executable
// the daemon resolves on PATH and launches. fixed_args are launch arguments
// every agent on this runtime inherits.
type RuntimeProfile struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
DisplayName string `json:"display_name"`
ProtocolFamily string `json:"protocol_family"`
CommandName string `json:"command_name"`
Description *string `json:"description"`
FixedArgs []string `json:"fixed_args"`
Visibility string `json:"visibility"`
Enabled bool `json:"enabled"`
}
// RuntimeProfilesResponse is the body of
// GET /api/daemon/workspaces/{workspaceID}/runtime-profiles. The server only
// returns enabled profiles for the workspace.
type RuntimeProfilesResponse struct {
WorkspaceID string `json:"workspace_id"`
RuntimeProfiles []RuntimeProfile `json:"runtime_profiles"`
}
// GetRuntimeProfiles fetches the workspace's enabled custom runtime profiles.
// Mirrors GetWorkspaceRepos. Callers must treat this as best-effort: an older
// server with no profiles route returns 404, which the daemon swallows and
// continues with built-in runtimes only.
func (c *Client) GetRuntimeProfiles(ctx context.Context, workspaceID string) (*RuntimeProfilesResponse, error) {
var resp RuntimeProfilesResponse
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/workspaces/%s/runtime-profiles", workspaceID), &resp); err != nil {
return nil, err
}
return &resp, nil
}
// defaultTerminalRetrySchedule is the backoff used by postJSONWithRetry for
// terminal task callbacks (CompleteTask / FailTask). N entries → N+1 attempts
// in the worst case (one immediate + N retries). Five backoffs totalling
// 124s is wide enough to ride out the short upstream blips we've seen
// (MUL-2780) without leaving the task stuck if the outage outlives the
// window.
var defaultTerminalRetrySchedule = []time.Duration{
4 * time.Second,
8 * time.Second,
16 * time.Second,
32 * time.Second,
64 * time.Second,
}
// skillBundleResolveRetrySchedule rides out brief transport blips on a single
// bundle download. Kept short on purpose: the real budget is the size-scaled
// context deadline the daemon sets per skill, and a skill that still fails is
// retried on the next dispatch once its siblings are cached. N entries → N+1
// attempts. (GitHub #4505)
var skillBundleResolveRetrySchedule = []time.Duration{
500 * time.Millisecond,
2 * time.Second,
}
// retrySleep is the sleep used between retry attempts. Pulled into a package
// variable so tests can swap in an instant sleep without rewriting the
// caller's schedule.
var retrySleep = func(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// isTransientError reports whether err looks like a hiccup that's likely to
// resolve on retry: connection / TLS / I/O errors at the transport layer
// (including client timeouts surfacing as context.DeadlineExceeded inside
// http.Client.Do), 5xx server responses, and 408/429 rate-limit-style 4xx
// codes. Other 4xx codes are treated as permanent — retrying a 400 (bad
// body) or 404 (task not found) only burns time.
//
// The caller is responsible for separately bailing on parent-context
// cancellation; this predicate cannot distinguish "the daemon is shutting
// down" from "the HTTP client timed out a single attempt" because both
// reach here as context errors wrapped by net/http.
func isTransientError(err error) bool {
if err == nil {
return false
}
var reqErr *requestError
if errors.As(err, &reqErr) {
if reqErr.StatusCode >= 500 {
return true
}
if reqErr.StatusCode == http.StatusRequestTimeout || reqErr.StatusCode == http.StatusTooManyRequests {
return true
}
return false
}
return true
}
// postJSONWithRetry posts a JSON body with bounded exponential backoff,
// intended for "must reach the server" terminal callbacks (CompleteTask /
// FailTask). It retries transient errors per isTransientError and stops
// immediately on permanent 4xx responses so we don't burn the schedule on
// requests the server has already rejected.
//
// schedule controls the sleeps between attempts. With N entries the helper
// performs N+1 attempts in the worst case (one initial + N retries). The
// returned error is the last response from the server, so callers can still
// inspect it with isTransientError to decide whether to fall back to a
// different terminal call (e.g. complete → fail on permanent error only).
//
// The server-side CompleteTask / FailTask treat "already terminal" as an
// idempotent success (see service/task.go), so a duplicate replay from a
// retry is safe even if the server's prior response was lost in transit.
func (c *Client) postJSONWithRetry(ctx context.Context, path string, reqBody any, respBody any, schedule []time.Duration) error {
return c.postJSONViaWithRetry(ctx, c.client, path, reqBody, respBody, schedule)
}
// postJSONViaWithRetry is postJSONWithRetry over an explicit http.Client, so
// large-body endpoints can run on bundleClient (deadline from ctx) while the
// control-plane keeps its fixed 30s client.
func (c *Client) postJSONViaWithRetry(ctx context.Context, httpClient *http.Client, path string, reqBody any, respBody any, schedule []time.Duration) error {
var lastErr error
for attempt := 0; ; attempt++ {
if err := ctx.Err(); err != nil {
if lastErr != nil {
return lastErr
}
return err
}
err := c.postJSONVia(ctx, httpClient, path, reqBody, respBody)
if err == nil {
return nil
}
lastErr = err
if !isTransientError(err) {
return err
}
if attempt >= len(schedule) {
return err
}
if sleepErr := retrySleep(ctx, schedule[attempt]); sleepErr != nil {
return err
}
}
}
func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
return c.postJSONVia(ctx, c.client, path, reqBody, respBody)
}
// postJSONVia is postJSON over an explicit http.Client. Callers pick the client
// to control the timeout regime: c.client (fixed 30s) for control-plane calls,
// c.bundleClient (deadline from ctx) for large skill-bundle downloads.
func (c *Client) postJSONVia(ctx context.Context, httpClient *http.Client, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
return err
}
body = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
c.setIdentityHeaders(req)
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return &requestError{Method: http.MethodPost, Path: path, StatusCode: resp.StatusCode, Body: strings.TrimSpace(string(data))}
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}
func (c *Client) getJSON(ctx context.Context, path string, respBody any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
if err != nil {
return err
}
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
c.setIdentityHeaders(req)
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return &requestError{Method: http.MethodGet, Path: path, StatusCode: resp.StatusCode, Body: strings.TrimSpace(string(data))}
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}