Files
multica/server/internal/handler/webhook_delivery.go
Bohan Jiang 2323b72710 feat(autopilots): webhook delivery layer + idempotency/signature/replay (MUL-2334) [PR1] (#2774)
* feat(autopilots): webhook delivery layer + idempotency / signature / replay (MUL-2334)

Splits "inbound webhook receipt" from "autopilot run creation" so we can
record duplicate attempts, signature outcomes, and ignored/skipped
deliveries — and replay a delivery on demand. v1 ingress wrote straight
into autopilot_run.trigger_payload, which collapsed the two concerns and
left run_only autopilots vulnerable to provider retry storms.

Backend only (PR1). UI Deliveries tab follows in PR2.

Schema (migration 093):
  - autopilot_trigger.provider: 'generic' | 'github' (default 'generic').
  - autopilot_trigger.signing_secret: nullable plaintext (HMAC needs it
    cleartext; mirrors how webhook_token is stored).
  - webhook_delivery: one row per inbound POST. Carries raw_body,
    selected_headers, dedupe_key/source, signature_status,
    autopilot_run_id, replayed_from_delivery_id, response_status / body.
  - Partial unique index on (trigger_id, dedupe_key) excludes NULL and
    'rejected' rows, so a wrong-secret 401 does NOT permanently block a
    future retry with the same X-GitHub-Delivery once the operator fixes
    the secret.

Ingress flow (autopilot_webhook.go), persist-first + sync dispatch:
  1. IP rate limit -> 2. token lookup -> 3. token rate limit ->
  4. read raw body -> 5. autopilot/workspace cross-check ->
  6. normalize JSON (400 without persistence on parse failure) ->
  7. compute dedupe key + signature status ->
  8. INSERT delivery (status=queued). On (trigger_id, dedupe_key)
     unique-violation: bump attempt_count on existing row and return
     the original delivery_id + autopilot_run_id with 200 ->
  9. invalid/missing signature: UPDATE -> rejected, return 401 with
     delivery_id (no dispatch, not replayable) ->
 10. trigger disabled / autopilot paused/archived: UPDATE -> ignored,
     return 200 ->
 11. DispatchAutopilot synchronously, UPDATE -> dispatched/skipped/failed
     with autopilot_run_id and the response body we returned ->
 12. TouchAutopilotTriggerFiredAt and return 200.

No new long-running worker. A stale 'queued' row only happens if the
process dies between INSERT and UPDATE; that's a follow-up sweeper, not
this PR.

Authenticated API:
  - GET    /api/autopilots/{id}/deliveries (slim list)
  - GET    /api/autopilots/{id}/deliveries/{deliveryId} (with raw_body)
  - POST   /api/autopilots/{id}/deliveries/{deliveryId}/replay -> creates
    a new delivery row (replayed_from_delivery_id set), dispatches a
    new run, never collapses onto the original via dedupe.
  - PUT    /api/autopilots/{id}/triggers/{triggerId}/signing-secret
    Write-only; trigger response surfaces has_signing_secret +
    signing_secret_hint (last 4 chars), never the secret itself.

Signature verification reuses the GitHub-compatible
X-Hub-Signature-256: sha256=<hex(hmac(body, secret))> scheme; the
HMAC helper is constant-time. Invalid/missing signatures still count
against per-IP and per-token rate limits.

autopilot_run.trigger_payload is intentionally preserved — delivery
records the HTTP receipt; run records the normalized envelope handed
to the agent. They are two different views.

Tests (Postgres-backed):
  - delivery persistence on accept
  - dedupe via Idempotency-Key and X-GitHub-Delivery; run_only retry
    storm pin (3 retries -> 1 run)
  - invalid signature: 401 + rejected row + no run linkage
  - missing signature when secret configured: 401 + 'missing' state
  - valid signature dispatches
  - signing secret never echoed in trigger responses; hint shows last 4
  - min-length and clear-by-empty for signing secret PUT
  - replay creates a NEW delivery + new run; rejected deliveries cannot
    be replayed
  - list omits raw_body; detail includes it; cross-autopilot ID returns
    404 (workspace isolation defense in depth)
  - provider validation: unknown -> 400, github -> 201 round-trips
  - bad-signature stream still counts against per-token rate limit

Co-authored-by: multica-agent <github@multica.ai>

* fix(autopilots): address PR review on webhook delivery layer (MUL-2334)

- Exclude `failed` from the (trigger_id, dedupe_key) partial unique index
  alongside `rejected`, so a transient ingress failure does not strand the
  provider's stable X-GitHub-Delivery / Idempotency-Key retry. Update the
  dedupe lookup to prefer non-terminal rows under the same predicate.
- Tighten delivery status enum: drop `skipped` from the CHECK constraint
  and from the handler. A run that was admission-skipped (e.g. runtime
  offline) is now recorded as delivery=`dispatched` linked to the
  skipped run, with the response payload carrying status=`skipped`.
  Source of truth for skipped-ness is autopilot_run.status, not the
  delivery row — keeps the Deliveries UI enum unambiguous.
- On dispatch error, link the (possibly non-nil) autopilot_run returned
  by DispatchAutopilot to the failed delivery so Deliveries UI can
  navigate to the run row for debugging.
- Slim list projection: ListWebhookDeliveriesByAutopilot no longer pulls
  raw_body / selected_headers / response_body — a 100-row page × 256 KiB
  would otherwise round-trip ~25 MiB from Postgres per Deliveries reload.
  Detail endpoint continues to return the full row.
- Fix backend CI: TestGetDelivery_ReturnsFullPayload now decodes the
  response and asserts on the parsed raw_body instead of substring-
  matching against an escaped JSON string; raise the test-suite default
  webhook rate limits in TestMain so the shared 192.0.2.1 IP bucket
  doesn't fill across the suite and leak 429s into unrelated tests.
- Add regression coverage for the dedupe-after-failure path.

cd server && go test ./... is green locally.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-18 14:59:40 +08:00

413 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ── Response types ──────────────────────────────────────────────────────────
// WebhookDeliveryResponse is the authenticated-API view of webhook_delivery.
// The list endpoint returns these without `RawBody` / `SelectedHeaders`
// populated; the detail endpoint includes both for debugging. We never echo
// the signing secret or token through this surface.
type WebhookDeliveryResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
AutopilotID string `json:"autopilot_id"`
TriggerID string `json:"trigger_id"`
Provider string `json:"provider"`
Event string `json:"event"`
DedupeKey *string `json:"dedupe_key"`
DedupeSource *string `json:"dedupe_source"`
SignatureStatus string `json:"signature_status"`
Status string `json:"status"`
AttemptCount int32 `json:"attempt_count"`
ContentType *string `json:"content_type"`
ResponseStatus *int32 `json:"response_status"`
AutopilotRunID *string `json:"autopilot_run_id"`
ReplayedFromDeliveryID *string `json:"replayed_from_delivery_id"`
Error *string `json:"error"`
ReceivedAt string `json:"received_at"`
LastAttemptAt string `json:"last_attempt_at"`
CreatedAt string `json:"created_at"`
// Detail-only fields. List responses leave these nil/empty so a page
// of N deliveries never serialises ~N × 256 KiB of raw bodies. Detail
// requests opt in by hitting GET /deliveries/{deliveryId}.
SelectedHeaders json.RawMessage `json:"selected_headers,omitempty"`
RawBody *string `json:"raw_body,omitempty"`
ResponseBody *string `json:"response_body,omitempty"`
}
// slimDeliveryToResponse maps the projected list row (no raw_body /
// selected_headers / response_body) into the wire response shape.
func slimDeliveryToResponse(d db.ListWebhookDeliveriesByAutopilotRow) WebhookDeliveryResponse {
resp := WebhookDeliveryResponse{
ID: uuidToString(d.ID),
WorkspaceID: uuidToString(d.WorkspaceID),
AutopilotID: uuidToString(d.AutopilotID),
TriggerID: uuidToString(d.TriggerID),
Provider: d.Provider,
Event: d.Event,
DedupeKey: textToPtr(d.DedupeKey),
DedupeSource: textToPtr(d.DedupeSource),
SignatureStatus: d.SignatureStatus,
Status: d.Status,
AttemptCount: d.AttemptCount,
ContentType: textToPtr(d.ContentType),
ReceivedAt: timestampToString(d.ReceivedAt),
LastAttemptAt: timestampToString(d.LastAttemptAt),
CreatedAt: timestampToString(d.CreatedAt),
}
if d.ResponseStatus.Valid {
v := d.ResponseStatus.Int32
resp.ResponseStatus = &v
}
if d.AutopilotRunID.Valid {
v := uuidToString(d.AutopilotRunID)
resp.AutopilotRunID = &v
}
if d.ReplayedFromDeliveryID.Valid {
v := uuidToString(d.ReplayedFromDeliveryID)
resp.ReplayedFromDeliveryID = &v
}
if d.Error.Valid {
v := d.Error.String
resp.Error = &v
}
return resp
}
func deliveryToResponse(d db.WebhookDelivery, detail bool) WebhookDeliveryResponse {
resp := WebhookDeliveryResponse{
ID: uuidToString(d.ID),
WorkspaceID: uuidToString(d.WorkspaceID),
AutopilotID: uuidToString(d.AutopilotID),
TriggerID: uuidToString(d.TriggerID),
Provider: d.Provider,
Event: d.Event,
DedupeKey: textToPtr(d.DedupeKey),
DedupeSource: textToPtr(d.DedupeSource),
SignatureStatus: d.SignatureStatus,
Status: d.Status,
AttemptCount: d.AttemptCount,
ContentType: textToPtr(d.ContentType),
ReceivedAt: timestampToString(d.ReceivedAt),
LastAttemptAt: timestampToString(d.LastAttemptAt),
CreatedAt: timestampToString(d.CreatedAt),
}
if d.ResponseStatus.Valid {
v := d.ResponseStatus.Int32
resp.ResponseStatus = &v
}
if d.AutopilotRunID.Valid {
v := uuidToString(d.AutopilotRunID)
resp.AutopilotRunID = &v
}
if d.ReplayedFromDeliveryID.Valid {
v := uuidToString(d.ReplayedFromDeliveryID)
resp.ReplayedFromDeliveryID = &v
}
if d.Error.Valid {
v := d.Error.String
resp.Error = &v
}
if detail {
if len(d.SelectedHeaders) > 0 {
resp.SelectedHeaders = json.RawMessage(d.SelectedHeaders)
}
if len(d.RawBody) > 0 {
s := string(d.RawBody)
resp.RawBody = &s
}
if d.ResponseBody.Valid {
v := d.ResponseBody.String
resp.ResponseBody = &v
}
}
return resp
}
// ── Handlers ────────────────────────────────────────────────────────────────
// ListAutopilotDeliveries returns recent deliveries for an autopilot. Slim
// projection — selected_headers / raw_body / response_body are omitted to
// keep list responses small. Use GetAutopilotDelivery for the full payload.
func (h *Handler) ListAutopilotDeliveries(w http.ResponseWriter, r *http.Request) {
autopilotID := chi.URLParam(r, "id")
workspaceID := h.resolveWorkspaceID(r)
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
if !ok {
return
}
limit := int32(20)
offset := int32(0)
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil && v > 0 {
limit = int32(v)
}
}
if limit > 100 {
limit = 100
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil && v >= 0 {
offset = int32(v)
}
}
rows, err := h.Queries.ListWebhookDeliveriesByAutopilot(r.Context(), db.ListWebhookDeliveriesByAutopilotParams{
AutopilotID: autopilot.ID,
WorkspaceID: autopilot.WorkspaceID,
Limit: limit,
Offset: offset,
})
if err != nil {
slog.Error("list deliveries failed", "error", err, "autopilot_id", autopilotID)
writeError(w, http.StatusInternalServerError, "failed to list deliveries")
return
}
resp := make([]WebhookDeliveryResponse, len(rows))
for i, row := range rows {
resp[i] = slimDeliveryToResponse(row)
}
writeJSON(w, http.StatusOK, map[string]any{"deliveries": resp, "total": len(resp)})
}
// GetAutopilotDelivery returns one delivery in full, including the raw body
// and headers subset. Workspace-scoped via the autopilot lookup; the
// delivery is then re-checked to belong to that autopilot so a guessed
// delivery id from another workspace cannot leak data.
func (h *Handler) GetAutopilotDelivery(w http.ResponseWriter, r *http.Request) {
autopilotID := chi.URLParam(r, "id")
deliveryID := chi.URLParam(r, "deliveryId")
workspaceID := h.resolveWorkspaceID(r)
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
if !ok {
return
}
delivery, ok := h.loadDeliveryForAutopilot(w, r, autopilot, deliveryID)
if !ok {
return
}
writeJSON(w, http.StatusOK, deliveryToResponse(delivery, true))
}
// ReplayAutopilotDelivery creates a NEW delivery row from a prior one and
// dispatches the autopilot synchronously. The new row carries
// `replayed_from_delivery_id` so the operator can correlate. Replay is
// rejected for deliveries that originally failed signature verification —
// re-running an attack payload against the autopilot would defeat the
// rejection in the first place.
//
// Replays bypass per-trigger dedupe by inserting with a NULL dedupe_key:
// reusing the original key would silently collapse the replay onto the prior
// delivery (the partial unique index would fire). This is the intended
// behaviour — a replay is explicitly "run this again".
func (h *Handler) ReplayAutopilotDelivery(w http.ResponseWriter, r *http.Request) {
autopilotID := chi.URLParam(r, "id")
deliveryID := chi.URLParam(r, "deliveryId")
workspaceID := h.resolveWorkspaceID(r)
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
if !ok {
return
}
original, ok := h.loadDeliveryForAutopilot(w, r, autopilot, deliveryID)
if !ok {
return
}
if original.Status == deliveryStatusRejected || original.SignatureStatus == sigStatusInvalid {
writeError(w, http.StatusBadRequest, "cannot replay a delivery that failed signature verification")
return
}
if len(original.RawBody) == 0 {
writeError(w, http.StatusBadRequest, "original delivery has no raw body to replay")
return
}
if autopilot.Status != "active" {
writeError(w, http.StatusBadRequest, "autopilot is not active")
return
}
trigRow, err := h.Queries.GetAutopilotTrigger(r.Context(), original.TriggerID)
if err != nil {
writeError(w, http.StatusNotFound, "trigger not found")
return
}
if !trigRow.Enabled {
writeError(w, http.StatusBadRequest, "trigger is disabled")
return
}
// Build the envelope from the stored raw body using the original headers
// subset for event inference. SelectedHeaders is small + JSON-shaped, so
// we decode it back into a header map to reuse the same normalize path.
headers := headersFromSelected(original.SelectedHeaders)
envelope, err := normalizeWebhookPayload(original.RawBody, headers)
if err != nil {
writeError(w, http.StatusBadRequest, "stored body no longer parses: "+err.Error())
return
}
envelopeBytes, err := json.Marshal(envelope)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to encode envelope")
return
}
contentType := ""
if original.ContentType.Valid {
contentType = original.ContentType.String
}
replay, err := h.Queries.CreateWebhookDelivery(r.Context(), db.CreateWebhookDeliveryParams{
WorkspaceID: autopilot.WorkspaceID,
AutopilotID: autopilot.ID,
TriggerID: original.TriggerID,
Provider: original.Provider,
Event: envelope.Event,
SignatureStatus: sigStatusNotRequired,
Status: deliveryStatusQueued,
SelectedHeaders: original.SelectedHeaders,
ContentType: pgtype.Text{String: contentType, Valid: contentType != ""},
RawBody: original.RawBody,
ReplayedFromDeliveryID: original.ID,
})
if err != nil {
slog.Error("replay: insert delivery failed",
"error", err,
"original_delivery_id", uuidToString(original.ID),
)
writeError(w, http.StatusInternalServerError, "failed to create replay delivery")
return
}
run, dispatchErr := h.AutopilotService.DispatchAutopilot(
r.Context(),
autopilot,
trigRow.ID,
"webhook",
envelopeBytes,
)
if dispatchErr != nil {
respBody := map[string]any{"error": "failed to dispatch autopilot"}
// DispatchAutopilot may return a non-nil run alongside an error
// (see HandleAutopilotWebhook for the same rationale). Link the
// run on the failed delivery so Deliveries UI can surface it.
if run != nil {
h.finaliseDeliveryWithRun(r, replay.ID, deliveryStatusFailed, run.ID, http.StatusInternalServerError, respBody)
} else {
h.finaliseDeliveryTerminal(r, replay.ID, deliveryStatusFailed, http.StatusInternalServerError, respBody, dispatchErr.Error())
}
writeError(w, http.StatusInternalServerError, dispatchErr.Error())
return
}
if err := h.Queries.TouchAutopilotTriggerFiredAt(r.Context(), trigRow.ID); err != nil {
slog.Warn("replay: failed to touch last_fired_at", "trigger_id", uuidToString(trigRow.ID), "error", err)
}
// Delivery is always `dispatched` once a run is produced — even when
// the run itself was skipped (e.g. runtime offline). See the comment
// in HandleAutopilotWebhook for the rationale.
respBody := map[string]any{
"status": "accepted",
"delivery_id": uuidToString(replay.ID),
"run_id": uuidToString(run.ID),
"autopilot_id": uuidToString(autopilot.ID),
"trigger_id": uuidToString(trigRow.ID),
"replayed_from_delivery_id": uuidToString(original.ID),
}
if run.Status == "skipped" {
respBody["status"] = "skipped"
if run.FailureReason.Valid {
respBody["reason"] = run.FailureReason.String
}
}
h.finaliseDeliveryWithRun(r, replay.ID, deliveryStatusDispatched, run.ID, http.StatusCreated, respBody)
final, err := h.Queries.GetWebhookDelivery(r.Context(), replay.ID)
if err != nil {
writeJSON(w, http.StatusCreated, respBody)
return
}
writeJSON(w, http.StatusCreated, deliveryToResponse(final, true))
}
// loadDeliveryForAutopilot returns the delivery row when it exists in the
// same workspace AND belongs to the given autopilot. Cross-autopilot or
// cross-workspace IDs are returned as 404 — defense in depth against ID
// guessing.
func (h *Handler) loadDeliveryForAutopilot(w http.ResponseWriter, r *http.Request, autopilot db.Autopilot, deliveryID string) (db.WebhookDelivery, bool) {
deliveryUUID, ok := parseUUIDOrBadRequest(w, deliveryID, "delivery id")
if !ok {
return db.WebhookDelivery{}, false
}
delivery, err := h.Queries.GetWebhookDeliveryInWorkspace(r.Context(), db.GetWebhookDeliveryInWorkspaceParams{
ID: deliveryUUID,
WorkspaceID: autopilot.WorkspaceID,
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
writeError(w, http.StatusNotFound, "delivery not found")
return db.WebhookDelivery{}, false
}
writeError(w, http.StatusInternalServerError, "failed to load delivery")
return db.WebhookDelivery{}, false
}
if uuidToString(delivery.AutopilotID) != uuidToString(autopilot.ID) {
writeError(w, http.StatusNotFound, "delivery not found")
return db.WebhookDelivery{}, false
}
return delivery, true
}
// headersFromSelected decodes the small headers-subset blob back into an
// http.Header. Only used by the replay path — fields we did not capture at
// ingress time are simply absent, which matches what would have happened if
// the original request had not sent them either.
func headersFromSelected(raw []byte) http.Header {
out := http.Header{}
if len(raw) == 0 {
return out
}
var m map[string]any
if err := json.Unmarshal(raw, &m); err != nil {
return out
}
canonical := map[string]string{
"user-agent": "User-Agent",
"x-github-event": "X-GitHub-Event",
"x-github-delivery": "X-GitHub-Delivery",
"x-gitlab-event": "X-Gitlab-Event",
"x-event-type": "X-Event-Type",
"idempotency-key": "Idempotency-Key",
}
for k, v := range m {
s, ok := v.(string)
if !ok {
continue
}
header := canonical[k]
if header == "" {
continue
}
out.Set(header, s)
}
return out
}