mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-27 17:47:43 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/a2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45a379a36f | ||
|
|
d175a9cbb2 |
@@ -77,8 +77,9 @@ multica issue rerun <issue-id>
|
||||
|
||||
Behavior:
|
||||
|
||||
- Targets the issue's **current agent assignee** — not whoever ran the most recent task. If the assignee changed since the last run, rerun follows the current assignment. To rerun a specific agent that is no longer the assignee, reassign the issue first, then rerun.
|
||||
- **Cancels** the assignee's queued or running task on this issue (if any). Tasks owned by other agents on the same issue (e.g. parallel @-mention runs) are left alone.
|
||||
- By default, targets the issue's **current agent assignee** — useful when you want the rerun to follow the current assignment regardless of who ran the prior task.
|
||||
- The execution-log retry button on a specific row sends that row's task ID alongside, so the rerun targets **the agent that ran that exact task** — not the current assignee. This makes per-row retry meaningful for squad workers, parallel @-mention agents, or rows whose agent has since been displaced by a reassignment.
|
||||
- **Cancels** the target agent's queued or running task on this issue (if any). Tasks owned by other agents on the same issue (e.g. parallel @-mention runs) are left alone.
|
||||
- Creates a **brand-new** task — attempt count resets to 1, even if the original task hit the attempt ceiling.
|
||||
- Starts a **fresh agent session** — the prior session ID is **not** inherited. A manual rerun means you've judged the previous output bad, so resuming the same conversation would replay the same poisoned state. (Automatic retry, by contrast, does inherit the session — that path is for infrastructure failures, not bad output.)
|
||||
|
||||
@@ -89,7 +90,7 @@ Comparison:
|
||||
| Trigger | System, based on failure reason | You, manually |
|
||||
| Ceiling | 2 attempts | No limit |
|
||||
| Applicable sources | Issues, chat | Issues with an agent assignee |
|
||||
| Agent picked | Same agent as the failed task | Issue's current assignee |
|
||||
| Agent picked | Same agent as the failed task | Source task's agent (UI per-row retry) or issue's current assignee (CLI / no task_id) |
|
||||
| Session inheritance | Yes (resumes prior session) | No (fresh session) |
|
||||
|
||||
## How a failed task affects issue status
|
||||
|
||||
@@ -77,8 +77,9 @@ multica issue rerun <issue-id>
|
||||
|
||||
行为:
|
||||
|
||||
- 跑的是 issue **当前的智能体分配人**——不是上一次跑过的 agent。如果分配人在上次运行后改了,rerun 会跟着新的分配人走。要重跑一个已经不再是分配人的智能体,先把 issue 改派回它,再 rerun。
|
||||
- **取消**该分配人在这条 issue 上 queued / running 的任务(如果有)。同 issue 上其它 agent 的任务(例如 @-mention 触发的并行任务)不会被一起取消。
|
||||
- 默认跑的是 issue **当前的智能体分配人**——适用于希望 rerun 跟随当前分配人的场景。
|
||||
- 执行日志里某一行的 retry 按钮会把这一行的 task ID 一并发出,rerun 会**针对那一行原本的 agent**,而不是当前分配人。这让 squad worker、并行的 @-mention agent、或者已经被新分配人替代的旧任务行的 retry 按钮都能符合直觉地工作。
|
||||
- **取消**目标 agent 在这条 issue 上 queued / running 的任务(如果有)。同 issue 上其它 agent 的任务(例如 @-mention 触发的并行任务)不会被一起取消。
|
||||
- 创建一个**全新**的执行任务——尝试次数重置为 1,即使原任务已达最大尝试。
|
||||
- 启动**全新的智能体会话**——**不**继承之前的会话 ID。手动重跑意味着你已经判定上一次的产出不行,再继续之前的对话只会重放被污染的上下文。(自动重试则相反,会继承会话——那条路径处理的是基础设施层面的失败,不是产出不好。)
|
||||
|
||||
@@ -89,7 +90,7 @@ multica issue rerun <issue-id>
|
||||
| 触发 | 系统基于失败原因自动执行 | 你主动发起 |
|
||||
| 上限 | 2 次 | 无上限 |
|
||||
| 适用来源 | issue、聊天 | 有智能体分配人的 issue |
|
||||
| 跑哪个 agent | 失败任务原本的 agent | issue 当前的分配人 |
|
||||
| 跑哪个 agent | 失败任务原本的 agent | UI 单行 retry:那一行任务的 agent;CLI / 不带 task_id:issue 当前的分配人 |
|
||||
| 会话继承 | 是(接着上次会话) | 否(全新会话) |
|
||||
|
||||
## 失败的任务对 issue 状态有什么影响
|
||||
|
||||
@@ -1023,9 +1023,10 @@ export class ApiClient {
|
||||
});
|
||||
}
|
||||
|
||||
async rerunIssue(issueId: string): Promise<AgentTask> {
|
||||
async rerunIssue(issueId: string, taskId?: string): Promise<AgentTask> {
|
||||
return this.fetch(`/api/issues/${issueId}/rerun`, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(taskId ? { task_id: taskId } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -347,17 +347,18 @@ function PastRow({ task, issueId }: { task: AgentTask; issueId: string }) {
|
||||
? failureReasonLabel[task.failure_reason as TaskFailureReason]
|
||||
: null;
|
||||
|
||||
// Retry only makes sense for terminal-but-not-success rows. The rerun
|
||||
// endpoint creates a fresh task on the issue's current agent assignee
|
||||
// (not necessarily this row's agent) — clicking retry on a row whose
|
||||
// agent has since been reassigned will rerun under the new assignee.
|
||||
// Retry only makes sense for terminal-but-not-success rows. Passing
|
||||
// task.id targets this specific row's agent — without it, the rerun
|
||||
// endpoint would fall back to the issue's current assignee and the
|
||||
// wrong agent would fire on rows whose agent has since been displaced
|
||||
// (e.g. reassignment, squad worker, or a one-off @-mention agent).
|
||||
const canRetry = task.status === "failed" || task.status === "cancelled";
|
||||
|
||||
const handleRetry = async () => {
|
||||
if (retrying) return;
|
||||
setRetrying(true);
|
||||
try {
|
||||
await api.rerunIssue(issueId);
|
||||
await api.rerunIssue(issueId, task.id);
|
||||
} catch (e) {
|
||||
toast.error(e instanceof Error ? e.message : t(($) => $.execution_log.retry_failed));
|
||||
} finally {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/service"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
)
|
||||
|
||||
// setupRerunTestFixture creates an issue assigned to the integration test
|
||||
@@ -31,9 +32,13 @@ func setupRerunTestFixture(t *testing.T) (string, string, string) {
|
||||
}
|
||||
|
||||
var issueID string
|
||||
// Pick the next per-workspace number to avoid colliding with the
|
||||
// uq_issue_workspace_number unique constraint when multiple fixtures
|
||||
// coexist in the same test (e.g. TestRerunIssueRejectsCrossIssueTask).
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id)
|
||||
SELECT $1, 'Rerun test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number)
|
||||
SELECT $1, 'Rerun test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1)
|
||||
FROM member m WHERE m.workspace_id = $1 LIMIT 1
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID).Scan(&issueID); err != nil {
|
||||
@@ -274,7 +279,7 @@ func TestRerunIssueSetsForceFreshSession(t *testing.T) {
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
task, err := taskService.RerunIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, pgtype.UUID{})
|
||||
task, err := taskService.RerunIssue(ctx, pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true}, pgtype.UUID{}, pgtype.UUID{})
|
||||
if err != nil {
|
||||
t.Fatalf("RerunIssue failed: %v", err)
|
||||
}
|
||||
@@ -286,6 +291,218 @@ func TestRerunIssueSetsForceFreshSession(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRerunIssueTargetsSourceTaskAgent asserts that when a source task ID is
|
||||
// supplied (the execution-log retry-button path), the rerun targets the agent
|
||||
// that ran that specific past task — not the issue's current assignee.
|
||||
// Without this, clicking retry on a row whose agent has since been displaced
|
||||
// (squad worker, @-mention agent, or a prior assignee) re-fires the new
|
||||
// assignee instead, which is the MUL-2457 bug.
|
||||
func TestRerunIssueTargetsSourceTaskAgent(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, primaryAgentID, runtimeID := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a second agent in the same workspace + runtime so we can stand
|
||||
// in as a "row whose agent is no longer the issue assignee" — e.g. a
|
||||
// squad worker or an @-mentioned agent. The issue's assignee is still
|
||||
// the primary agent; the rerun must target this secondary one because
|
||||
// that's whose task row the user clicked.
|
||||
var secondaryAgentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id
|
||||
)
|
||||
SELECT a.workspace_id, 'Rerun Secondary Agent', '', 'cloud', '{}'::jsonb,
|
||||
a.runtime_id, 'workspace', 1, a.owner_id
|
||||
FROM agent a WHERE a.id = $1
|
||||
RETURNING id
|
||||
`, primaryAgentID).Scan(&secondaryAgentID); err != nil {
|
||||
t.Fatalf("create secondary agent: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE agent_id = $1`, secondaryAgentID)
|
||||
testPool.Exec(ctx, `DELETE FROM agent WHERE id = $1`, secondaryAgentID)
|
||||
})
|
||||
|
||||
// Insert a failed past task on this issue under the secondary agent —
|
||||
// the row the user is about to click retry on.
|
||||
var sourceTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority,
|
||||
started_at, completed_at, failure_reason)
|
||||
VALUES ($1, $2, $3, 'failed', 0,
|
||||
now() - interval '1 minute', now() - interval '30 seconds', 'agent_error')
|
||||
RETURNING id
|
||||
`, secondaryAgentID, runtimeID, issueID).Scan(&sourceTaskID); err != nil {
|
||||
t.Fatalf("insert source task: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(testPool)
|
||||
hub := realtime.NewHub()
|
||||
go hub.Run()
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
task, err := taskService.RerunIssue(
|
||||
ctx,
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(sourceTaskID), Valid: true},
|
||||
pgtype.UUID{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("RerunIssue failed: %v", err)
|
||||
}
|
||||
if task == nil {
|
||||
t.Fatal("RerunIssue returned nil task")
|
||||
}
|
||||
|
||||
gotAgent := util.UUIDToString(task.AgentID)
|
||||
if gotAgent != secondaryAgentID {
|
||||
t.Fatalf("rerun targeted wrong agent: got %s, want %s (issue assignee is %s — must not be picked)",
|
||||
gotAgent, secondaryAgentID, primaryAgentID)
|
||||
}
|
||||
if !task.ForceFreshSession {
|
||||
t.Fatal("expected per-row rerun to also set force_fresh_session=true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRerunIssueRejectsCrossIssueTask asserts a source task whose IssueID
|
||||
// does not match the rerun target is rejected — both as defense-in-depth
|
||||
// against malicious requests and because picking up an unrelated task's
|
||||
// agent would silently misroute the rerun.
|
||||
func TestRerunIssueRejectsCrossIssueTask(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueAID, agentID, runtimeID := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueAID) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Second issue in the same workspace, with a task that does NOT belong
|
||||
// to issue A. The handler must reject this. Take the next available
|
||||
// per-workspace number so the uq_issue_workspace_number constraint
|
||||
// (both issues default to number=0 otherwise) doesn't fire before the
|
||||
// rerun assertion can.
|
||||
var issueBID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number)
|
||||
SELECT $1, 'Rerun cross-issue test', 'todo', 'none', 'member', m.user_id, 'agent', $2,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1)
|
||||
FROM member m WHERE m.workspace_id = $1 LIMIT 1
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID).Scan(&issueBID); err != nil {
|
||||
t.Fatalf("create second issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueBID) })
|
||||
|
||||
var crossTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority,
|
||||
started_at, completed_at, failure_reason)
|
||||
VALUES ($1, $2, $3, 'failed', 0,
|
||||
now() - interval '1 minute', now() - interval '30 seconds', 'agent_error')
|
||||
RETURNING id
|
||||
`, agentID, runtimeID, issueBID).Scan(&crossTaskID); err != nil {
|
||||
t.Fatalf("insert cross task: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(testPool)
|
||||
hub := realtime.NewHub()
|
||||
go hub.Run()
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
_, err := taskService.RerunIssue(
|
||||
ctx,
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(issueAID), Valid: true},
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(crossTaskID), Valid: true},
|
||||
pgtype.UUID{},
|
||||
)
|
||||
if err == nil {
|
||||
t.Fatal("expected RerunIssue to reject a source task from a different issue")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRerunIssueInheritsTriggerCommentFromSourceTask locks the trigger
|
||||
// provenance contract: a per-row rerun of a comment- or mention-triggered
|
||||
// task must carry the original trigger_comment_id through to the new task.
|
||||
// Otherwise the daemon's buildCommentPrompt path (which keys on
|
||||
// TriggerCommentID) is skipped and the rerun degrades into a generic
|
||||
// issue run that has lost the original comment context — see MUL-2457
|
||||
// review feedback.
|
||||
func TestRerunIssueInheritsTriggerCommentFromSourceTask(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, runtimeID := setupRerunTestFixture(t)
|
||||
t.Cleanup(func() { cleanupRerunFixture(t, issueID) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a comment to stand in as the original mention / reply trigger.
|
||||
var triggerCommentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO comment (issue_id, workspace_id, author_type, author_id, content, type)
|
||||
SELECT $1, $2, 'member', m.user_id, 'please retry this', 'comment'
|
||||
FROM member m WHERE m.workspace_id = $2 LIMIT 1
|
||||
RETURNING id
|
||||
`, issueID, testWorkspaceID).Scan(&triggerCommentID); err != nil {
|
||||
t.Fatalf("insert trigger comment: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM comment WHERE id = $1`, triggerCommentID)
|
||||
})
|
||||
|
||||
// Source task carries the trigger_comment_id — this is the row whose
|
||||
// retry button the user clicks in the execution log.
|
||||
var sourceTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority,
|
||||
started_at, completed_at, failure_reason,
|
||||
trigger_comment_id)
|
||||
VALUES ($1, $2, $3, 'failed', 0,
|
||||
now() - interval '1 minute', now() - interval '30 seconds', 'agent_error',
|
||||
$4)
|
||||
RETURNING id
|
||||
`, agentID, runtimeID, issueID, triggerCommentID).Scan(&sourceTaskID); err != nil {
|
||||
t.Fatalf("insert source task: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(testPool)
|
||||
hub := realtime.NewHub()
|
||||
go hub.Run()
|
||||
bus := events.New()
|
||||
taskService := service.NewTaskService(queries, nil, hub, bus)
|
||||
|
||||
task, err := taskService.RerunIssue(
|
||||
ctx,
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(issueID), Valid: true},
|
||||
pgtype.UUID{Bytes: parseUUIDBytes(sourceTaskID), Valid: true},
|
||||
pgtype.UUID{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("RerunIssue failed: %v", err)
|
||||
}
|
||||
if task == nil {
|
||||
t.Fatal("RerunIssue returned nil task")
|
||||
}
|
||||
if !task.TriggerCommentID.Valid {
|
||||
t.Fatal("expected per-row rerun to inherit trigger_comment_id from source task, got NULL")
|
||||
}
|
||||
if got := util.UUIDToString(task.TriggerCommentID); got != triggerCommentID {
|
||||
t.Fatalf("trigger_comment_id mismatch: got %s, want %s", got, triggerCommentID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueTaskForIssueDoesNotForceFreshSession is the negative control
|
||||
// for the rerun flag: the normal enqueue path must leave the flag false so
|
||||
// auto-retry / comment-triggered tasks keep resuming the prior session
|
||||
|
||||
@@ -2,6 +2,7 @@ package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
@@ -94,14 +95,28 @@ func (h *Handler) PinTaskSession(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// RerunIssue manually re-enqueues the issue's current agent assignment as a
|
||||
// fresh task. Useful when an issue is stuck or the user wants to retry a
|
||||
// failed run. The new task is flagged force_fresh_session=true: the daemon
|
||||
// claim handler skips the (agent_id, issue_id) session-resume lookup so the
|
||||
// agent starts a clean session. A user clicking rerun has just judged the
|
||||
// prior output bad — replaying the same conversation would replay the same
|
||||
// poisoned state. (Automatic retry, by contrast, intentionally inherits the
|
||||
// session — that path handles infrastructure failures, not bad output.)
|
||||
// RerunIssueRequest is the optional body of POST /api/issues/{id}/rerun.
|
||||
// All fields are optional; an empty body keeps the legacy "rerun the issue's
|
||||
// current assignee" behaviour used by the CLI.
|
||||
type RerunIssueRequest struct {
|
||||
// TaskID identifies the execution-log row the user clicked retry on.
|
||||
// When set, the rerun targets the agent that ran that specific task
|
||||
// (and reuses its leader/worker role) rather than the issue's current
|
||||
// assignee — so clicking retry on row that belonged to a now-displaced
|
||||
// agent re-fires that same agent, not the new assignee.
|
||||
TaskID string `json:"task_id,omitempty"`
|
||||
}
|
||||
|
||||
// RerunIssue manually re-enqueues an agent run for the issue. By default it
|
||||
// targets the issue's current assignee (agent or squad leader); if the
|
||||
// request body carries task_id, the rerun targets the agent that ran that
|
||||
// specific past task instead. The new task is flagged force_fresh_session=true:
|
||||
// the daemon claim handler skips the (agent_id, issue_id) session-resume
|
||||
// lookup so the agent starts a clean session. A user clicking rerun has just
|
||||
// judged the prior output bad — replaying the same conversation would replay
|
||||
// the same poisoned state. (Automatic retry, by contrast, intentionally
|
||||
// inherits the session — that path handles infrastructure failures, not bad
|
||||
// output.)
|
||||
func (h *Handler) RerunIssue(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
issue, ok := h.loadIssueForUser(w, r, id)
|
||||
@@ -109,7 +124,26 @@ func (h *Handler) RerunIssue(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
task, err := h.TaskService.RerunIssue(r.Context(), issue.ID, pgtype.UUID{})
|
||||
// Body is optional. A zero-length body or `{}` keeps the legacy
|
||||
// assignee-driven rerun behaviour the CLI relies on.
|
||||
var req RerunIssueRequest
|
||||
if r.ContentLength != 0 {
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil && err != io.EOF {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var sourceTaskID pgtype.UUID
|
||||
if req.TaskID != "" {
|
||||
parsed, ok := parseUUIDOrBadRequest(w, req.TaskID, "task_id")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sourceTaskID = parsed
|
||||
}
|
||||
|
||||
task, err := h.TaskService.RerunIssue(r.Context(), issue.ID, sourceTaskID, pgtype.UUID{})
|
||||
if err != nil {
|
||||
slog.Warn("issue rerun failed", "issue_id", id, "error", err)
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
|
||||
@@ -441,7 +441,7 @@ func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, trig
|
||||
// Unlike EnqueueTaskForIssue, this takes an explicit agent ID rather than
|
||||
// deriving it from the issue assignee.
|
||||
func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
|
||||
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, false)
|
||||
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, false, false)
|
||||
}
|
||||
|
||||
// EnqueueTaskForSquadLeader is the leader-role variant of EnqueueTaskForMention.
|
||||
@@ -451,10 +451,10 @@ func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue,
|
||||
// as a worker (do not skip). This matters for agents that are simultaneously
|
||||
// the leader and a worker of the same squad — see migration 090.
|
||||
func (s *TaskService) EnqueueTaskForSquadLeader(ctx context.Context, issue db.Issue, leaderID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
|
||||
return s.enqueueMentionTask(ctx, issue, leaderID, triggerCommentID, true)
|
||||
return s.enqueueMentionTask(ctx, issue, leaderID, triggerCommentID, true, false)
|
||||
}
|
||||
|
||||
func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool) (db.AgentTaskQueue, error) {
|
||||
func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool, forceFreshSession bool) (db.AgentTaskQueue, error) {
|
||||
agent, err := s.Queries.GetAgent(ctx, agentID)
|
||||
if err != nil {
|
||||
slog.Error("mention task enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
|
||||
@@ -470,13 +470,14 @@ func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, ag
|
||||
}
|
||||
|
||||
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
|
||||
AgentID: agentID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
IssueID: issue.ID,
|
||||
Priority: priorityToInt(issue.Priority),
|
||||
TriggerCommentID: triggerCommentID,
|
||||
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
|
||||
IsLeaderTask: pgtype.Bool{Bool: isLeader, Valid: isLeader},
|
||||
AgentID: agentID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
IssueID: issue.ID,
|
||||
Priority: priorityToInt(issue.Priority),
|
||||
TriggerCommentID: triggerCommentID,
|
||||
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
|
||||
IsLeaderTask: pgtype.Bool{Bool: isLeader, Valid: isLeader},
|
||||
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
|
||||
@@ -1301,8 +1302,22 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
|
||||
return &child, nil
|
||||
}
|
||||
|
||||
// RerunIssue creates a fresh queued task for the agent currently assigned
|
||||
// to the issue. Used by the manual rerun endpoint.
|
||||
// RerunIssue creates a fresh queued task for an agent on the issue. Used by
|
||||
// the manual rerun endpoint.
|
||||
//
|
||||
// Target agent resolution:
|
||||
// - sourceTaskID Valid: rerun the agent that ran that task (and reuse its
|
||||
// leader/worker role). This is what the execution log retry button uses
|
||||
// so a per-row retry survives a subsequent assignee change and correctly
|
||||
// re-fires the squad worker or mention agent whose row was clicked. The
|
||||
// source task's trigger_comment_id is also inherited (when the caller
|
||||
// didn't pass one) so a per-row rerun of a comment- or mention-triggered
|
||||
// task stays comment-triggered — the daemon's buildCommentPrompt path
|
||||
// keys on TriggerCommentID, and losing it would degrade the rerun into
|
||||
// a generic issue run that no longer carries the original comment.
|
||||
// - sourceTaskID empty: fall back to the issue's current assignee (agent
|
||||
// or squad leader). This preserves the CLI / API contract for callers
|
||||
// that have an issue ID but no specific task to target.
|
||||
//
|
||||
// The new task is flagged force_fresh_session=true so the daemon starts a
|
||||
// clean agent session instead of resuming the prior (agent_id, issue_id)
|
||||
@@ -1312,29 +1327,54 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
|
||||
// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so
|
||||
// MUL-1128's mid-flight resume contract is preserved.
|
||||
//
|
||||
// Only tasks belonging to the issue's current assignee are cancelled.
|
||||
// Only tasks belonging to the target agent on this issue are cancelled.
|
||||
// Tasks owned by other agents on the same issue (e.g. a parallel
|
||||
// @-mention agent) are left alone — rerun must not collateral-cancel
|
||||
// them.
|
||||
func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, triggerCommentID pgtype.UUID) (*db.AgentTaskQueue, error) {
|
||||
func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, sourceTaskID pgtype.UUID, triggerCommentID pgtype.UUID) (*db.AgentTaskQueue, error) {
|
||||
issue, err := s.Queries.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load issue: %w", err)
|
||||
}
|
||||
|
||||
// Determine the target agent for the rerun.
|
||||
var agentID pgtype.UUID
|
||||
switch {
|
||||
case issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid:
|
||||
agentID = issue.AssigneeID
|
||||
case issue.AssigneeType.String == "squad" && issue.AssigneeID.Valid:
|
||||
squad, err := s.Queries.GetSquad(ctx, issue.AssigneeID)
|
||||
var (
|
||||
agentID pgtype.UUID
|
||||
isLeader bool
|
||||
)
|
||||
if sourceTaskID.Valid {
|
||||
sourceTask, err := s.Queries.GetAgentTask(ctx, sourceTaskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("issue is assigned to a squad but squad not found")
|
||||
return nil, fmt.Errorf("load source task: %w", err)
|
||||
}
|
||||
if !sourceTask.IssueID.Valid || util.UUIDToString(sourceTask.IssueID) != util.UUIDToString(issueID) {
|
||||
return nil, fmt.Errorf("source task does not belong to this issue")
|
||||
}
|
||||
agentID = sourceTask.AgentID
|
||||
isLeader = sourceTask.IsLeaderTask
|
||||
// Inherit trigger provenance so a per-row rerun of a comment- or
|
||||
// mention-triggered task stays a comment-triggered task. Without
|
||||
// this the daemon's buildCommentPrompt path is skipped (it keys on
|
||||
// TriggerCommentID) and the rerun degrades into a generic issue
|
||||
// run that has lost the original comment context. Only override
|
||||
// when the caller didn't pass one explicitly.
|
||||
if !triggerCommentID.Valid && sourceTask.TriggerCommentID.Valid {
|
||||
triggerCommentID = sourceTask.TriggerCommentID
|
||||
}
|
||||
} else {
|
||||
switch {
|
||||
case issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid:
|
||||
agentID = issue.AssigneeID
|
||||
case issue.AssigneeType.String == "squad" && issue.AssigneeID.Valid:
|
||||
squad, err := s.Queries.GetSquad(ctx, issue.AssigneeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("issue is assigned to a squad but squad not found")
|
||||
}
|
||||
agentID = squad.LeaderID
|
||||
isLeader = true
|
||||
default:
|
||||
return nil, fmt.Errorf("issue is not assigned to an agent or squad")
|
||||
}
|
||||
agentID = squad.LeaderID
|
||||
default:
|
||||
return nil, fmt.Errorf("issue is not assigned to an agent or squad")
|
||||
}
|
||||
|
||||
// Cancel only the target agent's active/queued tasks on this issue.
|
||||
@@ -1345,7 +1385,7 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg
|
||||
if err != nil {
|
||||
slog.Warn("rerun: cancel prior tasks failed",
|
||||
"issue_id", util.UUIDToString(issueID),
|
||||
"agent_id", util.UUIDToString(issue.AssigneeID),
|
||||
"agent_id", util.UUIDToString(agentID),
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
@@ -1355,7 +1395,7 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
|
||||
}
|
||||
|
||||
task, err := s.enqueueRerunTask(ctx, issue, agentID, triggerCommentID)
|
||||
task, err := s.enqueueRerunTask(ctx, issue, agentID, triggerCommentID, isLeader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1363,20 +1403,25 @@ func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, trigg
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
"issue_id", util.UUIDToString(issueID),
|
||||
"agent_id", util.UUIDToString(agentID),
|
||||
"source_task_id", util.UUIDToString(sourceTaskID),
|
||||
"is_leader", isLeader,
|
||||
"cancelled_prior", len(cancelled),
|
||||
)
|
||||
return &task, nil
|
||||
}
|
||||
|
||||
// enqueueRerunTask enqueues a fresh task for the given agent on the issue.
|
||||
// For agent-assigned issues it uses enqueueIssueTask (which reads AssigneeID);
|
||||
// for squad-assigned issues the rerun targets the squad leader and is flagged
|
||||
// as a leader task so the self-trigger guard treats it correctly.
|
||||
func (s *TaskService) enqueueRerunTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
|
||||
if issue.AssigneeType.String == "agent" {
|
||||
// When the target agent is the issue's single-agent assignee we use the
|
||||
// assignee-driven path (enqueueIssueTask) so the issue-assignee bookkeeping
|
||||
// stays in sync; otherwise (squad member, prior assignee that has since been
|
||||
// reassigned, mention agent) we use the mention path with the same
|
||||
// force_fresh_session=true contract.
|
||||
func (s *TaskService) enqueueRerunTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool) (db.AgentTaskQueue, error) {
|
||||
if issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid &&
|
||||
util.UUIDToString(issue.AssigneeID) == util.UUIDToString(agentID) {
|
||||
return s.enqueueIssueTask(ctx, issue, triggerCommentID, true)
|
||||
}
|
||||
return s.EnqueueTaskForSquadLeader(ctx, issue, agentID, triggerCommentID)
|
||||
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, isLeader, true)
|
||||
}
|
||||
|
||||
// HandleFailedTasks runs the post-failure side effects for a batch of
|
||||
|
||||
Reference in New Issue
Block a user