fix(task): rerun starts a fresh session, skip poisoned resume (#1928)

* fix(task): rerun starts a fresh session, skip poisoned resume

When a task ended in a known agent fallback ("I reached the iteration
limit and couldn't generate a summary.", "Put your final update inside
the content string. Keep it concise.") the (agent_id, issue_id) resume
lookup would still pick that session, so a manual rerun inherited the
poisoned state and reproduced the same bad output.

Two complementary guards:

1. Daemon classifies poisoned terminal output and routes it through the
   blocked path with failure_reason set ('iteration_limit' /
   'agent_fallback_message'). GetLastTaskSession excludes failed tasks
   with those reasons, so even comment-triggered tasks no longer resume
   them. Tasks that failed mid-flight (timeout, runtime_recovery, etc.)
   are still resumable, preserving MUL-1128's auto-retry contract.

2. Manual rerun marks the new task force_fresh_session=true. The daemon
   claim handler skips the resume lookup entirely when the flag is set,
   capturing the user-intent signal that "the prior output was bad" even
   when poisoned classification misses a future fallback wording.

Auto-retry of orphaned mid-flight failures (MaybeRetryFailedTask →
CreateRetryTask) does not take this path, so it keeps resuming.

Tests: classifyPoisonedOutput unit test; integration tests assert the
SQL filter excludes poisoned classifiers, RerunIssue flips the flag,
and the normal enqueue path leaves it false.

Co-authored-by: multica-agent <github@multica.ai>

* fix(daemon): cap poisoned-output matcher to short trimmed text

GPT-Boy review on MUL-1630: the previous strings.Contains match would
classify any output that quoted the marker substring — including a
review/analysis that simply discussed the marker itself. Real fallback
messages are short single-sentence affairs, so cap the candidate at
~one paragraph and trim whitespace before matching. Adds regression
tests covering a long quoting review and a marker buried in a long
real conclusion; both must stay classified as completed.

Co-authored-by: multica-agent <github@multica.ai>

* fix(migrations): rename 065 force_fresh_session → 066 to clear collision

main introduced 065_project_resources after this branch was cut, so
both files shared the 065_ prefix. The readiness check
(server/cmd/server/health.go → migrations.LatestVersion) takes the
last entry by lexical order, which is 065_project_resources, leaving
this branch's 065_force_fresh_session unguarded — a deploy that
applied project_resources but not force_fresh_session would still
report ready, and the next enqueue / rerun / claim would crash on
"column force_fresh_session does not exist".

Renaming to 066_force_fresh_session puts it strictly after
project_resources so readiness blocks until it's applied.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Bohan Jiang
2026-04-30 14:17:53 +08:00
committed by GitHub
parent 44608713bb
commit b1345685a3
15 changed files with 534 additions and 85 deletions

View File

@@ -0,0 +1,193 @@
package main
import (
"context"
"testing"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// setupRerunTestFixture creates an issue assigned to the integration test
// agent and returns (issueID, agentID, runtimeID).
func setupRerunTestFixture(t *testing.T) (string, string, string) {
t.Helper()
ctx := context.Background()
var agentID, runtimeID string
if err := testPool.QueryRow(ctx, `
SELECT a.id, a.runtime_id FROM agent a
JOIN member m ON m.workspace_id = a.workspace_id
JOIN "user" u ON u.id = m.user_id
WHERE u.email = $1
LIMIT 1
`, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil {
t.Fatalf("failed to find test agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id)
SELECT $1, 'Rerun test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2
FROM member m WHERE m.workspace_id = $1 LIMIT 1
RETURNING id
`, testWorkspaceID, agentID).Scan(&issueID); err != nil {
t.Fatalf("failed to create test issue: %v", err)
}
return issueID, agentID, runtimeID
}
func cleanupRerunFixture(t *testing.T, issueID string) {
t.Helper()
ctx := context.Background()
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, issueID)
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
}
// TestGetLastTaskSessionExcludesPoisonedFailures asserts that the
// (agent_id, issue_id) resume lookup skips failed tasks whose
// failure_reason classifies them as poisoned terminal output. This is the
// SQL-level half of the rerun-poisoned-session fix: without the filter, a
// rerun would inherit the same session and replay the same bad output.
func TestGetLastTaskSessionExcludesPoisonedFailures(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
issueID, agentID, runtimeID := setupRerunTestFixture(t)
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
ctx := context.Background()
// Insert an older failed task with a poisoned classifier and a session_id.
// The poisoned task is the *most recent* one, so without the filter the
// resume lookup would return its session_id.
if _, err := testPool.Exec(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
VALUES ($1, $2, $3, 'failed', 0, now() - interval '2 minutes', now() - interval '2 minutes', 'HEALTHY-SESSION', '/tmp/healthy', 'timeout')
`, agentID, runtimeID, issueID); err != nil {
t.Fatalf("insert healthy failed task: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
VALUES ($1, $2, $3, 'failed', 0, now() - interval '1 minute', now() - interval '1 minute', 'POISONED-SESSION', '/tmp/poisoned', 'iteration_limit')
`, agentID, runtimeID, issueID); err != nil {
t.Fatalf("insert poisoned failed task: %v", err)
}
queries := db.New(testPool)
prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{
AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true},
IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
})
if err != nil {
t.Fatalf("GetLastTaskSession failed: %v", err)
}
if !prior.SessionID.Valid {
t.Fatal("expected to fall back to the healthy failed session, got no session")
}
if prior.SessionID.String == "POISONED-SESSION" {
t.Fatal("rerun would inherit poisoned session — filter is not active")
}
if prior.SessionID.String != "HEALTHY-SESSION" {
t.Fatalf("expected HEALTHY-SESSION, got %q", prior.SessionID.String)
}
}
// TestGetLastTaskSessionFallbackPoisonedClassifier covers the second
// poisoned classifier so adding a third doesn't silently break this rule.
func TestGetLastTaskSessionFallbackPoisonedClassifier(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
issueID, agentID, runtimeID := setupRerunTestFixture(t)
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
ctx := context.Background()
if _, err := testPool.Exec(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
VALUES ($1, $2, $3, 'failed', 0, now() - interval '5 seconds', now() - interval '5 seconds', 'POISONED-FALLBACK', '/tmp/poisoned', 'agent_fallback_message')
`, agentID, runtimeID, issueID); err != nil {
t.Fatalf("insert poisoned failed task: %v", err)
}
queries := db.New(testPool)
prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{
AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true},
IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
})
if err == nil && prior.SessionID.Valid {
t.Fatalf("expected no resumable session, got %q", prior.SessionID.String)
}
}
// TestRerunIssueSetsForceFreshSession asserts the manual rerun flow flags
// the new task so the daemon claim handler skips the resume lookup. This
// is the call-site half of the fix: even if the SQL filter ever misses a
// poisoned classifier, manual rerun never resumes.
func TestRerunIssueSetsForceFreshSession(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
issueID, _, _ := setupRerunTestFixture(t)
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
ctx := context.Background()
queries := db.New(testPool)
hub := realtime.NewHub()
go hub.Run()
bus := events.New()
taskService := service.NewTaskService(queries, nil, hub, bus)
task, err := taskService.RerunIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, pgtype.UUID{})
if err != nil {
t.Fatalf("RerunIssue failed: %v", err)
}
if task == nil {
t.Fatal("RerunIssue returned nil task")
}
if !task.ForceFreshSession {
t.Fatal("expected manual rerun to set force_fresh_session=true")
}
}
// TestEnqueueTaskForIssueDoesNotForceFreshSession is the negative control
// for the rerun flag: the normal enqueue path must leave the flag false so
// auto-retry / comment-triggered tasks keep resuming the prior session
// (MUL-1128 contract).
func TestEnqueueTaskForIssueDoesNotForceFreshSession(t *testing.T) {
if testPool == nil {
t.Skip("no database connection")
}
issueID, _, _ := setupRerunTestFixture(t)
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
ctx := context.Background()
queries := db.New(testPool)
hub := realtime.NewHub()
go hub.Run()
bus := events.New()
taskService := service.NewTaskService(queries, nil, hub, bus)
issue, err := queries.GetIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true})
if err != nil {
t.Fatalf("load issue: %v", err)
}
task, err := taskService.EnqueueTaskForIssue(ctx, issue)
if err != nil {
t.Fatalf("EnqueueTaskForIssue failed: %v", err)
}
if task.ForceFreshSession {
t.Fatal("expected normal enqueue to leave force_fresh_session=false")
}
}

View File

@@ -1393,6 +1393,27 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
Usage: usageEntries,
}, nil
}
// Detect "poisoned" terminal output: the agent didn't reach a real
// conclusion but emitted a known fallback marker (iteration limit,
// fallback meta message). Route through the blocked path with a
// specific failure_reason so the server can exclude this session
// from the (agent_id, issue_id) resume lookup — otherwise a manual
// rerun would inherit the same poisoned session and reproduce the
// same bad output.
if reason, ok := classifyPoisonedOutput(result.Output); ok {
taskLog.Warn("agent finished with poisoned fallback output, classifying as blocked",
"failure_reason", reason,
)
return TaskResult{
Status: "blocked",
Comment: result.Output,
SessionID: result.SessionID,
WorkDir: env.WorkDir,
EnvRoot: env.RootDir,
Usage: usageEntries,
FailureReason: reason,
}, nil
}
return TaskResult{
Status: "completed",
Comment: result.Output,

View File

@@ -0,0 +1,57 @@
package daemon
import "strings"
// FailureReason values for tasks that "completed" with output but the
// output is actually a known agent fallback marker — i.e. the agent gave
// up and emitted a meta message instead of a real result. Listed here so
// the server-side query GetLastTaskSession can filter them out and a
// rerun starts from a fresh agent session instead of resuming the same
// poisoned conversation.
const (
FailureReasonIterationLimit = "iteration_limit"
FailureReasonAgentFallbackMsg = "agent_fallback_message"
)
// poisonedOutputMaxLen caps how long an output can be and still be
// classified as a poisoned fallback. Real fallback messages are short,
// one-sentence affairs; a long output that happens to mention a marker
// is almost certainly a real conclusion (e.g. a code-review reply
// quoting these strings, like the one currently quoting them in
// MUL-1630). The cap intentionally errs on the side of NOT classifying
// — a missed poisoned task gets retried by user action, but a
// false-positive turns a successful task into a failure and a system
// comment.
const poisonedOutputMaxLen = 320
// poisonedMarkers maps a substring fingerprint of a known agent fallback
// terminal message to its failure_reason classifier. Match is case-
// insensitive and substring-based; the cap above prevents long outputs
// that quote a marker from being misclassified.
var poisonedMarkers = []struct {
Substring string
Reason string
}{
{"i reached the iteration limit", FailureReasonIterationLimit},
{"put your final update inside the content string", FailureReasonAgentFallbackMsg},
}
// classifyPoisonedOutput reports whether output matches a known agent
// fallback terminal message and, if so, returns the failure_reason that
// should be persisted on the task row. Long outputs are never
// classified: a real fallback is the agent's only utterance for the
// turn, so anything beyond ~one paragraph is treated as a real result
// even if it contains a marker substring.
func classifyPoisonedOutput(output string) (string, bool) {
trimmed := strings.TrimSpace(output)
if trimmed == "" || len(trimmed) > poisonedOutputMaxLen {
return "", false
}
lowered := strings.ToLower(trimmed)
for _, m := range poisonedMarkers {
if strings.Contains(lowered, m.Substring) {
return m.Reason, true
}
}
return "", false
}

View File

@@ -0,0 +1,86 @@
package daemon
import (
"strings"
"testing"
)
func TestClassifyPoisonedOutput(t *testing.T) {
cases := []struct {
name string
output string
wantOK bool
wantReason string
}{
{
name: "iteration limit canonical",
output: "I reached the iteration limit and couldn't generate a summary.",
wantOK: true,
wantReason: FailureReasonIterationLimit,
},
{
name: "iteration limit case insensitive",
output: "I REACHED THE ITERATION LIMIT and stopped",
wantOK: true,
wantReason: FailureReasonIterationLimit,
},
{
name: "fallback meta message",
output: "Put your final update inside the content string. Keep it concise.",
wantOK: true,
wantReason: FailureReasonAgentFallbackMsg,
},
{
name: "real conclusion is not poisoned",
output: "Fixed the bug in auth.go and pushed PR #42.",
wantOK: false,
},
{
name: "empty output",
output: "",
wantOK: false,
},
{
name: "mentions iteration but not the marker",
output: "Each iteration of the loop processes one record.",
wantOK: false,
},
{
// Regression guard for the GPT-Boy review on MUL-1630:
// a real review/analysis that quotes both markers must not
// be misclassified. Without the length cap, this entire
// PR's review thread would tank as a poisoned failure.
name: "long review quoting both markers is not poisoned",
output: `Review for the rerun fix.
Detection markers under consideration:
- "I reached the iteration limit and couldn't generate a summary."
- "Put your final update inside the content string. Keep it concise."
The implementation looks correct: the daemon classifies these as
fallback output, persists a dedicated failure_reason, and the SQL
filter excludes them from the resume lookup. Auto-retry of mid-flight
orphans still keeps the resume contract because CreateRetryTask does
not set force_fresh_session. Approving with a follow-up note about
the matcher being too permissive on long outputs.`,
wantOK: false,
},
{
name: "marker buried inside a long agent conclusion",
output: strings.Repeat("All checks passed and the bug is fixed. ", 10) + "i reached the iteration limit while debugging earlier.",
wantOK: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
reason, ok := classifyPoisonedOutput(tc.output)
if ok != tc.wantOK {
t.Fatalf("classifyPoisonedOutput(%q) ok=%v, want %v", tc.output, ok, tc.wantOK)
}
if ok && reason != tc.wantReason {
t.Fatalf("classifyPoisonedOutput(%q) reason=%q, want %q", tc.output, reason, tc.wantReason)
}
})
}
}

