Files
multica/server/internal/service/task.go
Tom Qiao 1c91c2a3b2 security(db): scope DELETE/UpdateIssueStatus by workspace_id (defense-in-depth) (#3027)
* fix(security): scope DELETE/UpdateIssueStatus by workspace_id

Add workspace_id to the WHERE clause of DeleteIssue, DeleteComment,
DeleteProject, DeleteSkill, DeleteChatSession, and UpdateIssueStatus
as SQL-layer defense-in-depth.

Handler loaders (loadIssueForUser / loadSkillForUser / etc.) already
enforce workspace membership today, so this is not patching a known
live vuln. But the tenant invariant is currently a handler-layer
guarantee — a future loader bypass or a new caller skipping the
loader would be silently catastrophic. Making workspace_id part of
the SQL identity collapses the trust surface to the schema itself:
forging a sibling-workspace UUID becomes ErrNoRows instead of a
cross-tenant write.

Reference: incident #1661 (util.ParseUUID silent zero UUID returning
204 on a DELETE that matched zero rows) — same class of failure,
prevented at a different layer.

Scope:
- 5 DELETE queries: issue, comment, project, skill, chat_session
- 1 simple UPDATE: UpdateIssueStatus (2 narg, no SET ordering risk)
- All callers updated (handlers, service, runtime sweeper fallback)

Multi-narg UPDATE queries (UpdateIssue, UpdateProject, UpdateSkill,
UpdateComment, UpdateChatSession*) are deferred to a follow-up to
keep this change reviewable: each needs its narg pinning shifted
and per-caller verification.

sqlc was regenerated by hand (no local sqlc toolchain); CI's
backend job is the authoritative compile check.

* test(security): add workspace_scope_guard regression test

Locks in the SQL-layer tenant guard added in this PR. For each of the 6
scoped queries (DeleteIssue, DeleteComment, DeleteProject, DeleteSkill,
DeleteChatSession, UpdateIssueStatus), creates the resource in workspace
A, invokes the query with a foreign workspace UUID, and asserts the row
is untouched (0 rows affected with no error for :exec; pgx.ErrNoRows for
:one). A future refactor that drops the workspace_id arg from any of
these queries will now fail loudly instead of silently regressing.

Includes a sanity sub-test that the in-workspace path still mutates, so
a buggy guard that returns no-op for every call would not pass.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>

---------

Co-authored-by: Tom Qiao <tomqiaozc@users.noreply.github.com>
Co-authored-by: Claude Opus 4 <noreply@anthropic.com>
2026-05-22 14:39:47 +08:00

2228 lines
82 KiB
Go

