Files
multica/server/internal/handler/webhook_delivery_test.go
Bohan Jiang 2323b72710 feat(autopilots): webhook delivery layer + idempotency/signature/replay (MUL-2334) [PR1] (#2774)
* feat(autopilots): webhook delivery layer + idempotency / signature / replay (MUL-2334)

Splits "inbound webhook receipt" from "autopilot run creation" so we can
record duplicate attempts, signature outcomes, and ignored/skipped
deliveries — and replay a delivery on demand. v1 ingress wrote straight
into autopilot_run.trigger_payload, which collapsed the two concerns and
left run_only autopilots vulnerable to provider retry storms.

Backend only (PR1). UI Deliveries tab follows in PR2.

Schema (migration 093):
  - autopilot_trigger.provider: 'generic' | 'github' (default 'generic').
  - autopilot_trigger.signing_secret: nullable plaintext (HMAC needs it
    cleartext; mirrors how webhook_token is stored).
  - webhook_delivery: one row per inbound POST. Carries raw_body,
    selected_headers, dedupe_key/source, signature_status,
    autopilot_run_id, replayed_from_delivery_id, response_status / body.
  - Partial unique index on (trigger_id, dedupe_key) excludes NULL and
    'rejected' rows, so a wrong-secret 401 does NOT permanently block a
    future retry with the same X-GitHub-Delivery once the operator fixes
    the secret.

Ingress flow (autopilot_webhook.go), persist-first + sync dispatch:
  1. IP rate limit -> 2. token lookup -> 3. token rate limit ->
  4. read raw body -> 5. autopilot/workspace cross-check ->
  6. normalize JSON (400 without persistence on parse failure) ->
  7. compute dedupe key + signature status ->
  8. INSERT delivery (status=queued). On (trigger_id, dedupe_key)
     unique-violation: bump attempt_count on existing row and return
     the original delivery_id + autopilot_run_id with 200 ->
  9. invalid/missing signature: UPDATE -> rejected, return 401 with
     delivery_id (no dispatch, not replayable) ->
 10. trigger disabled / autopilot paused/archived: UPDATE -> ignored,
     return 200 ->
 11. DispatchAutopilot synchronously, UPDATE -> dispatched/skipped/failed
     with autopilot_run_id and the response body we returned ->
 12. TouchAutopilotTriggerFiredAt and return 200.

No new long-running worker. A stale 'queued' row only happens if the
process dies between INSERT and UPDATE; that's a follow-up sweeper, not
this PR.

Authenticated API:
  - GET    /api/autopilots/{id}/deliveries (slim list)
  - GET    /api/autopilots/{id}/deliveries/{deliveryId} (with raw_body)
  - POST   /api/autopilots/{id}/deliveries/{deliveryId}/replay -> creates
    a new delivery row (replayed_from_delivery_id set), dispatches a
    new run, never collapses onto the original via dedupe.
  - PUT    /api/autopilots/{id}/triggers/{triggerId}/signing-secret
    Write-only; trigger response surfaces has_signing_secret +
    signing_secret_hint (last 4 chars), never the secret itself.

Signature verification reuses the GitHub-compatible
X-Hub-Signature-256: sha256=<hex(hmac(body, secret))> scheme; the
HMAC helper is constant-time. Invalid/missing signatures still count
against per-IP and per-token rate limits.

autopilot_run.trigger_payload is intentionally preserved — delivery
records the HTTP receipt; run records the normalized envelope handed
to the agent. They are two different views.

Tests (Postgres-backed):
  - delivery persistence on accept
  - dedupe via Idempotency-Key and X-GitHub-Delivery; run_only retry
    storm pin (3 retries -> 1 run)
  - invalid signature: 401 + rejected row + no run linkage
  - missing signature when secret configured: 401 + 'missing' state
  - valid signature dispatches
  - signing secret never echoed in trigger responses; hint shows last 4
  - min-length and clear-by-empty for signing secret PUT
  - replay creates a NEW delivery + new run; rejected deliveries cannot
    be replayed
  - list omits raw_body; detail includes it; cross-autopilot ID returns
    404 (workspace isolation defense in depth)
  - provider validation: unknown -> 400, github -> 201 round-trips
  - bad-signature stream still counts against per-token rate limit

Co-authored-by: multica-agent <github@multica.ai>

* fix(autopilots): address PR review on webhook delivery layer (MUL-2334)

- Exclude `failed` from the (trigger_id, dedupe_key) partial unique index
  alongside `rejected`, so a transient ingress failure does not strand the
  provider's stable X-GitHub-Delivery / Idempotency-Key retry. Update the
  dedupe lookup to prefer non-terminal rows under the same predicate.
- Tighten delivery status enum: drop `skipped` from the CHECK constraint
  and from the handler. A run that was admission-skipped (e.g. runtime
  offline) is now recorded as delivery=`dispatched` linked to the
  skipped run, with the response payload carrying status=`skipped`.
  Source of truth for skipped-ness is autopilot_run.status, not the
  delivery row — keeps the Deliveries UI enum unambiguous.
- On dispatch error, link the (possibly non-nil) autopilot_run returned
  by DispatchAutopilot to the failed delivery so Deliveries UI can
  navigate to the run row for debugging.
- Slim list projection: ListWebhookDeliveriesByAutopilot no longer pulls
  raw_body / selected_headers / response_body — a 100-row page × 256 KiB
  would otherwise round-trip ~25 MiB from Postgres per Deliveries reload.
  Detail endpoint continues to return the full row.
- Fix backend CI: TestGetDelivery_ReturnsFullPayload now decodes the
  response and asserts on the parsed raw_body instead of substring-
  matching against an escaped JSON string; raise the test-suite default
  webhook rate limits in TestMain so the shared 192.0.2.1 IP bucket
  doesn't fill across the suite and leak 429s into unrelated tests.
- Add regression coverage for the dedupe-after-failure path.

cd server && go test ./... is green locally.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-18 14:59:40 +08:00

702 lines
26 KiB
Go

package handler
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ── Setup helpers ───────────────────────────────────────────────────────────
const testSigningSecret = "this-is-a-test-secret-32-chars-x"
func setSigningSecretViaHandler(t *testing.T, apID, triggerID, secret string) {
t.Helper()
w := httptest.NewRecorder()
req := newRequest("PUT", fmt.Sprintf("/api/autopilots/%s/triggers/%s/signing-secret", apID, triggerID), map[string]any{
"signing_secret": secret,
})
req = withURLParams(req, "id", apID, "triggerId", triggerID)
testHandler.SetAutopilotTriggerSigningSecret(w, req)
if w.Code != http.StatusOK {
t.Fatalf("set signing secret: %d body=%s", w.Code, w.Body.String())
}
}
func setTriggerProvider(t *testing.T, triggerID, provider string) {
t.Helper()
if _, err := testPool.Exec(context.Background(),
`UPDATE autopilot_trigger SET provider = $1 WHERE id = $2`, provider, triggerID); err != nil {
t.Fatalf("set provider: %v", err)
}
}
func signBody(secret string, body []byte) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
return "sha256=" + hex.EncodeToString(mac.Sum(nil))
}
// listDeliveries calls ListAutopilotDeliveries and decodes the body.
func listDeliveries(t *testing.T, apID string) []map[string]any {
t.Helper()
w := httptest.NewRecorder()
req := newRequest("GET", "/api/autopilots/"+apID+"/deliveries", nil)
req = withURLParam(req, "id", apID)
testHandler.ListAutopilotDeliveries(w, req)
if w.Code != http.StatusOK {
t.Fatalf("list deliveries: %d body=%s", w.Code, w.Body.String())
}
var resp struct {
Deliveries []map[string]any `json:"deliveries"`
}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
return resp.Deliveries
}
// ── Tests ───────────────────────────────────────────────────────────────────
func TestWebhookHandler_PersistsDeliveryOnAccept(t *testing.T) {
agentID := createWebhookTestAgent(t, "DeliveryPersist Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
w := postWebhook(t, *trig.WebhookToken, map[string]any{"hello": "world"}, nil)
if w.Code != http.StatusOK {
t.Fatalf("status: %d body=%s", w.Code, w.Body.String())
}
var resp map[string]any
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_id"] == nil {
t.Fatal("response should include delivery_id")
}
if resp["status"] != "accepted" {
t.Fatalf("status: %v", resp["status"])
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery, got %d", len(deliveries))
}
d := deliveries[0]
if d["status"] != "dispatched" {
t.Fatalf("delivery status: %v", d["status"])
}
if d["autopilot_run_id"] == nil {
t.Fatal("delivery should link to run")
}
if d["signature_status"] != "not_required" {
t.Fatalf("expected signature_status=not_required, got %v", d["signature_status"])
}
}
func TestWebhookHandler_DedupeViaIdempotencyKey(t *testing.T) {
agentID := createWebhookTestAgent(t, "DeliveryIdem Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
body := map[string]any{"event": "demo.x", "eventPayload": map[string]any{"k": "v"}}
headers := map[string]string{"Idempotency-Key": "demo-key-1"}
w1 := postWebhook(t, *trig.WebhookToken, body, headers)
if w1.Code != http.StatusOK {
t.Fatalf("first: %d body=%s", w1.Code, w1.Body.String())
}
var r1 map[string]any
json.Unmarshal(w1.Body.Bytes(), &r1)
if r1["status"] != "accepted" {
t.Fatalf("first status: %v", r1["status"])
}
firstDeliveryID := r1["delivery_id"].(string)
firstRunID := r1["run_id"].(string)
// Second identical delivery should be a duplicate.
w2 := postWebhook(t, *trig.WebhookToken, body, headers)
if w2.Code != http.StatusOK {
t.Fatalf("second: %d body=%s", w2.Code, w2.Body.String())
}
var r2 map[string]any
json.Unmarshal(w2.Body.Bytes(), &r2)
if r2["status"] != "duplicate" {
t.Fatalf("expected duplicate, got %v body=%s", r2["status"], w2.Body.String())
}
if r2["delivery_id"] != firstDeliveryID {
t.Fatalf("duplicate delivery_id mismatch: %v != %v", r2["delivery_id"], firstDeliveryID)
}
if r2["run_id"] != firstRunID {
t.Fatalf("duplicate run_id mismatch: %v != %v", r2["run_id"], firstRunID)
}
// Only one delivery should exist; attempt_count must be 2.
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery (dedupe), got %d", len(deliveries))
}
if int(deliveries[0]["attempt_count"].(float64)) != 2 {
t.Fatalf("attempt_count: %v", deliveries[0]["attempt_count"])
}
}
func TestWebhookHandler_DedupeViaGitHubDelivery(t *testing.T) {
agentID := createWebhookTestAgent(t, "DeliveryGH Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setTriggerProvider(t, trig.ID, "github")
body := map[string]any{"action": "opened"}
headers := map[string]string{
"X-GitHub-Event": "pull_request",
"X-GitHub-Delivery": "abc-123",
}
w1 := postWebhook(t, *trig.WebhookToken, body, headers)
if w1.Code != http.StatusOK {
t.Fatalf("first: %d", w1.Code)
}
var r1 map[string]any
json.Unmarshal(w1.Body.Bytes(), &r1)
if r1["status"] != "accepted" {
t.Fatalf("first status: %v", r1["status"])
}
w2 := postWebhook(t, *trig.WebhookToken, body, headers)
var r2 map[string]any
json.Unmarshal(w2.Body.Bytes(), &r2)
if r2["status"] != "duplicate" {
t.Fatalf("expected duplicate, got %v", r2["status"])
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery, got %d", len(deliveries))
}
if deliveries[0]["dedupe_source"] != "x-github-delivery" {
t.Fatalf("dedupe_source: %v", deliveries[0]["dedupe_source"])
}
}
func TestWebhookHandler_InvalidSignatureReturns401AndPersistsRejected(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigInvalid Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
body := map[string]any{"hello": "world"}
w := postWebhook(t, *trig.WebhookToken, body, map[string]string{
"X-Hub-Signature-256": "sha256=deadbeef",
})
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d body=%s", w.Code, w.Body.String())
}
var resp map[string]any
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "rejected" {
t.Fatalf("status: %v", resp["status"])
}
if resp["delivery_id"] == nil {
t.Fatal("delivery_id should be present on rejected response")
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery, got %d", len(deliveries))
}
if deliveries[0]["status"] != "rejected" {
t.Fatalf("expected rejected, got %v", deliveries[0]["status"])
}
if deliveries[0]["signature_status"] != "invalid" {
t.Fatalf("expected signature_status=invalid, got %v", deliveries[0]["signature_status"])
}
if deliveries[0]["autopilot_run_id"] != nil {
t.Fatal("rejected delivery must not link to a run")
}
}
func TestWebhookHandler_MissingSignatureReturns401WhenSecretSet(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigMissing Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
w := postWebhook(t, *trig.WebhookToken, map[string]any{"hello": "world"}, nil)
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d body=%s", w.Code, w.Body.String())
}
var resp map[string]any
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["reason"] != "missing_signature" {
t.Fatalf("reason: %v", resp["reason"])
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 || deliveries[0]["signature_status"] != "missing" {
t.Fatalf("delivery missing-signature state: %#v", deliveries)
}
}
func TestWebhookHandler_ValidSignatureDispatches(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigValid Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
bodyBytes := []byte(`{"hello":"world"}`)
sig := signBody(testSigningSecret, bodyBytes)
w := postWebhook(t, *trig.WebhookToken, bodyBytes, map[string]string{
"X-Hub-Signature-256": sig,
})
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String())
}
var resp map[string]any
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "accepted" {
t.Fatalf("status: %v", resp["status"])
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery, got %d", len(deliveries))
}
if deliveries[0]["signature_status"] != "valid" {
t.Fatalf("signature_status: %v", deliveries[0]["signature_status"])
}
}
func TestSigningSecretNotEchoedInTriggerResponse(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigEcho Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
// GET the autopilot — trigger response embedded.
w := httptest.NewRecorder()
req := newRequest("GET", "/api/autopilots/"+apID, nil)
req = withURLParam(req, "id", apID)
testHandler.GetAutopilot(w, req)
if w.Code != http.StatusOK {
t.Fatalf("get autopilot: %d", w.Code)
}
if bytes.Contains(w.Body.Bytes(), []byte(testSigningSecret)) {
t.Fatalf("signing secret leaked in trigger response: %s", w.Body.String())
}
if !bytes.Contains(w.Body.Bytes(), []byte(`"has_signing_secret":true`)) {
t.Fatalf("has_signing_secret should be true: %s", w.Body.String())
}
if !bytes.Contains(w.Body.Bytes(), []byte(`"signing_secret_hint":"`+testSigningSecret[len(testSigningSecret)-4:]+`"`)) {
t.Fatalf("hint should be last 4 chars: %s", w.Body.String())
}
}
func TestSigningSecret_MinLengthEnforced(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigMinLen Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
w := httptest.NewRecorder()
req := newRequest("PUT", "/api/autopilots/"+apID+"/triggers/"+trig.ID+"/signing-secret", map[string]any{
"signing_secret": "short",
})
req = withURLParams(req, "id", apID, "triggerId", trig.ID)
testHandler.SetAutopilotTriggerSigningSecret(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for short secret, got %d body=%s", w.Code, w.Body.String())
}
}
func TestSigningSecret_EmptyClearsSecret(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigClear Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
// Now clear with empty string.
w := httptest.NewRecorder()
req := newRequest("PUT", "/api/autopilots/"+apID+"/triggers/"+trig.ID+"/signing-secret", map[string]any{
"signing_secret": "",
})
req = withURLParams(req, "id", apID, "triggerId", trig.ID)
testHandler.SetAutopilotTriggerSigningSecret(w, req)
if w.Code != http.StatusOK {
t.Fatalf("clear secret: %d body=%s", w.Code, w.Body.String())
}
// Unsigned request should now go through (back to not_required).
post := postWebhook(t, *trig.WebhookToken, map[string]any{"x": 1}, nil)
if post.Code != http.StatusOK {
t.Fatalf("post after clear: %d body=%s", post.Code, post.Body.String())
}
}
func TestReplay_CreatesNewDeliveryAndDispatchesRun(t *testing.T) {
agentID := createWebhookTestAgent(t, "Replay Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
// Original delivery (with dedupe key) → accepted + dispatched.
w := postWebhook(t, *trig.WebhookToken, map[string]any{"hello": "world"}, map[string]string{
"Idempotency-Key": "replay-original",
})
if w.Code != http.StatusOK {
t.Fatalf("original: %d body=%s", w.Code, w.Body.String())
}
var orig map[string]any
json.Unmarshal(w.Body.Bytes(), &orig)
originalID := orig["delivery_id"].(string)
originalRunID := orig["run_id"].(string)
// Replay the original.
wr := httptest.NewRecorder()
req := newRequest("POST", fmt.Sprintf("/api/autopilots/%s/deliveries/%s/replay", apID, originalID), nil)
req = withURLParams(req, "id", apID, "deliveryId", originalID)
testHandler.ReplayAutopilotDelivery(wr, req)
if wr.Code != http.StatusCreated {
t.Fatalf("replay: %d body=%s", wr.Code, wr.Body.String())
}
var replay map[string]any
json.Unmarshal(wr.Body.Bytes(), &replay)
if replay["id"] == originalID {
t.Fatal("replay should create a NEW delivery, not return the original")
}
if replay["replayed_from_delivery_id"] != originalID {
t.Fatalf("replayed_from_delivery_id: %v", replay["replayed_from_delivery_id"])
}
if replay["autopilot_run_id"] == nil {
t.Fatal("replay should dispatch a run")
}
if replay["autopilot_run_id"] == originalRunID {
t.Fatal("replay should produce a NEW run, not reuse the original")
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 2 {
t.Fatalf("expected 2 deliveries (original + replay), got %d", len(deliveries))
}
}
func TestReplay_RejectsInvalidSignatureDelivery(t *testing.T) {
agentID := createWebhookTestAgent(t, "ReplayReject Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
// Send an invalid-signature request → rejected delivery created.
w := postWebhook(t, *trig.WebhookToken, map[string]any{"x": 1}, map[string]string{
"X-Hub-Signature-256": "sha256=baadf00d",
})
if w.Code != http.StatusUnauthorized {
t.Fatalf("setup: expected 401, got %d", w.Code)
}
var rej map[string]any
json.Unmarshal(w.Body.Bytes(), &rej)
rejectedID := rej["delivery_id"].(string)
// Replay the rejected delivery → 400.
wr := httptest.NewRecorder()
req := newRequest("POST", fmt.Sprintf("/api/autopilots/%s/deliveries/%s/replay", apID, rejectedID), nil)
req = withURLParams(req, "id", apID, "deliveryId", rejectedID)
testHandler.ReplayAutopilotDelivery(wr, req)
if wr.Code != http.StatusBadRequest {
t.Fatalf("replay of rejected: expected 400, got %d body=%s", wr.Code, wr.Body.String())
}
}
func TestGetDelivery_ReturnsFullPayload(t *testing.T) {
agentID := createWebhookTestAgent(t, "DeliveryDetail Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
w := postWebhook(t, *trig.WebhookToken, map[string]any{"event": "demo", "eventPayload": map[string]any{"answer": 42}}, nil)
if w.Code != http.StatusOK {
t.Fatalf("seed: %d", w.Code)
}
var seed map[string]any
json.Unmarshal(w.Body.Bytes(), &seed)
deliveryID := seed["delivery_id"].(string)
// List response should NOT include raw_body / selected_headers.
wList := httptest.NewRecorder()
reqList := newRequest("GET", "/api/autopilots/"+apID+"/deliveries", nil)
reqList = withURLParam(reqList, "id", apID)
testHandler.ListAutopilotDeliveries(wList, reqList)
if bytes.Contains(wList.Body.Bytes(), []byte(`"raw_body"`)) {
t.Fatalf("list response should not include raw_body, body=%s", wList.Body.String())
}
// Detail response SHOULD include raw_body and selected_headers.
wDetail := httptest.NewRecorder()
reqDetail := newRequest("GET", "/api/autopilots/"+apID+"/deliveries/"+deliveryID, nil)
reqDetail = withURLParams(reqDetail, "id", apID, "deliveryId", deliveryID)
testHandler.GetAutopilotDelivery(wDetail, reqDetail)
if wDetail.Code != http.StatusOK {
t.Fatalf("detail: %d body=%s", wDetail.Code, wDetail.Body.String())
}
// raw_body is serialised as a JSON string (escaped); decode the response
// and assert against the decoded payload so we don't rely on a brittle
// substring search against the escaped form.
var detail WebhookDeliveryResponse
if err := json.Unmarshal(wDetail.Body.Bytes(), &detail); err != nil {
t.Fatalf("decode detail: %v body=%s", err, wDetail.Body.String())
}
if detail.RawBody == nil {
t.Fatalf("detail should include raw_body: %s", wDetail.Body.String())
}
var raw map[string]any
if err := json.Unmarshal([]byte(*detail.RawBody), &raw); err != nil {
t.Fatalf("raw_body should be valid JSON: %v body=%q", err, *detail.RawBody)
}
payload, ok := raw["eventPayload"].(map[string]any)
if !ok {
t.Fatalf("eventPayload missing or wrong type in raw_body: %#v", raw)
}
if v, ok := payload["answer"].(float64); !ok || v != 42 {
t.Fatalf("raw_body eventPayload.answer should be 42, got %#v", payload["answer"])
}
}
func TestGetDelivery_CrossAutopilotReturns404(t *testing.T) {
// A delivery_id from one autopilot must not be readable via another
// autopilot's URL — defense in depth even though both rows are in the
// same workspace.
agentID := createWebhookTestAgent(t, "CrossAP Agent")
apA := createWebhookTestAutopilot(t, agentID, "active", "run_only")
apB := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apA)
w := postWebhook(t, *trig.WebhookToken, map[string]any{"x": 1}, nil)
var seed map[string]any
json.Unmarshal(w.Body.Bytes(), &seed)
deliveryID := seed["delivery_id"].(string)
// Try reading via the OTHER autopilot's URL.
wDetail := httptest.NewRecorder()
reqDetail := newRequest("GET", "/api/autopilots/"+apB+"/deliveries/"+deliveryID, nil)
reqDetail = withURLParams(reqDetail, "id", apB, "deliveryId", deliveryID)
testHandler.GetAutopilotDelivery(wDetail, reqDetail)
if wDetail.Code != http.StatusNotFound {
t.Fatalf("cross-autopilot GET: expected 404, got %d", wDetail.Code)
}
}
func TestCreateAutopilotTrigger_RejectsUnknownProvider(t *testing.T) {
agentID := createWebhookTestAgent(t, "ProviderInvalid Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots/"+apID+"/triggers", map[string]any{
"kind": "webhook",
"provider": "stripe",
})
req = withURLParam(req, "id", apID)
testHandler.CreateAutopilotTrigger(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for unknown provider, got %d body=%s", w.Code, w.Body.String())
}
}
func TestCreateAutopilotTrigger_AcceptsGitHubProvider(t *testing.T) {
agentID := createWebhookTestAgent(t, "ProviderGH Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots/"+apID+"/triggers", map[string]any{
"kind": "webhook",
"provider": "github",
})
req = withURLParam(req, "id", apID)
testHandler.CreateAutopilotTrigger(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d body=%s", w.Code, w.Body.String())
}
var resp AutopilotTriggerResponse
json.Unmarshal(w.Body.Bytes(), &resp)
if resp.Provider == nil || *resp.Provider != "github" {
t.Fatalf("provider: %v", resp.Provider)
}
}
// run_only autopilots have no issue-title duplicate guard, so dedupe via
// the delivery layer is the only thing keeping a retried provider event
// from re-running the agent. This regression test pins that path
// explicitly — it's the largest concrete win over the v1 ingress flow.
func TestWebhookHandler_RunOnlyDedupeOnGitHubDelivery(t *testing.T) {
agentID := createWebhookTestAgent(t, "RunOnlyDedupe Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setTriggerProvider(t, trig.ID, "github")
headers := map[string]string{
"X-GitHub-Event": "pull_request",
"X-GitHub-Delivery": "pin-redelivery",
}
body := map[string]any{"action": "opened"}
postWebhook(t, *trig.WebhookToken, body, headers)
postWebhook(t, *trig.WebhookToken, body, headers)
postWebhook(t, *trig.WebhookToken, body, headers)
// Count autopilot_run rows linked to this trigger.
rows, err := testHandler.Queries.ListAutopilotRuns(context.Background(), db.ListAutopilotRunsParams{
AutopilotID: parseUUID(apID),
Limit: 50,
Offset: 0,
})
if err != nil {
t.Fatalf("list runs: %v", err)
}
runCount := 0
for _, r := range rows {
if r.TriggerID.Valid && uuidToString(r.TriggerID) == trig.ID {
runCount++
}
}
if runCount != 1 {
t.Fatalf("expected exactly 1 run from 3 retried deliveries, got %d", runCount)
}
}
func TestWebhookHandler_InvalidSignatureCountsAgainstRateLimit(t *testing.T) {
// A stream of bad-signature attempts must not let an attacker bypass
// per-token rate limiting; the limiter increment happens before
// signature check.
agentID := createWebhookTestAgent(t, "SigRateLimit Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
setSigningSecretViaHandler(t, apID, trig.ID, testSigningSecret)
prev := testHandler.WebhookRateLimiter
testHandler.WebhookRateLimiter = NewMemoryWebhookRateLimiter(WebhookRateLimit{Limit: 2, Window: 60_000_000_000})
t.Cleanup(func() { testHandler.WebhookRateLimiter = prev })
bad := map[string]string{"X-Hub-Signature-256": "sha256=baad"}
for i := 0; i < 2; i++ {
w := postWebhook(t, *trig.WebhookToken, map[string]any{"i": i}, bad)
if w.Code != http.StatusUnauthorized {
t.Fatalf("request %d: expected 401, got %d", i, w.Code)
}
}
w := postWebhook(t, *trig.WebhookToken, map[string]any{"i": "third"}, bad)
if w.Code != http.StatusTooManyRequests {
t.Fatalf("third request expected 429 (rate-limited despite bad sig), got %d", w.Code)
}
}
func TestWebhookHandler_IgnoredPathStillPersistsDelivery(t *testing.T) {
// An ignored delivery (paused autopilot) must still leave a row so the
// operator can see "yes the request arrived, here's why we did nothing".
agentID := createWebhookTestAgent(t, "IgnoredPersist Agent")
apID := createWebhookTestAutopilot(t, agentID, "paused", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
w := postWebhook(t, *trig.WebhookToken, map[string]any{"x": 1}, nil)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
deliveries := listDeliveries(t, apID)
if len(deliveries) != 1 {
t.Fatalf("expected 1 delivery on paused autopilot, got %d", len(deliveries))
}
if deliveries[0]["status"] != "ignored" {
t.Fatalf("status: %v", deliveries[0]["status"])
}
}
// A `failed` delivery (e.g. transient dispatch error) must NOT permanently
// dedupe-block the provider's retry of the same event. GitHub keeps
// `X-GitHub-Delivery` stable across retries; if the unique index trapped
// the `failed` row, the second attempt would come back as `duplicate` and
// the event would be lost.
//
// The handler-level failure path is hard to force in tests (most reasons
// route through the admission check and produce a skipped run instead),
// so we exercise the partial unique index directly: insert a `failed`
// row, then a fresh `dispatched` row with the same dedupe_key — the
// index excludes both `rejected` and `failed`, so both INSERTs must
// succeed.
func TestWebhookDelivery_FailedRowDoesNotBlockDedupe(t *testing.T) {
ctx := context.Background()
agentID := createWebhookTestAgent(t, "FailedRetry Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
first, err := testHandler.Queries.CreateWebhookDelivery(ctx, db.CreateWebhookDeliveryParams{
WorkspaceID: parseUUID(testWorkspaceID),
AutopilotID: parseUUID(apID),
TriggerID: parseUUID(trig.ID),
Provider: "github",
Event: "github.pull_request",
SignatureStatus: "not_required",
Status: "failed",
SelectedHeaders: []byte("{}"),
DedupeKey: pgtype.Text{String: "retry-key", Valid: true},
DedupeSource: pgtype.Text{String: "x-github-delivery", Valid: true},
})
if err != nil {
t.Fatalf("insert failed row: %v", err)
}
// Same dedupe_key, status=dispatched. Must succeed: the partial unique
// index excludes both `rejected` and `failed`, so the prior `failed`
// row does not consume the slot.
second, err := testHandler.Queries.CreateWebhookDelivery(ctx, db.CreateWebhookDeliveryParams{
WorkspaceID: parseUUID(testWorkspaceID),
AutopilotID: parseUUID(apID),
TriggerID: parseUUID(trig.ID),
Provider: "github",
Event: "github.pull_request",
SignatureStatus: "not_required",
Status: "dispatched",
SelectedHeaders: []byte("{}"),
DedupeKey: pgtype.Text{String: "retry-key", Valid: true},
DedupeSource: pgtype.Text{String: "x-github-delivery", Valid: true},
})
if err != nil {
t.Fatalf("retry insert blocked by stale failed row: %v", err)
}
if uuidToString(second.ID) == uuidToString(first.ID) {
t.Fatal("retry should produce a fresh row, not reuse the failed one")
}
// And the dedupe lookup MUST prefer the non-terminal (dispatched) row,
// not the stale `failed` one, so a third attempt collapses onto the
// successful delivery rather than the failure.
got, err := testHandler.Queries.GetWebhookDeliveryByTriggerAndDedupe(ctx,
db.GetWebhookDeliveryByTriggerAndDedupeParams{
TriggerID: parseUUID(trig.ID),
DedupeKey: pgtype.Text{String: "retry-key", Valid: true},
})
if err != nil {
t.Fatalf("lookup: %v", err)
}
if got.Status != "dispatched" {
t.Fatalf("lookup should prefer non-terminal row, got status=%q (id=%s)",
got.Status, uuidToString(got.ID))
}
}
// Confirm a column-level write — sqlc params for narg('signing_secret')
// must allow nullable NULL to clear the column, not just non-NULL strings.
func TestSetSigningSecretParams_NullableWrite(t *testing.T) {
agentID := createWebhookTestAgent(t, "SigSqlcNull Agent")
apID := createWebhookTestAutopilot(t, agentID, "active", "run_only")
trig := createWebhookTriggerViaHandler(t, apID)
if _, err := testHandler.Queries.SetAutopilotTriggerSigningSecret(context.Background(),
db.SetAutopilotTriggerSigningSecretParams{
ID: parseUUID(trig.ID),
SigningSecret: pgtype.Text{}, // explicit NULL
}); err != nil {
t.Fatalf("sqlc NULL write: %v", err)
}
}