View File

@@ -105,6 +105,6 @@ type TaskResult struct {
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server)
FailureReason string `json:"-"` // internal server failure classification
FailureReason string `json:"-"` // classifier forwarded to FailTask on the blocked path; empty falls back to 'agent_error'
Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage
}

View File

@@ -944,13 +944,20 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
//
// Skip the lookup when the task was flagged as a manual rerun: the
// user just judged the prior output bad, so the daemon must start a
// fresh agent session instead of resuming the same conversation that
// produced that output.
if !task.ForceFreshSession {
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
}
}
}
}

View File

@@ -96,6 +96,19 @@ func NewTaskService(q *db.Queries, tx TxStarter, hub *realtime.Hub, bus *events.
// No context snapshot is stored — the agent fetches all data it needs at
// runtime via the multica CLI.
func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) {
var commentID pgtype.UUID
if len(triggerCommentID) > 0 {
commentID = triggerCommentID[0]
}
return s.enqueueIssueTask(ctx, issue, commentID, false)
}
// enqueueIssueTask is the shared implementation behind EnqueueTaskForIssue
// and the manual rerun path. forceFreshSession=true marks the task so the
// daemon claim handler skips the (agent_id, issue_id) resume lookup — the
// user already judged the prior output bad, a fresh agent session is the
// expected behavior.
func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool) (db.AgentTaskQueue, error) {
if !issue.AssigneeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
@@ -115,25 +128,26 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, t
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
var commentID pgtype.UUID
if len(triggerCommentID) > 0 {
commentID = triggerCommentID[0]
}
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: commentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, commentID),
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
})
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(issue.AssigneeID))
slog.Info("task enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"agent_id", util.UUIDToString(issue.AssigneeID),
"force_fresh_session", forceFreshSession,
)
// Order matters: broadcast first, notify daemon second. notifyTaskAvailable
// kicks an in-process channel that the daemon picks up over HTTP and
// claims; the claim path then emits its own task:dispatch. Doing the
@@ -874,9 +888,15 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
}
// RerunIssue creates a fresh queued task for the agent currently assigned
// to the issue. Used by the manual rerun endpoint. Carries the most recent
// session_id/work_dir on the issue (across any status) so the new run
// resumes from where the prior one left off when the backend supports it.
// to the issue. Used by the manual rerun endpoint.
//
// The new task is flagged force_fresh_session=true so the daemon starts a
// clean agent session instead of resuming the prior (agent_id, issue_id)
// session. A user clicking rerun has just judged the prior output bad —
// resuming the same conversation would replay the same poisoned state.
// Auto-retry of an orphaned mid-flight failure (HandleFailedTasks →
// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so
// MUL-1128's mid-flight resume contract is preserved.
//
// Only tasks belonging to the issue's current assignee are cancelled.
// Tasks owned by other agents on the same issue (e.g. a parallel
@@ -909,7 +929,7 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
task, err := s.EnqueueTaskForIssue(ctx, issue, triggerCommentID)
task, err := s.enqueueIssueTask(ctx, issue, triggerCommentID, true)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1 @@
ALTER TABLE agent_task_queue DROP COLUMN force_fresh_session;

