fix(tasks): cancel autopilot run_only & quick_create tasks (MUL-2827) (#3615)

CancelTaskByUser (POST /api/tasks/{taskId}/cancel) keyed cancellation off
issue_id / chat_session_id alone, so any task whose only source link was
autopilot_run_id (run_only autopilots) or quick_create context fell into the
dead else branch and 404'd with "task not found" — even though the task was
visible (and showed a cancel X) on the agent Activity tab.

Enforce tenancy uniformly through the task's owning agent instead: agent_id is
NOT NULL on every task row (ON DELETE CASCADE), and agents are workspace-scoped,
so GetAgentTaskInWorkspace (task JOIN agent ON workspace) is a single tenant
guard that works regardless of which optional source FK is set — including
orphan tasks whose autopilot_run_id was SET NULL after the autopilot was
deleted. Privacy layers on top: chat tasks stay creator-only, and every other
task mirrors the agent Activity / snapshot private-agent visibility gate via
canAccessPrivateAgent so the id-only endpoint is never more permissive than the
surface that exposes the task.

Tests cover run_only (same-ws success, cross-ws 404 no-mutation), quick_create,
retry clones, issue-task regression, chat non-creator 403, and private-agent
plain-member 403.

Co-authored-by: J <j@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Bohan Jiang
2026-06-01 22:11:27 +08:00
committed by GitHub
parent 0f9d9d1494
commit 674be86add
4 changed files with 437 additions and 12 deletions

View File

@@ -0,0 +1,335 @@
package handler
import (
"context"
"net/http"
"net/http/httptest"
"testing"
)
// CancelTaskByUser (POST /api/tasks/{taskId}/cancel) used to key cancellation
// off issue_id / chat_session_id alone, which 404'd every task whose only
// source link was autopilot_run_id or quick_create context (MUL-2827). These
// tests pin the new behavior: tenancy flows through the task's owning agent,
// with chat-creator privacy and the private-agent visibility gate layered on.
// taskStatus reads a task's current status straight from the DB so reject
// paths can assert "no side effect before the access check".
func taskStatus(t *testing.T, taskID string) string {
t.Helper()
var status string
if err := testPool.QueryRow(context.Background(),
`SELECT status FROM agent_task_queue WHERE id = $1`, taskID,
).Scan(&status); err != nil {
t.Fatalf("read task status: %v", err)
}
return status
}
// createAutopilotRunOnlyTask seeds the autopilot -> autopilot_run -> task chain
// that AutopilotService.dispatchRunOnly produces: a queued task with issue_id
// and chat_session_id NULL, linked only by autopilot_run_id. The autopilot is
// created in the agent's own workspace so the fixture works for foreign agents
// too.
func createAutopilotRunOnlyTask(t *testing.T, agentID string) string {
t.Helper()
ctx := context.Background()
var workspaceID, runtimeID string
if err := testPool.QueryRow(ctx,
`SELECT workspace_id, runtime_id FROM agent WHERE id = $1`, agentID,
).Scan(&workspaceID, &runtimeID); err != nil {
t.Fatalf("load agent workspace/runtime: %v", err)
}
var autopilotID string
if err := testPool.QueryRow(ctx, `
INSERT INTO autopilot (workspace_id, title, assignee_id, execution_mode, created_by_type, created_by_id)
VALUES ($1, 'cancel-runonly-ap', $2, 'run_only', 'member', $3)
RETURNING id
`, workspaceID, agentID, testUserID).Scan(&autopilotID); err != nil {
t.Fatalf("create autopilot: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM autopilot WHERE id = $1`, autopilotID) })
var runID string
if err := testPool.QueryRow(ctx, `
INSERT INTO autopilot_run (autopilot_id, source, status)
VALUES ($1, 'manual', 'running')
RETURNING id
`, autopilotID).Scan(&runID); err != nil {
t.Fatalf("create autopilot_run: %v", err)
}
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, autopilot_run_id)
VALUES ($1, $2, 'queued', 0, $3)
RETURNING id
`, agentID, runtimeID, runID).Scan(&taskID); err != nil {
t.Fatalf("create run_only task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
return taskID
}
// createForeignWorkspaceAgent stands up an isolated workspace + runtime + agent
// and returns the agent ID, for cross-tenant cancel tests.
func createForeignWorkspaceAgent(t *testing.T) string {
t.Helper()
ctx := context.Background()
var workspaceID string
if err := testPool.QueryRow(ctx, `
INSERT INTO workspace (name, slug, description, issue_prefix)
VALUES ('Foreign Cancel WS', 'foreign-cancel-ws', 'cross-tenant cancel test', 'FCW')
RETURNING id
`).Scan(&workspaceID); err != nil {
t.Fatalf("create foreign workspace: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, workspaceID) })
var runtimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at)
VALUES ($1, NULL, 'Foreign Cancel Runtime', 'cloud', 'foreign_runtime', 'online', 'Foreign runtime', '{}'::jsonb, now())
RETURNING id
`, workspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("create foreign runtime: %v", err)
}
var agentID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent (workspace_id, name, description, runtime_mode, runtime_config, runtime_id, visibility, max_concurrent_tasks)
VALUES ($1, 'Foreign Cancel Agent', '', 'cloud', '{}'::jsonb, $2, 'workspace', 1)
RETURNING id
`, workspaceID, runtimeID).Scan(&agentID); err != nil {
t.Fatalf("create foreign agent: %v", err)
}
return agentID
}
// createWorkspaceMemberUser adds a plain (non-owner/admin) member to the test
// workspace and returns the user ID. The member row cascades when the user is
// deleted (member.user_id ON DELETE CASCADE).
func createWorkspaceMemberUser(t *testing.T, name, email string) string {
t.Helper()
ctx := context.Background()
var userID string
if err := testPool.QueryRow(ctx,
`INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id`, name, email,
).Scan(&userID); err != nil {
t.Fatalf("create user %s: %v", email, err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, userID) })
if _, err := testPool.Exec(ctx,
`INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'member')`, testWorkspaceID, userID,
); err != nil {
t.Fatalf("add member %s: %v", email, err)
}
return userID
}
func cancelTaskByUserRequest(t *testing.T, userID, taskID string) *http.Request {
t.Helper()
req := newRequestAs(userID, "POST", "/api/tasks/"+taskID+"/cancel", nil)
req = withURLParam(req, "taskId", taskID)
return withChatTestWorkspaceCtx(t, req)
}
// TestCancelTaskByUser_RunOnlyAutopilot_Succeeds is the core MUL-2827 fix: a
// run_only autopilot task (issue_id + chat_session_id NULL, only
// autopilot_run_id set) is cancellable by a member of its agent's workspace.
func TestCancelTaskByUser_RunOnlyAutopilot_Succeeds(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelRunOnlyAgent", []byte("[]"))
taskID := createAutopilotRunOnlyTask(t, agentID)
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "cancelled" {
t.Fatalf("task not cancelled: status = %q", got)
}
}
// TestCancelTaskByUser_RunOnlyAutopilot_CrossWorkspace_Returns404 verifies the
// tenant guard: a member of workspace A cannot cancel a run_only task whose
// agent lives in workspace B, and the task is not mutated before the check.
func TestCancelTaskByUser_RunOnlyAutopilot_CrossWorkspace_Returns404(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
foreignAgentID := createForeignWorkspaceAgent(t)
taskID := createAutopilotRunOnlyTask(t, foreignAgentID)
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "queued" {
t.Fatalf("foreign task was mutated: status = %q", got)
}
}
// TestCancelTaskByUser_QuickCreate_Succeeds verifies a quick_create task — no
// issue yet, no chat session, only context JSONB — is cancellable during its
// active window (the pre-issue-creation phase, i.e. whenever the user clicks X).
func TestCancelTaskByUser_QuickCreate_Succeeds(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelQuickCreateAgent", []byte("[]"))
var taskID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, context)
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL,
'{"type":"quick_create","workspace_id":"ws","prompt":"do a thing"}'::jsonb)
RETURNING id
`, agentID).Scan(&taskID); err != nil {
t.Fatalf("create quick_create task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "cancelled" {
t.Fatalf("task not cancelled: status = %q", got)
}
}
// TestCancelTaskByUser_RetryClone_Autopilot_Succeeds verifies a retry clone of
// an autopilot task — which copies parent_task_id + autopilot_run_id verbatim,
// inheriting the NULL issue/chat links — is still cancellable.
func TestCancelTaskByUser_RetryClone_Autopilot_Succeeds(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelRetryCloneAgent", []byte("[]"))
parentID := createAutopilotRunOnlyTask(t, agentID)
var cloneID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, autopilot_run_id, parent_task_id, attempt)
SELECT agent_id, runtime_id, 'queued', priority, autopilot_run_id, id, 1
FROM agent_task_queue WHERE id = $1
RETURNING id
`, parentID).Scan(&cloneID); err != nil {
t.Fatalf("create retry clone: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, cloneID) })
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, cloneID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, cloneID); got != "cancelled" {
t.Fatalf("clone not cancelled: status = %q", got)
}
}
// TestCancelTaskByUser_IssueTask_Succeeds is a regression guard: issue-bound
// tasks (the original supported case) stay cancellable after the rewrite.
func TestCancelTaskByUser_IssueTask_Succeeds(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelIssueTaskAgent", []byte("[]"))
var issueID, taskID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO issue (workspace_id, title, status, priority, creator_id, creator_type, number, position)
VALUES ($1, 'cancel-byid-issue', 'todo', 'medium', $2, 'member', 92001, 0)
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM issue WHERE id = $1`, issueID) })
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id)
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'queued', 0, $2)
RETURNING id
`, agentID, issueID).Scan(&taskID); err != nil {
t.Fatalf("create issue task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "cancelled" {
t.Fatalf("task not cancelled: status = %q", got)
}
}
// TestCancelTaskByUser_ChatTask_NonCreator_Returns403 preserves chat privacy:
// a workspace member who did not start the conversation cannot cancel its task.
func TestCancelTaskByUser_ChatTask_NonCreator_Returns403(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelChatAgent", []byte("[]"))
sessionID := createHandlerTestChatSession(t, agentID) // creator = testUserID
otherUserID := createWorkspaceMemberUser(t, "Chat Bystander", "cancel-chat-bystander@multica.test")
var taskID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, chat_session_id)
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL, $2)
RETURNING id
`, agentID, sessionID).Scan(&taskID); err != nil {
t.Fatalf("create chat task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, otherUserID, taskID))
if w.Code != http.StatusForbidden {
t.Fatalf("expected 403, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "running" {
t.Fatalf("chat task was mutated: status = %q", got)
}
}
// TestCancelTaskByUser_PrivateAgent_PlainMember_Returns403 verifies the cancel
// endpoint mirrors the agent Activity / snapshot visibility gate: a plain
// member who cannot see a private agent's tasks cannot cancel them either.
func TestCancelTaskByUser_PrivateAgent_PlainMember_Returns403(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID, _, memberID := privateAgentTestFixture(t)
taskID := createAutopilotRunOnlyTask(t, agentID)
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, memberID, taskID))
if w.Code != http.StatusForbidden {
t.Fatalf("expected 403, got %d: %s", w.Code, w.Body.String())
}
if got := taskStatus(t, taskID); got != "queued" {
t.Fatalf("task was mutated: status = %q", got)
}
}