package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/mention"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/multica-ai/multica/server/pkg/redact"
)
type TaskService struct {
Queries *db.Queries
TxStarter TxStarter
Hub *realtime.Hub
Bus *events.Bus
Analytics analytics.Client
Wakeup TaskWakeupNotifier
// EmptyClaim caches "this runtime has no queued task" so the daemon
// poll path can skip a Postgres scan on the steady-state empty case.
// Optional — a nil cache disables the fast path and every claim
// goes through the DB. Wired in router.go from the shared Redis
// client.
EmptyClaim *EmptyClaimCache
analyticsContextMu sync.Mutex
analyticsContextCache map[string]analytics.TaskContext
analyticsContextOrder []string
}
type TaskWakeupNotifier interface {
NotifyTaskAvailable(runtimeID, taskID string)
}
// triggerSummaryMaxLen caps the snapshot length so the row stays cheap to
// transmit (it ends up in every task list response). 200 is enough for a
// recognisable preview of a one-paragraph comment.
const triggerSummaryMaxLen = 200
// truncateForSummary returns s shortened to maxRunes, with a trailing
// `…` when truncated. Operates on runes (not bytes) so multibyte characters
// — Chinese / emoji — count as one each. Strips surrounding whitespace
// first so a leading newline doesn't waste budget.
func truncateForSummary(s string, maxRunes int) string {
// strings.Builder + Grow avoids the O(N²) realloc cycle of `+=` in
// a loop. Grow uses byte length, which is an upper bound for the
// rune-equivalent output (replacing \n/\r/\t with space is byte-equal
// for ASCII whitespace), so we never reallocate.
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
switch r {
case '\n', '\r', '\t':
b.WriteByte(' ')
default:
b.WriteRune(r)
}
}
rs := []rune(strings.TrimSpace(b.String()))
if len(rs) <= maxRunes {
return string(rs)
}
return string(rs[:maxRunes]) + "…"
}
const (
taskAnalyticsContextCacheMax = 4096
// claimResponseRecoveryWindow must exceed daemon client.Timeout for
// /tasks/claim (30s) plus /tasks/{id}/start (30s) plus scheduling slack, so
// an in-flight StartTask cannot be reclaimed and double-dispatched.
claimResponseRecoveryWindow = 90 * time.Second
)
// buildCommentTriggerSummary fetches the comment content and truncates
// it for storage on the task row. Returns an invalid pgtype.Text when
// the comment is missing (deleted / wrong workspace / etc) so the column
// stays NULL — front-end falls back to a structural label in that case.
func (s *TaskService) buildCommentTriggerSummary(ctx context.Context, commentID pgtype.UUID) pgtype.Text {
if !commentID.Valid {
return pgtype.Text{}
}
comment, err := s.Queries.GetComment(ctx, commentID)
if err != nil {
return pgtype.Text{}
}
summary := truncateForSummary(comment.Content, triggerSummaryMaxLen)
if summary == "" {
return pgtype.Text{}
}
return pgtype.Text{String: summary, Valid: true}
}
func NewTaskService(q *db.Queries, tx TxStarter, hub *realtime.Hub, bus *events.Bus, wakeups ...TaskWakeupNotifier) *TaskService {
var wakeup TaskWakeupNotifier
if len(wakeups) > 0 {
wakeup = wakeups[0]
}
return &TaskService{Queries: q, TxStarter: tx, Hub: hub, Bus: bus, Wakeup: wakeup}
}
var trivialDoneMarkers = []string{
"done",
"готово",
"готова",
"сделано",
"完成",
"完了",
}
func isTrivialDoneOutput(output string) bool {
normalized := strings.TrimSpace(strings.ToLower(output))
normalized = strings.Trim(normalized, ".!!。… ")
for _, marker := range trivialDoneMarkers {
if normalized == marker {
return true
}
}
return false
}
func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskEvent(ctx, analytics.AgentTaskQueued(s.taskAnalyticsContext(ctx, task)))
}
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskEvent(ctx, analytics.AgentTaskDispatched(s.taskAnalyticsContext(ctx, task)))
}
func (s *TaskService) AnalyticsContextForTask(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
return s.taskAnalyticsContext(ctx, task)
}
func (s *TaskService) captureTaskStarted(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskEvent(ctx, analytics.AgentTaskStarted(s.taskAnalyticsContext(ctx, task)))
}
func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskEvent(ctx, analytics.AgentTaskCompleted(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
))
}
func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQueue) {
failureReason := taskFailureReason(task)
s.captureTaskEvent(ctx, analytics.AgentTaskFailed(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
failureReason,
taskErrorType(failureReason),
s.willRetryTask(task),
))
}
func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskEvent(ctx, analytics.AgentTaskCancelled(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
))
}
func (s *TaskService) captureTaskEvent(ctx context.Context, event analytics.Event) {
if s.Analytics == nil {
return
}
if event.WorkspaceID == "" {
return
}
s.Analytics.Capture(event)
}
func (s *TaskService) cachedTaskAnalyticsContext(task db.AgentTaskQueue) (analytics.TaskContext, bool) {
key := taskAnalyticsContextKey(task)
if key == "" {
return analytics.TaskContext{}, false
}
s.analyticsContextMu.Lock()
defer s.analyticsContextMu.Unlock()
if s.analyticsContextCache == nil {
return analytics.TaskContext{}, false
}
tc, ok := s.analyticsContextCache[key]
return tc, ok
}
func (s *TaskService) storeTaskAnalyticsContext(task db.AgentTaskQueue, tc analytics.TaskContext) {
if tc.WorkspaceID == "" {
return
}
key := taskAnalyticsContextKey(task)
if key == "" {
return
}
s.analyticsContextMu.Lock()
defer s.analyticsContextMu.Unlock()
if s.analyticsContextCache == nil {
s.analyticsContextCache = make(map[string]analytics.TaskContext)
}
if _, ok := s.analyticsContextCache[key]; !ok {
s.analyticsContextOrder = append(s.analyticsContextOrder, key)
if len(s.analyticsContextOrder) > taskAnalyticsContextCacheMax {
oldest := s.analyticsContextOrder[0]
s.analyticsContextOrder = s.analyticsContextOrder[1:]
delete(s.analyticsContextCache, oldest)
}
}
s.analyticsContextCache[key] = tc
}
func taskAnalyticsContextKey(task db.AgentTaskQueue) string {
taskID := util.UUIDToString(task.ID)
if taskID == "" {
return ""
}
return strings.Join([]string{
taskID,
util.UUIDToString(task.RuntimeID),
util.UUIDToString(task.IssueID),
util.UUIDToString(task.ChatSessionID),
util.UUIDToString(task.AutopilotRunID),
}, "|")
}
func (s *TaskService) taskAnalyticsContext(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
if tc, ok := s.cachedTaskAnalyticsContext(task); ok {
return tc
}
tc := analytics.TaskContext{
AgentID: util.UUIDToString(task.AgentID),
TaskID: util.UUIDToString(task.ID),
Source: analytics.SourceManual,
}
if task.IssueID.Valid {
tc.IssueID = util.UUIDToString(task.IssueID)
}
if task.ChatSessionID.Valid {
tc.ChatSessionID = util.UUIDToString(task.ChatSessionID)
tc.Source = analytics.SourceChat
}
if task.AutopilotRunID.Valid {
tc.AutopilotRunID = util.UUIDToString(task.AutopilotRunID)
tc.Source = analytics.SourceAutopilot
}
if task.RuntimeID.Valid {
if rt, err := s.Queries.GetAgentRuntime(ctx, task.RuntimeID); err == nil {
tc.WorkspaceID = util.UUIDToString(rt.WorkspaceID)
tc.RuntimeMode = rt.RuntimeMode
tc.Provider = rt.Provider
}
}
if tc.WorkspaceID == "" || tc.RuntimeMode == "" {
if agent, err := s.Queries.GetAgent(ctx, task.AgentID); err == nil {
if tc.WorkspaceID == "" {
tc.WorkspaceID = util.UUIDToString(agent.WorkspaceID)
}
if tc.RuntimeMode == "" {
tc.RuntimeMode = agent.RuntimeMode
}
}
}
if task.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
tc.WorkspaceID = util.UUIDToString(issue.WorkspaceID)
if issue.CreatorType == "member" {
tc.UserID = util.UUIDToString(issue.CreatorID)
}
if issue.OriginType.Valid {
switch issue.OriginType.String {
case "autopilot":
tc.Source = analytics.SourceAutopilot
if ap, err := s.Queries.GetAutopilot(ctx, issue.OriginID); err == nil {
if ap.CreatedByType == "member" {
tc.UserID = util.UUIDToString(ap.CreatedByID)
}
}
case "quick_create":
tc.Source = analytics.SourceManual
}
}
}
}
if task.ChatSessionID.Valid {
if cs, err := s.Queries.GetChatSession(ctx, task.ChatSessionID); err == nil {
tc.WorkspaceID = util.UUIDToString(cs.WorkspaceID)
tc.UserID = util.UUIDToString(cs.CreatorID)
}
}
if task.AutopilotRunID.Valid {
if run, err := s.Queries.GetAutopilotRun(ctx, task.AutopilotRunID); err == nil {
if ap, err := s.Queries.GetAutopilot(ctx, run.AutopilotID); err == nil {
tc.WorkspaceID = util.UUIDToString(ap.WorkspaceID)
if ap.CreatedByType == "member" {
tc.UserID = util.UUIDToString(ap.CreatedByID)
}
}
}
}
if qc, ok := s.parseQuickCreateContext(task); ok {
tc.WorkspaceID = qc.WorkspaceID
tc.UserID = qc.RequesterID
tc.Source = analytics.SourceManual
}
s.storeTaskAnalyticsContext(task, tc)
return tc
}
func taskDurationMS(task db.AgentTaskQueue) int64 {
if !task.CompletedAt.Valid {
return 0
}
start := task.CreatedAt
if task.StartedAt.Valid {
start = task.StartedAt
} else if task.DispatchedAt.Valid {
start = task.DispatchedAt
}
if !start.Valid {
return 0
}
ms := task.CompletedAt.Time.Sub(start.Time).Milliseconds()
if ms < 0 {
return 0
}
return ms
}
func taskFailureReason(task db.AgentTaskQueue) string {
if task.FailureReason.Valid && task.FailureReason.String != "" {
return task.FailureReason.String
}
return "agent_error"
}
func taskErrorType(reason string) string {
switch reason {
case "runtime_offline", "runtime_recovery":
return "runtime"
case "timeout", "codex_semantic_inactivity":
return "timeout"
case "iteration_limit", "agent_fallback_message":
return "agent_output"
case "cancelled", "user_cancelled":
return "cancelled"
default:
return "agent_error"
}
}
func (s *TaskService) willRetryTask(task db.AgentTaskQueue) bool {
reason := taskFailureReason(task)
if !retryableReasons[reason] {
return false
}
if task.Attempt >= task.MaxAttempts {
return false
}
if task.AutopilotRunID.Valid {
return false
}
return task.IssueID.Valid || task.ChatSessionID.Valid
}
// EnqueueTaskForIssue creates a queued task for an agent-assigned issue.
// No context snapshot is stored — the agent fetches all data it needs at
// runtime via the multica CLI.
func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) {
var commentID pgtype.UUID
if len(triggerCommentID) > 0 {
commentID = triggerCommentID[0]
}
return s.enqueueIssueTask(ctx, issue, commentID, false)
}
// enqueueIssueTask is the shared implementation behind EnqueueTaskForIssue
// and the manual rerun path. forceFreshSession=true marks the task so the
// daemon claim handler skips the (agent_id, issue_id) resume lookup — the
// user already judged the prior output bad, a fresh agent session is the
// expected behavior.
func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool) (db.AgentTaskQueue, error) {
if !issue.AssigneeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
}
agent, err := s.Queries.GetAgent(ctx, issue.AssigneeID)
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agent.ID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "agent has no runtime")
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
})
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("task enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"agent_id", util.UUIDToString(issue.AssigneeID),
"force_fresh_session", forceFreshSession,
)
// Order matters: broadcast first, notify daemon second. notifyTaskAvailable
// kicks an in-process channel that the daemon picks up over HTTP and
// claims; the claim path then emits its own task:dispatch. Doing the
// queued broadcast afterwards risks the dispatch event reaching clients
// before the queued one (rare but unsafe-by-construction). Publishing
// in the desired observe-order makes correctness independent of timing.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// EnqueueTaskForMention creates a queued task for a mentioned agent on an issue.
// Unlike EnqueueTaskForIssue, this takes an explicit agent ID rather than
// deriving it from the issue assignee.
func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, false, false)
}
// EnqueueTaskForSquadLeader is the leader-role variant of EnqueueTaskForMention.
// The resulting task carries is_leader_task=true so that downstream
// self-trigger guards can distinguish a comment posted while the agent was
// acting as the squad's leader (skip) from one posted while it was acting
// as a worker (do not skip). This matters for agents that are simultaneously
// the leader and a worker of the same squad — see migration 090.
func (s *TaskService) EnqueueTaskForSquadLeader(ctx context.Context, issue db.Issue, leaderID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, leaderID, triggerCommentID, true, false)
}
func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool, forceFreshSession bool) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
slog.Error("mention task enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("mention task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("mention task enqueue failed: agent has no runtime", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
IsLeaderTask: pgtype.Bool{Bool: isLeader, Valid: isLeader},
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
})
if err != nil {
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "is_leader_task", isLeader)
// See EnqueueTaskForIssue for ordering rationale.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// QuickCreateContext is the JSON payload stored on a quick-create task's
// context column. The daemon detects this variant via Type == "quick_create"
// and switches to the quick-create prompt template; the completion path
// uses RequesterID + WorkspaceID to write the inbox notification.
//
// ProjectID is the optional project the user picked in the modal. When
// non-empty the daemon claim handler resolves the project's title +
// resources, and the prompt template instructs the agent to pass
// `--project <uuid>` so the new issue lands in that project.
//
// SquadID is non-empty when the user picked a squad (rather than an agent)
// in the modal. The task is still enqueued against the squad's leader
// agent (Queries.CreateQuickCreateTask is agent-scoped); SquadID is the
// hint the daemon claim handler uses to layer the squad-leader briefing
// onto the agent's Instructions, matching the behavior of issue-bound
// tasks assigned to the squad.
type QuickCreateContext struct {
Type string `json:"type"`
Prompt string `json:"prompt"`
RequesterID string `json:"requester_id"`
WorkspaceID string `json:"workspace_id"`
ProjectID string `json:"project_id,omitempty"`
SquadID string `json:"squad_id,omitempty"`
}
// QuickCreateContextType marks a task as a quick-create job.
const QuickCreateContextType = "quick_create"
// EnqueueQuickCreateTask creates a queued task that has no issue / chat /
// autopilot link — the user's natural-language prompt is stored in the
// task's context JSONB and the agent is expected to translate it into a
// `multica issue create` call. Pre-validates that the agent is reachable
// (not archived, has a runtime) so the API can reject up-front rather than
// queue a task no one will ever claim.
//
// projectID is optional (zero-valued pgtype.UUID when the user didn't pick
// one). The handler is responsible for validating it belongs to the same
// workspace before passing it in.
//
// squadID is non-empty (Valid) when the user picked a squad as the actor.
// The handler has already resolved it to the squad's leader agent for
// agentID; the squadID hint is stamped into the task context so the daemon
// claim handler can inject the squad-leader briefing on dispatch.
func (s *TaskService) EnqueueQuickCreateTask(ctx context.Context, workspaceID, requesterID pgtype.UUID, agentID, squadID pgtype.UUID, prompt string, projectID pgtype.UUID) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
payload := QuickCreateContext{
Type: QuickCreateContextType,
Prompt: prompt,
RequesterID: util.UUIDToString(requesterID),
WorkspaceID: util.UUIDToString(workspaceID),
}
if projectID.Valid {
payload.ProjectID = util.UUIDToString(projectID)
}
if squadID.Valid {
payload.SquadID = util.UUIDToString(squadID)
}
contextJSON, err := json.Marshal(payload)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("marshal quick-create context: %w", err)
}
task, err := s.Queries.CreateQuickCreateTask(ctx, db.CreateQuickCreateTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
Priority: priorityToInt("high"),
Context: contextJSON,
})
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("create quick-create task: %w", err)
}
slog.Info("quick-create task enqueued",
"task_id", util.UUIDToString(task.ID),
"agent_id", util.UUIDToString(agentID),
"squad_id", payload.SquadID,
"requester_id", util.UUIDToString(requesterID),
"workspace_id", util.UUIDToString(workspaceID),
"project_id", payload.ProjectID,
)
// Match every other Enqueue* path: kick the daemon WS so the task
// gets claimed promptly instead of waiting for the next 30 s poll
// cycle. Without this the user perceives "quick create never
// triggered" because the modal closes immediately and the task
// sits in 'queued' until the next sleepWithContextOrWakeup tick.
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// EnqueueChatTask creates a queued task for a chat session.
// Unlike issue tasks, chat tasks have no issue_id.
func (s *TaskService) EnqueueChatTask(ctx context.Context, chatSession db.ChatSession) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, chatSession.AgentID)
if err != nil {
slog.Error("chat task enqueue failed", "chat_session_id", util.UUIDToString(chatSession.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
task, err := s.Queries.CreateChatTask(ctx, db.CreateChatTaskParams{
AgentID: chatSession.AgentID,
RuntimeID: agent.RuntimeID,
Priority: 2, // medium priority for chat
ChatSessionID: chatSession.ID,
})
if err != nil {
slog.Error("chat task enqueue failed", "chat_session_id", util.UUIDToString(chatSession.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create chat task: %w", err)
}
slog.Info("chat task enqueued", "task_id", util.UUIDToString(task.ID), "chat_session_id", util.UUIDToString(chatSession.ID), "agent_id", util.UUIDToString(chatSession.AgentID))
// See EnqueueTaskForIssue for ordering rationale.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// CancelTasksForIssue cancels every active task on the issue, reconciles each
// affected agent's status, and broadcasts task:cancelled events so frontends
// clear their live cards.
//
// Before #1587 this path was "cancel rows and return" — issue-status flips
// (e.g. user marks the issue `done` or `cancelled` while a task is still
// running) left the agent stuck at status="working" indefinitely, requiring a
// manual `multica agent update <id> --status idle` to unwedge. Matches the
// pattern already used by CancelTask and RerunIssue.
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
cancelled, err := s.Queries.CancelAgentTasksByIssue(ctx, issueID)
if err != nil {
return err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
return nil
}
// CancelTasksForAgent cancels every active task belonging to an agent
// (queued + dispatched + running), reconciles the agent's status, and
// broadcasts task:cancelled events. Used by the agent-level "Cancel all
// tasks" action — same shape as CancelTasksForIssue but scoped on agent_id.
//
// Returns the cancelled rows so callers can report counts / log them.
func (s *TaskService) CancelTasksForAgent(ctx context.Context, agentID pgtype.UUID) ([]db.AgentTaskQueue, error) {
cancelled, err := s.Queries.CancelAgentTasksByAgent(ctx, agentID)
if err != nil {
return nil, err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
// Reconcile once after the loop — agent transitions from
// working→available based on remaining task counts, no need to call
// per row (the rows we just cancelled all belong to the same agent).
s.ReconcileAgentStatus(ctx, agentID)
return cancelled, nil
}
// CancelTasksByTriggerComment cancels active tasks whose trigger is the given
// comment. Called from DeleteComment so an agent does not run with the
// now-deleted content already embedded in its prompt. Must be invoked BEFORE
// the comment row is deleted because the FK ON DELETE SET NULL would
// otherwise nullify trigger_comment_id and we'd lose the ability to find
// the affected tasks.
func (s *TaskService) CancelTasksByTriggerComment(ctx context.Context, commentID pgtype.UUID) error {
cancelled, err := s.Queries.CancelAgentTasksByTriggerComment(ctx, commentID)
if err != nil {
return err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
return nil
}
// BroadcastCancelledTasks reconciles each affected agent's status and emits
// task:cancelled for every row. Callers must invoke this AFTER committing the
// cancellation so subscribers don't observe a "cancelled" event for a row
// that the tx might still roll back.
func (s *TaskService) BroadcastCancelledTasks(ctx context.Context, cancelled []db.AgentTaskQueue) {
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
}
func (s *TaskService) CaptureCancelledTasks(ctx context.Context, cancelled []db.AgentTaskQueue) {
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
}
}
// CancelTask cancels a single task by ID. It broadcasts a task:cancelled event
// so frontends can update immediately.
func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.CancelAgentTask(ctx, taskID)
if errors.Is(err, pgx.ErrNoRows) {
existing, err := s.Queries.GetAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
return &existing, nil
}
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
slog.Info("task cancelled", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskCancelled(ctx, task)
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast cancellation as a task:failed event so frontends clear the live card
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, task)
return &task, nil
}
// ClaimTask atomically claims the next queued task for an agent,
// respecting max_concurrent_tasks.
func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.AgentTaskQueue, error) {
start := time.Now()
var (
outcome = "unknown"
getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs int64
)
defer func() {
s.maybeLogClaimSlow(agentID, outcome, start, getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs)
}()
t0 := start
agent, err := s.Queries.GetAgent(ctx, agentID)
getAgentMs = time.Since(t0).Milliseconds()
if err != nil {
outcome = "error_get_agent"
return nil, fmt.Errorf("agent not found: %w", err)
}
t0 = time.Now()
running, err := s.Queries.CountRunningTasks(ctx, agentID)
countRunningMs = time.Since(t0).Milliseconds()
if err != nil {
outcome = "error_count_running"
return nil, fmt.Errorf("count running tasks: %w", err)
}
if running >= int64(agent.MaxConcurrentTasks) {
slog.Debug("task claim: no capacity", "agent_id", util.UUIDToString(agentID), "running", running, "max", agent.MaxConcurrentTasks)
outcome = "no_capacity"
return nil, nil // No capacity
}
t0 = time.Now()
task, err := s.Queries.ClaimAgentTask(ctx, agentID)
claimAgentMs = time.Since(t0).Milliseconds()
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Debug("task claim: no tasks available", "agent_id", util.UUIDToString(agentID))
outcome = "no_tasks"
return nil, nil // No tasks available
}
outcome = "error_claim"
return nil, fmt.Errorf("claim task: %w", err)
}
slog.Info("task claimed", "task_id", util.UUIDToString(task.ID), "agent_id", util.UUIDToString(agentID))
s.captureTaskDispatched(ctx, task)
// Refresh agent status from active tasks. This avoids a stale unconditional
// working write racing after a just-cancelled claim.
t0 = time.Now()
s.ReconcileAgentStatus(ctx, agentID)
updateStatusMs = time.Since(t0).Milliseconds()
// Broadcast task:dispatch. ResolveTaskWorkspaceID inside this path can
// re-query issue/chat_session/autopilot_run, so it can also be a real
// contributor to claim latency.
t0 = time.Now()
s.broadcastTaskDispatch(ctx, task)
dispatchMs = time.Since(t0).Milliseconds()
outcome = "claimed"
return &task, nil
}
// ClaimTaskForRuntime claims the next runnable task for a runtime while
// still respecting each agent's max_concurrent_tasks limit.
//
// Empty-claim fast path: when EmptyClaim is configured and a recent
// check verified the runtime had no queued tasks, returns immediately
// without touching Postgres. The cache is invalidated synchronously on
// every enqueue (notifyTaskAvailable), so a queued task becomes
// claimable on the next call rather than waiting for the TTL.
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
start := time.Now()
var (
outcome = "no_task"
listMs, loopMs int64
listCount, tried int
claimedFlag bool
)
defer func() {
totalMs := time.Since(start).Milliseconds()
if totalMs < 300 {
return
}
slog.Info("claim_for_runtime slow",
"runtime_id", util.UUIDToString(runtimeID),
"outcome", outcome,
"total_ms", totalMs,
"list_pending_ms", listMs,
"list_pending_count", listCount,
"agents_tried", tried,
"claim_loop_ms", loopMs,
"claimed", claimedFlag,
)
}()
runtimeKey := util.UUIDToString(runtimeID)
// Check this before EmptyClaim: a lost claim response moves the task out of
// `queued`, so the empty-queued cache cannot represent recoverability.
stale, err := s.Queries.ReclaimStaleDispatchedTaskForRuntime(ctx, db.ReclaimStaleDispatchedTaskForRuntimeParams{
RuntimeID: runtimeID,
ClaimRecoverySecs: claimResponseRecoveryWindow.Seconds(),
})
if err == nil {
outcome = "reclaimed_dispatched"
claimedFlag = true
slog.Info("stale dispatched task reclaimed",
"task_id", util.UUIDToString(stale.ID),
"runtime_id", runtimeKey,
"agent_id", util.UUIDToString(stale.AgentID),
)
return &stale, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
outcome = "error_reclaim_dispatched"
return nil, fmt.Errorf("reclaim stale dispatched task: %w", err)
}
if s.EmptyClaim.IsEmpty(ctx, runtimeKey) {
outcome = "empty_cache_hit"
return nil, nil
}
// Sample the invalidation version BEFORE the SELECT. If a
// concurrent enqueue Bumps between this read and the post-SELECT
// MarkEmpty, the next IsEmpty will see the empty key tagged with
// a stale version and reject it — closing the race that would
// otherwise stall the just-queued task until the empty key's TTL
// expired.
preSelectVersion := s.EmptyClaim.CurrentVersion(ctx, runtimeKey)
t0 := time.Now()
tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID)
listMs = time.Since(t0).Milliseconds()
listCount = len(tasks)
if err != nil {
outcome = "error_list"
return nil, fmt.Errorf("list queued claim candidates: %w", err)
}
if len(tasks) == 0 {
s.EmptyClaim.MarkEmpty(ctx, runtimeKey, preSelectVersion)
outcome = "empty_db"
return nil, nil
}
loopStart := time.Now()
triedAgents := map[string]struct{}{}
var claimed *db.AgentTaskQueue
for _, candidate := range tasks {
agentKey := util.UUIDToString(candidate.AgentID)
if _, seen := triedAgents[agentKey]; seen {
continue
}
triedAgents[agentKey] = struct{}{}
tried++
task, err := s.ClaimTask(ctx, candidate.AgentID)
if err != nil {
loopMs = time.Since(loopStart).Milliseconds()
outcome = "error_claim"
return nil, err
}
if task != nil && task.RuntimeID == runtimeID {
claimed = task
break
}
}
loopMs = time.Since(loopStart).Milliseconds()
if claimed != nil {
claimedFlag = true
outcome = "claimed"
}
return claimed, nil
}
// maybeLogClaimSlow emits one structured log per ClaimTask call when its total
// latency exceeds 300ms, so the prod tail can be diagnosed without flooding
// logs at normal poll rates. Called via defer so it captures the full path
// including post-claim updateAgentStatus / broadcastTaskDispatch (both of
// which can hit the DB) and any error exit.
func (s *TaskService) maybeLogClaimSlow(agentID pgtype.UUID, outcome string, start time.Time, getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs int64) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 300 {
return
}
slog.Info("claim_task slow",
"agent_id", util.UUIDToString(agentID),
"outcome", outcome,
"total_ms", totalMs,
"get_agent_ms", getAgentMs,
"count_running_ms", countRunningMs,
"claim_agent_ms", claimAgentMs,
"update_status_ms", updateStatusMs,
"dispatch_ms", dispatchMs,
)
}
// StartTask transitions a dispatched task to running.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.StartAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("start task: %w", err)
}
slog.Info("task started", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskStarted(ctx, task)
// Tell every connected workspace WS client that this task transitioned
// dispatched → running. Without this, the workspace-wide
// `agentTaskSnapshot` query only refreshes on the 30s staleTime, so any
// UI that distinguishes "queued" from "running" (e.g. the issue-card
// agent activity indicator) lags by up to half a minute on the
// transition users care about most.
s.broadcastTaskEvent(ctx, protocol.EventTaskRunning, task)
return &task, nil
}
// CompleteTask marks a task as completed.
// Issue status is NOT changed here — the agent manages it via the CLI.
//
// For chat tasks, CompleteAgentTask and the chat_session resume-pointer
// update run in a single transaction. This closes a race where the next
// queued chat message could be claimed in the window between the task
// flipping to 'completed' and chat_session.session_id being refreshed,
// causing the new task to resume against a stale (or NULL) session.
func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte, sessionID, workDir string) (*db.AgentTaskQueue, error) {
var task db.AgentTaskQueue
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
t, err := qtx.CompleteAgentTask(ctx, db.CompleteAgentTaskParams{
ID: taskID,
Result: result,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
return err
}
task = t
if t.ChatSessionID.Valid {
// Pin the chat_session's runtime_id alongside the session_id so the
// next claim can apply the runtime-guard. Both fields move together:
// when there's no session_id to record, leave runtime_id untouched
// (NULL → COALESCE keeps the existing value).
var sessionRuntimeID pgtype.UUID
if sessionID != "" {
sessionRuntimeID = t.RuntimeID
}
// COALESCE in SQL guarantees empty inputs don't wipe the
// existing resume pointer; we still surface DB errors.
if err := qtx.UpdateChatSessionSession(ctx, db.UpdateChatSessionSessionParams{
ID: t.ChatSessionID,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
RuntimeID: sessionRuntimeID,
}); err != nil {
return fmt.Errorf("update chat session resume pointer: %w", err)
}
}
return nil
}); err != nil {
// When parallel agents race, a task may already be completed,
// cancelled, or failed by the time this call runs. The UPDATE
// … WHERE status = 'running' returns no rows in that case.
// Treat it as an idempotent success — same pattern as CancelTask.
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Info("complete task: already finalized",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"agent_id", util.UUIDToString(existing.AgentID),
)
return &existing, nil
}
slog.Warn("complete task failed",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"chat_session_id", util.UUIDToString(existing.ChatSessionID),
"agent_id", util.UUIDToString(existing.AgentID),
"error", err,
)
} else {
slog.Warn("complete task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("complete task: %w", err)
}
slog.Info("task completed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskCompleted(ctx, task)
// Invariant: every completed issue task must have at least one agent
// comment on the issue, so the user always sees something when a run
// ends. If the agent posted a comment during execution (result, progress
// ping, or CLI reply), HasAgentCommentedSince returns true and we skip.
// Otherwise, synthesize one from the final output. For comment-triggered
// tasks, TriggerCommentID threads the fallback under the original comment;
// for assignment-triggered tasks it is NULL and the fallback is top-level.
// Chat tasks have no IssueID and are handled separately below.
if task.IssueID.Valid {
suppressNoActionComment, err := HasSquadLeaderNoActionEvaluationForTask(ctx, s.Queries, task)
if err != nil {
slog.Warn("checking squad leader no_action evaluation failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(task.IssueID),
"agent_id", util.UUIDToString(task.AgentID),
"error", err,
)
}
agentCommented, _ := s.Queries.HasAgentCommentedSince(ctx, db.HasAgentCommentedSinceParams{
IssueID: task.IssueID,
AuthorID: task.AgentID,
Since: task.StartedAt,
})
if !suppressNoActionComment && !agentCommented {
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil {
if payload.Output != "" {
// Match the CLI's --content / --description behavior: agents that
// emit literal `\n` 4-char sequences (Python/JSON-style) get them
// decoded into real newlines before the comment hits the DB. See
// util.UnescapeBackslashEscapes for the exact contract.
body := util.UnescapeBackslashEscapes(payload.Output)
if task.TriggerCommentID.Valid && isTrivialDoneOutput(body) {
slog.Warn("suppressing trivial comment-trigger fallback output",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(task.IssueID),
"agent_id", util.UUIDToString(task.AgentID),
)
} else {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(body), "comment", task.TriggerCommentID)
}
}
}
}
}
// Quick-create tasks: locate the issue the agent just created and push
// an inbox confirmation to the requester. The agent has no issue / chat
// link, so the regular completion paths above don't apply. We find the
// new issue by querying for the most recent issue this agent created in
// the requester's workspace since the task started — more robust than
// parsing the agent's stdout for an identifier.
if qc, ok := s.parseQuickCreateContext(task); ok {
s.notifyQuickCreateCompleted(ctx, task, qc)
}
// For chat tasks, save assistant reply and broadcast chat:done. The
// resume pointer was already persisted inside the transaction above.
if task.ChatSessionID.Valid {
var assistantMsg *db.ChatMessage
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil && payload.Output != "" {
// Same unescape as the issue-comment path above: literal `\n` from
// agent stdout becomes a real newline so the chat panel renders
// paragraph breaks instead of one wall of prose.
body := util.UnescapeBackslashEscapes(payload.Output)
row, err := s.Queries.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: redact.Text(body),
TaskID: task.ID,
ElapsedMs: computeChatElapsedMs(task),
})
if err != nil {
slog.Error("failed to save assistant chat message", "task_id", util.UUIDToString(task.ID), "error", err)
} else {
assistantMsg = &row
// Event-driven unread: stamp unread_since on the first unread
// assistant message. No-op if the session already has unread.
// If the user is actively viewing the session, the frontend's
// auto-mark-read effect will clear this within a tick.
if err := s.Queries.SetUnreadSinceIfNull(ctx, task.ChatSessionID); err != nil {
slog.Warn("failed to set unread_since", "chat_session_id", util.UUIDToString(task.ChatSessionID), "error", err)
}
}
}
s.broadcastChatDone(ctx, task, assistantMsg)
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskCompleted, task)
return &task, nil
}
// FailTask marks a task as failed.
// Issue status is NOT changed here — the agent manages it via the CLI.
//
// sessionID/workDir are optional: when the agent established a real session
// before failing (e.g. crashed mid-conversation, was cancelled, or hit a
// tool error), the daemon should pass them so we can preserve the resume
// pointer on both the task row and the chat_session — otherwise the next
// chat turn would silently start a brand-new session and lose memory.
//
// failureReason is a coarse classifier consumed by the auto-retry path.
// Pass "" when unknown (treated as 'agent_error').
func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg, sessionID, workDir, failureReason string) (*db.AgentTaskQueue, error) {
var task db.AgentTaskQueue
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
t, err := qtx.FailAgentTask(ctx, db.FailAgentTaskParams{
ID: taskID,
Error: pgtype.Text{String: errMsg, Valid: true},
FailureReason: pgtype.Text{String: failureReason, Valid: failureReason != ""},
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
return err
}
task = t
// Keep resume-unsafe sessions on the task row for observability, but
// do not promote them to the chat-level resume pointer.
if t.ChatSessionID.Valid && !resumeUnsafeFailureReason(failureReason) {
// Pin the chat_session's runtime_id alongside the session_id so the
// next claim can apply the runtime-guard. Both fields move together:
// when there's no session_id to record, leave runtime_id untouched
// (NULL → COALESCE keeps the existing value).
var sessionRuntimeID pgtype.UUID
if sessionID != "" {
sessionRuntimeID = t.RuntimeID
}
if err := qtx.UpdateChatSessionSession(ctx, db.UpdateChatSessionSessionParams{
ID: t.ChatSessionID,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
RuntimeID: sessionRuntimeID,
}); err != nil {
return fmt.Errorf("update chat session resume pointer: %w", err)
}
}
return nil
}); err != nil {
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Info("fail task: already finalized",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"agent_id", util.UUIDToString(existing.AgentID),
)
return &existing, nil
}
slog.Warn("fail task failed",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"chat_session_id", util.UUIDToString(existing.ChatSessionID),
"agent_id", util.UUIDToString(existing.AgentID),
"error", err,
)
} else {
slog.Warn("fail task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("fail task: %w", err)
}
slog.Warn("task failed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID), "error", errMsg, "failure_reason", failureReason)
s.captureTaskFailed(ctx, task)
// Auto-retry eligible failures (orphan, timeout, runtime_offline,
// runtime_recovery). The helper itself enforces attempt < max_attempts
// and only triggers for issue/chat tasks.
retried, _ := s.MaybeRetryFailedTask(ctx, task)
// Skip the per-failure system comment when we'll immediately retry —
// the new task will surface its own status to the user, and we don't
// want to spam the issue with "task timed out" messages on every
// daemon hiccup.
if errMsg != "" && task.IssueID.Valid && retried == nil {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(errMsg), "system", task.TriggerCommentID)
}
// Mirror the issue fallback for chat tasks: write an assistant
// chat_message tagged with the daemon-reported failure_reason so the
// conversation history shows what happened. Skip when auto-retry is
// pending (the new attempt will write its own outcome) — same guard as
// the issue path above.
if task.ChatSessionID.Valid && retried == nil {
if _, err := s.Queries.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: redact.Text(errMsg),
TaskID: pgtype.UUID{Bytes: task.ID.Bytes, Valid: true},
FailureReason: pgtype.Text{String: failureReason, Valid: failureReason != ""},
ElapsedMs: computeChatElapsedMs(task),
}); err != nil {
slog.Error("failed to save failure chat message",
"task_id", util.UUIDToString(task.ID),
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err)
} else if err := s.Queries.SetUnreadSinceIfNull(ctx, task.ChatSessionID); err != nil {
slog.Warn("failed to set unread_since on failure",
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err)
}
}
// Quick-create tasks: push a failure inbox notification to the
// requester so they can either retry or fall back to the advanced form
// without losing their original prompt. Skipped when an auto-retry is
// pending — the new attempt will write its own outcome.
if retried == nil {
if qc, ok := s.parseQuickCreateContext(task); ok {
s.notifyQuickCreateFailed(ctx, task, qc, errMsg)
}
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskFailed, task)
return &task, nil
}
// retryableReasons enumerates failure reasons that the auto-retry path is
// allowed to act on. Agent-side errors (compile failures, model rejections,
// etc.) are intentionally excluded — those are real problems that the user
// should see, not infrastructure flakiness.
var retryableReasons = map[string]bool{
"runtime_offline": true,
"runtime_recovery": true,
"timeout": true,
"codex_semantic_inactivity": true,
}
func resumeUnsafeFailureReason(reason string) bool {
switch reason {
// Keep in sync with GetLastTaskSession / GetLastChatTaskSession and
// CreateRetryTask's fresh-session CASE WHEN.
case "iteration_limit", "agent_fallback_message", "api_invalid_request", "codex_semantic_inactivity":
return true
default:
return false
}
}
// MaybeRetryFailedTask spawns a fresh queued attempt for a recently-failed
// task when the failure was infrastructure-shaped (daemon crash, runtime
// went offline, dispatch/run timeout) and the task hasn't exhausted its
// max_attempts budget. The child task inherits agent/runtime/issue/chat
// links and, for resume-safe failures, the parent's session_id/work_dir so
// the agent can resume the conversation when the backend supports it. Returns
// the new task, or nil when no retry was created.
//
// Autopilot tasks are NOT auto-retried here; the autopilot scheduler owns
// its own re-run cadence and we don't want to double-fire it.
func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentTaskQueue) (*db.AgentTaskQueue, error) {
if parent.Status != "failed" {
return nil, nil
}
reason := ""
if parent.FailureReason.Valid {
reason = parent.FailureReason.String
}
if !retryableReasons[reason] {
return nil, nil
}
if parent.Attempt >= parent.MaxAttempts {
slog.Info("task auto-retry skipped: budget exhausted",
"task_id", util.UUIDToString(parent.ID),
"attempt", parent.Attempt,
"max_attempts", parent.MaxAttempts,
)
return nil, nil
}
if parent.AutopilotRunID.Valid {
// Autopilot has its own retry semantics; do not double-trigger.
return nil, nil
}
if !parent.IssueID.Valid && !parent.ChatSessionID.Valid {
return nil, nil
}
child, err := s.Queries.CreateRetryTask(ctx, parent.ID)
if err != nil {
slog.Warn("task auto-retry failed",
"parent_task_id", util.UUIDToString(parent.ID),
"reason", reason,
"error", err,
)
return nil, err
}
slog.Info("task auto-retry enqueued",
"parent_task_id", util.UUIDToString(parent.ID),
"child_task_id", util.UUIDToString(child.ID),
"reason", reason,
"attempt", child.Attempt,
"max_attempts", child.MaxAttempts,
)
// Retry creates a fresh queued row, same status transition (∅ → queued)
// as EnqueueTaskFor*. Broadcast queued first, then notify the daemon —
// see EnqueueTaskForIssue for ordering rationale.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, child)
s.NotifyTaskEnqueued(ctx, child)
return &child, nil
}
// RerunIssue creates a fresh queued task for an agent on the issue. Used by
// the manual rerun endpoint.
//
// Target agent resolution:
// - sourceTaskID Valid: rerun the agent that ran that task (and reuse its
// leader/worker role). This is what the execution log retry button uses
// so a per-row retry survives a subsequent assignee change and correctly
// re-fires the squad worker or mention agent whose row was clicked. The
// source task's trigger_comment_id is also inherited (when the caller
// didn't pass one) so a per-row rerun of a comment- or mention-triggered
// task stays comment-triggered — the daemon's buildCommentPrompt path
// keys on TriggerCommentID, and losing it would degrade the rerun into
// a generic issue run that no longer carries the original comment.
// - sourceTaskID empty: fall back to the issue's current assignee (agent
// or squad leader). This preserves the CLI / API contract for callers
// that have an issue ID but no specific task to target.
//
// The new task is flagged force_fresh_session=true so the daemon starts a
// clean agent session instead of resuming the prior (agent_id, issue_id)
// session. A user clicking rerun has just judged the prior output bad —
// resuming the same conversation would replay the same poisoned state.
// Auto-retry of an orphaned mid-flight failure (HandleFailedTasks →
// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so
// MUL-1128's mid-flight resume contract is preserved.
//
// Only tasks belonging to the target agent on this issue are cancelled.
// Tasks owned by other agents on the same issue (e.g. a parallel
// @-mention agent) are left alone — rerun must not collateral-cancel
// them.
func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, sourceTaskID pgtype.UUID, triggerCommentID pgtype.UUID) (*db.AgentTaskQueue, error) {
issue, err := s.Queries.GetIssue(ctx, issueID)
if err != nil {
return nil, fmt.Errorf("load issue: %w", err)
}
// Determine the target agent for the rerun.
var (
agentID pgtype.UUID
isLeader bool
)
if sourceTaskID.Valid {
sourceTask, err := s.Queries.GetAgentTask(ctx, sourceTaskID)
if err != nil {
return nil, fmt.Errorf("load source task: %w", err)
}
if !sourceTask.IssueID.Valid || util.UUIDToString(sourceTask.IssueID) != util.UUIDToString(issueID) {
return nil, fmt.Errorf("source task does not belong to this issue")
}
agentID = sourceTask.AgentID
isLeader = sourceTask.IsLeaderTask
// Inherit trigger provenance so a per-row rerun of a comment- or
// mention-triggered task stays a comment-triggered task. Without
// this the daemon's buildCommentPrompt path is skipped (it keys on
// TriggerCommentID) and the rerun degrades into a generic issue
// run that has lost the original comment context. Only override
// when the caller didn't pass one explicitly.
if !triggerCommentID.Valid && sourceTask.TriggerCommentID.Valid {
triggerCommentID = sourceTask.TriggerCommentID
}
} else {
switch {
case issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid:
agentID = issue.AssigneeID
case issue.AssigneeType.String == "squad" && issue.AssigneeID.Valid:
squad, err := s.Queries.GetSquad(ctx, issue.AssigneeID)
if err != nil {
return nil, fmt.Errorf("issue is assigned to a squad but squad not found")
}
agentID = squad.LeaderID
isLeader = true
default:
return nil, fmt.Errorf("issue is not assigned to an agent or squad")
}
}
// Cancel only the target agent's active/queued tasks on this issue.
cancelled, err := s.Queries.CancelAgentTasksByIssueAndAgent(ctx, db.CancelAgentTasksByIssueAndAgentParams{
IssueID: issueID,
AgentID: agentID,
})
if err != nil {
slog.Warn("rerun: cancel prior tasks failed",
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"error", err,
)
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
task, err := s.enqueueRerunTask(ctx, issue, agentID, triggerCommentID, isLeader)
if err != nil {
return nil, err
}
slog.Info("issue rerun enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"source_task_id", util.UUIDToString(sourceTaskID),
"is_leader", isLeader,
"cancelled_prior", len(cancelled),
)
return &task, nil
}
// enqueueRerunTask enqueues a fresh task for the given agent on the issue.
// When the target agent is the issue's single-agent assignee we use the
// assignee-driven path (enqueueIssueTask) so the issue-assignee bookkeeping
// stays in sync; otherwise (squad member, prior assignee that has since been
// reassigned, mention agent) we use the mention path with the same
// force_fresh_session=true contract.
func (s *TaskService) enqueueRerunTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool) (db.AgentTaskQueue, error) {
if issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid &&
util.UUIDToString(issue.AssigneeID) == util.UUIDToString(agentID) {
return s.enqueueIssueTask(ctx, issue, triggerCommentID, true)
}
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, isLeader, true)
}
// HandleFailedTasks runs the post-failure side effects for a batch of
// freshly-failed tasks: optional auto-retry, task:failed event broadcast,
// agent status reconciliation, and (when an issue has no remaining active
// task and isn't being retried) resetting the issue back to todo so the
// daemon can pick it up again.
//
// All callers that surface a task as failed — sweepers, FailTask,
// recover-orphans — funnel through here so the same UI-consistency
// guarantees apply on every code path.
func (s *TaskService) HandleFailedTasks(ctx context.Context, tasks []db.AgentTaskQueue) int {
if len(tasks) == 0 {
return 0
}
affectedAgents := make(map[string]pgtype.UUID)
processedIssues := make(map[string]bool)
retriedIssues := make(map[string]bool)
retried := 0
for _, t := range tasks {
// Auto-retry first so the issue stays in_progress rather than
// flapping todo → in_progress within a tick.
if child, _ := s.MaybeRetryFailedTask(ctx, t); child != nil {
retried++
if t.IssueID.Valid {
retriedIssues[util.UUIDToString(t.IssueID)] = true
}
}
failureReason := "agent_error"
if t.FailureReason.Valid && t.FailureReason.String != "" {
failureReason = t.FailureReason.String
}
s.captureTaskFailed(ctx, t)
workspaceID := ""
if t.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, t.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
// Reset stuck in_progress issues only when no other active
// task exists for the issue and no retry was just enqueued.
issueKey := util.UUIDToString(t.IssueID)
if issue.Status == "in_progress" && !processedIssues[issueKey] && !retriedIssues[issueKey] {
processedIssues[issueKey] = true
hasActive, checkErr := s.Queries.HasActiveTaskForIssue(ctx, t.IssueID)
if checkErr != nil {
slog.Warn("handle failed tasks: active check failed",
"issue_id", issueKey,
"error", checkErr,
)
} else if !hasActive {
if _, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: t.IssueID,
Status: "todo",
WorkspaceID: issue.WorkspaceID,
}); updateErr != nil {
slog.Warn("handle failed tasks: reset stuck issue failed",
"issue_id", issueKey,
"error", updateErr,
)
}
}
}
}
}
if workspaceID == "" {
workspaceID = s.ResolveTaskWorkspaceID(ctx, t)
}
if workspaceID != "" {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskFailed,
WorkspaceID: workspaceID,
ActorType: "system",
Payload: map[string]any{
"task_id": util.UUIDToString(t.ID),
"agent_id": util.UUIDToString(t.AgentID),
"issue_id": util.UUIDToString(t.IssueID),
"status": "failed",
"failure_reason": failureReason,
},
})
}
affectedAgents[util.UUIDToString(t.AgentID)] = t.AgentID
}
for _, agentID := range affectedAgents {
s.ReconcileAgentStatus(ctx, agentID)
}
return retried
}
// runInTx executes fn inside a single DB transaction. If TxStarter is nil
// (e.g. some tests construct TaskService directly), fn runs against the
// regular Queries handle without transactional guarantees.
func (s *TaskService) runInTx(ctx context.Context, fn func(*db.Queries) error) error {
if s.TxStarter == nil {
return fn(s.Queries)
}
tx, err := s.TxStarter.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
if err := fn(s.Queries.WithTx(tx)); err != nil {
return err
}
return tx.Commit(ctx)
}
// ReportProgress broadcasts a progress update via the event bus.
func (s *TaskService) ReportProgress(ctx context.Context, taskID string, workspaceID string, summary string, step, total int) {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskProgress,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
TaskID: taskID,
Payload: protocol.TaskProgressPayload{
TaskID: taskID,
Summary: summary,
Step: step,
Total: total,
},
})
}
// ReconcileAgentStatus refreshes agent status from the current active task set.
func (s *TaskService) ReconcileAgentStatus(ctx context.Context, agentID pgtype.UUID) {
agent, err := s.Queries.RefreshAgentStatusFromTasks(ctx, agentID)
if err != nil {
return
}
slog.Debug("agent status reconciled", "agent_id", util.UUIDToString(agentID), "status", agent.Status)
s.publishAgentStatus(agent)
}
func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID, status string) {
agent, err := s.Queries.UpdateAgentStatus(ctx, db.UpdateAgentStatusParams{
ID: agentID,
Status: status,
})
if err != nil {
return
}
s.publishAgentStatus(agent)
}
func (s *TaskService) publishAgentStatus(agent db.Agent) {
s.Bus.Publish(events.Event{
Type: protocol.EventAgentStatus,
WorkspaceID: util.UUIDToString(agent.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"agent": agentToMap(agent)},
})
}
// LoadAgentSkills loads an agent's skills with their files for task execution.
func (s *TaskService) LoadAgentSkills(ctx context.Context, agentID pgtype.UUID) []AgentSkillData {
skills, err := s.Queries.ListAgentSkills(ctx, agentID)
if err != nil || len(skills) == 0 {
return nil
}
result := make([]AgentSkillData, 0, len(skills))
for _, sk := range skills {
data := AgentSkillData{Name: sk.Name, Description: sk.Description, Content: sk.Content}
files, _ := s.Queries.ListSkillFiles(ctx, sk.ID)
for _, f := range files {
data.Files = append(data.Files, AgentSkillFileData{Path: f.Path, Content: f.Content})
}
result = append(result, data)
}
return result
}
// AgentSkillData represents a skill for task execution responses.
type AgentSkillData struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Content string `json:"content"`
Files []AgentSkillFileData `json:"files,omitempty"`
}
// AgentSkillFileData represents a supporting file within a skill.
type AgentSkillFileData struct {
Path string `json:"path"`
Content string `json:"content"`
}
// computeChatElapsedMs returns the wall-clock duration from task creation
// (user hit send) to terminal state (completed/failed). Stored on the
// assistant chat_message so the UI can render "Replied in 38s" /
// "Failed after 12s". Uses created_at — not started_at — because users
// experience total wait time, including queue + dispatch, not just the
// daemon's actual run time.
func computeChatElapsedMs(task db.AgentTaskQueue) pgtype.Int8 {
if !task.CompletedAt.Valid || !task.CreatedAt.Valid {
return pgtype.Int8{}
}
ms := task.CompletedAt.Time.Sub(task.CreatedAt.Time).Milliseconds()
if ms < 0 {
ms = 0
}
return pgtype.Int8{Int64: ms, Valid: true}
}
func priorityToInt(p string) int32 {
switch p {
case "urgent":
return 4
case "high":
return 3
case "medium":
return 2
case "low":
return 1
default:
return 0
}
}
// NotifyTaskEnqueued is the cross-package shim for callers outside
// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a
// row into agent_task_queue directly. Invalidates the empty-claim
// cache and kicks the daemon WS so the new task is claimed without
// waiting for the next poll.
func (s *TaskService) NotifyTaskEnqueued(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskQueued(ctx, task)
s.notifyTaskAvailable(task)
}
// notifyTaskAvailable runs after a task has been inserted: bumps the
// runtime's invalidation version so any in-flight claim that is about
// to write an "empty" verdict will have it rejected on read, then
// kicks the daemon WS so the daemon claims without waiting for its
// next poll. Order matters — Bump must happen before the wakeup,
// otherwise the wakeup-driven claim could read the still-current
// empty verdict and return null.
func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) {
if !task.RuntimeID.Valid {
return
}
runtimeKey := util.UUIDToString(task.RuntimeID)
// Use a background context: the cache bump / wakeup must outlive
// the request that created the task, otherwise an early client
// disconnect could leave the empty verdict in place and stall the
// just-queued task until the TTL expires. The cache itself bounds
// every Redis call with a short timeout so a wedged Redis cannot
// block enqueue.
s.EmptyClaim.Bump(context.Background(), runtimeKey)
if s.Wakeup == nil {
return
}
s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID))
}
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
var payload map[string]any
if task.Context != nil {
json.Unmarshal(task.Context, &payload)
}
if payload == nil {
payload = map[string]any{}
}
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
payload["issue_id"] = util.UUIDToString(task.IssueID)
payload["agent_id"] = util.UUIDToString(task.AgentID)
// chat_session_id is the routing key the chat window uses to writethrough
// `chatKeys.pendingTask` to status="running" the moment the daemon claims
// the task. Without it the pill stays stuck at "Queued" until completion.
if task.ChatSessionID.Valid {
payload["chat_session_id"] = util.UUIDToString(task.ChatSessionID)
}
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventTaskDispatch,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
payload := map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
}
if task.ChatSessionID.Valid {
payload["chat_session_id"] = util.UUIDToString(task.ChatSessionID)
}
s.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
// ResolveTaskWorkspaceID determines the workspace ID for a task.
// For issue tasks, it comes from the issue. For chat tasks, from the chat session.
// For autopilot tasks, from the autopilot via its run.
// Returns "" when none of the links resolve — callers treat that as "not found".
func (s *TaskService) ResolveTaskWorkspaceID(ctx context.Context, task db.AgentTaskQueue) string {
if task.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
return util.UUIDToString(issue.WorkspaceID)
}
}
if task.ChatSessionID.Valid {
if cs, err := s.Queries.GetChatSession(ctx, task.ChatSessionID); err == nil {
return util.UUIDToString(cs.WorkspaceID)
}
}
if task.AutopilotRunID.Valid {
if run, err := s.Queries.GetAutopilotRun(ctx, task.AutopilotRunID); err == nil {
if ap, err := s.Queries.GetAutopilot(ctx, run.AutopilotID); err == nil {
return util.UUIDToString(ap.WorkspaceID)
}
}
}
// Quick-create tasks have no issue / chat / autopilot link — workspace
// lives in the context JSONB. Returning "" here is what blocked
// requireDaemonTaskAccess (404 on /start, /progress, /complete, /fail
// for the daemon) and silently dropped task:dispatch / task:completed
// broadcasts, which is why quick-create tasks appeared stuck queued.
if qc, ok := s.parseQuickCreateContext(task); ok {
return qc.WorkspaceID
}
return ""
}
func (s *TaskService) broadcastChatDone(ctx context.Context, task db.AgentTaskQueue, msg *db.ChatMessage) {
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
payload := protocol.ChatDonePayload{
ChatSessionID: util.UUIDToString(task.ChatSessionID),
TaskID: util.UUIDToString(task.ID),
}
if msg != nil {
payload.MessageID = util.UUIDToString(msg.ID)
payload.Content = msg.Content
if msg.CreatedAt.Valid {
payload.CreatedAt = msg.CreatedAt.Time.UTC().Format(time.RFC3339Nano)
}
if msg.ElapsedMs.Valid {
payload.ElapsedMs = msg.ElapsedMs.Int64
}
}
s.Bus.Publish(events.Event{
Type: protocol.EventChatDone,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
ChatSessionID: util.UUIDToString(task.ChatSessionID),
Payload: payload,
})
}
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
prefix := s.getIssuePrefix(issue.WorkspaceID)
s.Bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"issue": issueToMap(issue, prefix)},
})
}
func (s *TaskService) getIssuePrefix(workspaceID pgtype.UUID) string {
ws, err := s.Queries.GetWorkspace(context.Background(), workspaceID)
if err != nil {
return ""
}
return ws.IssuePrefix
}
func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID pgtype.UUID, content, commentType string, parentID pgtype.UUID) {
if content == "" {
return
}
// Look up issue to get workspace ID for mention expansion and broadcasting.
issue, err := s.Queries.GetIssue(ctx, issueID)
if err != nil {
return
}
// Resolve thread root: if parentID points to a reply (has its own parent),
// use that parent instead so the comment lands in the top-level thread.
// rootComment captures the root row so we can auto-unresolve it after the
// reply is committed (see AutoUnresolveThreadOnReply).
var rootComment *db.Comment
if parentID.Valid {
if parent, err := s.Queries.GetComment(ctx, parentID); err == nil {
if parent.ParentID.Valid {
if root, err := s.Queries.GetComment(ctx, parent.ParentID); err == nil {
rootComment = &root
parentID = root.ID
}
} else {
rootComment = &parent
}
}
}
// Expand bare issue identifiers (e.g. MUL-117) into mention links.
content = mention.ExpandIssueIdentifiers(ctx, s.Queries, issue.WorkspaceID, content)
comment, err := s.Queries.CreateComment(ctx, db.CreateCommentParams{
IssueID: issueID,
WorkspaceID: issue.WorkspaceID,
AuthorType: "agent",
AuthorID: agentID,
Content: content,
Type: commentType,
ParentID: parentID,
})
if err != nil {
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventCommentCreated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "agent",
ActorID: util.UUIDToString(agentID),
Payload: map[string]any{
"comment": map[string]any{
"id": util.UUIDToString(comment.ID),
"issue_id": util.UUIDToString(comment.IssueID),
"author_type": comment.AuthorType,
"author_id": util.UUIDToString(comment.AuthorID),
"content": comment.Content,
"type": comment.Type,
"parent_id": util.UUIDToPtr(comment.ParentID),
"created_at": comment.CreatedAt.Time.Format("2006-01-02T15:04:05Z"),
},
"issue_title": issue.Title,
"issue_status": issue.Status,
},
})
s.AutoUnresolveThreadOnReply(ctx, rootComment, util.UUIDToString(issue.WorkspaceID), "agent", util.UUIDToString(agentID))
}
// AutoUnresolveThreadOnReply clears resolved_at on the thread root when a
// reply lands in a resolved thread, and broadcasts comment:unresolved. Shared
// between the user-facing Handler.CreateComment path and the agent-facing
// TaskService.createAgentComment path so the resolved-then-replied state can
// never desync (one of the bugs Emacs flagged on PR #2300). Errors are logged
// — the reply itself already committed, the desync is recoverable on next read.
func (s *TaskService) AutoUnresolveThreadOnReply(ctx context.Context, parent *db.Comment, workspaceID, actorType, actorID string) {
if parent == nil || !parent.ResolvedAt.Valid {
return
}
updated, err := s.Queries.UnresolveComment(ctx, parent.ID)
if err != nil {
slog.Warn("auto-unresolve on reply failed", "error", err, "comment_id", util.UUIDToString(parent.ID))
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventCommentUnresolved,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
Payload: map[string]any{
"comment": map[string]any{
"id": util.UUIDToString(updated.ID),
"issue_id": util.UUIDToString(updated.IssueID),
"author_type": updated.AuthorType,
"author_id": util.UUIDToString(updated.AuthorID),
"content": updated.Content,
"type": updated.Type,
"parent_id": util.UUIDToPtr(updated.ParentID),
"created_at": util.TimestampToString(updated.CreatedAt),
"updated_at": util.TimestampToString(updated.UpdatedAt),
"resolved_at": util.TimestampToPtr(updated.ResolvedAt),
"resolved_by_type": util.TextToPtr(updated.ResolvedByType),
"resolved_by_id": util.UUIDToPtr(updated.ResolvedByID),
},
},
})
}
func issueToMap(issue db.Issue, issuePrefix string) map[string]any {
return map[string]any{
"id": util.UUIDToString(issue.ID),
"workspace_id": util.UUIDToString(issue.WorkspaceID),
"number": issue.Number,
"identifier": issuePrefix + "-" + strconv.Itoa(int(issue.Number)),
"title": issue.Title,
"description": util.TextToPtr(issue.Description),
"status": issue.Status,
"priority": issue.Priority,
"assignee_type": util.TextToPtr(issue.AssigneeType),
"assignee_id": util.UUIDToPtr(issue.AssigneeID),
"creator_type": issue.CreatorType,
"creator_id": util.UUIDToString(issue.CreatorID),
"parent_issue_id": util.UUIDToPtr(issue.ParentIssueID),
"position": issue.Position,
"start_date": util.TimestampToPtr(issue.StartDate),
"due_date": util.TimestampToPtr(issue.DueDate),
"created_at": util.TimestampToString(issue.CreatedAt),
"updated_at": util.TimestampToString(issue.UpdatedAt),
}
}
// parseQuickCreateContext returns the quick-create payload if the task's
// context JSONB contains type == "quick_create"; otherwise the bool is
// false so callers can short-circuit. Tasks linked to an issue / chat /
// autopilot are never quick-create even if they happen to carry a
// context blob, so those are filtered up front.
func (s *TaskService) parseQuickCreateContext(task db.AgentTaskQueue) (QuickCreateContext, bool) {
if task.IssueID.Valid || task.ChatSessionID.Valid || task.AutopilotRunID.Valid {
return QuickCreateContext{}, false
}
if len(task.Context) == 0 {
return QuickCreateContext{}, false
}
var qc QuickCreateContext
if err := json.Unmarshal(task.Context, &qc); err != nil {
return QuickCreateContext{}, false
}
if qc.Type != QuickCreateContextType {
return QuickCreateContext{}, false
}
return qc, true
}
// notifyQuickCreateCompleted writes a success inbox notification to the
// requester pointing at the issue the agent just created. The issue is
// stamped with origin_type=quick_create + origin_id=<task_id> by the
// daemon-injected MULTICA_QUICK_CREATE_TASK_ID env var, so this lookup is
// deterministic — robust against the same agent creating other issues in
// parallel (e.g. assignment task running while max_concurrent_tasks > 1
// permits another quick-create alongside it).
func (s *TaskService) notifyQuickCreateCompleted(ctx context.Context, task db.AgentTaskQueue, qc QuickCreateContext) {
requesterID, err := util.ParseUUID(qc.RequesterID)
if err != nil {
slog.Warn("quick-create completion: invalid requester id", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
workspaceID, err := util.ParseUUID(qc.WorkspaceID)
if err != nil {
slog.Warn("quick-create completion: invalid workspace id", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
issue, err := s.Queries.GetIssueByOrigin(ctx, db.GetIssueByOriginParams{
WorkspaceID: workspaceID,
OriginType: pgtype.Text{String: "quick_create", Valid: true},
OriginID: task.ID,
})
if err != nil {
// No issue created — agent ran to completion but the CLI call must
// have failed. Surface as a failure inbox so the user sees something.
slog.Warn("quick-create completion: no issue found, writing failure inbox",
"task_id", util.UUIDToString(task.ID),
"agent_id", util.UUIDToString(task.AgentID),
"workspace_id", qc.WorkspaceID,
)
s.notifyQuickCreateFailed(ctx, task, qc, "agent finished without creating an issue")
return
}
// Link the new issue back to this task so subsequent reads of the task
// (Activity tab, Recent work, etc.) render it as a normal issue task
// (kind = "direct") instead of staying on the "Creating issue" active-
// wording label. Best-effort: a write failure here doesn't block the
// inbox notification, which is the more important signal to the user.
if err := s.Queries.LinkTaskToIssue(ctx, db.LinkTaskToIssueParams{
ID: task.ID,
IssueID: issue.ID,
}); err != nil {
slog.Warn("quick-create completion: link task→issue failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"error", err,
)
}
// Subscribe the requester so they receive notifications for follow-up
// comments and updates. The DB row's creator_type/creator_id is the
// agent (it ran the CLI), but the human who triggered the quick-create
// is the semantic creator from a UX perspective — without this they
// only see the one-shot completion inbox and miss everything after.
// Best-effort: log on failure but don't block the inbox notification.
if err := s.Queries.AddIssueSubscriber(ctx, db.AddIssueSubscriberParams{
IssueID: issue.ID,
UserType: "member",
UserID: requesterID,
Reason: "creator",
}); err != nil {
slog.Warn("quick-create completion: subscribe requester failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"requester_id", qc.RequesterID,
"error", err,
)
} else {
s.Bus.Publish(events.Event{
Type: protocol.EventSubscriberAdded,
WorkspaceID: qc.WorkspaceID,
ActorType: "agent",
ActorID: util.UUIDToString(task.AgentID),
Payload: map[string]any{
"issue_id": util.UUIDToString(issue.ID),
"user_type": "member",
"user_id": qc.RequesterID,
"reason": "creator",
},
})
}
prefix := s.getIssuePrefix(workspaceID)
identifier := fmt.Sprintf("%s-%d", prefix, issue.Number)
details, _ := json.Marshal(map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(issue.ID),
"identifier": identifier,
"original_prompt": qc.Prompt,
})
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: workspaceID,
RecipientType: "member",
RecipientID: requesterID,
Type: "quick_create_done",
Severity: "info",
IssueID: issue.ID,
Title: issue.Title,
Body: pgtype.Text{},
ActorType: pgtype.Text{String: "agent", Valid: true},
ActorID: task.AgentID,
Details: details,
})
if err != nil {
slog.Error("quick-create completion: inbox write failed", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
s.publishQuickCreateInbox(item, qc.WorkspaceID, util.UUIDToString(task.AgentID), issue.Status)
}
// notifyQuickCreateFailed writes a failure inbox notification carrying the
// original prompt + agent ID so the frontend can render an "Edit as
// advanced form" entry that pre-fills the legacy create-issue modal
// without asking the user to retype.
func (s *TaskService) notifyQuickCreateFailed(ctx context.Context, task db.AgentTaskQueue, qc QuickCreateContext, errMsg string) {
requesterID, err := util.ParseUUID(qc.RequesterID)
if err != nil {
return
}
workspaceID, err := util.ParseUUID(qc.WorkspaceID)
if err != nil {
return
}
if errMsg == "" {
errMsg = "Quick create did not finish successfully"
}
details, _ := json.Marshal(map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"original_prompt": qc.Prompt,
"error": redact.Text(errMsg),
})
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: workspaceID,
RecipientType: "member",
RecipientID: requesterID,
Type: "quick_create_failed",
Severity: "action_required",
IssueID: pgtype.UUID{},
Title: "Quick create failed",
Body: pgtype.Text{String: redact.Text(errMsg), Valid: true},
ActorType: pgtype.Text{String: "agent", Valid: true},
ActorID: task.AgentID,
Details: details,
})
if err != nil {
slog.Error("quick-create failure: inbox write failed", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
s.publishQuickCreateInbox(item, qc.WorkspaceID, util.UUIDToString(task.AgentID), "")
}
// publishQuickCreateInbox emits the WS event so the requester's inbox list
// updates immediately. Mirrors the payload shape used by the other inbox
// listeners (notification_listeners.go).
func (s *TaskService) publishQuickCreateInbox(item db.InboxItem, workspaceID, agentID, issueStatus string) {
resp := map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
"details": json.RawMessage(item.Details),
"issue_status": issueStatus,
}
s.Bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: workspaceID,
ActorType: "agent",
ActorID: agentID,
Payload: map[string]any{"item": resp},
})
}
// agentToMap builds a simple map for broadcasting agent status updates.
func agentToMap(a db.Agent) map[string]any {
var rc any
if a.RuntimeConfig != nil {
json.Unmarshal(a.RuntimeConfig, &rc)
}
return map[string]any{
"id": util.UUIDToString(a.ID),
"workspace_id": util.UUIDToString(a.WorkspaceID),
"runtime_id": util.UUIDToString(a.RuntimeID),
"name": a.Name,
"description": a.Description,
"avatar_url": util.TextToPtr(a.AvatarUrl),
"runtime_mode": a.RuntimeMode,
"runtime_config": rc,
"visibility": a.Visibility,
"status": a.Status,
"max_concurrent_tasks": a.MaxConcurrentTasks,
"owner_id": util.UUIDToPtr(a.OwnerID),
"skills": []any{},
"created_at": util.TimestampToString(a.CreatedAt),
"updated_at": util.TimestampToString(a.UpdatedAt),
"archived_at": util.TimestampToPtr(a.ArchivedAt),
"archived_by": util.UUIDToPtr(a.ArchivedBy),
}
}