View File

@@ -0,0 +1,8 @@
-- Per-task signal that the manual rerun flow uses to short-circuit the
-- (agent_id, issue_id) session resume lookup. Set when a user clicks
-- rerun: the user just judged the prior output bad, so the daemon must
-- start a fresh agent session instead of resuming the same conversation
-- that produced the bad result. Auto-retry of an orphaned mid-flight
-- failure leaves this FALSE so MUL-1128's resume contract is preserved.
ALTER TABLE agent_task_queue
ADD COLUMN force_fresh_session BOOLEAN NOT NULL DEFAULT FALSE;

View File

@@ -55,7 +55,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -86,6 +86,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -94,7 +95,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
@@ -136,6 +137,7 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -151,7 +153,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Cancels every active task on the issue and returns the affected rows so the
@@ -193,6 +195,7 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -208,7 +211,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CancelAgentTasksByIssueAndAgentParams struct {
@@ -254,6 +257,7 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -269,7 +273,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Cancels active tasks whose trigger is the given comment. Called when a
@@ -311,6 +315,7 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -349,7 +354,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
@@ -389,6 +394,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -432,7 +438,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CompleteAgentTaskParams struct {
@@ -475,6 +481,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -564,18 +571,26 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
}
const createAgentTask = `-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary)
VALUES ($1, $2, $3, 'queued', $4, $5, $6)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
INSERT INTO agent_task_queue (
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
trigger_summary, force_fresh_session
)
VALUES (
$1, $2, $3, 'queued', $4, $5,
$6,
COALESCE($7::boolean, FALSE)
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CreateAgentTaskParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
IssueID pgtype.UUID `json:"issue_id"`
Priority int32 `json:"priority"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
TriggerSummary pgtype.Text `json:"trigger_summary"`
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
IssueID pgtype.UUID `json:"issue_id"`
Priority int32 `json:"priority"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
TriggerSummary pgtype.Text `json:"trigger_summary"`
ForceFreshSession pgtype.Bool `json:"force_fresh_session"`
}
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
@@ -586,6 +601,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
arg.Priority,
arg.TriggerCommentID,
arg.TriggerSummary,
arg.ForceFreshSession,
)
var i AgentTaskQueue
err := row.Scan(
@@ -613,6 +629,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -620,7 +637,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CreateQuickCreateTaskParams struct {
@@ -666,6 +683,7 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -684,7 +702,7 @@ SELECT
p.attempt + 1, p.max_attempts, p.id
FROM agent_task_queue p
WHERE p.id = $1
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Clones a parent task into a fresh queued attempt. Carries forward the
@@ -719,6 +737,7 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -732,7 +751,7 @@ SET status = 'failed',
session_id = COALESCE($4, session_id),
work_dir = COALESCE($5, work_dir)
WHERE id = $1 AND status IN ('dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type FailAgentTaskParams struct {
@@ -786,6 +805,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -796,7 +816,7 @@ SET status = 'failed', completed_at = now(), error = 'task timed out',
failure_reason = 'timeout'
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type FailStaleTasksParams struct {
@@ -841,6 +861,7 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams)
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -926,7 +947,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE id = $1
`
@@ -958,6 +979,7 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}
@@ -965,7 +987,10 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
const getLastTaskSession = `-- name: GetLastTaskSession :one
SELECT session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2
AND status IN ('completed', 'failed')
AND (
status = 'completed'
OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message'))
)
AND session_id IS NOT NULL
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
LIMIT 1
@@ -989,6 +1014,11 @@ type GetLastTaskSessionRow struct {
// UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a
// mid-run failure would silently start a fresh conversation and lose the
// in-flight context — exactly what MUL-1128's B branch is meant to fix.
//
// Tasks that ended in a known "poisoned" terminal state are excluded so
// a rerun does not inherit the bad session. The daemon classifies these
// failures (iteration_limit, agent_fallback_message) when it detects the
// agent emitted a fallback marker instead of a real result.
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
var i GetLastTaskSessionRow
@@ -1167,7 +1197,7 @@ func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
ORDER BY created_at DESC
`
@@ -1206,6 +1236,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1218,7 +1249,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@@ -1257,6 +1288,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1365,7 +1397,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
@@ -1404,6 +1436,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1416,7 +1449,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
@@ -1455,6 +1488,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1467,15 +1501,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
}
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary FROM agent_task_queue atq
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.status IN ('queued', 'dispatched', 'running')
UNION ALL
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary, t.force_fresh_session FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
@@ -1536,6 +1570,7 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1554,7 +1589,7 @@ SET status = 'failed',
error = 'daemon restarted while task was in flight',
failure_reason = 'runtime_recovery'
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Called by the daemon at startup. Atomically fails any dispatched/running
@@ -1595,6 +1630,7 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
@@ -1685,7 +1721,7 @@ const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -1716,6 +1752,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}

