mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
fix(task): rerun starts a fresh session, skip poisoned resume (#1928)
* fix(task): rerun starts a fresh session, skip poisoned resume
When a task ended in a known agent fallback ("I reached the iteration
limit and couldn't generate a summary.", "Put your final update inside
the content string. Keep it concise.") the (agent_id, issue_id) resume
lookup would still pick that session, so a manual rerun inherited the
poisoned state and reproduced the same bad output.
Two complementary guards:
1. Daemon classifies poisoned terminal output and routes it through the
blocked path with failure_reason set ('iteration_limit' /
'agent_fallback_message'). GetLastTaskSession excludes failed tasks
with those reasons, so even comment-triggered tasks no longer resume
them. Tasks that failed mid-flight (timeout, runtime_recovery, etc.)
are still resumable, preserving MUL-1128's auto-retry contract.
2. Manual rerun marks the new task force_fresh_session=true. The daemon
claim handler skips the resume lookup entirely when the flag is set,
capturing the user-intent signal that "the prior output was bad" even
when poisoned classification misses a future fallback wording.
Auto-retry of orphaned mid-flight failures (MaybeRetryFailedTask →
CreateRetryTask) does not take this path, so it keeps resuming.
Tests: classifyPoisonedOutput unit test; integration tests assert the
SQL filter excludes poisoned classifiers, RerunIssue flips the flag,
and the normal enqueue path leaves it false.
Co-authored-by: multica-agent <github@multica.ai>
* fix(daemon): cap poisoned-output matcher to short trimmed text
GPT-Boy review on MUL-1630: the previous strings.Contains match would
classify any output that quoted the marker substring — including a
review/analysis that simply discussed the marker itself. Real fallback
messages are short single-sentence affairs, so cap the candidate at
~one paragraph and trim whitespace before matching. Adds regression
tests covering a long quoting review and a marker buried in a long
real conclusion; both must stay classified as completed.
Co-authored-by: multica-agent <github@multica.ai>
* fix(migrations): rename 065 force_fresh_session → 066 to clear collision
main introduced 065_project_resources after this branch was cut, so
both files shared the 065_ prefix. The readiness check
(server/cmd/server/health.go → migrations.LatestVersion) takes the
last entry by lexical order, which is 065_project_resources, leaving
this branch's 065_force_fresh_session unguarded — a deploy that
applied project_resources but not force_fresh_session would still
report ready, and the next enqueue / rerun / claim would crash on
"column force_fresh_session does not exist".
Renaming to 066_force_fresh_session puts it strictly after
project_resources so readiness blocks until it's applied.
Co-authored-by: multica-agent <github@multica.ai>
---------
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
193
server/cmd/server/rerun_session_test.go
Normal file
193
server/cmd/server/rerun_session_test.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/service"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// setupRerunTestFixture creates an issue assigned to the integration test
|
||||
// agent and returns (issueID, agentID, runtimeID).
|
||||
func setupRerunTestFixture(t *testing.T) (string, string, string) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
var agentID, runtimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT a.id, a.runtime_id FROM agent a
|
||||
JOIN member m ON m.workspace_id = a.workspace_id
|
||||
JOIN "user" u ON u.id = m.user_id
|
||||
WHERE u.email = $1
|
||||
LIMIT 1
|
||||
`, integrationTestEmail).Scan(&agentID, &runtimeID); err != nil {
|
||||
t.Fatalf("failed to find test agent: %v", err)
|
||||
}
|
||||
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id)
|
||||
SELECT $1, 'Rerun test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2
|
||||
FROM member m WHERE m.workspace_id = $1 LIMIT 1
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("failed to create test issue: %v", err)
|
||||
}
|
||||
|
||||
return issueID, agentID, runtimeID
|
||||
}
|
||||
|
||||
func cleanupRerunFixture(t *testing.T, issueID string) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, issueID)
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
}
|
||||
|
||||
// TestGetLastTaskSessionExcludesPoisonedFailures asserts that the
|
||||
// (agent_id, issue_id) resume lookup skips failed tasks whose
|
||||
// failure_reason classifies them as poisoned terminal output. This is the
|
||||
// SQL-level half of the rerun-poisoned-session fix: without the filter, a
|
||||
// rerun would inherit the same session and replay the same bad output.
|
||||
func TestGetLastTaskSessionExcludesPoisonedFailures(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, runtimeID := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Insert an older failed task with a poisoned classifier and a session_id.
|
||||
// The poisoned task is the *most recent* one, so without the filter the
|
||||
// resume lookup would return its session_id.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
|
||||
VALUES ($1, $2, $3, 'failed', 0, now() - interval '2 minutes', now() - interval '2 minutes', 'HEALTHY-SESSION', '/tmp/healthy', 'timeout')
|
||||
`, agentID, runtimeID, issueID); err != nil {
|
||||
t.Fatalf("insert healthy failed task: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
|
||||
VALUES ($1, $2, $3, 'failed', 0, now() - interval '1 minute', now() - interval '1 minute', 'POISONED-SESSION', '/tmp/poisoned', 'iteration_limit')
|
||||
`, agentID, runtimeID, issueID); err != nil {
|
||||
t.Fatalf("insert poisoned failed task: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(testPool)
|
||||
prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{
|
||||
AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true},
|
||||
IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("GetLastTaskSession failed: %v", err)
|
||||
}
|
||||
if !prior.SessionID.Valid {
|
||||
t.Fatal("expected to fall back to the healthy failed session, got no session")
|
||||
}
|
||||
if prior.SessionID.String == "POISONED-SESSION" {
|
||||
t.Fatal("rerun would inherit poisoned session — filter is not active")
|
||||
}
|
||||
if prior.SessionID.String != "HEALTHY-SESSION" {
|
||||
t.Fatalf("expected HEALTHY-SESSION, got %q", prior.SessionID.String)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetLastTaskSessionFallbackPoisonedClassifier covers the second
|
||||
// poisoned classifier so adding a third doesn't silently break this rule.
|
||||
func TestGetLastTaskSessionFallbackPoisonedClassifier(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, runtimeID := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, started_at, completed_at, session_id, work_dir, failure_reason)
|
||||
VALUES ($1, $2, $3, 'failed', 0, now() - interval '5 seconds', now() - interval '5 seconds', 'POISONED-FALLBACK', '/tmp/poisoned', 'agent_fallback_message')
|
||||
`, agentID, runtimeID, issueID); err != nil {
|
||||
t.Fatalf("insert poisoned failed task: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(testPool)
|
||||
prior, err := queries.GetLastTaskSession(ctx, db.GetLastTaskSessionParams{
|
||||
AgentID: pgtype.UUID{Bytes: parseUUIDBytes(agentID), Valid: true},
|
||||
IssueID: pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
|
||||
})
|
||||
if err == nil && prior.SessionID.Valid {
|
||||
t.Fatalf("expected no resumable session, got %q", prior.SessionID.String)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRerunIssueSetsForceFreshSession asserts the manual rerun flow flags
|
||||
// the new task so the daemon claim handler skips the resume lookup. This
|
||||
// is the call-site half of the fix: even if the SQL filter ever misses a
|
||||
// poisoned classifier, manual rerun never resumes.
|
||||
func TestRerunIssueSetsForceFreshSession(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, _, _ := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
queries := db.New(testPool)
|
||||
hub := realtime.NewHub()
|
||||
go hub.Run()
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
task, err := taskService.RerunIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, pgtype.UUID{})
|
||||
if err != nil {
|
||||
t.Fatalf("RerunIssue failed: %v", err)
|
||||
}
|
||||
if task == nil {
|
||||
t.Fatal("RerunIssue returned nil task")
|
||||
}
|
||||
if !task.ForceFreshSession {
|
||||
t.Fatal("expected manual rerun to set force_fresh_session=true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueTaskForIssueDoesNotForceFreshSession is the negative control
|
||||
// for the rerun flag: the normal enqueue path must leave the flag false so
|
||||
// auto-retry / comment-triggered tasks keep resuming the prior session
|
||||
// (MUL-1128 contract).
|
||||
func TestEnqueueTaskForIssueDoesNotForceFreshSession(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, _, _ := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
queries := db.New(testPool)
|
||||
hub := realtime.NewHub()
|
||||
go hub.Run()
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
issue, err := queries.GetIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true})
|
||||
if err != nil {
|
||||
t.Fatalf("load issue: %v", err)
|
||||
}
|
||||
task, err := taskService.EnqueueTaskForIssue(ctx, issue)
|
||||
if err != nil {
|
||||
t.Fatalf("EnqueueTaskForIssue failed: %v", err)
|
||||
}
|
||||
if task.ForceFreshSession {
|
||||
t.Fatal("expected normal enqueue to leave force_fresh_session=false")
|
||||
}
|
||||
}
|
||||
@@ -1393,6 +1393,27 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
Usage: usageEntries,
|
||||
}, nil
|
||||
}
|
||||
// Detect "poisoned" terminal output: the agent didn't reach a real
|
||||
// conclusion but emitted a known fallback marker (iteration limit,
|
||||
// fallback meta message). Route through the blocked path with a
|
||||
// specific failure_reason so the server can exclude this session
|
||||
// from the (agent_id, issue_id) resume lookup — otherwise a manual
|
||||
// rerun would inherit the same poisoned session and reproduce the
|
||||
// same bad output.
|
||||
if reason, ok := classifyPoisonedOutput(result.Output); ok {
|
||||
taskLog.Warn("agent finished with poisoned fallback output, classifying as blocked",
|
||||
"failure_reason", reason,
|
||||
)
|
||||
return TaskResult{
|
||||
Status: "blocked",
|
||||
Comment: result.Output,
|
||||
SessionID: result.SessionID,
|
||||
WorkDir: env.WorkDir,
|
||||
EnvRoot: env.RootDir,
|
||||
Usage: usageEntries,
|
||||
FailureReason: reason,
|
||||
}, nil
|
||||
}
|
||||
return TaskResult{
|
||||
Status: "completed",
|
||||
Comment: result.Output,
|
||||
|
||||
57
server/internal/daemon/poisoned.go
Normal file
57
server/internal/daemon/poisoned.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package daemon
|
||||
|
||||
import "strings"
|
||||
|
||||
// FailureReason values for tasks that "completed" with output but the
|
||||
// output is actually a known agent fallback marker — i.e. the agent gave
|
||||
// up and emitted a meta message instead of a real result. Listed here so
|
||||
// the server-side query GetLastTaskSession can filter them out and a
|
||||
// rerun starts from a fresh agent session instead of resuming the same
|
||||
// poisoned conversation.
|
||||
const (
|
||||
FailureReasonIterationLimit = "iteration_limit"
|
||||
FailureReasonAgentFallbackMsg = "agent_fallback_message"
|
||||
)
|
||||
|
||||
// poisonedOutputMaxLen caps how long an output can be and still be
|
||||
// classified as a poisoned fallback. Real fallback messages are short,
|
||||
// one-sentence affairs; a long output that happens to mention a marker
|
||||
// is almost certainly a real conclusion (e.g. a code-review reply
|
||||
// quoting these strings, like the one currently quoting them in
|
||||
// MUL-1630). The cap intentionally errs on the side of NOT classifying
|
||||
// — a missed poisoned task gets retried by user action, but a
|
||||
// false-positive turns a successful task into a failure and a system
|
||||
// comment.
|
||||
const poisonedOutputMaxLen = 320
|
||||
|
||||
// poisonedMarkers maps a substring fingerprint of a known agent fallback
|
||||
// terminal message to its failure_reason classifier. Match is case-
|
||||
// insensitive and substring-based; the cap above prevents long outputs
|
||||
// that quote a marker from being misclassified.
|
||||
var poisonedMarkers = []struct {
|
||||
Substring string
|
||||
Reason string
|
||||
}{
|
||||
{"i reached the iteration limit", FailureReasonIterationLimit},
|
||||
{"put your final update inside the content string", FailureReasonAgentFallbackMsg},
|
||||
}
|
||||
|
||||
// classifyPoisonedOutput reports whether output matches a known agent
|
||||
// fallback terminal message and, if so, returns the failure_reason that
|
||||
// should be persisted on the task row. Long outputs are never
|
||||
// classified: a real fallback is the agent's only utterance for the
|
||||
// turn, so anything beyond ~one paragraph is treated as a real result
|
||||
// even if it contains a marker substring.
|
||||
func classifyPoisonedOutput(output string) (string, bool) {
|
||||
trimmed := strings.TrimSpace(output)
|
||||
if trimmed == "" || len(trimmed) > poisonedOutputMaxLen {
|
||||
return "", false
|
||||
}
|
||||
lowered := strings.ToLower(trimmed)
|
||||
for _, m := range poisonedMarkers {
|
||||
if strings.Contains(lowered, m.Substring) {
|
||||
return m.Reason, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
86
server/internal/daemon/poisoned_test.go
Normal file
86
server/internal/daemon/poisoned_test.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClassifyPoisonedOutput(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
output string
|
||||
wantOK bool
|
||||
wantReason string
|
||||
}{
|
||||
{
|
||||
name: "iteration limit canonical",
|
||||
output: "I reached the iteration limit and couldn't generate a summary.",
|
||||
wantOK: true,
|
||||
wantReason: FailureReasonIterationLimit,
|
||||
},
|
||||
{
|
||||
name: "iteration limit case insensitive",
|
||||
output: "I REACHED THE ITERATION LIMIT and stopped",
|
||||
wantOK: true,
|
||||
wantReason: FailureReasonIterationLimit,
|
||||
},
|
||||
{
|
||||
name: "fallback meta message",
|
||||
output: "Put your final update inside the content string. Keep it concise.",
|
||||
wantOK: true,
|
||||
wantReason: FailureReasonAgentFallbackMsg,
|
||||
},
|
||||
{
|
||||
name: "real conclusion is not poisoned",
|
||||
output: "Fixed the bug in auth.go and pushed PR #42.",
|
||||
wantOK: false,
|
||||
},
|
||||
{
|
||||
name: "empty output",
|
||||
output: "",
|
||||
wantOK: false,
|
||||
},
|
||||
{
|
||||
name: "mentions iteration but not the marker",
|
||||
output: "Each iteration of the loop processes one record.",
|
||||
wantOK: false,
|
||||
},
|
||||
{
|
||||
// Regression guard for the GPT-Boy review on MUL-1630:
|
||||
// a real review/analysis that quotes both markers must not
|
||||
// be misclassified. Without the length cap, this entire
|
||||
// PR's review thread would tank as a poisoned failure.
|
||||
name: "long review quoting both markers is not poisoned",
|
||||
output: `Review for the rerun fix.
|
||||
|
||||
Detection markers under consideration:
|
||||
- "I reached the iteration limit and couldn't generate a summary."
|
||||
- "Put your final update inside the content string. Keep it concise."
|
||||
|
||||
The implementation looks correct: the daemon classifies these as
|
||||
fallback output, persists a dedicated failure_reason, and the SQL
|
||||
filter excludes them from the resume lookup. Auto-retry of mid-flight
|
||||
orphans still keeps the resume contract because CreateRetryTask does
|
||||
not set force_fresh_session. Approving with a follow-up note about
|
||||
the matcher being too permissive on long outputs.`,
|
||||
wantOK: false,
|
||||
},
|
||||
{
|
||||
name: "marker buried inside a long agent conclusion",
|
||||
output: strings.Repeat("All checks passed and the bug is fixed. ", 10) + "i reached the iteration limit while debugging earlier.",
|
||||
wantOK: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
reason, ok := classifyPoisonedOutput(tc.output)
|
||||
if ok != tc.wantOK {
|
||||
t.Fatalf("classifyPoisonedOutput(%q) ok=%v, want %v", tc.output, ok, tc.wantOK)
|
||||
}
|
||||
if ok && reason != tc.wantReason {
|
||||
t.Fatalf("classifyPoisonedOutput(%q) reason=%q, want %q", tc.output, reason, tc.wantReason)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -105,6 +105,6 @@ type TaskResult struct {
|
||||
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
|
||||
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
|
||||
EnvRoot string `json:"-"` // env root dir for writing GC metadata (not sent to server)
|
||||
FailureReason string `json:"-"` // internal server failure classification
|
||||
FailureReason string `json:"-"` // classifier forwarded to FailTask on the blocked path; empty falls back to 'agent_error'
|
||||
Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage
|
||||
}
|
||||
|
||||
@@ -944,13 +944,20 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Look up the prior session for this (agent, issue) pair so the daemon
|
||||
// can resume the Claude Code conversation context.
|
||||
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
|
||||
AgentID: task.AgentID,
|
||||
IssueID: task.IssueID,
|
||||
}); err == nil && prior.SessionID.Valid {
|
||||
resp.PriorSessionID = prior.SessionID.String
|
||||
if prior.WorkDir.Valid {
|
||||
resp.PriorWorkDir = prior.WorkDir.String
|
||||
//
|
||||
// Skip the lookup when the task was flagged as a manual rerun: the
|
||||
// user just judged the prior output bad, so the daemon must start a
|
||||
// fresh agent session instead of resuming the same conversation that
|
||||
// produced that output.
|
||||
if !task.ForceFreshSession {
|
||||
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
|
||||
AgentID: task.AgentID,
|
||||
IssueID: task.IssueID,
|
||||
}); err == nil && prior.SessionID.Valid {
|
||||
resp.PriorSessionID = prior.SessionID.String
|
||||
if prior.WorkDir.Valid {
|
||||
resp.PriorWorkDir = prior.WorkDir.String
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +96,19 @@ func NewTaskService(q *db.Queries, tx TxStarter, hub *realtime.Hub, bus *events.
|
||||
// No context snapshot is stored — the agent fetches all data it needs at
|
||||
// runtime via the multica CLI.
|
||||
func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) {
|
||||
var commentID pgtype.UUID
|
||||
if len(triggerCommentID) > 0 {
|
||||
commentID = triggerCommentID[0]
|
||||
}
|
||||
return s.enqueueIssueTask(ctx, issue, commentID, false)
|
||||
}
|
||||
|
||||
// enqueueIssueTask is the shared implementation behind EnqueueTaskForIssue
|
||||
// and the manual rerun path. forceFreshSession=true marks the task so the
|
||||
// daemon claim handler skips the (agent_id, issue_id) resume lookup — the
|
||||
// user already judged the prior output bad, a fresh agent session is the
|
||||
// expected behavior.
|
||||
func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool) (db.AgentTaskQueue, error) {
|
||||
if !issue.AssigneeID.Valid {
|
||||
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
|
||||
@@ -115,25 +128,26 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, t
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
|
||||
}
|
||||
|
||||
var commentID pgtype.UUID
|
||||
if len(triggerCommentID) > 0 {
|
||||
commentID = triggerCommentID[0]
|
||||
}
|
||||
|
||||
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
|
||||
AgentID: issue.AssigneeID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
IssueID: issue.ID,
|
||||
Priority: priorityToInt(issue.Priority),
|
||||
TriggerCommentID: commentID,
|
||||
TriggerSummary: s.buildCommentTriggerSummary(ctx, commentID),
|
||||
AgentID: issue.AssigneeID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
IssueID: issue.ID,
|
||||
Priority: priorityToInt(issue.Priority),
|
||||
TriggerCommentID: triggerCommentID,
|
||||
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
|
||||
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(issue.AssigneeID))
|
||||
slog.Info("task enqueued",
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
"issue_id", util.UUIDToString(issue.ID),
|
||||
"agent_id", util.UUIDToString(issue.AssigneeID),
|
||||
"force_fresh_session", forceFreshSession,
|
||||
)
|
||||
// Order matters: broadcast first, notify daemon second. notifyTaskAvailable
|
||||
// kicks an in-process channel that the daemon picks up over HTTP and
|
||||
// claims; the claim path then emits its own task:dispatch. Doing the
|
||||
@@ -874,9 +888,15 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
|
||||
}
|
||||
|
||||
// RerunIssue creates a fresh queued task for the agent currently assigned
|
||||
// to the issue. Used by the manual rerun endpoint. Carries the most recent
|
||||
// session_id/work_dir on the issue (across any status) so the new run
|
||||
// resumes from where the prior one left off when the backend supports it.
|
||||
// to the issue. Used by the manual rerun endpoint.
|
||||
//
|
||||
// The new task is flagged force_fresh_session=true so the daemon starts a
|
||||
// clean agent session instead of resuming the prior (agent_id, issue_id)
|
||||
// session. A user clicking rerun has just judged the prior output bad —
|
||||
// resuming the same conversation would replay the same poisoned state.
|
||||
// Auto-retry of an orphaned mid-flight failure (HandleFailedTasks →
|
||||
// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so
|
||||
// MUL-1128's mid-flight resume contract is preserved.
|
||||
//
|
||||
// Only tasks belonging to the issue's current assignee are cancelled.
|
||||
// Tasks owned by other agents on the same issue (e.g. a parallel
|
||||
@@ -909,7 +929,7 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
|
||||
}
|
||||
|
||||
task, err := s.EnqueueTaskForIssue(ctx, issue, triggerCommentID)
|
||||
task, err := s.enqueueIssueTask(ctx, issue, triggerCommentID, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
1
server/migrations/066_force_fresh_session.down.sql
Normal file
1
server/migrations/066_force_fresh_session.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
ALTER TABLE agent_task_queue DROP COLUMN force_fresh_session;
|
||||
8
server/migrations/066_force_fresh_session.up.sql
Normal file
8
server/migrations/066_force_fresh_session.up.sql
Normal file
@@ -0,0 +1,8 @@
|
||||
-- Per-task signal that the manual rerun flow uses to short-circuit the
|
||||
-- (agent_id, issue_id) session resume lookup. Set when a user clicks
|
||||
-- rerun: the user just judged the prior output bad, so the daemon must
|
||||
-- start a fresh agent session instead of resuming the same conversation
|
||||
-- that produced the bad result. Auto-retry of an orphaned mid-flight
|
||||
-- failure leaves this FALSE so MUL-1128's resume contract is preserved.
|
||||
ALTER TABLE agent_task_queue
|
||||
ADD COLUMN force_fresh_session BOOLEAN NOT NULL DEFAULT FALSE;
|
||||
@@ -55,7 +55,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
||||
@@ -86,6 +86,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -94,7 +95,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
|
||||
@@ -136,6 +137,7 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -151,7 +153,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Cancels every active task on the issue and returns the affected rows so the
|
||||
@@ -193,6 +195,7 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -208,7 +211,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CancelAgentTasksByIssueAndAgentParams struct {
|
||||
@@ -254,6 +257,7 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -269,7 +273,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Cancels active tasks whose trigger is the given comment. Called when a
|
||||
@@ -311,6 +315,7 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -349,7 +354,7 @@ WHERE id = (
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
|
||||
@@ -389,6 +394,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -432,7 +438,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
|
||||
WHERE id = $1 AND status = 'running'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CompleteAgentTaskParams struct {
|
||||
@@ -475,6 +481,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -564,18 +571,26 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
|
||||
}
|
||||
|
||||
const createAgentTask = `-- name: CreateAgentTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary)
|
||||
VALUES ($1, $2, $3, 'queued', $4, $5, $6)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
INSERT INTO agent_task_queue (
|
||||
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
||||
trigger_summary, force_fresh_session
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, 'queued', $4, $5,
|
||||
$6,
|
||||
COALESCE($7::boolean, FALSE)
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CreateAgentTaskParams struct {
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
Priority int32 `json:"priority"`
|
||||
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
||||
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
Priority int32 `json:"priority"`
|
||||
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
||||
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
||||
ForceFreshSession pgtype.Bool `json:"force_fresh_session"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
|
||||
@@ -586,6 +601,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
arg.Priority,
|
||||
arg.TriggerCommentID,
|
||||
arg.TriggerSummary,
|
||||
arg.ForceFreshSession,
|
||||
)
|
||||
var i AgentTaskQueue
|
||||
err := row.Scan(
|
||||
@@ -613,6 +629,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -620,7 +637,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CreateQuickCreateTaskParams struct {
|
||||
@@ -666,6 +683,7 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -684,7 +702,7 @@ SELECT
|
||||
p.attempt + 1, p.max_attempts, p.id
|
||||
FROM agent_task_queue p
|
||||
WHERE p.id = $1
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Clones a parent task into a fresh queued attempt. Carries forward the
|
||||
@@ -719,6 +737,7 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -732,7 +751,7 @@ SET status = 'failed',
|
||||
session_id = COALESCE($4, session_id),
|
||||
work_dir = COALESCE($5, work_dir)
|
||||
WHERE id = $1 AND status IN ('dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type FailAgentTaskParams struct {
|
||||
@@ -786,6 +805,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -796,7 +816,7 @@ SET status = 'failed', completed_at = now(), error = 'task timed out',
|
||||
failure_reason = 'timeout'
|
||||
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
|
||||
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type FailStaleTasksParams struct {
|
||||
@@ -841,6 +861,7 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams)
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -926,7 +947,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
|
||||
}
|
||||
|
||||
const getAgentTask = `-- name: GetAgentTask :one
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
@@ -958,6 +979,7 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -965,7 +987,10 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
|
||||
const getLastTaskSession = `-- name: GetLastTaskSession :one
|
||||
SELECT session_id, work_dir FROM agent_task_queue
|
||||
WHERE agent_id = $1 AND issue_id = $2
|
||||
AND status IN ('completed', 'failed')
|
||||
AND (
|
||||
status = 'completed'
|
||||
OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message'))
|
||||
)
|
||||
AND session_id IS NOT NULL
|
||||
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
|
||||
LIMIT 1
|
||||
@@ -989,6 +1014,11 @@ type GetLastTaskSessionRow struct {
|
||||
// UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a
|
||||
// mid-run failure would silently start a fresh conversation and lose the
|
||||
// in-flight context — exactly what MUL-1128's B branch is meant to fix.
|
||||
//
|
||||
// Tasks that ended in a known "poisoned" terminal state are excluded so
|
||||
// a rerun does not inherit the bad session. The daemon classifies these
|
||||
// failures (iteration_limit, agent_fallback_message) when it detects the
|
||||
// agent emitted a fallback marker instead of a real result.
|
||||
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
|
||||
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
|
||||
var i GetLastTaskSessionRow
|
||||
@@ -1167,7 +1197,7 @@ func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams
|
||||
}
|
||||
|
||||
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -1206,6 +1236,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1218,7 +1249,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
}
|
||||
|
||||
const listAgentTasks = `-- name: ListAgentTasks :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE agent_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -1257,6 +1288,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1365,7 +1397,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
|
||||
}
|
||||
|
||||
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
@@ -1404,6 +1436,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1416,7 +1449,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
}
|
||||
|
||||
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session FROM agent_task_queue
|
||||
WHERE issue_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -1455,6 +1488,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1467,15 +1501,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
|
||||
}
|
||||
|
||||
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary FROM agent_task_queue atq
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.status IN ('queued', 'dispatched', 'running')
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary FROM (
|
||||
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary
|
||||
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary, t.force_fresh_session FROM (
|
||||
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary, atq.force_fresh_session
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
@@ -1536,6 +1570,7 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1554,7 +1589,7 @@ SET status = 'failed',
|
||||
error = 'daemon restarted while task was in flight',
|
||||
failure_reason = 'runtime_recovery'
|
||||
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Called by the daemon at startup. Atomically fails any dispatched/running
|
||||
@@ -1595,6 +1630,7 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1685,7 +1721,7 @@ const startAgentTask = `-- name: StartAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'running', started_at = now()
|
||||
WHERE id = $1 AND status = 'dispatched'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
||||
@@ -1716,6 +1752,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ const createAutopilotTask = `-- name: CreateAutopilotTask :one
|
||||
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, autopilot_run_id, trigger_summary)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4, $5)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CreateAutopilotTaskParams struct {
|
||||
@@ -252,6 +252,7 @@ func (q *Queries) CreateAutopilotTask(ctx context.Context, arg CreateAutopilotTa
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ func (q *Queries) CreateChatSession(ctx context.Context, arg CreateChatSessionPa
|
||||
const createChatTask = `-- name: CreateChatTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, chat_session_id)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CreateChatTaskParams struct {
|
||||
@@ -142,6 +142,7 @@ func (q *Queries) CreateChatTask(ctx context.Context, arg CreateChatTaskParams)
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -67,30 +67,31 @@ type AgentSkill struct {
|
||||
}
|
||||
|
||||
type AgentTaskQueue struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
Status string `json:"status"`
|
||||
Priority int32 `json:"priority"`
|
||||
DispatchedAt pgtype.Timestamptz `json:"dispatched_at"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
CompletedAt pgtype.Timestamptz `json:"completed_at"`
|
||||
Result []byte `json:"result"`
|
||||
Error pgtype.Text `json:"error"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
Context []byte `json:"context"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
SessionID pgtype.Text `json:"session_id"`
|
||||
WorkDir pgtype.Text `json:"work_dir"`
|
||||
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
|
||||
Attempt int32 `json:"attempt"`
|
||||
MaxAttempts int32 `json:"max_attempts"`
|
||||
ParentTaskID pgtype.UUID `json:"parent_task_id"`
|
||||
FailureReason pgtype.Text `json:"failure_reason"`
|
||||
LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"`
|
||||
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
||||
ID pgtype.UUID `json:"id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
Status string `json:"status"`
|
||||
Priority int32 `json:"priority"`
|
||||
DispatchedAt pgtype.Timestamptz `json:"dispatched_at"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
CompletedAt pgtype.Timestamptz `json:"completed_at"`
|
||||
Result []byte `json:"result"`
|
||||
Error pgtype.Text `json:"error"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
Context []byte `json:"context"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
SessionID pgtype.Text `json:"session_id"`
|
||||
WorkDir pgtype.Text `json:"work_dir"`
|
||||
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
AutopilotRunID pgtype.UUID `json:"autopilot_run_id"`
|
||||
Attempt int32 `json:"attempt"`
|
||||
MaxAttempts int32 `json:"max_attempts"`
|
||||
ParentTaskID pgtype.UUID `json:"parent_task_id"`
|
||||
FailureReason pgtype.Text `json:"failure_reason"`
|
||||
LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"`
|
||||
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
||||
ForceFreshSession bool `json:"force_fresh_session"`
|
||||
}
|
||||
|
||||
type Attachment struct {
|
||||
|
||||
@@ -84,7 +84,7 @@ WHERE status IN ('dispatched', 'running')
|
||||
AND runtime_id IN (
|
||||
SELECT id FROM agent_runtime WHERE status = 'offline'
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
// Marks dispatched/running tasks as failed when their runtime is offline.
|
||||
@@ -123,6 +123,7 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQ
|
||||
&i.FailureReason,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -65,8 +65,15 @@ WHERE agent_id = $1
|
||||
ORDER BY created_at DESC;
|
||||
|
||||
-- name: CreateAgentTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary)
|
||||
VALUES ($1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id), sqlc.narg(trigger_summary))
|
||||
INSERT INTO agent_task_queue (
|
||||
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
||||
trigger_summary, force_fresh_session
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, 'queued', $4, sqlc.narg(trigger_comment_id),
|
||||
sqlc.narg(trigger_summary),
|
||||
COALESCE(sqlc.narg('force_fresh_session')::boolean, FALSE)
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: CreateQuickCreateTask :one
|
||||
@@ -213,9 +220,17 @@ RETURNING *;
|
||||
-- UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a
|
||||
-- mid-run failure would silently start a fresh conversation and lose the
|
||||
-- in-flight context — exactly what MUL-1128's B branch is meant to fix.
|
||||
--
|
||||
-- Tasks that ended in a known "poisoned" terminal state are excluded so
|
||||
-- a rerun does not inherit the bad session. The daemon classifies these
|
||||
-- failures (iteration_limit, agent_fallback_message) when it detects the
|
||||
-- agent emitted a fallback marker instead of a real result.
|
||||
SELECT session_id, work_dir FROM agent_task_queue
|
||||
WHERE agent_id = $1 AND issue_id = $2
|
||||
AND status IN ('completed', 'failed')
|
||||
AND (
|
||||
status = 'completed'
|
||||
OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message'))
|
||||
)
|
||||
AND session_id IS NOT NULL
|
||||
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
|
||||
LIMIT 1;
|
||||
|
||||
Reference in New Issue
Block a user