Files
multica/server/pkg/db/generated/webhook_delivery.sql.go
Bohan Jiang 2323b72710 feat(autopilots): webhook delivery layer + idempotency/signature/replay (MUL-2334) [PR1] (#2774)
* feat(autopilots): webhook delivery layer + idempotency / signature / replay (MUL-2334)

Splits "inbound webhook receipt" from "autopilot run creation" so we can
record duplicate attempts, signature outcomes, and ignored/skipped
deliveries — and replay a delivery on demand. v1 ingress wrote straight
into autopilot_run.trigger_payload, which collapsed the two concerns and
left run_only autopilots vulnerable to provider retry storms.

Backend only (PR1). UI Deliveries tab follows in PR2.

Schema (migration 093):
  - autopilot_trigger.provider: 'generic' | 'github' (default 'generic').
  - autopilot_trigger.signing_secret: nullable plaintext (HMAC needs it
    cleartext; mirrors how webhook_token is stored).
  - webhook_delivery: one row per inbound POST. Carries raw_body,
    selected_headers, dedupe_key/source, signature_status,
    autopilot_run_id, replayed_from_delivery_id, response_status / body.
  - Partial unique index on (trigger_id, dedupe_key) excludes NULL and
    'rejected' rows, so a wrong-secret 401 does NOT permanently block a
    future retry with the same X-GitHub-Delivery once the operator fixes
    the secret.

Ingress flow (autopilot_webhook.go), persist-first + sync dispatch:
  1. IP rate limit -> 2. token lookup -> 3. token rate limit ->
  4. read raw body -> 5. autopilot/workspace cross-check ->
  6. normalize JSON (400 without persistence on parse failure) ->
  7. compute dedupe key + signature status ->
  8. INSERT delivery (status=queued). On (trigger_id, dedupe_key)
     unique-violation: bump attempt_count on existing row and return
     the original delivery_id + autopilot_run_id with 200 ->
  9. invalid/missing signature: UPDATE -> rejected, return 401 with
     delivery_id (no dispatch, not replayable) ->
 10. trigger disabled / autopilot paused/archived: UPDATE -> ignored,
     return 200 ->
 11. DispatchAutopilot synchronously, UPDATE -> dispatched/skipped/failed
     with autopilot_run_id and the response body we returned ->
 12. TouchAutopilotTriggerFiredAt and return 200.

No new long-running worker. A stale 'queued' row only happens if the
process dies between INSERT and UPDATE; that's a follow-up sweeper, not
this PR.

Authenticated API:
  - GET    /api/autopilots/{id}/deliveries (slim list)
  - GET    /api/autopilots/{id}/deliveries/{deliveryId} (with raw_body)
  - POST   /api/autopilots/{id}/deliveries/{deliveryId}/replay -> creates
    a new delivery row (replayed_from_delivery_id set), dispatches a
    new run, never collapses onto the original via dedupe.
  - PUT    /api/autopilots/{id}/triggers/{triggerId}/signing-secret
    Write-only; trigger response surfaces has_signing_secret +
    signing_secret_hint (last 4 chars), never the secret itself.

Signature verification reuses the GitHub-compatible
X-Hub-Signature-256: sha256=<hex(hmac(body, secret))> scheme; the
HMAC helper is constant-time. Invalid/missing signatures still count
against per-IP and per-token rate limits.

autopilot_run.trigger_payload is intentionally preserved — delivery
records the HTTP receipt; run records the normalized envelope handed
to the agent. They are two different views.

Tests (Postgres-backed):
  - delivery persistence on accept
  - dedupe via Idempotency-Key and X-GitHub-Delivery; run_only retry
    storm pin (3 retries -> 1 run)
  - invalid signature: 401 + rejected row + no run linkage
  - missing signature when secret configured: 401 + 'missing' state
  - valid signature dispatches
  - signing secret never echoed in trigger responses; hint shows last 4
  - min-length and clear-by-empty for signing secret PUT
  - replay creates a NEW delivery + new run; rejected deliveries cannot
    be replayed
  - list omits raw_body; detail includes it; cross-autopilot ID returns
    404 (workspace isolation defense in depth)
  - provider validation: unknown -> 400, github -> 201 round-trips
  - bad-signature stream still counts against per-token rate limit

Co-authored-by: multica-agent <github@multica.ai>

* fix(autopilots): address PR review on webhook delivery layer (MUL-2334)

- Exclude `failed` from the (trigger_id, dedupe_key) partial unique index
  alongside `rejected`, so a transient ingress failure does not strand the
  provider's stable X-GitHub-Delivery / Idempotency-Key retry. Update the
  dedupe lookup to prefer non-terminal rows under the same predicate.
- Tighten delivery status enum: drop `skipped` from the CHECK constraint
  and from the handler. A run that was admission-skipped (e.g. runtime
  offline) is now recorded as delivery=`dispatched` linked to the
  skipped run, with the response payload carrying status=`skipped`.
  Source of truth for skipped-ness is autopilot_run.status, not the
  delivery row — keeps the Deliveries UI enum unambiguous.
- On dispatch error, link the (possibly non-nil) autopilot_run returned
  by DispatchAutopilot to the failed delivery so Deliveries UI can
  navigate to the run row for debugging.
- Slim list projection: ListWebhookDeliveriesByAutopilot no longer pulls
  raw_body / selected_headers / response_body — a 100-row page × 256 KiB
  would otherwise round-trip ~25 MiB from Postgres per Deliveries reload.
  Detail endpoint continues to return the full row.
- Fix backend CI: TestGetDelivery_ReturnsFullPayload now decodes the
  response and asserts on the parsed raw_body instead of substring-
  matching against an escaped JSON string; raise the test-suite default
  webhook rate limits in TestMain so the shared 192.0.2.1 IP bucket
  doesn't fill across the suite and leak 429s into unrelated tests.
- Add regression coverage for the dedupe-after-failure path.

cd server && go test ./... is green locally.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-18 14:59:40 +08:00

474 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: webhook_delivery.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const bumpWebhookDeliveryAttempt = `-- name: BumpWebhookDeliveryAttempt :one
UPDATE webhook_delivery
SET attempt_count = attempt_count + 1,
last_attempt_at = now()
WHERE id = $1
RETURNING id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at
`
// On duplicate detection, bump attempt_count and refresh last_attempt_at on
// the existing delivery so the UI / operator can see retry pressure without
// creating a new row per attempt.
func (q *Queries) BumpWebhookDeliveryAttempt(ctx context.Context, id pgtype.UUID) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, bumpWebhookDeliveryAttempt, id)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const createWebhookDelivery = `-- name: CreateWebhookDelivery :one
INSERT INTO webhook_delivery (
workspace_id, autopilot_id, trigger_id, provider, event,
dedupe_key, dedupe_source, signature_status, status,
selected_headers, content_type, raw_body,
replayed_from_delivery_id
) VALUES (
$1, $2, $3, $4, $5,
$9, $10, $6, $7,
$8, $11, $12,
$13
) RETURNING id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at
`
type CreateWebhookDeliveryParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
AutopilotID pgtype.UUID `json:"autopilot_id"`
TriggerID pgtype.UUID `json:"trigger_id"`
Provider string `json:"provider"`
Event string `json:"event"`
SignatureStatus string `json:"signature_status"`
Status string `json:"status"`
SelectedHeaders []byte `json:"selected_headers"`
DedupeKey pgtype.Text `json:"dedupe_key"`
DedupeSource pgtype.Text `json:"dedupe_source"`
ContentType pgtype.Text `json:"content_type"`
RawBody []byte `json:"raw_body"`
ReplayedFromDeliveryID pgtype.UUID `json:"replayed_from_delivery_id"`
}
// =====================
// Webhook Delivery
// =====================
// Inserts a delivery row. On dedupe-key collision the unique partial index
// (trigger_id, dedupe_key) raises 23505 and the handler treats it as
// "duplicate" rather than an error.
func (q *Queries) CreateWebhookDelivery(ctx context.Context, arg CreateWebhookDeliveryParams) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, createWebhookDelivery,
arg.WorkspaceID,
arg.AutopilotID,
arg.TriggerID,
arg.Provider,
arg.Event,
arg.SignatureStatus,
arg.Status,
arg.SelectedHeaders,
arg.DedupeKey,
arg.DedupeSource,
arg.ContentType,
arg.RawBody,
arg.ReplayedFromDeliveryID,
)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const getWebhookDelivery = `-- name: GetWebhookDelivery :one
SELECT id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at FROM webhook_delivery
WHERE id = $1
`
func (q *Queries) GetWebhookDelivery(ctx context.Context, id pgtype.UUID) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, getWebhookDelivery, id)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const getWebhookDeliveryByTriggerAndDedupe = `-- name: GetWebhookDeliveryByTriggerAndDedupe :one
SELECT id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at FROM webhook_delivery
WHERE trigger_id = $1
AND dedupe_key = $2
ORDER BY (status IN ('rejected', 'failed')), created_at DESC
LIMIT 1
`
type GetWebhookDeliveryByTriggerAndDedupeParams struct {
TriggerID pgtype.UUID `json:"trigger_id"`
DedupeKey pgtype.Text `json:"dedupe_key"`
}
// Looks up the existing delivery for a (trigger, dedupe_key) pair so that
// duplicate requests return the original delivery_id / autopilot_run_id.
// The partial unique index excludes terminal-but-not-successful statuses
// (`rejected`, `failed`), so multiple such rows can coexist for the same
// key. Prefer non-terminal rows in the lookup: without the ORDER BY we
// could return a stale rejection / failure even after the operator fixed
// the cause and a fresh dispatch succeeded.
func (q *Queries) GetWebhookDeliveryByTriggerAndDedupe(ctx context.Context, arg GetWebhookDeliveryByTriggerAndDedupeParams) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, getWebhookDeliveryByTriggerAndDedupe, arg.TriggerID, arg.DedupeKey)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const getWebhookDeliveryInWorkspace = `-- name: GetWebhookDeliveryInWorkspace :one
SELECT id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at FROM webhook_delivery
WHERE id = $1 AND workspace_id = $2
`
type GetWebhookDeliveryInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
// Workspace-scoped read for authenticated detail / replay endpoints.
func (q *Queries) GetWebhookDeliveryInWorkspace(ctx context.Context, arg GetWebhookDeliveryInWorkspaceParams) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, getWebhookDeliveryInWorkspace, arg.ID, arg.WorkspaceID)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const listWebhookDeliveriesByAutopilot = `-- name: ListWebhookDeliveriesByAutopilot :many
SELECT
d.id, d.workspace_id, d.autopilot_id, d.trigger_id, d.provider, d.event,
d.dedupe_key, d.dedupe_source, d.signature_status, d.status,
d.attempt_count, d.content_type, d.response_status,
d.autopilot_run_id, d.replayed_from_delivery_id, d.error,
d.received_at, d.last_attempt_at, d.created_at
FROM webhook_delivery d
JOIN autopilot a ON a.id = d.autopilot_id
WHERE d.autopilot_id = $1
AND a.workspace_id = $2
ORDER BY d.created_at DESC
LIMIT $3 OFFSET $4
`
type ListWebhookDeliveriesByAutopilotParams struct {
AutopilotID pgtype.UUID `json:"autopilot_id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
type ListWebhookDeliveriesByAutopilotRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
AutopilotID pgtype.UUID `json:"autopilot_id"`
TriggerID pgtype.UUID `json:"trigger_id"`
Provider string `json:"provider"`
Event string `json:"event"`
DedupeKey pgtype.Text `json:"dedupe_key"`
DedupeSource pgtype.Text `json:"dedupe_source"`
SignatureStatus string `json:"signature_status"`
Status string `json:"status"`
AttemptCount int32 `json:"attempt_count"`
ContentType pgtype.Text `json:"content_type"`
ResponseStatus pgtype.Int4 `json:"response_status"`
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
ReplayedFromDeliveryID pgtype.UUID `json:"replayed_from_delivery_id"`
Error pgtype.Text `json:"error"`
ReceivedAt pgtype.Timestamptz `json:"received_at"`
LastAttemptAt pgtype.Timestamptz `json:"last_attempt_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
// Workspace-scoped via the join so a runId from another workspace cannot
// leak. Newest first, paged by limit/offset.
//
// Projection: large columns (`raw_body`, `selected_headers`, `response_body`)
// are deliberately excluded. A 100-row page × 256 KiB raw_body would be
// 25 MiB of bytes pulled from Postgres just to be dropped in the JSON
// encoder — Deliveries tab would hit that on every reload. Detail views
// fetch the full row via GetWebhookDelivery / GetWebhookDeliveryInWorkspace.
func (q *Queries) ListWebhookDeliveriesByAutopilot(ctx context.Context, arg ListWebhookDeliveriesByAutopilotParams) ([]ListWebhookDeliveriesByAutopilotRow, error) {
rows, err := q.db.Query(ctx, listWebhookDeliveriesByAutopilot,
arg.AutopilotID,
arg.WorkspaceID,
arg.Limit,
arg.Offset,
)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListWebhookDeliveriesByAutopilotRow{}
for rows.Next() {
var i ListWebhookDeliveriesByAutopilotRow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.ContentType,
&i.ResponseStatus,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateWebhookDeliveryDispatched = `-- name: UpdateWebhookDeliveryDispatched :one
UPDATE webhook_delivery
SET status = $2,
autopilot_run_id = $3,
response_status = $4,
response_body = $5,
last_attempt_at = now()
WHERE id = $1
RETURNING id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at
`
type UpdateWebhookDeliveryDispatchedParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
ResponseStatus pgtype.Int4 `json:"response_status"`
ResponseBody pgtype.Text `json:"response_body"`
}
// Finalises a delivery that successfully created (or skipped to) an
// autopilot_run. response_status is the HTTP status we returned, recorded
// alongside so the operator can correlate logs.
func (q *Queries) UpdateWebhookDeliveryDispatched(ctx context.Context, arg UpdateWebhookDeliveryDispatchedParams) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, updateWebhookDeliveryDispatched,
arg.ID,
arg.Status,
arg.AutopilotRunID,
arg.ResponseStatus,
arg.ResponseBody,
)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}
const updateWebhookDeliveryTerminal = `-- name: UpdateWebhookDeliveryTerminal :one
UPDATE webhook_delivery
SET status = $2,
error = $3,
response_status = $4,
response_body = $5,
last_attempt_at = now()
WHERE id = $1
RETURNING id, workspace_id, autopilot_id, trigger_id, provider, event, dedupe_key, dedupe_source, signature_status, status, attempt_count, selected_headers, content_type, raw_body, response_status, response_body, autopilot_run_id, replayed_from_delivery_id, error, received_at, last_attempt_at, created_at
`
type UpdateWebhookDeliveryTerminalParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
Error pgtype.Text `json:"error"`
ResponseStatus pgtype.Int4 `json:"response_status"`
ResponseBody pgtype.Text `json:"response_body"`
}
// Finalises a delivery without an autopilot_run link — rejected, ignored,
// failed. Separate query so callers can't accidentally drop the run_id when
// they only meant to set status/error.
func (q *Queries) UpdateWebhookDeliveryTerminal(ctx context.Context, arg UpdateWebhookDeliveryTerminalParams) (WebhookDelivery, error) {
row := q.db.QueryRow(ctx, updateWebhookDeliveryTerminal,
arg.ID,
arg.Status,
arg.Error,
arg.ResponseStatus,
arg.ResponseBody,
)
var i WebhookDelivery
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AutopilotID,
&i.TriggerID,
&i.Provider,
&i.Event,
&i.DedupeKey,
&i.DedupeSource,
&i.SignatureStatus,
&i.Status,
&i.AttemptCount,
&i.SelectedHeaders,
&i.ContentType,
&i.RawBody,
&i.ResponseStatus,
&i.ResponseBody,
&i.AutopilotRunID,
&i.ReplayedFromDeliveryID,
&i.Error,
&i.ReceivedAt,
&i.LastAttemptAt,
&i.CreatedAt,
)
return i, err
}