View File

@@ -204,7 +204,7 @@ const createAutopilotTask = `-- name: CreateAutopilotTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, autopilot_run_id, trigger_summary)
VALUES ($1, $2, NULL, 'queued', $3, $4, $5)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CreateAutopilotTaskParams struct {
@@ -252,6 +252,7 @@ func (q *Queries) CreateAutopilotTask(ctx context.Context, arg CreateAutopilotTa
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}

View File

@@ -99,7 +99,7 @@ func (q *Queries) CreateChatSession(ctx context.Context, arg CreateChatSessionPa
const createChatTask = `-- name: CreateChatTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, chat_session_id)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
type CreateChatTaskParams struct {
@@ -142,6 +142,7 @@ func (q *Queries) CreateChatTask(ctx context.Context, arg CreateChatTaskParams)
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
)
return i, err
}

View File

@@ -67,30 +67,31 @@ type AgentSkill struct {
}
type AgentTaskQueue struct {
ID pgtype.UUID `json:"id"`
AgentID pgtype.UUID `json:"agent_id"`
IssueID pgtype.UUID `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt pgtype.Timestamptz `json:"dispatched_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
CompletedAt pgtype.Timestamptz `json:"completed_at"`
Result []byte `json:"result"`
Error pgtype.Text `json:"error"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
Context []byte `json:"context"`
RuntimeID pgtype.UUID `json:"runtime_id"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
ChatSessionID pgtype.UUID `json:"chat_session_id"`
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
Attempt int32 `json:"attempt"`
MaxAttempts int32 `json:"max_attempts"`
ParentTaskID pgtype.UUID `json:"parent_task_id"`
FailureReason pgtype.Text `json:"failure_reason"`
LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"`
TriggerSummary pgtype.Text `json:"trigger_summary"`
ID pgtype.UUID `json:"id"`
AgentID pgtype.UUID `json:"agent_id"`
IssueID pgtype.UUID `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt pgtype.Timestamptz `json:"dispatched_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
CompletedAt pgtype.Timestamptz `json:"completed_at"`
Result []byte `json:"result"`
Error pgtype.Text `json:"error"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
Context []byte `json:"context"`
RuntimeID pgtype.UUID `json:"runtime_id"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
ChatSessionID pgtype.UUID `json:"chat_session_id"`
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
Attempt int32 `json:"attempt"`
MaxAttempts int32 `json:"max_attempts"`
ParentTaskID pgtype.UUID `json:"parent_task_id"`
FailureReason pgtype.Text `json:"failure_reason"`
LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"`
TriggerSummary pgtype.Text `json:"trigger_summary"`
ForceFreshSession bool `json:"force_fresh_session"`
}
type Attachment struct {

View File

@@ -84,7 +84,7 @@ WHERE status IN ('dispatched', 'running')
AND runtime_id IN (
SELECT id FROM agent_runtime WHERE status = 'offline'
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
`
// Marks dispatched/running tasks as failed when their runtime is offline.
@@ -123,6 +123,7 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQ
&i.FailureReason,
&i.LastHeartbeatAt,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}

View File

@@ -65,8 +65,15 @@ WHERE agent_id = $1
ORDER BY created_at DESC;
-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary)
VALUES ($1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id), sqlc.narg(trigger_summary))
INSERT INTO agent_task_queue (
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
trigger_summary, force_fresh_session
)
VALUES (
$1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id),
sqlc.narg(trigger_summary),
COALESCE(sqlc.narg('force_fresh_session')::boolean, FALSE)
)
RETURNING *;
-- name: CreateQuickCreateTask :one
@@ -213,9 +220,17 @@ RETURNING *;
-- UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a
-- mid-run failure would silently start a fresh conversation and lose the
-- in-flight context — exactly what MUL-1128's B branch is meant to fix.
--
-- Tasks that ended in a known "poisoned" terminal state are excluded so
-- a rerun does not inherit the bad session. The daemon classifies these
-- failures (iteration_limit, agent_fallback_message) when it detects the
-- agent emitted a fallback marker instead of a real result.
SELECT session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2
AND status IN ('completed', 'failed')
AND (
status = 'completed'
OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message'))
)
AND session_id IS NOT NULL
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
LIMIT 1;