From b1345685a38cd1c99529ab657a911c30318adaad Mon Sep 17 00:00:00 2001 From: Bohan Jiang <52446949+Bohan-J@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:17:53 +0800 Subject: [PATCH] fix(task): rerun starts a fresh session, skip poisoned resume (#1928) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(task): rerun starts a fresh session, skip poisoned resume When a task ended in a known agent fallback ("I reached the iteration limit and couldn't generate a summary.", "Put your final update inside the content string. Keep it concise.") the (agent_id, issue_id) resume lookup would still pick that session, so a manual rerun inherited the poisoned state and reproduced the same bad output. Two complementary guards: 1. Daemon classifies poisoned terminal output and routes it through the blocked path with failure_reason set ('iteration_limit' / 'agent_fallback_message'). GetLastTaskSession excludes failed tasks with those reasons, so even comment-triggered tasks no longer resume them. Tasks that failed mid-flight (timeout, runtime_recovery, etc.) are still resumable, preserving MUL-1128's auto-retry contract. 2. Manual rerun marks the new task force_fresh_session=true. The daemon claim handler skips the resume lookup entirely when the flag is set, capturing the user-intent signal that "the prior output was bad" even when poisoned classification misses a future fallback wording. Auto-retry of orphaned mid-flight failures (MaybeRetryFailedTask → CreateRetryTask) does not take this path, so it keeps resuming. Tests: classifyPoisonedOutput unit test; integration tests assert the SQL filter excludes poisoned classifiers, RerunIssue flips the flag, and the normal enqueue path leaves it false. Co-authored-by: multica-agent * fix(daemon): cap poisoned-output matcher to short trimmed text GPT-Boy review on MUL-1630: the previous strings.Contains match would classify any output that quoted the marker substring — including a review/analysis that simply discussed the marker itself. Real fallback messages are short single-sentence affairs, so cap the candidate at ~one paragraph and trim whitespace before matching. Adds regression tests covering a long quoting review and a marker buried in a long real conclusion; both must stay classified as completed. Co-authored-by: multica-agent * fix(migrations): rename 065 force_fresh_session → 066 to clear collision main introduced 065_project_resources after this branch was cut, so both files shared the 065_ prefix. The readiness check (server/cmd/server/health.go → migrations.LatestVersion) takes the last entry by lexical order, which is 065_project_resources, leaving this branch's 065_force_fresh_session unguarded — a deploy that applied project_resources but not force_fresh_session would still report ready, and the next enqueue / rerun / claim would crash on "column force_fresh_session does not exist". Renaming to 066_force_fresh_session puts it strictly after project_resources so readiness blocks until it's applied. Co-authored-by: multica-agent --------- Co-authored-by: multica-agent --- server/cmd/server/rerun_session_test.go | 193 ++++++++++++++++++ server/internal/daemon/daemon.go | 21 ++ server/internal/daemon/poisoned.go | 57 ++++++ server/internal/daemon/poisoned_test.go | 86 ++++++++ server/internal/daemon/types.go | 2 +- server/internal/handler/daemon.go | 21 +- server/internal/service/task.go | 52 +++-- .../066_force_fresh_session.down.sql | 1 + .../migrations/066_force_fresh_session.up.sql | 8 + server/pkg/db/generated/agent.sql.go | 99 ++++++--- server/pkg/db/generated/autopilot.sql.go | 3 +- server/pkg/db/generated/chat.sql.go | 3 +- server/pkg/db/generated/models.go | 49 ++--- server/pkg/db/generated/runtime.sql.go | 3 +- server/pkg/db/queries/agent.sql | 21 +- 15 files changed, 534 insertions(+), 85 deletions(-) create mode 100644 server/cmd/server/rerun_session_test.go create mode 100644 server/internal/daemon/poisoned.go create mode 100644 server/internal/daemon/poisoned_test.go create mode 100644 server/migrations/066_force_fresh_session.down.sql create mode 100644 server/migrations/066_force_fresh_session.up.sql diff --git a/server/cmd/server/rerun_session_test.go b/server/cmd/server/rerun_session_test.go new file mode 100644 index 000000000..8a8075c2f --- /dev/null +++ b/server/cmd/server/rerun_session_test.go @@ -0,0 +1,193 @@ +package main + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5/pgtype" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/realtime" + "github.com/multica-ai/multica/server/internal/service" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// setupRerunTestFixture creates an issue assigned to the integration test +// agent and returns (issueID, agentID, runtimeID). +func setupRerunTestFixture(t *testing.T) (string, string, string) { + t.Helper() + ctx := context.Background() + + var agentID, runtimeID string + if err := testPool.QueryRow(ctx, ` + SELECT a.id, a.runtime_id FROM agent a + JOIN member m ON m.workspace_id = a.workspace_id + JOIN "user" u ON u.id = m.user_id + WHERE u.email = $1 + LIMIT 1 + `, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil { + t.Fatalf("failed to find test agent: %v", err) + } + + var issueID string + if err := testPool.QueryRow(ctx, ` + INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id) + SELECT $1, 'Rerun test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2 + FROM member m WHERE m.workspace_id = $1 LIMIT 1 + RETURNING id + `, testWorkspaceID, agentID).Scan(&issueID); err != nil { + t.Fatalf("failed to create test issue: %v", err) + } + + return issueID, agentID, runtimeID +} + +func cleanupRerunFixture(t *testing.T, issueID string) { + t.Helper() + ctx := context.Background() + testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) +} + +// TestGetLastTaskSessionExcludesPoisonedFailures asserts that the +// (agent_id, issue_id) resume lookup skips failed tasks whose +// failure_reason classifies them as poisoned terminal output. This is the +// SQL-level half of the rerun-poisoned-session fix: without the filter, a +// rerun would inherit the same session and replay the same bad output. +func TestGetLastTaskSessionExcludesPoisonedFailures(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, agentID, runtimeID := setupRerunTestFixture(t) + t.Cleanup(func() { cleanupRerunFixture(t, issueID) }) + + ctx := context.Background() + + // Insert an older failed task with a poisoned classifier and a session_id. + // The poisoned task is the *most recent* one, so without the filter the + // resume lookup would return its session_id. + if _, err := testPool.Exec(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason) + VALUES ($1, $2, $3, 'failed', 0, now() - interval '2 minutes', now() - interval '2 minutes', 'HEALTHY-SESSION', '/tmp/healthy', 'timeout') + `, agentID, runtimeID, issueID); err != nil { + t.Fatalf("insert healthy failed task: %v", err) + } + + if _, err := testPool.Exec(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason) + VALUES ($1, $2, $3, 'failed', 0, now() - interval '1 minute', now() - interval '1 minute', 'POISONED-SESSION', '/tmp/poisoned', 'iteration_limit') + `, agentID, runtimeID, issueID); err != nil { + t.Fatalf("insert poisoned failed task: %v", err) + } + + queries := db.New(testPool) + prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{ + AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true}, + IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, + }) + if err != nil { + t.Fatalf("GetLastTaskSession failed: %v", err) + } + if !prior.SessionID.Valid { + t.Fatal("expected to fall back to the healthy failed session, got no session") + } + if prior.SessionID.String == "POISONED-SESSION" { + t.Fatal("rerun would inherit poisoned session — filter is not active") + } + if prior.SessionID.String != "HEALTHY-SESSION" { + t.Fatalf("expected HEALTHY-SESSION, got %q", prior.SessionID.String) + } +} + +// TestGetLastTaskSessionFallbackPoisonedClassifier covers the second +// poisoned classifier so adding a third doesn't silently break this rule. +func TestGetLastTaskSessionFallbackPoisonedClassifier(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, agentID, runtimeID := setupRerunTestFixture(t) + t.Cleanup(func() { cleanupRerunFixture(t, issueID) }) + + ctx := context.Background() + + if _, err := testPool.Exec(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason) + VALUES ($1, $2, $3, 'failed', 0, now() - interval '5 seconds', now() - interval '5 seconds', 'POISONED-FALLBACK', '/tmp/poisoned', 'agent_fallback_message') + `, agentID, runtimeID, issueID); err != nil { + t.Fatalf("insert poisoned failed task: %v", err) + } + + queries := db.New(testPool) + prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{ + AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true}, + IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, + }) + if err == nil && prior.SessionID.Valid { + t.Fatalf("expected no resumable session, got %q", prior.SessionID.String) + } +} + +// TestRerunIssueSetsForceFreshSession asserts the manual rerun flow flags +// the new task so the daemon claim handler skips the resume lookup. This +// is the call-site half of the fix: even if the SQL filter ever misses a +// poisoned classifier, manual rerun never resumes. +func TestRerunIssueSetsForceFreshSession(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, _, _ := setupRerunTestFixture(t) + t.Cleanup(func() { cleanupRerunFixture(t, issueID) }) + + ctx := context.Background() + queries := db.New(testPool) + hub := realtime.NewHub() + go hub.Run() + bus := events.New() + taskService := service.NewTaskService(queries, nil, hub, bus) + + task, err := taskService.RerunIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, pgtype.UUID{}) + if err != nil { + t.Fatalf("RerunIssue failed: %v", err) + } + if task == nil { + t.Fatal("RerunIssue returned nil task") + } + if !task.ForceFreshSession { + t.Fatal("expected manual rerun to set force_fresh_session=true") + } +} + +// TestEnqueueTaskForIssueDoesNotForceFreshSession is the negative control +// for the rerun flag: the normal enqueue path must leave the flag false so +// auto-retry / comment-triggered tasks keep resuming the prior session +// (MUL-1128 contract). +func TestEnqueueTaskForIssueDoesNotForceFreshSession(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, _, _ := setupRerunTestFixture(t) + t.Cleanup(func() { cleanupRerunFixture(t, issueID) }) + + ctx := context.Background() + queries := db.New(testPool) + hub := realtime.NewHub() + go hub.Run() + bus := events.New() + taskService := service.NewTaskService(queries, nil, hub, bus) + + issue, err := queries.GetIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}) + if err != nil { + t.Fatalf("load issue: %v", err) + } + task, err := taskService.EnqueueTaskForIssue(ctx, issue) + if err != nil { + t.Fatalf("EnqueueTaskForIssue failed: %v", err) + } + if task.ForceFreshSession { + t.Fatal("expected normal enqueue to leave force_fresh_session=false") + } +} diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 43131a8d8..12c268556 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -1393,6 +1393,27 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i Usage: usageEntries, }, nil } + // Detect "poisoned" terminal output: the agent didn't reach a real + // conclusion but emitted a known fallback marker (iteration limit, + // fallback meta message). Route through the blocked path with a + // specific failure_reason so the server can exclude this session + // from the (agent_id, issue_id) resume lookup — otherwise a manual + // rerun would inherit the same poisoned session and reproduce the + // same bad output. + if reason, ok := classifyPoisonedOutput(result.Output); ok { + taskLog.Warn("agent finished with poisoned fallback output, classifying as blocked", + "failure_reason", reason, + ) + return TaskResult{ + Status: "blocked", + Comment: result.Output, + SessionID: result.SessionID, + WorkDir: env.WorkDir, + EnvRoot: env.RootDir, + Usage: usageEntries, + FailureReason: reason, + }, nil + } return TaskResult{ Status: "completed", Comment: result.Output, diff --git a/server/internal/daemon/poisoned.go b/server/internal/daemon/poisoned.go new file mode 100644 index 000000000..0bf1b560e --- /dev/null +++ b/server/internal/daemon/poisoned.go @@ -0,0 +1,57 @@ +package daemon + +import "strings" + +// FailureReason values for tasks that "completed" with output but the +// output is actually a known agent fallback marker — i.e. the agent gave +// up and emitted a meta message instead of a real result. Listed here so +// the server-side query GetLastTaskSession can filter them out and a +// rerun starts from a fresh agent session instead of resuming the same +// poisoned conversation. +const ( + FailureReasonIterationLimit = "iteration_limit" + FailureReasonAgentFallbackMsg = "agent_fallback_message" +) + +// poisonedOutputMaxLen caps how long an output can be and still be +// classified as a poisoned fallback. Real fallback messages are short, +// one-sentence affairs; a long output that happens to mention a marker +// is almost certainly a real conclusion (e.g. a code-review reply +// quoting these strings, like the one currently quoting them in +// MUL-1630). The cap intentionally errs on the side of NOT classifying +// — a missed poisoned task gets retried by user action, but a +// false-positive turns a successful task into a failure and a system +// comment. +const poisonedOutputMaxLen = 320 + +// poisonedMarkers maps a substring fingerprint of a known agent fallback +// terminal message to its failure_reason classifier. Match is case- +// insensitive and substring-based; the cap above prevents long outputs +// that quote a marker from being misclassified. +var poisonedMarkers = []struct { + Substring string + Reason string +}{ + {"i reached the iteration limit", FailureReasonIterationLimit}, + {"put your final update inside the content string", FailureReasonAgentFallbackMsg}, +} + +// classifyPoisonedOutput reports whether output matches a known agent +// fallback terminal message and, if so, returns the failure_reason that +// should be persisted on the task row. Long outputs are never +// classified: a real fallback is the agent's only utterance for the +// turn, so anything beyond ~one paragraph is treated as a real result +// even if it contains a marker substring. +func classifyPoisonedOutput(output string) (string, bool) { + trimmed := strings.TrimSpace(output) + if trimmed == "" || len(trimmed) > poisonedOutputMaxLen { + return "", false + } + lowered := strings.ToLower(trimmed) + for _, m := range poisonedMarkers { + if strings.Contains(lowered, m.Substring) { + return m.Reason, true + } + } + return "", false +} diff --git a/server/internal/daemon/poisoned_test.go b/server/internal/daemon/poisoned_test.go new file mode 100644 index 000000000..b50dceee3 --- /dev/null +++ b/server/internal/daemon/poisoned_test.go @@ -0,0 +1,86 @@ +package daemon + +import ( + "strings" + "testing" +) + +func TestClassifyPoisonedOutput(t *testing.T) { + cases := []struct { + name string + output string + wantOK bool + wantReason string + }{ + { + name: "iteration limit canonical", + output: "I reached the iteration limit and couldn't generate a summary.", + wantOK: true, + wantReason: FailureReasonIterationLimit, + }, + { + name: "iteration limit case insensitive", + output: "I REACHED THE ITERATION LIMIT and stopped", + wantOK: true, + wantReason: FailureReasonIterationLimit, + }, + { + name: "fallback meta message", + output: "Put your final update inside the content string. Keep it concise.", + wantOK: true, + wantReason: FailureReasonAgentFallbackMsg, + }, + { + name: "real conclusion is not poisoned", + output: "Fixed the bug in auth.go and pushed PR #42.", + wantOK: false, + }, + { + name: "empty output", + output: "", + wantOK: false, + }, + { + name: "mentions iteration but not the marker", + output: "Each iteration of the loop processes one record.", + wantOK: false, + }, + { + // Regression guard for the GPT-Boy review on MUL-1630: + // a real review/analysis that quotes both markers must not + // be misclassified. Without the length cap, this entire + // PR's review thread would tank as a poisoned failure. + name: "long review quoting both markers is not poisoned", + output: `Review for the rerun fix. + +Detection markers under consideration: +- "I reached the iteration limit and couldn't generate a summary." +- "Put your final update inside the content string. Keep it concise." + +The implementation looks correct: the daemon classifies these as +fallback output, persists a dedicated failure_reason, and the SQL +filter excludes them from the resume lookup. Auto-retry of mid-flight +orphans still keeps the resume contract because CreateRetryTask does +not set force_fresh_session. Approving with a follow-up note about +the matcher being too permissive on long outputs.`, + wantOK: false, + }, + { + name: "marker buried inside a long agent conclusion", + output: strings.Repeat("All checks passed and the bug is fixed. ", 10) + "i reached the iteration limit while debugging earlier.", + wantOK: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + reason, ok := classifyPoisonedOutput(tc.output) + if ok != tc.wantOK { + t.Fatalf("classifyPoisonedOutput(%q) ok=%v, want %v", tc.output, ok, tc.wantOK) + } + if ok && reason != tc.wantReason { + t.Fatalf("classifyPoisonedOutput(%q) reason=%q, want %q", tc.output, reason, tc.wantReason) + } + }) + } +} diff --git a/server/internal/daemon/types.go b/server/internal/daemon/types.go index ddcdc68ab..183e8cc61 100644 --- a/server/internal/daemon/types.go +++ b/server/internal/daemon/types.go @@ -105,6 +105,6 @@ type TaskResult struct { SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption WorkDir string `json:"work_dir,omitempty"` // working directory used during execution EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server) - FailureReason string `json:"-"` // internal server failure classification + FailureReason string `json:"-"` // classifier forwarded to FailTask on the blocked path; empty falls back to 'agent_error' Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage } diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 6de31091f..8b6424965 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -944,13 +944,20 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { // Look up the prior session for this (agent, issue) pair so the daemon // can resume the Claude Code conversation context. - if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{ - AgentID: task.AgentID, - IssueID: task.IssueID, - }); err == nil && prior.SessionID.Valid { - resp.PriorSessionID = prior.SessionID.String - if prior.WorkDir.Valid { - resp.PriorWorkDir = prior.WorkDir.String + // + // Skip the lookup when the task was flagged as a manual rerun: the + // user just judged the prior output bad, so the daemon must start a + // fresh agent session instead of resuming the same conversation that + // produced that output. + if !task.ForceFreshSession { + if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{ + AgentID: task.AgentID, + IssueID: task.IssueID, + }); err == nil && prior.SessionID.Valid { + resp.PriorSessionID = prior.SessionID.String + if prior.WorkDir.Valid { + resp.PriorWorkDir = prior.WorkDir.String + } } } } diff --git a/server/internal/service/task.go b/server/internal/service/task.go index ea17ff712..b62cdcf73 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -96,6 +96,19 @@ func NewTaskService(q *db.Queries, tx TxStarter, hub *realtime.Hub, bus *events. // No context snapshot is stored — the agent fetches all data it needs at // runtime via the multica CLI. func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) { + var commentID pgtype.UUID + if len(triggerCommentID) > 0 { + commentID = triggerCommentID[0] + } + return s.enqueueIssueTask(ctx, issue, commentID, false) +} + +// enqueueIssueTask is the shared implementation behind EnqueueTaskForIssue +// and the manual rerun path. forceFreshSession=true marks the task so the +// daemon claim handler skips the (agent_id, issue_id) resume lookup — the +// user already judged the prior output bad, a fresh agent session is the +// expected behavior. +func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool) (db.AgentTaskQueue, error) { if !issue.AssigneeID.Valid { slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee") return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee") @@ -115,25 +128,26 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, t return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime") } - var commentID pgtype.UUID - if len(triggerCommentID) > 0 { - commentID = triggerCommentID[0] - } - task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{ - AgentID: issue.AssigneeID, - RuntimeID: agent.RuntimeID, - IssueID: issue.ID, - Priority: priorityToInt(issue.Priority), - TriggerCommentID: commentID, - TriggerSummary: s.buildCommentTriggerSummary(ctx, commentID), + AgentID: issue.AssigneeID, + RuntimeID: agent.RuntimeID, + IssueID: issue.ID, + Priority: priorityToInt(issue.Priority), + TriggerCommentID: triggerCommentID, + TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID), + ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession}, }) if err != nil { slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err) return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err) } - slog.Info("task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(issue.AssigneeID)) + slog.Info("task enqueued", + "task_id", util.UUIDToString(task.ID), + "issue_id", util.UUIDToString(issue.ID), + "agent_id", util.UUIDToString(issue.AssigneeID), + "force_fresh_session", forceFreshSession, + ) // Order matters: broadcast first, notify daemon second. notifyTaskAvailable // kicks an in-process channel that the daemon picks up over HTTP and // claims; the claim path then emits its own task:dispatch. Doing the @@ -874,9 +888,15 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT } // RerunIssue creates a fresh queued task for the agent currently assigned -// to the issue. Used by the manual rerun endpoint. Carries the most recent -// session_id/work_dir on the issue (across any status) so the new run -// resumes from where the prior one left off when the backend supports it. +// to the issue. Used by the manual rerun endpoint. +// +// The new task is flagged force_fresh_session=true so the daemon starts a +// clean agent session instead of resuming the prior (agent_id, issue_id) +// session. A user clicking rerun has just judged the prior output bad — +// resuming the same conversation would replay the same poisoned state. +// Auto-retry of an orphaned mid-flight failure (HandleFailedTasks → +// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so +// MUL-1128's mid-flight resume contract is preserved. // // Only tasks belonging to the issue's current assignee are cancelled. // Tasks owned by other agents on the same issue (e.g. a parallel @@ -909,7 +929,7 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t) } - task, err := s.EnqueueTaskForIssue(ctx, issue, triggerCommentID) + task, err := s.enqueueIssueTask(ctx, issue, triggerCommentID, true) if err != nil { return nil, err } diff --git a/server/migrations/066_force_fresh_session.down.sql b/server/migrations/066_force_fresh_session.down.sql new file mode 100644 index 000000000..ef842bda9 --- /dev/null +++ b/server/migrations/066_force_fresh_session.down.sql @@ -0,0 +1 @@ +ALTER TABLE agent_task_queue DROP COLUMN force_fresh_session; diff --git a/server/migrations/066_force_fresh_session.up.sql b/server/migrations/066_force_fresh_session.up.sql new file mode 100644 index 000000000..6fa3d623c --- /dev/null +++ b/server/migrations/066_force_fresh_session.up.sql @@ -0,0 +1,8 @@ +-- Per-task signal that the manual rerun flow uses to short-circuit the +-- (agent_id, issue_id) session resume lookup. Set when a user clicks +-- rerun: the user just judged the prior output bad, so the daemon must +-- start a fresh agent session instead of resuming the same conversation +-- that produced the bad result. Auto-retry of an orphaned mid-flight +-- failure leaves this FALSE so MUL-1128's resume contract is preserved. +ALTER TABLE agent_task_queue + ADD COLUMN force_fresh_session BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 1e4f07c6c..003e61477 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -55,7 +55,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -86,6 +86,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -94,7 +95,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Bulk-cancel every active (queued/dispatched/running) task for an agent. @@ -136,6 +137,7 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -151,7 +153,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Cancels every active task on the issue and returns the affected rows so the @@ -193,6 +195,7 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -208,7 +211,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CancelAgentTasksByIssueAndAgentParams struct { @@ -254,6 +257,7 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -269,7 +273,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Cancels active tasks whose trigger is the given comment. Called when a @@ -311,6 +315,7 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -349,7 +354,7 @@ WHERE id = ( LIMIT 1 FOR UPDATE SKIP LOCKED ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Claims the next queued task for an agent, enforcing per-(issue, agent) serialization: @@ -389,6 +394,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -432,7 +438,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one UPDATE agent_task_queue SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CompleteAgentTaskParams struct { @@ -475,6 +481,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -564,18 +571,26 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent } const createAgentTask = `-- name: CreateAgentTask :one -INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary) -VALUES ($1, $2, $3, 'queued', $4, $5, $6) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +INSERT INTO agent_task_queue ( + agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, + trigger_summary, force_fresh_session +) +VALUES ( + $1, $2, $3, 'queued', $4, $5, + $6, + COALESCE($7::boolean, FALSE) +) +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CreateAgentTaskParams struct { - AgentID pgtype.UUID `json:"agent_id"` - RuntimeID pgtype.UUID `json:"runtime_id"` - IssueID pgtype.UUID `json:"issue_id"` - Priority int32 `json:"priority"` - TriggerCommentID pgtype.UUID `json:"trigger_comment_id"` - TriggerSummary pgtype.Text `json:"trigger_summary"` + AgentID pgtype.UUID `json:"agent_id"` + RuntimeID pgtype.UUID `json:"runtime_id"` + IssueID pgtype.UUID `json:"issue_id"` + Priority int32 `json:"priority"` + TriggerCommentID pgtype.UUID `json:"trigger_comment_id"` + TriggerSummary pgtype.Text `json:"trigger_summary"` + ForceFreshSession pgtype.Bool `json:"force_fresh_session"` } func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) { @@ -586,6 +601,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams arg.Priority, arg.TriggerCommentID, arg.TriggerSummary, + arg.ForceFreshSession, ) var i AgentTaskQueue err := row.Scan( @@ -613,6 +629,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -620,7 +637,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams const createQuickCreateTask = `-- name: CreateQuickCreateTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context) VALUES ($1, $2, NULL, 'queued', $3, $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CreateQuickCreateTaskParams struct { @@ -666,6 +683,7 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -684,7 +702,7 @@ SELECT p.attempt + 1, p.max_attempts, p.id FROM agent_task_queue p WHERE p.id = $1 -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Clones a parent task into a fresh queued attempt. Carries forward the @@ -719,6 +737,7 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -732,7 +751,7 @@ SET status = 'failed', session_id = COALESCE($4, session_id), work_dir = COALESCE($5, work_dir) WHERE id = $1 AND status IN ('dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type FailAgentTaskParams struct { @@ -786,6 +805,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -796,7 +816,7 @@ SET status = 'failed', completed_at = now(), error = 'task timed out', failure_reason = 'timeout' WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision)) OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision)) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type FailStaleTasksParams struct { @@ -841,6 +861,7 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -926,7 +947,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa } const getAgentTask = `-- name: GetAgentTask :one -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE id = $1 ` @@ -958,6 +979,7 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } @@ -965,7 +987,10 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu const getLastTaskSession = `-- name: GetLastTaskSession :one SELECT session_id, work_dir FROM agent_task_queue WHERE agent_id = $1 AND issue_id = $2 - AND status IN ('completed', 'failed') + AND ( + status = 'completed' + OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message')) + ) AND session_id IS NOT NULL ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC LIMIT 1 @@ -989,6 +1014,11 @@ type GetLastTaskSessionRow struct { // UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a // mid-run failure would silently start a fresh conversation and lose the // in-flight context — exactly what MUL-1128's B branch is meant to fix. +// +// Tasks that ended in a known "poisoned" terminal state are excluded so +// a rerun does not inherit the bad session. The daemon classifies these +// failures (iteration_limit, agent_fallback_message) when it detects the +// agent emitted a fallback marker instead of a real result. func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) { row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID) var i GetLastTaskSessionRow @@ -1167,7 +1197,7 @@ func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams } const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE issue_id = $1 AND status IN ('dispatched', 'running') ORDER BY created_at DESC ` @@ -1206,6 +1236,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1218,7 +1249,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI } const listAgentTasks = `-- name: ListAgentTasks :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE agent_id = $1 ORDER BY created_at DESC ` @@ -1257,6 +1288,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1365,7 +1397,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([ } const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC ` @@ -1404,6 +1436,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1416,7 +1449,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp } const listTasksByIssue = `-- name: ListTasksByIssue :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue WHERE issue_id = $1 ORDER BY created_at DESC ` @@ -1455,6 +1488,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([] &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1467,15 +1501,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([] } const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many -SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary FROM agent_task_queue atq +SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session FROM agent_task_queue atq JOIN agent a ON a.id = atq.agent_id WHERE a.workspace_id = $1 AND atq.status IN ('queued', 'dispatched', 'running') UNION ALL -SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary FROM ( - SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary +SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary, t.force_fresh_session FROM ( + SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session FROM agent_task_queue atq JOIN agent a ON a.id = atq.agent_id WHERE a.workspace_id = $1 @@ -1536,6 +1570,7 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1554,7 +1589,7 @@ SET status = 'failed', error = 'daemon restarted while task was in flight', failure_reason = 'runtime_recovery' WHERE runtime_id = $1 AND status IN ('dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Called by the daemon at startup. Atomically fails any dispatched/running @@ -1595,6 +1630,7 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } @@ -1685,7 +1721,7 @@ const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() WHERE id = $1 AND status = 'dispatched' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -1716,6 +1752,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } diff --git a/server/pkg/db/generated/autopilot.sql.go b/server/pkg/db/generated/autopilot.sql.go index 9f4709fc3..07c1c4fe0 100644 --- a/server/pkg/db/generated/autopilot.sql.go +++ b/server/pkg/db/generated/autopilot.sql.go @@ -204,7 +204,7 @@ const createAutopilotTask = `-- name: CreateAutopilotTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, autopilot_run_id, trigger_summary) VALUES ($1, $2, NULL, 'queued', $3, $4, $5) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CreateAutopilotTaskParams struct { @@ -252,6 +252,7 @@ func (q *Queries) CreateAutopilotTask(ctx context.Context, arg CreateAutopilotTa &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } diff --git a/server/pkg/db/generated/chat.sql.go b/server/pkg/db/generated/chat.sql.go index a0c0f08e5..f2f7797e6 100644 --- a/server/pkg/db/generated/chat.sql.go +++ b/server/pkg/db/generated/chat.sql.go @@ -99,7 +99,7 @@ func (q *Queries) CreateChatSession(ctx context.Context, arg CreateChatSessionPa const createChatTask = `-- name: CreateChatTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, chat_session_id) VALUES ($1, $2, NULL, 'queued', $3, $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` type CreateChatTaskParams struct { @@ -142,6 +142,7 @@ func (q *Queries) CreateChatTask(ctx context.Context, arg CreateChatTaskParams) &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ) return i, err } diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index 4145ca110..9776aa640 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -67,30 +67,31 @@ type AgentSkill struct { } type AgentTaskQueue struct { - ID pgtype.UUID `json:"id"` - AgentID pgtype.UUID `json:"agent_id"` - IssueID pgtype.UUID `json:"issue_id"` - Status string `json:"status"` - Priority int32 `json:"priority"` - DispatchedAt pgtype.Timestamptz `json:"dispatched_at"` - StartedAt pgtype.Timestamptz `json:"started_at"` - CompletedAt pgtype.Timestamptz `json:"completed_at"` - Result []byte `json:"result"` - Error pgtype.Text `json:"error"` - CreatedAt pgtype.Timestamptz `json:"created_at"` - Context []byte `json:"context"` - RuntimeID pgtype.UUID `json:"runtime_id"` - SessionID pgtype.Text `json:"session_id"` - WorkDir pgtype.Text `json:"work_dir"` - TriggerCommentID pgtype.UUID `json:"trigger_comment_id"` - ChatSessionID pgtype.UUID `json:"chat_session_id"` - AutopilotRunID pgtype.UUID `json:"autopilot_run_id"` - Attempt int32 `json:"attempt"` - MaxAttempts int32 `json:"max_attempts"` - ParentTaskID pgtype.UUID `json:"parent_task_id"` - FailureReason pgtype.Text `json:"failure_reason"` - LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"` - TriggerSummary pgtype.Text `json:"trigger_summary"` + ID pgtype.UUID `json:"id"` + AgentID pgtype.UUID `json:"agent_id"` + IssueID pgtype.UUID `json:"issue_id"` + Status string `json:"status"` + Priority int32 `json:"priority"` + DispatchedAt pgtype.Timestamptz `json:"dispatched_at"` + StartedAt pgtype.Timestamptz `json:"started_at"` + CompletedAt pgtype.Timestamptz `json:"completed_at"` + Result []byte `json:"result"` + Error pgtype.Text `json:"error"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + Context []byte `json:"context"` + RuntimeID pgtype.UUID `json:"runtime_id"` + SessionID pgtype.Text `json:"session_id"` + WorkDir pgtype.Text `json:"work_dir"` + TriggerCommentID pgtype.UUID `json:"trigger_comment_id"` + ChatSessionID pgtype.UUID `json:"chat_session_id"` + AutopilotRunID pgtype.UUID `json:"autopilot_run_id"` + Attempt int32 `json:"attempt"` + MaxAttempts int32 `json:"max_attempts"` + ParentTaskID pgtype.UUID `json:"parent_task_id"` + FailureReason pgtype.Text `json:"failure_reason"` + LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"` + TriggerSummary pgtype.Text `json:"trigger_summary"` + ForceFreshSession bool `json:"force_fresh_session"` } type Attachment struct { diff --git a/server/pkg/db/generated/runtime.sql.go b/server/pkg/db/generated/runtime.sql.go index fcef693ab..60e72aa3c 100644 --- a/server/pkg/db/generated/runtime.sql.go +++ b/server/pkg/db/generated/runtime.sql.go @@ -84,7 +84,7 @@ WHERE status IN ('dispatched', 'running') AND runtime_id IN ( SELECT id FROM agent_runtime WHERE status = 'offline' ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session ` // Marks dispatched/running tasks as failed when their runtime is offline. @@ -123,6 +123,7 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQ &i.FailureReason, &i.LastHeartbeatAt, &i.TriggerSummary, + &i.ForceFreshSession, ); err != nil { return nil, err } diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 88ee35ce1..42b8b5c73 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -65,8 +65,15 @@ WHERE agent_id = $1 ORDER BY created_at DESC; -- name: CreateAgentTask :one -INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary) -VALUES ($1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id), sqlc.narg(trigger_summary)) +INSERT INTO agent_task_queue ( + agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, + trigger_summary, force_fresh_session +) +VALUES ( + $1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id), + sqlc.narg(trigger_summary), + COALESCE(sqlc.narg('force_fresh_session')::boolean, FALSE) +) RETURNING *; -- name: CreateQuickCreateTask :one @@ -213,9 +220,17 @@ RETURNING *; -- UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a -- mid-run failure would silently start a fresh conversation and lose the -- in-flight context — exactly what MUL-1128's B branch is meant to fix. +-- +-- Tasks that ended in a known "poisoned" terminal state are excluded so +-- a rerun does not inherit the bad session. The daemon classifies these +-- failures (iteration_limit, agent_fallback_message) when it detects the +-- agent emitted a fallback marker instead of a real result. SELECT session_id, work_dir FROM agent_task_queue WHERE agent_id = $1 AND issue_id = $2 - AND status IN ('completed', 'failed') + AND ( + status = 'completed' + OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message')) + ) AND session_id IS NOT NULL ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC LIMIT 1;