View File

@@ -684,32 +684,57 @@ func (h *Handler) GetPendingChatTask(w http.ResponseWriter, r *http.Request) {
// Task cancellation (user-facing, with ownership check)
// ---------------------------------------------------------------------------
// CancelTaskByUser cancels a task after verifying the requesting user owns
// the associated chat session or issue within the current workspace.
// CancelTaskByUser cancels a task the caller is allowed to act on within the
// current workspace.
//
// Tenancy is enforced uniformly through the task's owning agent: every
// agent_task_queue row carries a NOT NULL agent_id (ON DELETE CASCADE, so the
// agent always exists), and agents are workspace-scoped. GetAgentTaskInWorkspace
// is therefore the single tenant guard that works regardless of which optional
// source FK (issue / chat_session / autopilot_run) is set — which is what makes
// run_only autopilot tasks and quick_create tasks (whose issue does not exist
// yet) cancellable at all. Keying cancellation off issue_id / chat_session_id
// alone is exactly what 404'd these tasks before (MUL-2827).
//
// On top of tenancy, two privacy models layer on:
// - a chat task is private to the member who started the conversation, so
// only that creator may cancel it;
// - every other task surfaces on the agent Activity tab and the workspace
// task snapshot, both of which hide private agents from members without
// access. Cancellation mirrors that gate via canAccessPrivateAgent so the
// id-only endpoint is never more permissive than the surface that exposes
// the task.
func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
wsUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return
}
taskID := chi.URLParam(r, "taskId")
taskUUID, ok := parseUUIDOrBadRequest(w, taskID, "task id")
if !ok {
return
}
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
task, err := h.Queries.GetAgentTaskInWorkspace(r.Context(), db.GetAgentTaskInWorkspaceParams{
ID: taskUUID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
// Verify ownership: for chat tasks, check workspace + creator;
// for issue tasks, verify the issue belongs to the current workspace.
if task.ChatSessionID.Valid {
// Chat privacy: only the member who opened the conversation may
// cancel its task, even though the workspace is shared.
cs, err := h.Queries.GetChatSessionInWorkspace(r.Context(), db.GetChatSessionInWorkspaceParams{
ID: task.ChatSessionID,
WorkspaceID: parseUUID(workspaceID),
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
@@ -719,15 +744,23 @@ func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusForbidden, "not your task")
return
}
} else if task.IssueID.Valid {
issue, err := h.Queries.GetIssue(r.Context(), task.IssueID)
if err != nil || uuidToString(issue.WorkspaceID) != workspaceID {
} else {
// Issue / autopilot / quick_create tasks are all visible on the
// agent Activity tab + workspace snapshot, which gate private
// agents. Mirror that gate here.
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
ID: task.AgentID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
} else {
writeError(w, http.StatusNotFound, "task not found")
return
actorType, actorID := h.resolveActor(r, userID, workspaceID)
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
writeError(w, http.StatusForbidden, "you do not have access to this agent")
return
}
}
cancelled, err := h.TaskService.CancelTask(r.Context(), taskUUID)

View File

@@ -1337,6 +1337,51 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
return i, err
}
const getAgentTaskInWorkspace = `-- name: GetAgentTaskInWorkspace :one
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE atq.id = $1 AND a.workspace_id = $2
`
type GetAgentTaskInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetAgentTaskInWorkspace(ctx context.Context, arg GetAgentTaskInWorkspaceParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, getAgentTaskInWorkspace, arg.ID, arg.WorkspaceID)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const getLastTaskSession = `-- name: GetLastTaskSession :one
SELECT session_id, work_dir, runtime_id FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2

View File

@@ -251,6 +251,18 @@ RETURNING *;
SELECT * FROM agent_task_queue
WHERE id = $1;
-- name: GetAgentTaskInWorkspace :one
-- Loads a task only when its owning agent lives in the given workspace.
-- agent_id is NOT NULL on every task row (and ON DELETE CASCADE, so the agent
-- always exists), which makes this the universal tenant guard for
-- user-initiated cancellation — independent of which optional source FK
-- (issue / chat_session / autopilot_run) happens to be set. It is what lets
-- run_only autopilot tasks and quick_create tasks (whose issue does not exist
-- yet) be cancelled at all, instead of 404-ing on a missing source FK.
SELECT atq.* FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE atq.id = $1 AND a.workspace_id = $2;
-- name: ClaimAgentTask :one
-- Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
-- a task is only claimable when no other task for the same issue AND same agent is