mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
* feat(autopilots): webhook delivery layer + idempotency / signature / replay (MUL-2334)
Splits "inbound webhook receipt" from "autopilot run creation" so we can
record duplicate attempts, signature outcomes, and ignored/skipped
deliveries — and replay a delivery on demand. v1 ingress wrote straight
into autopilot_run.trigger_payload, which collapsed the two concerns and
left run_only autopilots vulnerable to provider retry storms.
Backend only (PR1). UI Deliveries tab follows in PR2.
Schema (migration 093):
- autopilot_trigger.provider: 'generic' | 'github' (default 'generic').
- autopilot_trigger.signing_secret: nullable plaintext (HMAC needs it
cleartext; mirrors how webhook_token is stored).
- webhook_delivery: one row per inbound POST. Carries raw_body,
selected_headers, dedupe_key/source, signature_status,
autopilot_run_id, replayed_from_delivery_id, response_status / body.
- Partial unique index on (trigger_id, dedupe_key) excludes NULL and
'rejected' rows, so a wrong-secret 401 does NOT permanently block a
future retry with the same X-GitHub-Delivery once the operator fixes
the secret.
Ingress flow (autopilot_webhook.go), persist-first + sync dispatch:
1. IP rate limit -> 2. token lookup -> 3. token rate limit ->
4. read raw body -> 5. autopilot/workspace cross-check ->
6. normalize JSON (400 without persistence on parse failure) ->
7. compute dedupe key + signature status ->
8. INSERT delivery (status=queued). On (trigger_id, dedupe_key)
unique-violation: bump attempt_count on existing row and return
the original delivery_id + autopilot_run_id with 200 ->
9. invalid/missing signature: UPDATE -> rejected, return 401 with
delivery_id (no dispatch, not replayable) ->
10. trigger disabled / autopilot paused/archived: UPDATE -> ignored,
return 200 ->
11. DispatchAutopilot synchronously, UPDATE -> dispatched/skipped/failed
with autopilot_run_id and the response body we returned ->
12. TouchAutopilotTriggerFiredAt and return 200.
No new long-running worker. A stale 'queued' row only happens if the
process dies between INSERT and UPDATE; that's a follow-up sweeper, not
this PR.
Authenticated API:
- GET /api/autopilots/{id}/deliveries (slim list)
- GET /api/autopilots/{id}/deliveries/{deliveryId} (with raw_body)
- POST /api/autopilots/{id}/deliveries/{deliveryId}/replay -> creates
a new delivery row (replayed_from_delivery_id set), dispatches a
new run, never collapses onto the original via dedupe.
- PUT /api/autopilots/{id}/triggers/{triggerId}/signing-secret
Write-only; trigger response surfaces has_signing_secret +
signing_secret_hint (last 4 chars), never the secret itself.
Signature verification reuses the GitHub-compatible
X-Hub-Signature-256: sha256=<hex(hmac(body, secret))> scheme; the
HMAC helper is constant-time. Invalid/missing signatures still count
against per-IP and per-token rate limits.
autopilot_run.trigger_payload is intentionally preserved — delivery
records the HTTP receipt; run records the normalized envelope handed
to the agent. They are two different views.
Tests (Postgres-backed):
- delivery persistence on accept
- dedupe via Idempotency-Key and X-GitHub-Delivery; run_only retry
storm pin (3 retries -> 1 run)
- invalid signature: 401 + rejected row + no run linkage
- missing signature when secret configured: 401 + 'missing' state
- valid signature dispatches
- signing secret never echoed in trigger responses; hint shows last 4
- min-length and clear-by-empty for signing secret PUT
- replay creates a NEW delivery + new run; rejected deliveries cannot
be replayed
- list omits raw_body; detail includes it; cross-autopilot ID returns
404 (workspace isolation defense in depth)
- provider validation: unknown -> 400, github -> 201 round-trips
- bad-signature stream still counts against per-token rate limit
Co-authored-by: multica-agent <github@multica.ai>
* fix(autopilots): address PR review on webhook delivery layer (MUL-2334)
- Exclude `failed` from the (trigger_id, dedupe_key) partial unique index
alongside `rejected`, so a transient ingress failure does not strand the
provider's stable X-GitHub-Delivery / Idempotency-Key retry. Update the
dedupe lookup to prefer non-terminal rows under the same predicate.
- Tighten delivery status enum: drop `skipped` from the CHECK constraint
and from the handler. A run that was admission-skipped (e.g. runtime
offline) is now recorded as delivery=`dispatched` linked to the
skipped run, with the response payload carrying status=`skipped`.
Source of truth for skipped-ness is autopilot_run.status, not the
delivery row — keeps the Deliveries UI enum unambiguous.
- On dispatch error, link the (possibly non-nil) autopilot_run returned
by DispatchAutopilot to the failed delivery so Deliveries UI can
navigate to the run row for debugging.
- Slim list projection: ListWebhookDeliveriesByAutopilot no longer pulls
raw_body / selected_headers / response_body — a 100-row page × 256 KiB
would otherwise round-trip ~25 MiB from Postgres per Deliveries reload.
Detail endpoint continues to return the full row.
- Fix backend CI: TestGetDelivery_ReturnsFullPayload now decodes the
response and asserts on the parsed raw_body instead of substring-
matching against an escaped JSON string; raise the test-suite default
webhook rate limits in TestMain so the shared 192.0.2.1 IP bucket
doesn't fill across the suite and leak 429s into unrelated tests.
- Add regression coverage for the dedupe-after-failure path.
cd server && go test ./... is green locally.
Co-authored-by: multica-agent <github@multica.ai>
---------
Co-authored-by: multica-agent <github@multica.ai>
101 lines
3.9 KiB
SQL
101 lines
3.9 KiB
SQL
-- =====================
|
||
-- Webhook Delivery
|
||
-- =====================
|
||
|
||
-- name: CreateWebhookDelivery :one
|
||
-- Inserts a delivery row. On dedupe-key collision the unique partial index
|
||
-- (trigger_id, dedupe_key) raises 23505 and the handler treats it as
|
||
-- "duplicate" rather than an error.
|
||
INSERT INTO webhook_delivery (
|
||
workspace_id, autopilot_id, trigger_id, provider, event,
|
||
dedupe_key, dedupe_source, signature_status, status,
|
||
selected_headers, content_type, raw_body,
|
||
replayed_from_delivery_id
|
||
) VALUES (
|
||
$1, $2, $3, $4, $5,
|
||
sqlc.narg('dedupe_key'), sqlc.narg('dedupe_source'), $6, $7,
|
||
$8, sqlc.narg('content_type'), sqlc.narg('raw_body'),
|
||
sqlc.narg('replayed_from_delivery_id')
|
||
) RETURNING *;
|
||
|
||
-- name: GetWebhookDelivery :one
|
||
SELECT * FROM webhook_delivery
|
||
WHERE id = $1;
|
||
|
||
-- name: GetWebhookDeliveryInWorkspace :one
|
||
-- Workspace-scoped read for authenticated detail / replay endpoints.
|
||
SELECT * FROM webhook_delivery
|
||
WHERE id = $1 AND workspace_id = $2;
|
||
|
||
-- name: GetWebhookDeliveryByTriggerAndDedupe :one
|
||
-- Looks up the existing delivery for a (trigger, dedupe_key) pair so that
|
||
-- duplicate requests return the original delivery_id / autopilot_run_id.
|
||
-- The partial unique index excludes terminal-but-not-successful statuses
|
||
-- (`rejected`, `failed`), so multiple such rows can coexist for the same
|
||
-- key. Prefer non-terminal rows in the lookup: without the ORDER BY we
|
||
-- could return a stale rejection / failure even after the operator fixed
|
||
-- the cause and a fresh dispatch succeeded.
|
||
SELECT * FROM webhook_delivery
|
||
WHERE trigger_id = $1
|
||
AND dedupe_key = $2
|
||
ORDER BY (status IN ('rejected', 'failed')), created_at DESC
|
||
LIMIT 1;
|
||
|
||
-- name: BumpWebhookDeliveryAttempt :one
|
||
-- On duplicate detection, bump attempt_count and refresh last_attempt_at on
|
||
-- the existing delivery so the UI / operator can see retry pressure without
|
||
-- creating a new row per attempt.
|
||
UPDATE webhook_delivery
|
||
SET attempt_count = attempt_count + 1,
|
||
last_attempt_at = now()
|
||
WHERE id = $1
|
||
RETURNING *;
|
||
|
||
-- name: UpdateWebhookDeliveryDispatched :one
|
||
-- Finalises a delivery that successfully created (or skipped to) an
|
||
-- autopilot_run. response_status is the HTTP status we returned, recorded
|
||
-- alongside so the operator can correlate logs.
|
||
UPDATE webhook_delivery
|
||
SET status = $2,
|
||
autopilot_run_id = sqlc.narg('autopilot_run_id'),
|
||
response_status = sqlc.narg('response_status'),
|
||
response_body = sqlc.narg('response_body'),
|
||
last_attempt_at = now()
|
||
WHERE id = $1
|
||
RETURNING *;
|
||
|
||
-- name: UpdateWebhookDeliveryTerminal :one
|
||
-- Finalises a delivery without an autopilot_run link — rejected, ignored,
|
||
-- failed. Separate query so callers can't accidentally drop the run_id when
|
||
-- they only meant to set status/error.
|
||
UPDATE webhook_delivery
|
||
SET status = $2,
|
||
error = sqlc.narg('error'),
|
||
response_status = sqlc.narg('response_status'),
|
||
response_body = sqlc.narg('response_body'),
|
||
last_attempt_at = now()
|
||
WHERE id = $1
|
||
RETURNING *;
|
||
|
||
-- name: ListWebhookDeliveriesByAutopilot :many
|
||
-- Workspace-scoped via the join so a runId from another workspace cannot
|
||
-- leak. Newest first, paged by limit/offset.
|
||
--
|
||
-- Projection: large columns (`raw_body`, `selected_headers`, `response_body`)
|
||
-- are deliberately excluded. A 100-row page × 256 KiB raw_body would be
|
||
-- 25 MiB of bytes pulled from Postgres just to be dropped in the JSON
|
||
-- encoder — Deliveries tab would hit that on every reload. Detail views
|
||
-- fetch the full row via GetWebhookDelivery / GetWebhookDeliveryInWorkspace.
|
||
SELECT
|
||
d.id, d.workspace_id, d.autopilot_id, d.trigger_id, d.provider, d.event,
|
||
d.dedupe_key, d.dedupe_source, d.signature_status, d.status,
|
||
d.attempt_count, d.content_type, d.response_status,
|
||
d.autopilot_run_id, d.replayed_from_delivery_id, d.error,
|
||
d.received_at, d.last_attempt_at, d.created_at
|
||
FROM webhook_delivery d
|
||
JOIN autopilot a ON a.id = d.autopilot_id
|
||
WHERE d.autopilot_id = $1
|
||
AND a.workspace_id = $2
|
||
ORDER BY d.created_at DESC
|
||
LIMIT $3 OFFSET $4;
|