mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
fix(server): key reviewer-loop dedup on reviewed commit SHA (MUL-4003) (#4873)
The agent-task run-dedup keyed only on (issue_id, agent_id), so a completed/pending verdict for commit A was silently reused to satisfy a review request for a NEWER commit B pushed after A's run began — giving B zero review coverage (nearly shipped an unreviewed commit; sibling of the daemon disposition-loss bug in #4337). Fix (no migration — reuses the existing context JSONB column): - CreateAgentTask stamps the reviewed head_sha into the task's context. - HasPendingTaskForIssueAndAgent(+ExcludingTriggerComment) now key dedup on that head_sha: a pending task only dedups a request carrying the SAME head. If HEAD advanced (or the pending task predates the stamp), dedup MISSES and a fresh review enqueues. Empty head_sha (no linked PR) falls back to the previous (issue_id, agent_id) key, so non-PR issues keep coalescing unchanged. - head_sha resolves from the issue's linked PR via GetIssueReviewHeadSha (prefers open/draft, newest by pr_updated_at); ResolveIssueReviewSHA fails soft to '' so a github-table hiccup can never over-dedup a review out of existence. - Threaded through all six dedup trigger sites (comment @mention + edit preview, issue-status, squad-leader assign, child-done agent + squad). Issue-linked tasks never reach quick-create context parsing, so the key rides harmlessly alongside. Adds DB-backed regression tests pinning: advanced-head misses dedup, repush invalidates dedup, same-SHA still dedups, and no-linked-PR legacy fallback (verified non-vacuous against the pre-fix query). Co-authored-by: Multica Ops <multica-ops@tenanture.com>
This commit is contained in:
@@ -1727,16 +1727,21 @@ func (h *Handler) routeAssignedSquadLeaderFallback(ctx context.Context, issue db
|
||||
}
|
||||
|
||||
func (h *Handler) hasPendingTaskForIssueAndAgent(ctx context.Context, issueID, agentID pgtype.UUID, opts commentTriggerComputeOptions) (bool, error) {
|
||||
// Key dedup on the reviewed head so re-pushing to the PR mid-review
|
||||
// invalidates dedup and a fresh run enqueues against the new HEAD (TEN-356).
|
||||
headSha := h.TaskService.ResolveIssueReviewSHAParam(ctx, issueID)
|
||||
if opts.ExcludeTriggerCommentID.Valid {
|
||||
return h.Queries.HasPendingTaskForIssueAndAgentExcludingTriggerComment(ctx, db.HasPendingTaskForIssueAndAgentExcludingTriggerCommentParams{
|
||||
IssueID: issueID,
|
||||
AgentID: agentID,
|
||||
ExcludeTriggerCommentID: opts.ExcludeTriggerCommentID,
|
||||
HeadSha: headSha,
|
||||
})
|
||||
}
|
||||
return h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: issueID,
|
||||
AgentID: agentID,
|
||||
HeadSha: headSha,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -459,6 +459,8 @@ func (h *Handler) triggerChildDoneAgent(ctx context.Context, parent db.Issue, tr
|
||||
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: parent.ID,
|
||||
AgentID: parent.AssigneeID,
|
||||
// Key dedup on the reviewed head (TEN-356).
|
||||
HeadSha: h.TaskService.ResolveIssueReviewSHAParam(ctx, parent.ID),
|
||||
})
|
||||
if err != nil || hasPending {
|
||||
return
|
||||
@@ -506,6 +508,8 @@ func (h *Handler) triggerChildDoneSquad(ctx context.Context, parent db.Issue, tr
|
||||
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: parent.ID,
|
||||
AgentID: squad.LeaderID,
|
||||
// Key dedup on the reviewed head (TEN-356).
|
||||
HeadSha: h.TaskService.ResolveIssueReviewSHAParam(ctx, parent.ID),
|
||||
})
|
||||
if err != nil || hasPending {
|
||||
return
|
||||
|
||||
@@ -967,6 +967,8 @@ func (h *Handler) enqueueSquadLeaderTask(ctx context.Context, issue db.Issue, tr
|
||||
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: issue.ID,
|
||||
AgentID: squad.LeaderID,
|
||||
// Key dedup on the reviewed head (TEN-356).
|
||||
HeadSha: h.TaskService.ResolveIssueReviewSHAParam(ctx, issue.ID),
|
||||
})
|
||||
if err != nil || hasPending {
|
||||
return false
|
||||
|
||||
@@ -464,6 +464,8 @@ func (s *IssueService) enqueueSquadLeaderTask(ctx context.Context, issue db.Issu
|
||||
hasPending, err := s.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: issue.ID,
|
||||
AgentID: squad.LeaderID,
|
||||
// Key dedup on the reviewed head (TEN-356).
|
||||
HeadSha: headShaText(s.TaskService.ResolveIssueReviewSHA(ctx, issue.ID)),
|
||||
})
|
||||
if err != nil || hasPending {
|
||||
return
|
||||
|
||||
@@ -165,6 +165,9 @@ func (s *IssueService) hasPendingRun(ctx context.Context, issueID, agentID pgtyp
|
||||
pending, err := s.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: issueID,
|
||||
AgentID: agentID,
|
||||
// Key dedup on the reviewed head so a pending run against an old HEAD
|
||||
// does not shadow a request after HEAD advanced (TEN-356).
|
||||
HeadSha: headShaText(s.TaskService.ResolveIssueReviewSHA(ctx, issueID)),
|
||||
})
|
||||
if err != nil {
|
||||
return true
|
||||
|
||||
@@ -453,6 +453,46 @@ func (s *TaskService) EnqueueTaskForIssueWithHandoff(ctx context.Context, issue
|
||||
// daemon claim handler skips the (agent_id, issue_id) resume lookup — the
|
||||
// user already judged the prior output bad, a fresh agent session is the
|
||||
// expected behavior.
|
||||
// ResolveIssueReviewSHA returns the head SHA of the commit currently under
|
||||
// review for an issue (the head_sha of its most-relevant linked PR), or the
|
||||
// empty string when the issue has no linked PR. Callers thread this into both
|
||||
// the reviewer-loop dedup check and the enqueue path so a pending review task
|
||||
// pinned to an old head does not satisfy a request after HEAD advanced
|
||||
// (TEN-356). Empty string is the safe default: it makes dedup fall back to the
|
||||
// pre-TEN-356 (issue_id, agent_id) key and leaves the task's context NULL.
|
||||
//
|
||||
// The lookup fails soft — any DB error (including "no linked PR") returns "" so
|
||||
// a transient github-table hiccup can never over-dedup a review out of
|
||||
// existence; the worst case is the pre-TEN-356 coalescing behavior.
|
||||
func (s *TaskService) ResolveIssueReviewSHA(ctx context.Context, issueID pgtype.UUID) string {
|
||||
if !issueID.Valid {
|
||||
return ""
|
||||
}
|
||||
sha, err := s.Queries.GetIssueReviewHeadSha(ctx, issueID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pgx.ErrNoRows) {
|
||||
slog.Warn("resolve issue review sha failed",
|
||||
"issue_id", util.UUIDToString(issueID), "error", err)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
return sha
|
||||
}
|
||||
|
||||
// headShaText wraps a resolved review SHA into the pgtype.Text the dedup/enqueue
|
||||
// queries expect. Empty SHA marshals to an invalid (NULL) Text so the queries
|
||||
// take their fall-back branch.
|
||||
func headShaText(sha string) pgtype.Text {
|
||||
return pgtype.Text{String: sha, Valid: sha != ""}
|
||||
}
|
||||
|
||||
// ResolveIssueReviewSHAParam is ResolveIssueReviewSHA wrapped as the pgtype.Text
|
||||
// the dedup queries take, so both service- and handler-package call sites can
|
||||
// key dedup on the reviewed head with a single call (TEN-356).
|
||||
func (s *TaskService) ResolveIssueReviewSHAParam(ctx context.Context, issueID pgtype.UUID) pgtype.Text {
|
||||
return headShaText(s.ResolveIssueReviewSHA(ctx, issueID))
|
||||
}
|
||||
|
||||
func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool, handoffNote string) (db.AgentTaskQueue, error) {
|
||||
if !issue.AssigneeID.Valid {
|
||||
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
|
||||
@@ -482,6 +522,9 @@ func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, trig
|
||||
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
|
||||
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
|
||||
HandoffNote: pgtype.Text{String: handoffNote, Valid: handoffNote != ""},
|
||||
// Stamp the reviewed head so dedup can distinguish this run's target
|
||||
// from a later request against a new HEAD (TEN-356).
|
||||
HeadSha: headShaText(s.ResolveIssueReviewSHA(ctx, issue.ID)),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
|
||||
@@ -566,6 +609,9 @@ func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, ag
|
||||
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
|
||||
HandoffNote: pgtype.Text{String: handoffNote, Valid: handoffNote != ""},
|
||||
SquadID: squadID,
|
||||
// Stamp the reviewed head so dedup can distinguish this run's target
|
||||
// from a later request against a new HEAD (TEN-356).
|
||||
HeadSha: headShaText(s.ResolveIssueReviewSHA(ctx, issue.ID)),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
|
||||
|
||||
283
server/internal/service/task_dedup_head_sha_test.go
Normal file
283
server/internal/service/task_dedup_head_sha_test.go
Normal file
@@ -0,0 +1,283 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// TEN-356 regression: the reviewer-loop dedup keyed only on (issue_id,
|
||||
// agent_id), so a completed/pending verdict for commit A was silently reused to
|
||||
// satisfy a review request for a NEWER commit B pushed after A's run began —
|
||||
// giving B zero review coverage. The fix stamps the reviewed head SHA into the
|
||||
// task's context JSONB and keys HasPendingTaskForIssueAndAgent on it. These
|
||||
// tests pin the three behaviors at the query layer, where the dedup decision
|
||||
// actually lives.
|
||||
|
||||
func newHeadShaDedupPool(t *testing.T) *pgxpool.Pool {
|
||||
t.Helper()
|
||||
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pool, err := pgxpool.New(ctx, dbURL)
|
||||
if err != nil {
|
||||
t.Skipf("database unavailable: %v", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
t.Skipf("database unreachable: %v", err)
|
||||
}
|
||||
t.Cleanup(pool.Close)
|
||||
return pool
|
||||
}
|
||||
|
||||
// headShaDedupFixture builds a workspace / runtime / agent / issue, plus an
|
||||
// optional linked PR, and returns the identifiers the dedup tests need. When
|
||||
// prHeadSha is non-empty a github_pull_request row is created and linked to the
|
||||
// issue so ResolveIssueReviewSHA has a head to find.
|
||||
type headShaDedupFixture struct {
|
||||
agentID pgtype.UUID
|
||||
runtimeID pgtype.UUID
|
||||
issueID pgtype.UUID
|
||||
}
|
||||
|
||||
func createHeadShaDedupFixture(t *testing.T, ctx context.Context, pool *pgxpool.Pool, prHeadSha, prState string) headShaDedupFixture {
|
||||
t.Helper()
|
||||
|
||||
suffix := time.Now().UnixNano()
|
||||
email := fmt.Sprintf("head-sha-dedup-%d@multica.ai", suffix)
|
||||
slug := fmt.Sprintf("head-sha-dedup-%d", suffix)
|
||||
|
||||
var userID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id
|
||||
`, "Head SHA Dedup Test", email).Scan(&userID); err != nil {
|
||||
t.Fatalf("create user: %v", err)
|
||||
}
|
||||
|
||||
var workspaceID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4) RETURNING id
|
||||
`, "Head SHA Dedup Test", slug, "temporary TEN-356 dedup test workspace", "HSD").Scan(&workspaceID); err != nil {
|
||||
t.Fatalf("create workspace: %v", err)
|
||||
}
|
||||
if _, err := pool.Exec(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner')
|
||||
`, workspaceID, userID); err != nil {
|
||||
t.Fatalf("create member: %v", err)
|
||||
}
|
||||
|
||||
var runtimeID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider,
|
||||
status, device_info, metadata, last_seen_at, visibility, owner_id
|
||||
)
|
||||
VALUES ($1, NULL, $2, 'cloud', 'head_sha_dedup_test', 'online', 'test runtime', '{}'::jsonb, now(), 'private', $3)
|
||||
RETURNING id
|
||||
`, workspaceID, "Head SHA Dedup Runtime", userID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("create runtime: %v", err)
|
||||
}
|
||||
|
||||
var agentID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id
|
||||
)
|
||||
VALUES ($1, $2, '', 'cloud', '{}'::jsonb, $3, 'private', 1, $4)
|
||||
RETURNING id
|
||||
`, workspaceID, "Head SHA Dedup Agent", runtimeID, userID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("create agent: %v", err)
|
||||
}
|
||||
|
||||
var issueID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_id, creator_type, number, position)
|
||||
VALUES ($1, $2, 'in_review', 'none', $3, 'member', $4, 0)
|
||||
RETURNING id
|
||||
`, workspaceID, "head sha dedup issue", userID, 970000+int(suffix%1000)).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
|
||||
if prHeadSha != "" {
|
||||
state := prState
|
||||
if state == "" {
|
||||
state = "open"
|
||||
}
|
||||
var prID string
|
||||
if err := pool.QueryRow(ctx, `
|
||||
INSERT INTO github_pull_request (
|
||||
workspace_id, installation_id, repo_owner, repo_name, pr_number,
|
||||
title, state, html_url, pr_created_at, pr_updated_at, head_sha
|
||||
)
|
||||
VALUES ($1, 1, 'multica-ai', 'multica', $2, 'review PR', $3,
|
||||
'https://example.test/pr', now(), now(), $4)
|
||||
RETURNING id
|
||||
`, workspaceID, 4000+int(suffix%1000), state, prHeadSha).Scan(&prID); err != nil {
|
||||
t.Fatalf("create pull request: %v", err)
|
||||
}
|
||||
if _, err := pool.Exec(ctx, `
|
||||
INSERT INTO issue_pull_request (issue_id, pull_request_id) VALUES ($1, $2)
|
||||
`, issueID, prID); err != nil {
|
||||
t.Fatalf("link pull request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
c := context.Background()
|
||||
pool.Exec(c, `DELETE FROM agent_task_queue WHERE agent_id = $1`, agentID)
|
||||
pool.Exec(c, `DELETE FROM issue_pull_request WHERE issue_id = $1`, issueID)
|
||||
pool.Exec(c, `DELETE FROM github_pull_request WHERE workspace_id = $1`, workspaceID)
|
||||
pool.Exec(c, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
pool.Exec(c, `DELETE FROM agent WHERE id = $1`, agentID)
|
||||
pool.Exec(c, `DELETE FROM agent_runtime WHERE id = $1`, runtimeID)
|
||||
pool.Exec(c, `DELETE FROM member WHERE workspace_id = $1`, workspaceID)
|
||||
pool.Exec(c, `DELETE FROM workspace WHERE id = $1`, workspaceID)
|
||||
pool.Exec(c, `DELETE FROM "user" WHERE id = $1`, userID)
|
||||
})
|
||||
|
||||
return headShaDedupFixture{
|
||||
agentID: util.MustParseUUID(agentID),
|
||||
runtimeID: util.MustParseUUID(runtimeID),
|
||||
issueID: util.MustParseUUID(issueID),
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueReviewTask inserts a queued task carrying headSha in its context JSONB,
|
||||
// mirroring what CreateAgentTask does at enqueue time.
|
||||
func enqueueReviewTask(t *testing.T, ctx context.Context, q *db.Queries, fx headShaDedupFixture, headSha string) {
|
||||
t.Helper()
|
||||
if _, err := q.CreateAgentTask(ctx, db.CreateAgentTaskParams{
|
||||
AgentID: fx.agentID,
|
||||
RuntimeID: fx.runtimeID,
|
||||
IssueID: fx.issueID,
|
||||
Priority: 0,
|
||||
HeadSha: pgtype.Text{String: headSha, Valid: headSha != ""},
|
||||
}); err != nil {
|
||||
t.Fatalf("create review task (head_sha=%q): %v", headSha, err)
|
||||
}
|
||||
}
|
||||
|
||||
func hasPending(t *testing.T, ctx context.Context, q *db.Queries, fx headShaDedupFixture, headSha string) bool {
|
||||
t.Helper()
|
||||
got, err := q.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: fx.issueID,
|
||||
AgentID: fx.agentID,
|
||||
HeadSha: pgtype.Text{String: headSha, Valid: headSha != ""},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("HasPendingTaskForIssueAndAgent(head_sha=%q): %v", headSha, err)
|
||||
}
|
||||
return got
|
||||
}
|
||||
|
||||
const (
|
||||
shaA = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
shaB = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
|
||||
)
|
||||
|
||||
// Behavior 1: a pending task for SHA A does NOT satisfy a request when HEAD has
|
||||
// advanced to SHA B — dedup MISSES so a fresh review can enqueue against B.
|
||||
func TestHeadShaDedup_AdvancedHeadMissesDedup(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pool := newHeadShaDedupPool(t)
|
||||
q := db.New(pool)
|
||||
fx := createHeadShaDedupFixture(t, ctx, pool, shaB, "open")
|
||||
|
||||
// A run began against A and is still pending.
|
||||
enqueueReviewTask(t, ctx, q, fx, shaA)
|
||||
|
||||
// A request for the new HEAD (B) must NOT dedup — the pending task is for A.
|
||||
if hasPending(t, ctx, q, fx, shaB) {
|
||||
t.Fatalf("dedup HIT for SHA B while only a SHA A task is pending — B would get zero review coverage (the TEN-356 bug)")
|
||||
}
|
||||
}
|
||||
|
||||
// Behavior 2: re-pushing to a branch mid-review invalidates dedup. The resolver
|
||||
// now returns the PR's new head, and a request for that head misses the old
|
||||
// task — the platform-level equivalent of "fresh run against new HEAD".
|
||||
func TestHeadShaDedup_RepushInvalidatesDedup(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pool := newHeadShaDedupPool(t)
|
||||
q := db.New(pool)
|
||||
// PR head starts at A.
|
||||
fx := createHeadShaDedupFixture(t, ctx, pool, shaA, "open")
|
||||
|
||||
svc := NewTaskService(q, pool, nil, events.New())
|
||||
if got := svc.ResolveIssueReviewSHA(ctx, fx.issueID); got != shaA {
|
||||
t.Fatalf("ResolveIssueReviewSHA before repush = %q, want %q", got, shaA)
|
||||
}
|
||||
enqueueReviewTask(t, ctx, q, fx, shaA)
|
||||
if !hasPending(t, ctx, q, fx, shaA) {
|
||||
t.Fatalf("same-SHA request must dedup against the pending SHA A task")
|
||||
}
|
||||
|
||||
// Simulate a mid-review force-push: PR head advances to B.
|
||||
if _, err := pool.Exec(ctx, `
|
||||
UPDATE github_pull_request SET head_sha = $1, pr_updated_at = now()
|
||||
WHERE workspace_id = (SELECT workspace_id FROM issue WHERE id = $2)
|
||||
`, shaB, util.UUIDToString(fx.issueID)); err != nil {
|
||||
t.Fatalf("advance PR head: %v", err)
|
||||
}
|
||||
|
||||
// The resolver now reports B, and a dedup check for B misses the A task.
|
||||
if got := svc.ResolveIssueReviewSHA(ctx, fx.issueID); got != shaB {
|
||||
t.Fatalf("ResolveIssueReviewSHA after repush = %q, want %q", got, shaB)
|
||||
}
|
||||
if hasPending(t, ctx, q, fx, shaB) {
|
||||
t.Fatalf("dedup HIT for new HEAD B after repush — a fresh review would be suppressed")
|
||||
}
|
||||
}
|
||||
|
||||
// Behavior 3: same-SHA re-requests still dedup, so an unchanged HEAD does not
|
||||
// spawn wasteful duplicate reviewer runs.
|
||||
func TestHeadShaDedup_SameShaStillDedups(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pool := newHeadShaDedupPool(t)
|
||||
q := db.New(pool)
|
||||
fx := createHeadShaDedupFixture(t, ctx, pool, shaA, "open")
|
||||
|
||||
enqueueReviewTask(t, ctx, q, fx, shaA)
|
||||
|
||||
if !hasPending(t, ctx, q, fx, shaA) {
|
||||
t.Fatalf("dedup MISS for the same SHA A — an unchanged HEAD should coalesce, not re-run")
|
||||
}
|
||||
}
|
||||
|
||||
// Behavior 4 (fall-back safety): an issue with no linked PR has no review SHA,
|
||||
// so dedup falls back to the pre-TEN-356 (issue_id, agent_id) key and keeps
|
||||
// coalescing exactly as before.
|
||||
func TestHeadShaDedup_NoLinkedPRFallsBackToLegacyKey(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pool := newHeadShaDedupPool(t)
|
||||
q := db.New(pool)
|
||||
fx := createHeadShaDedupFixture(t, ctx, pool, "", "")
|
||||
|
||||
svc := NewTaskService(q, pool, nil, events.New())
|
||||
if got := svc.ResolveIssueReviewSHA(ctx, fx.issueID); got != "" {
|
||||
t.Fatalf("ResolveIssueReviewSHA with no linked PR = %q, want empty", got)
|
||||
}
|
||||
|
||||
// Enqueue with no head_sha (context NULL), then a legacy (empty-SHA) check
|
||||
// must still dedup.
|
||||
enqueueReviewTask(t, ctx, q, fx, "")
|
||||
if !hasPending(t, ctx, q, fx, "") {
|
||||
t.Fatalf("no-PR issue must dedup on (issue_id, agent_id) like pre-TEN-356")
|
||||
}
|
||||
}
|
||||
@@ -1029,7 +1029,7 @@ const createAgentTask = `-- name: CreateAgentTask :one
|
||||
INSERT INTO agent_task_queue (
|
||||
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
||||
trigger_summary, force_fresh_session, is_leader_task, handoff_note,
|
||||
squad_id
|
||||
squad_id, context
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, 'queued', $4, $5,
|
||||
@@ -1037,7 +1037,12 @@ VALUES (
|
||||
COALESCE($7::boolean, FALSE),
|
||||
COALESCE($8::boolean, FALSE),
|
||||
$9,
|
||||
$10
|
||||
$10,
|
||||
CASE
|
||||
WHEN COALESCE($11::text, '') <> ''
|
||||
THEN jsonb_build_object('head_sha', $11::text)
|
||||
ELSE NULL
|
||||
END
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, escalation_for_task_id, fire_at
|
||||
`
|
||||
@@ -1053,8 +1058,16 @@ type CreateAgentTaskParams struct {
|
||||
IsLeaderTask pgtype.Bool `json:"is_leader_task"`
|
||||
HandoffNote pgtype.Text `json:"handoff_note"`
|
||||
SquadID pgtype.UUID `json:"squad_id"`
|
||||
HeadSha pgtype.Text `json:"head_sha"`
|
||||
}
|
||||
|
||||
// head_sha stamps the commit under review into the task's context JSONB so the
|
||||
// reviewer-loop dedup (HasPendingTaskForIssueAndAgent) can tell a pending run
|
||||
// against an OLD head apart from a fresh request against a NEW head (TEN-356).
|
||||
// Empty/absent head_sha leaves context NULL, preserving pre-TEN-356 behavior for
|
||||
// issues with no linked PR. Issue-linked tasks never hit quick-create context
|
||||
// parsing (parseQuickCreateContext short-circuits on IssueID.Valid), so this
|
||||
// key rides harmlessly alongside.
|
||||
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
|
||||
row := q.db.QueryRow(ctx, createAgentTask,
|
||||
arg.AgentID,
|
||||
@@ -1067,6 +1080,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
arg.IsLeaderTask,
|
||||
arg.HandoffNote,
|
||||
arg.SquadID,
|
||||
arg.HeadSha,
|
||||
)
|
||||
var i AgentTaskQueue
|
||||
err := row.Scan(
|
||||
@@ -2110,17 +2124,30 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI
|
||||
const hasPendingTaskForIssueAndAgent = `-- name: HasPendingTaskForIssueAndAgent :one
|
||||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
|
||||
AND (
|
||||
COALESCE($3::text, '') = ''
|
||||
OR context->>'head_sha' = $3::text
|
||||
)
|
||||
`
|
||||
|
||||
type HasPendingTaskForIssueAndAgentParams struct {
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
HeadSha pgtype.Text `json:"head_sha"`
|
||||
}
|
||||
|
||||
// Returns true if a specific agent already has a queued or dispatched task
|
||||
// for the given issue. Used by @mention trigger dedup.
|
||||
//
|
||||
// head_sha keys the dedup on the commit under review (TEN-356): when a caller
|
||||
// passes a non-empty head_sha, a pending task only dedups if it was stamped
|
||||
// with the SAME head_sha at enqueue time. If HEAD advanced since the pending
|
||||
// task's run began (its context head_sha differs, or predates the stamp and is
|
||||
// NULL), the dedup MISSES and a fresh review enqueues against the new HEAD.
|
||||
// When head_sha is empty/NULL (issue has no linked PR) the check falls back to
|
||||
// the pre-TEN-356 (issue_id, agent_id) key so non-PR issues keep coalescing.
|
||||
func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPendingTaskForIssueAndAgentParams) (bool, error) {
|
||||
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID)
|
||||
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID, arg.HeadSha)
|
||||
var has_pending bool
|
||||
err := row.Scan(&has_pending)
|
||||
return has_pending, err
|
||||
@@ -2132,19 +2159,30 @@ WHERE issue_id = $1
|
||||
AND agent_id = $2
|
||||
AND status IN ('queued', 'dispatched')
|
||||
AND trigger_comment_id IS DISTINCT FROM $3::uuid
|
||||
AND (
|
||||
COALESCE($4::text, '') = ''
|
||||
OR context->>'head_sha' = $4::text
|
||||
)
|
||||
`
|
||||
|
||||
type HasPendingTaskForIssueAndAgentExcludingTriggerCommentParams struct {
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ExcludeTriggerCommentID pgtype.UUID `json:"exclude_trigger_comment_id"`
|
||||
HeadSha pgtype.Text `json:"head_sha"`
|
||||
}
|
||||
|
||||
// Same as HasPendingTaskForIssueAndAgent, but ignores tasks triggered by the
|
||||
// current comment being edited. Edit preview needs this because save cancels
|
||||
// that comment's old queued/dispatched tasks before re-computing triggers.
|
||||
// Carries the same head_sha dedup key as HasPendingTaskForIssueAndAgent (TEN-356).
|
||||
func (q *Queries) HasPendingTaskForIssueAndAgentExcludingTriggerComment(ctx context.Context, arg HasPendingTaskForIssueAndAgentExcludingTriggerCommentParams) (bool, error) {
|
||||
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgentExcludingTriggerComment, arg.IssueID, arg.AgentID, arg.ExcludeTriggerCommentID)
|
||||
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgentExcludingTriggerComment,
|
||||
arg.IssueID,
|
||||
arg.AgentID,
|
||||
arg.ExcludeTriggerCommentID,
|
||||
arg.HeadSha,
|
||||
)
|
||||
var has_pending bool
|
||||
err := row.Scan(&has_pending)
|
||||
return has_pending, err
|
||||
|
||||
@@ -275,6 +275,31 @@ func (q *Queries) GetIssuePullRequestCloseAggregate(ctx context.Context, issueID
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getIssueReviewHeadSha = `-- name: GetIssueReviewHeadSha :one
|
||||
SELECT pr.head_sha
|
||||
FROM github_pull_request pr
|
||||
JOIN issue_pull_request ipr ON ipr.pull_request_id = pr.id
|
||||
WHERE ipr.issue_id = $1 AND pr.head_sha <> ''
|
||||
ORDER BY (pr.state IN ('open', 'draft')) DESC, pr.pr_updated_at DESC
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
// Returns the head SHA of the commit currently "under review" for an issue:
|
||||
// the most-recently-updated linked PR that still has an open/draft state and a
|
||||
// non-empty head_sha. Used by the reviewer-loop dedup (TEN-356) so a pending
|
||||
// review task pinned to an old head does not satisfy a request after HEAD
|
||||
// advanced. Prefers in-flight PRs (open/draft) over merged/closed ones so a
|
||||
// stale merged sibling can't shadow the live review target; falls back to the
|
||||
// newest linked PR with a head_sha when none are open. Returns no rows (empty
|
||||
// string) when the issue has no linked PR — callers treat that as "no SHA key"
|
||||
// and dedup on (issue_id, agent_id) alone, preserving pre-TEN-356 behavior.
|
||||
func (q *Queries) GetIssueReviewHeadSha(ctx context.Context, issueID pgtype.UUID) (string, error) {
|
||||
row := q.db.QueryRow(ctx, getIssueReviewHeadSha, issueID)
|
||||
var head_sha string
|
||||
err := row.Scan(&head_sha)
|
||||
return head_sha, err
|
||||
}
|
||||
|
||||
const getPendingGitHubInstallation = `-- name: GetPendingGitHubInstallation :one
|
||||
SELECT installation_id, account_login, account_type, account_avatar_url, received_at, updated_at FROM github_pending_installation WHERE installation_id = $1
|
||||
`
|
||||
|
||||
@@ -134,10 +134,17 @@ WHERE agent_id = $1
|
||||
ORDER BY created_at DESC;
|
||||
|
||||
-- name: CreateAgentTask :one
|
||||
-- head_sha stamps the commit under review into the task's context JSONB so the
|
||||
-- reviewer-loop dedup (HasPendingTaskForIssueAndAgent) can tell a pending run
|
||||
-- against an OLD head apart from a fresh request against a NEW head (TEN-356).
|
||||
-- Empty/absent head_sha leaves context NULL, preserving pre-TEN-356 behavior for
|
||||
-- issues with no linked PR. Issue-linked tasks never hit quick-create context
|
||||
-- parsing (parseQuickCreateContext short-circuits on IssueID.Valid), so this
|
||||
-- key rides harmlessly alongside.
|
||||
INSERT INTO agent_task_queue (
|
||||
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
||||
trigger_summary, force_fresh_session, is_leader_task, handoff_note,
|
||||
squad_id
|
||||
squad_id, context
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id),
|
||||
@@ -145,7 +152,12 @@ VALUES (
|
||||
COALESCE(sqlc.narg('force_fresh_session')::boolean, FALSE),
|
||||
COALESCE(sqlc.narg('is_leader_task')::boolean, FALSE),
|
||||
sqlc.narg(handoff_note),
|
||||
sqlc.narg(squad_id)
|
||||
sqlc.narg(squad_id),
|
||||
CASE
|
||||
WHEN COALESCE(sqlc.narg('head_sha')::text, '') <> ''
|
||||
THEN jsonb_build_object('head_sha', sqlc.narg('head_sha')::text)
|
||||
ELSE NULL
|
||||
END
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
@@ -607,18 +619,35 @@ WHERE issue_id = $1 AND status IN ('queued', 'dispatched');
|
||||
-- name: HasPendingTaskForIssueAndAgent :one
|
||||
-- Returns true if a specific agent already has a queued or dispatched task
|
||||
-- for the given issue. Used by @mention trigger dedup.
|
||||
--
|
||||
-- head_sha keys the dedup on the commit under review (TEN-356): when a caller
|
||||
-- passes a non-empty head_sha, a pending task only dedups if it was stamped
|
||||
-- with the SAME head_sha at enqueue time. If HEAD advanced since the pending
|
||||
-- task's run began (its context head_sha differs, or predates the stamp and is
|
||||
-- NULL), the dedup MISSES and a fresh review enqueues against the new HEAD.
|
||||
-- When head_sha is empty/NULL (issue has no linked PR) the check falls back to
|
||||
-- the pre-TEN-356 (issue_id, agent_id) key so non-PR issues keep coalescing.
|
||||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched');
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
|
||||
AND (
|
||||
COALESCE(sqlc.narg('head_sha')::text, '') = ''
|
||||
OR context->>'head_sha' = sqlc.narg('head_sha')::text
|
||||
);
|
||||
|
||||
-- name: HasPendingTaskForIssueAndAgentExcludingTriggerComment :one
|
||||
-- Same as HasPendingTaskForIssueAndAgent, but ignores tasks triggered by the
|
||||
-- current comment being edited. Edit preview needs this because save cancels
|
||||
-- that comment's old queued/dispatched tasks before re-computing triggers.
|
||||
-- Carries the same head_sha dedup key as HasPendingTaskForIssueAndAgent (TEN-356).
|
||||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = @issue_id
|
||||
AND agent_id = @agent_id
|
||||
AND status IN ('queued', 'dispatched')
|
||||
AND trigger_comment_id IS DISTINCT FROM @exclude_trigger_comment_id::uuid;
|
||||
AND trigger_comment_id IS DISTINCT FROM @exclude_trigger_comment_id::uuid
|
||||
AND (
|
||||
COALESCE(sqlc.narg('head_sha')::text, '') = ''
|
||||
OR context->>'head_sha' = sqlc.narg('head_sha')::text
|
||||
);
|
||||
|
||||
-- name: GetLatestTaskIsLeaderForIssueAndAgent :one
|
||||
-- Returns the is_leader_task flag of the agent's most recent task on this
|
||||
|
||||
@@ -185,6 +185,23 @@ LEFT JOIN checks c ON c.pr_id = pr.id
|
||||
WHERE ipr.issue_id = sqlc.arg('issue_id')
|
||||
ORDER BY pr.pr_created_at DESC;
|
||||
|
||||
-- name: GetIssueReviewHeadSha :one
|
||||
-- Returns the head SHA of the commit currently "under review" for an issue:
|
||||
-- the most-recently-updated linked PR that still has an open/draft state and a
|
||||
-- non-empty head_sha. Used by the reviewer-loop dedup (TEN-356) so a pending
|
||||
-- review task pinned to an old head does not satisfy a request after HEAD
|
||||
-- advanced. Prefers in-flight PRs (open/draft) over merged/closed ones so a
|
||||
-- stale merged sibling can't shadow the live review target; falls back to the
|
||||
-- newest linked PR with a head_sha when none are open. Returns no rows (empty
|
||||
-- string) when the issue has no linked PR — callers treat that as "no SHA key"
|
||||
-- and dedup on (issue_id, agent_id) alone, preserving pre-TEN-356 behavior.
|
||||
SELECT pr.head_sha
|
||||
FROM github_pull_request pr
|
||||
JOIN issue_pull_request ipr ON ipr.pull_request_id = pr.id
|
||||
WHERE ipr.issue_id = $1 AND pr.head_sha <> ''
|
||||
ORDER BY (pr.state IN ('open', 'draft')) DESC, pr.pr_updated_at DESC
|
||||
LIMIT 1;
|
||||
|
||||
-- name: ListIssueIDsForPullRequest :many
|
||||
SELECT issue_id FROM issue_pull_request
|
||||
WHERE pull_request_id = $1;
|
||||
|
||||
Reference in New Issue
Block a user