mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
* feat(autopilots): webhook delivery layer + idempotency / signature / replay (MUL-2334)
Splits "inbound webhook receipt" from "autopilot run creation" so we can
record duplicate attempts, signature outcomes, and ignored/skipped
deliveries — and replay a delivery on demand. v1 ingress wrote straight
into autopilot_run.trigger_payload, which collapsed the two concerns and
left run_only autopilots vulnerable to provider retry storms.
Backend only (PR1). UI Deliveries tab follows in PR2.
Schema (migration 093):
- autopilot_trigger.provider: 'generic' | 'github' (default 'generic').
- autopilot_trigger.signing_secret: nullable plaintext (HMAC needs it
cleartext; mirrors how webhook_token is stored).
- webhook_delivery: one row per inbound POST. Carries raw_body,
selected_headers, dedupe_key/source, signature_status,
autopilot_run_id, replayed_from_delivery_id, response_status / body.
- Partial unique index on (trigger_id, dedupe_key) excludes NULL and
'rejected' rows, so a wrong-secret 401 does NOT permanently block a
future retry with the same X-GitHub-Delivery once the operator fixes
the secret.
Ingress flow (autopilot_webhook.go), persist-first + sync dispatch:
1. IP rate limit -> 2. token lookup -> 3. token rate limit ->
4. read raw body -> 5. autopilot/workspace cross-check ->
6. normalize JSON (400 without persistence on parse failure) ->
7. compute dedupe key + signature status ->
8. INSERT delivery (status=queued). On (trigger_id, dedupe_key)
unique-violation: bump attempt_count on existing row and return
the original delivery_id + autopilot_run_id with 200 ->
9. invalid/missing signature: UPDATE -> rejected, return 401 with
delivery_id (no dispatch, not replayable) ->
10. trigger disabled / autopilot paused/archived: UPDATE -> ignored,
return 200 ->
11. DispatchAutopilot synchronously, UPDATE -> dispatched/skipped/failed
with autopilot_run_id and the response body we returned ->
12. TouchAutopilotTriggerFiredAt and return 200.
No new long-running worker. A stale 'queued' row only happens if the
process dies between INSERT and UPDATE; that's a follow-up sweeper, not
this PR.
Authenticated API:
- GET /api/autopilots/{id}/deliveries (slim list)
- GET /api/autopilots/{id}/deliveries/{deliveryId} (with raw_body)
- POST /api/autopilots/{id}/deliveries/{deliveryId}/replay -> creates
a new delivery row (replayed_from_delivery_id set), dispatches a
new run, never collapses onto the original via dedupe.
- PUT /api/autopilots/{id}/triggers/{triggerId}/signing-secret
Write-only; trigger response surfaces has_signing_secret +
signing_secret_hint (last 4 chars), never the secret itself.
Signature verification reuses the GitHub-compatible
X-Hub-Signature-256: sha256=<hex(hmac(body, secret))> scheme; the
HMAC helper is constant-time. Invalid/missing signatures still count
against per-IP and per-token rate limits.
autopilot_run.trigger_payload is intentionally preserved — delivery
records the HTTP receipt; run records the normalized envelope handed
to the agent. They are two different views.
Tests (Postgres-backed):
- delivery persistence on accept
- dedupe via Idempotency-Key and X-GitHub-Delivery; run_only retry
storm pin (3 retries -> 1 run)
- invalid signature: 401 + rejected row + no run linkage
- missing signature when secret configured: 401 + 'missing' state
- valid signature dispatches
- signing secret never echoed in trigger responses; hint shows last 4
- min-length and clear-by-empty for signing secret PUT
- replay creates a NEW delivery + new run; rejected deliveries cannot
be replayed
- list omits raw_body; detail includes it; cross-autopilot ID returns
404 (workspace isolation defense in depth)
- provider validation: unknown -> 400, github -> 201 round-trips
- bad-signature stream still counts against per-token rate limit
Co-authored-by: multica-agent <github@multica.ai>
* fix(autopilots): address PR review on webhook delivery layer (MUL-2334)
- Exclude `failed` from the (trigger_id, dedupe_key) partial unique index
alongside `rejected`, so a transient ingress failure does not strand the
provider's stable X-GitHub-Delivery / Idempotency-Key retry. Update the
dedupe lookup to prefer non-terminal rows under the same predicate.
- Tighten delivery status enum: drop `skipped` from the CHECK constraint
and from the handler. A run that was admission-skipped (e.g. runtime
offline) is now recorded as delivery=`dispatched` linked to the
skipped run, with the response payload carrying status=`skipped`.
Source of truth for skipped-ness is autopilot_run.status, not the
delivery row — keeps the Deliveries UI enum unambiguous.
- On dispatch error, link the (possibly non-nil) autopilot_run returned
by DispatchAutopilot to the failed delivery so Deliveries UI can
navigate to the run row for debugging.
- Slim list projection: ListWebhookDeliveriesByAutopilot no longer pulls
raw_body / selected_headers / response_body — a 100-row page × 256 KiB
would otherwise round-trip ~25 MiB from Postgres per Deliveries reload.
Detail endpoint continues to return the full row.
- Fix backend CI: TestGetDelivery_ReturnsFullPayload now decodes the
response and asserts on the parsed raw_body instead of substring-
matching against an escaped JSON string; raise the test-suite default
webhook rate limits in TestMain so the shared 192.0.2.1 IP bucket
doesn't fill across the suite and leak 429s into unrelated tests.
- Add regression coverage for the dedupe-after-failure path.
cd server && go test ./... is green locally.
Co-authored-by: multica-agent <github@multica.ai>
---------
Co-authored-by: multica-agent <github@multica.ai>
413 lines
14 KiB
Go
413 lines
14 KiB
Go
package handler
|
||
|
||
import (
|
||
"encoding/json"
|
||
"errors"
|
||
"log/slog"
|
||
"net/http"
|
||
"strconv"
|
||
|
||
"github.com/go-chi/chi/v5"
|
||
"github.com/jackc/pgx/v5"
|
||
"github.com/jackc/pgx/v5/pgtype"
|
||
|
||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||
)
|
||
|
||
// ── Response types ──────────────────────────────────────────────────────────
|
||
|
||
// WebhookDeliveryResponse is the authenticated-API view of webhook_delivery.
|
||
// The list endpoint returns these without `RawBody` / `SelectedHeaders`
|
||
// populated; the detail endpoint includes both for debugging. We never echo
|
||
// the signing secret or token through this surface.
|
||
type WebhookDeliveryResponse struct {
|
||
ID string `json:"id"`
|
||
WorkspaceID string `json:"workspace_id"`
|
||
AutopilotID string `json:"autopilot_id"`
|
||
TriggerID string `json:"trigger_id"`
|
||
Provider string `json:"provider"`
|
||
Event string `json:"event"`
|
||
DedupeKey *string `json:"dedupe_key"`
|
||
DedupeSource *string `json:"dedupe_source"`
|
||
SignatureStatus string `json:"signature_status"`
|
||
Status string `json:"status"`
|
||
AttemptCount int32 `json:"attempt_count"`
|
||
ContentType *string `json:"content_type"`
|
||
ResponseStatus *int32 `json:"response_status"`
|
||
AutopilotRunID *string `json:"autopilot_run_id"`
|
||
ReplayedFromDeliveryID *string `json:"replayed_from_delivery_id"`
|
||
Error *string `json:"error"`
|
||
ReceivedAt string `json:"received_at"`
|
||
LastAttemptAt string `json:"last_attempt_at"`
|
||
CreatedAt string `json:"created_at"`
|
||
|
||
// Detail-only fields. List responses leave these nil/empty so a page
|
||
// of N deliveries never serialises ~N × 256 KiB of raw bodies. Detail
|
||
// requests opt in by hitting GET /deliveries/{deliveryId}.
|
||
SelectedHeaders json.RawMessage `json:"selected_headers,omitempty"`
|
||
RawBody *string `json:"raw_body,omitempty"`
|
||
ResponseBody *string `json:"response_body,omitempty"`
|
||
}
|
||
|
||
// slimDeliveryToResponse maps the projected list row (no raw_body /
|
||
// selected_headers / response_body) into the wire response shape.
|
||
func slimDeliveryToResponse(d db.ListWebhookDeliveriesByAutopilotRow) WebhookDeliveryResponse {
|
||
resp := WebhookDeliveryResponse{
|
||
ID: uuidToString(d.ID),
|
||
WorkspaceID: uuidToString(d.WorkspaceID),
|
||
AutopilotID: uuidToString(d.AutopilotID),
|
||
TriggerID: uuidToString(d.TriggerID),
|
||
Provider: d.Provider,
|
||
Event: d.Event,
|
||
DedupeKey: textToPtr(d.DedupeKey),
|
||
DedupeSource: textToPtr(d.DedupeSource),
|
||
SignatureStatus: d.SignatureStatus,
|
||
Status: d.Status,
|
||
AttemptCount: d.AttemptCount,
|
||
ContentType: textToPtr(d.ContentType),
|
||
ReceivedAt: timestampToString(d.ReceivedAt),
|
||
LastAttemptAt: timestampToString(d.LastAttemptAt),
|
||
CreatedAt: timestampToString(d.CreatedAt),
|
||
}
|
||
if d.ResponseStatus.Valid {
|
||
v := d.ResponseStatus.Int32
|
||
resp.ResponseStatus = &v
|
||
}
|
||
if d.AutopilotRunID.Valid {
|
||
v := uuidToString(d.AutopilotRunID)
|
||
resp.AutopilotRunID = &v
|
||
}
|
||
if d.ReplayedFromDeliveryID.Valid {
|
||
v := uuidToString(d.ReplayedFromDeliveryID)
|
||
resp.ReplayedFromDeliveryID = &v
|
||
}
|
||
if d.Error.Valid {
|
||
v := d.Error.String
|
||
resp.Error = &v
|
||
}
|
||
return resp
|
||
}
|
||
|
||
func deliveryToResponse(d db.WebhookDelivery, detail bool) WebhookDeliveryResponse {
|
||
resp := WebhookDeliveryResponse{
|
||
ID: uuidToString(d.ID),
|
||
WorkspaceID: uuidToString(d.WorkspaceID),
|
||
AutopilotID: uuidToString(d.AutopilotID),
|
||
TriggerID: uuidToString(d.TriggerID),
|
||
Provider: d.Provider,
|
||
Event: d.Event,
|
||
DedupeKey: textToPtr(d.DedupeKey),
|
||
DedupeSource: textToPtr(d.DedupeSource),
|
||
SignatureStatus: d.SignatureStatus,
|
||
Status: d.Status,
|
||
AttemptCount: d.AttemptCount,
|
||
ContentType: textToPtr(d.ContentType),
|
||
ReceivedAt: timestampToString(d.ReceivedAt),
|
||
LastAttemptAt: timestampToString(d.LastAttemptAt),
|
||
CreatedAt: timestampToString(d.CreatedAt),
|
||
}
|
||
if d.ResponseStatus.Valid {
|
||
v := d.ResponseStatus.Int32
|
||
resp.ResponseStatus = &v
|
||
}
|
||
if d.AutopilotRunID.Valid {
|
||
v := uuidToString(d.AutopilotRunID)
|
||
resp.AutopilotRunID = &v
|
||
}
|
||
if d.ReplayedFromDeliveryID.Valid {
|
||
v := uuidToString(d.ReplayedFromDeliveryID)
|
||
resp.ReplayedFromDeliveryID = &v
|
||
}
|
||
if d.Error.Valid {
|
||
v := d.Error.String
|
||
resp.Error = &v
|
||
}
|
||
if detail {
|
||
if len(d.SelectedHeaders) > 0 {
|
||
resp.SelectedHeaders = json.RawMessage(d.SelectedHeaders)
|
||
}
|
||
if len(d.RawBody) > 0 {
|
||
s := string(d.RawBody)
|
||
resp.RawBody = &s
|
||
}
|
||
if d.ResponseBody.Valid {
|
||
v := d.ResponseBody.String
|
||
resp.ResponseBody = &v
|
||
}
|
||
}
|
||
return resp
|
||
}
|
||
|
||
// ── Handlers ────────────────────────────────────────────────────────────────
|
||
|
||
// ListAutopilotDeliveries returns recent deliveries for an autopilot. Slim
|
||
// projection — selected_headers / raw_body / response_body are omitted to
|
||
// keep list responses small. Use GetAutopilotDelivery for the full payload.
|
||
func (h *Handler) ListAutopilotDeliveries(w http.ResponseWriter, r *http.Request) {
|
||
autopilotID := chi.URLParam(r, "id")
|
||
workspaceID := h.resolveWorkspaceID(r)
|
||
|
||
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
|
||
if !ok {
|
||
return
|
||
}
|
||
|
||
limit := int32(20)
|
||
offset := int32(0)
|
||
if l := r.URL.Query().Get("limit"); l != "" {
|
||
if v, err := strconv.Atoi(l); err == nil && v > 0 {
|
||
limit = int32(v)
|
||
}
|
||
}
|
||
if limit > 100 {
|
||
limit = 100
|
||
}
|
||
if o := r.URL.Query().Get("offset"); o != "" {
|
||
if v, err := strconv.Atoi(o); err == nil && v >= 0 {
|
||
offset = int32(v)
|
||
}
|
||
}
|
||
|
||
rows, err := h.Queries.ListWebhookDeliveriesByAutopilot(r.Context(), db.ListWebhookDeliveriesByAutopilotParams{
|
||
AutopilotID: autopilot.ID,
|
||
WorkspaceID: autopilot.WorkspaceID,
|
||
Limit: limit,
|
||
Offset: offset,
|
||
})
|
||
if err != nil {
|
||
slog.Error("list deliveries failed", "error", err, "autopilot_id", autopilotID)
|
||
writeError(w, http.StatusInternalServerError, "failed to list deliveries")
|
||
return
|
||
}
|
||
|
||
resp := make([]WebhookDeliveryResponse, len(rows))
|
||
for i, row := range rows {
|
||
resp[i] = slimDeliveryToResponse(row)
|
||
}
|
||
writeJSON(w, http.StatusOK, map[string]any{"deliveries": resp, "total": len(resp)})
|
||
}
|
||
|
||
// GetAutopilotDelivery returns one delivery in full, including the raw body
|
||
// and headers subset. Workspace-scoped via the autopilot lookup; the
|
||
// delivery is then re-checked to belong to that autopilot so a guessed
|
||
// delivery id from another workspace cannot leak data.
|
||
func (h *Handler) GetAutopilotDelivery(w http.ResponseWriter, r *http.Request) {
|
||
autopilotID := chi.URLParam(r, "id")
|
||
deliveryID := chi.URLParam(r, "deliveryId")
|
||
workspaceID := h.resolveWorkspaceID(r)
|
||
|
||
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
|
||
if !ok {
|
||
return
|
||
}
|
||
delivery, ok := h.loadDeliveryForAutopilot(w, r, autopilot, deliveryID)
|
||
if !ok {
|
||
return
|
||
}
|
||
writeJSON(w, http.StatusOK, deliveryToResponse(delivery, true))
|
||
}
|
||
|
||
// ReplayAutopilotDelivery creates a NEW delivery row from a prior one and
|
||
// dispatches the autopilot synchronously. The new row carries
|
||
// `replayed_from_delivery_id` so the operator can correlate. Replay is
|
||
// rejected for deliveries that originally failed signature verification —
|
||
// re-running an attack payload against the autopilot would defeat the
|
||
// rejection in the first place.
|
||
//
|
||
// Replays bypass per-trigger dedupe by inserting with a NULL dedupe_key:
|
||
// reusing the original key would silently collapse the replay onto the prior
|
||
// delivery (the partial unique index would fire). This is the intended
|
||
// behaviour — a replay is explicitly "run this again".
|
||
func (h *Handler) ReplayAutopilotDelivery(w http.ResponseWriter, r *http.Request) {
|
||
autopilotID := chi.URLParam(r, "id")
|
||
deliveryID := chi.URLParam(r, "deliveryId")
|
||
workspaceID := h.resolveWorkspaceID(r)
|
||
|
||
autopilot, ok := h.loadAutopilotInWorkspace(w, r, autopilotID, workspaceID)
|
||
if !ok {
|
||
return
|
||
}
|
||
original, ok := h.loadDeliveryForAutopilot(w, r, autopilot, deliveryID)
|
||
if !ok {
|
||
return
|
||
}
|
||
if original.Status == deliveryStatusRejected || original.SignatureStatus == sigStatusInvalid {
|
||
writeError(w, http.StatusBadRequest, "cannot replay a delivery that failed signature verification")
|
||
return
|
||
}
|
||
if len(original.RawBody) == 0 {
|
||
writeError(w, http.StatusBadRequest, "original delivery has no raw body to replay")
|
||
return
|
||
}
|
||
|
||
if autopilot.Status != "active" {
|
||
writeError(w, http.StatusBadRequest, "autopilot is not active")
|
||
return
|
||
}
|
||
|
||
trigRow, err := h.Queries.GetAutopilotTrigger(r.Context(), original.TriggerID)
|
||
if err != nil {
|
||
writeError(w, http.StatusNotFound, "trigger not found")
|
||
return
|
||
}
|
||
if !trigRow.Enabled {
|
||
writeError(w, http.StatusBadRequest, "trigger is disabled")
|
||
return
|
||
}
|
||
|
||
// Build the envelope from the stored raw body using the original headers
|
||
// subset for event inference. SelectedHeaders is small + JSON-shaped, so
|
||
// we decode it back into a header map to reuse the same normalize path.
|
||
headers := headersFromSelected(original.SelectedHeaders)
|
||
envelope, err := normalizeWebhookPayload(original.RawBody, headers)
|
||
if err != nil {
|
||
writeError(w, http.StatusBadRequest, "stored body no longer parses: "+err.Error())
|
||
return
|
||
}
|
||
envelopeBytes, err := json.Marshal(envelope)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, "failed to encode envelope")
|
||
return
|
||
}
|
||
|
||
contentType := ""
|
||
if original.ContentType.Valid {
|
||
contentType = original.ContentType.String
|
||
}
|
||
replay, err := h.Queries.CreateWebhookDelivery(r.Context(), db.CreateWebhookDeliveryParams{
|
||
WorkspaceID: autopilot.WorkspaceID,
|
||
AutopilotID: autopilot.ID,
|
||
TriggerID: original.TriggerID,
|
||
Provider: original.Provider,
|
||
Event: envelope.Event,
|
||
SignatureStatus: sigStatusNotRequired,
|
||
Status: deliveryStatusQueued,
|
||
SelectedHeaders: original.SelectedHeaders,
|
||
ContentType: pgtype.Text{String: contentType, Valid: contentType != ""},
|
||
RawBody: original.RawBody,
|
||
ReplayedFromDeliveryID: original.ID,
|
||
})
|
||
if err != nil {
|
||
slog.Error("replay: insert delivery failed",
|
||
"error", err,
|
||
"original_delivery_id", uuidToString(original.ID),
|
||
)
|
||
writeError(w, http.StatusInternalServerError, "failed to create replay delivery")
|
||
return
|
||
}
|
||
|
||
run, dispatchErr := h.AutopilotService.DispatchAutopilot(
|
||
r.Context(),
|
||
autopilot,
|
||
trigRow.ID,
|
||
"webhook",
|
||
envelopeBytes,
|
||
)
|
||
if dispatchErr != nil {
|
||
respBody := map[string]any{"error": "failed to dispatch autopilot"}
|
||
// DispatchAutopilot may return a non-nil run alongside an error
|
||
// (see HandleAutopilotWebhook for the same rationale). Link the
|
||
// run on the failed delivery so Deliveries UI can surface it.
|
||
if run != nil {
|
||
h.finaliseDeliveryWithRun(r, replay.ID, deliveryStatusFailed, run.ID, http.StatusInternalServerError, respBody)
|
||
} else {
|
||
h.finaliseDeliveryTerminal(r, replay.ID, deliveryStatusFailed, http.StatusInternalServerError, respBody, dispatchErr.Error())
|
||
}
|
||
writeError(w, http.StatusInternalServerError, dispatchErr.Error())
|
||
return
|
||
}
|
||
|
||
if err := h.Queries.TouchAutopilotTriggerFiredAt(r.Context(), trigRow.ID); err != nil {
|
||
slog.Warn("replay: failed to touch last_fired_at", "trigger_id", uuidToString(trigRow.ID), "error", err)
|
||
}
|
||
|
||
// Delivery is always `dispatched` once a run is produced — even when
|
||
// the run itself was skipped (e.g. runtime offline). See the comment
|
||
// in HandleAutopilotWebhook for the rationale.
|
||
respBody := map[string]any{
|
||
"status": "accepted",
|
||
"delivery_id": uuidToString(replay.ID),
|
||
"run_id": uuidToString(run.ID),
|
||
"autopilot_id": uuidToString(autopilot.ID),
|
||
"trigger_id": uuidToString(trigRow.ID),
|
||
"replayed_from_delivery_id": uuidToString(original.ID),
|
||
}
|
||
if run.Status == "skipped" {
|
||
respBody["status"] = "skipped"
|
||
if run.FailureReason.Valid {
|
||
respBody["reason"] = run.FailureReason.String
|
||
}
|
||
}
|
||
h.finaliseDeliveryWithRun(r, replay.ID, deliveryStatusDispatched, run.ID, http.StatusCreated, respBody)
|
||
|
||
final, err := h.Queries.GetWebhookDelivery(r.Context(), replay.ID)
|
||
if err != nil {
|
||
writeJSON(w, http.StatusCreated, respBody)
|
||
return
|
||
}
|
||
writeJSON(w, http.StatusCreated, deliveryToResponse(final, true))
|
||
}
|
||
|
||
// loadDeliveryForAutopilot returns the delivery row when it exists in the
|
||
// same workspace AND belongs to the given autopilot. Cross-autopilot or
|
||
// cross-workspace IDs are returned as 404 — defense in depth against ID
|
||
// guessing.
|
||
func (h *Handler) loadDeliveryForAutopilot(w http.ResponseWriter, r *http.Request, autopilot db.Autopilot, deliveryID string) (db.WebhookDelivery, bool) {
|
||
deliveryUUID, ok := parseUUIDOrBadRequest(w, deliveryID, "delivery id")
|
||
if !ok {
|
||
return db.WebhookDelivery{}, false
|
||
}
|
||
delivery, err := h.Queries.GetWebhookDeliveryInWorkspace(r.Context(), db.GetWebhookDeliveryInWorkspaceParams{
|
||
ID: deliveryUUID,
|
||
WorkspaceID: autopilot.WorkspaceID,
|
||
})
|
||
if err != nil {
|
||
if errors.Is(err, pgx.ErrNoRows) {
|
||
writeError(w, http.StatusNotFound, "delivery not found")
|
||
return db.WebhookDelivery{}, false
|
||
}
|
||
writeError(w, http.StatusInternalServerError, "failed to load delivery")
|
||
return db.WebhookDelivery{}, false
|
||
}
|
||
if uuidToString(delivery.AutopilotID) != uuidToString(autopilot.ID) {
|
||
writeError(w, http.StatusNotFound, "delivery not found")
|
||
return db.WebhookDelivery{}, false
|
||
}
|
||
return delivery, true
|
||
}
|
||
|
||
// headersFromSelected decodes the small headers-subset blob back into an
|
||
// http.Header. Only used by the replay path — fields we did not capture at
|
||
// ingress time are simply absent, which matches what would have happened if
|
||
// the original request had not sent them either.
|
||
func headersFromSelected(raw []byte) http.Header {
|
||
out := http.Header{}
|
||
if len(raw) == 0 {
|
||
return out
|
||
}
|
||
var m map[string]any
|
||
if err := json.Unmarshal(raw, &m); err != nil {
|
||
return out
|
||
}
|
||
canonical := map[string]string{
|
||
"user-agent": "User-Agent",
|
||
"x-github-event": "X-GitHub-Event",
|
||
"x-github-delivery": "X-GitHub-Delivery",
|
||
"x-gitlab-event": "X-Gitlab-Event",
|
||
"x-event-type": "X-Event-Type",
|
||
"idempotency-key": "Idempotency-Key",
|
||
}
|
||
for k, v := range m {
|
||
s, ok := v.(string)
|
||
if !ok {
|
||
continue
|
||
}
|
||
header := canonical[k]
|
||
if header == "" {
|
||
continue
|
||
}
|
||
out.Set(header, s)
|
||
}
|
||
return out
|
||
}
|