Files
LinYushen cb68669c73 feat(composio): gate MCP apps behind feature flag (#4876)
* feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720) (#4608)

* feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720)

Compose the merged server/pkg/composio SDK into a user-facing connection
manager: signed-state connect handshake, local user_composio_connection
mirror, idempotent disconnect, and a per-user MCP session helper (not yet
wired into task dispatch).

- migration 127_user_composio_connection (no FK/cascade, per DB rules)
- sqlc queries: upsert (idempotent on user_id+connected_account_id), list
  active, owner-scoped get, mark revoked
- internal/integrations/composio: signed HMAC-SHA256 state, BeginConnect,
  CompleteCallback (idempotent upsert), ListConnections, Disconnect
  (upstream 404 = idempotent success), CreateMCPSession (no-op when empty,
  pins connected_accounts per toolkit), CallbackRedirect
- REST handlers under /api/integrations/composio (user-scoped, 503 when
  COMPOSIO_API_KEY unset): connect/init, callback (302), connections list,
  delete
- router wiring gated by COMPOSIO_API_KEY; COMPOSIO_AUTH_CONFIGS_JSON maps
  toolkit->auth_config (MVP: notion); state secret from COMPOSIO_STATE_SECRET
  or derived from JWT_SECRET; callback base from COMPOSIO_CALLBACK_BASE_URL
  or MULTICA_PUBLIC_URL
- tests: state (expire/tamper/wrong-secret), service (mapping, callback
  idempotency, non-success, disconnect owner/404 idempotency, MCP pin),
  handlers (httptest), redact regression for Bearer mcp_ tokens

MVP scope: Notion only; no task-dispatch overlay, sharing, or webhook
event handling (later stages).

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): bind callback account to user + idempotent revoked disconnect (MUL-3720)

Address PR 4608 review (CHANGES_REQUESTED):

- callback: verify connected_account_id with Composio before mirroring it.
  The signed state only proved user/toolkit/exp, so a valid state paired with
  a tampered connected_account_id would be written verbatim. CompleteCallback
  now calls ListConnectedAccounts and fails closed (ErrAccountVerification)
  unless the account belongs to the state's user (composio_user_id == multica
  user id) and was created under the toolkit's auth config. No row is written
  on mismatch / unknown account / upstream error.

- disconnect: short-circuit to a no-op when the local row is already revoked,
  before touching upstream. Previously a second DELETE re-hit Composio and a
  non-404 upstream error surfaced as a 502, breaking the 204-idempotent
  contract.

- CreateMCPSession: document the v1 single-active-connection-per-(user,toolkit)
  constraint and make duplicate selection deterministic (newest-wins, rows are
  connected_at DESC) instead of order-dependent map overwrite. Stage 3 owns the
  real single-account-enforcement vs multi-account-shape decision.

Tests: tampered/wrong-auth-config/unknown-account callback rejection, revoked-row
disconnect no-op (asserts upstream not re-hit). composio pkg 85% coverage; all
green.

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): list all toolkits + dynamic auth-config resolution (MUL-3720)

Yushen's follow-up to the Notion MVP: surface the full Composio toolkit
catalog, render it in Settings, and drop the static env mapping in favor of
dynamic auth-config discovery.

Config correctness (per Composio docs):
- Remove COMPOSIO_AUTH_CONFIGS_JSON entirely. The toolkit→auth_config mapping
  is now resolved at request time from the project's /auth_configs (cached,
  5-min TTL), so enabling a toolkit is a dashboard action, not a redeploy.
- Do NOT add COMPOSIO_PROJECT_ID. The project API key (x-api-key) authenticates
  to exactly one project; the project is resolved from the key. Only org-level
  endpoints use x-org-api-key, which this integration never calls.

Backend:
- SDK: server/pkg/composio/auth_configs.go — ListAuthConfigs (toolkit_slug,
  is_composio_managed, show_disabled, limit, cursor).
- service: dynamic resolver (authConfigMap cache; betterAuthConfig prefers a
  custom/white-label config over Composio-managed, newest wins); BeginConnect
  and CompleteCallback resolve via it; ListToolkits fetches the full catalog
  (paginated, capped) annotated with connectable = has an enabled auth config,
  connectable-first ordering.
- handler + route: GET /api/integrations/composio/toolkits (user-scoped, 503
  when COMPOSIO_API_KEY unset) returning slug/name/logo/category/connectable.

Frontend:
- core: ComposioToolkit/ComposioConnection types, api client methods, and
  composio query options (@multica/core/composio).
- views: Settings → Integrations now has a Composio section rendering every
  toolkit as a card with search. Connect is gated on `connectable`;
  non-connectable toolkits show a muted "not configured" hint instead of a
  dead button. Connected toolkits show a badge + Disconnect (with confirm).
- i18n: composio block added to en/zh-Hans/ja/ko settings.

Tests: SDK + service (dynamic resolution, custom-over-managed preference,
connectable flag, resolver-error soft-degrade) and handler toolkits endpoint;
composio pkg 85.7% coverage. go build/vet/gofmt clean; core+views typecheck,
core+views lint, and core tests (691) all green.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): close cross-toolkit callback fail-open by signing auth_config_id into state (MUL-3720)

Re-review blocker: CompleteCallback resolved the toolkit's auth config at
callback time and ignored a resolve error/empty result, while
verifyAccountOwnership skipped the auth-config comparison when the expected
value was empty. A user could then pass another toolkit's connected_account_id
into this toolkit's callback — the owner check passed and it was written under
the wrong toolkit_slug/account binding.

Fix: the auth_config_id is already resolved in BeginConnect (before the state
is signed), so sign it into the state and compare it exactly at callback. No
re-resolve, no fail-open. verifyAccountOwnership now fails closed when the
expected auth config is empty (rejects instead of skipping) and requires an
exact match — closing the cross-toolkit binding gap.

Tests: state round-trips auth_config_id; BeginConnect signs it; callback
rejects wrong/cross-toolkit auth config and an empty (no-mapping) auth config
fails closed. composio pkg 85.2% coverage, all green.

Frontend (non-blocking): the Composio settings tab now surfaces an error when
the connections query fails instead of silently rendering everything as
unconnected.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): hide Settings section entirely when integration unconfigured (MUL-3720)

Decision (option 2, hide-then-merge): don't show a card that leaks the internal
COMPOSIO_API_KEY env-var name to every end user. IntegrationsTab now gates the
whole Composio section (heading + body) on the toolkits query — a 503 means the
key is unset, so the section is withheld instead of rendering the not-configured
card. Admin-only setup guidance is a later, role-gated affordance.

Removed the notConfigured card (and now-unused ApiError import) from
ComposioTab; it only mounts when configured. views typecheck + lint clean.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): Stage 2 frontend polish — callback toast, last_used & expired UI, e2e (MUL-3718) (#4688)

* feat(composio): callback toast + refresh, last_used & expired UI, e2e (MUL-3718)

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): real callback redirect route + StrictMode-safe toast dedup (MUL-3718 review)

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): callback endpoint should not require Multica auth (MUL-3843) (#4709)

* fix(composio): move OAuth callback out of the Auth group (MUL-3843)

Composio 302-redirects the browser to /api/integrations/composio/callback
at the end of the OAuth flow, but PR #4608 mounted it inside the cookie-auth
middleware group. When the session cookie is absent (expired session,
SameSite=Strict / Safari ITP, private window, self-hosted callback subdomain)
the Auth middleware returned a hard 401 and a JSON blob instead of the
settings redirect, breaking the flow.

Identity never came from the cookie anyway: it is carried by the HMAC-signed
state param that CompleteCallback verifies (signature, expiry, replay) and
cross-checked by verifyAccountOwnership; h.Composio == nil still 503s. So the
callback is registered alongside the other public OAuth/webhook routes; the
other four composio endpoints stay session-gated.

Refs MUL-3843, MUL-3715.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): correct stale callback routing comments (MUL-3843)

The package header and ComposioCallback doc comments still described the
callback as sitting under the Auth middleware group. After the route was
moved out (this PR), update both to state it is a public route whose identity
comes from the signed state — addressing review nit from 张大彪.

Refs MUL-3843.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): inject MCP overlay into agent runtime at task dispatch (MUL-3721) (#4704)

Stage 3 of the Composio epic. Wires the per-user Composio MCP session into
every agent task so the agent process sees the initiator's connected tools
without any prompt-time plumbing.

Server side
  - Migration 128 adds agent_task_queue.runtime_mcp_overlay JSONB plus a
    BEFORE-UPDATE trigger that wipes the column on any transition into a
    terminal status (completed / failed / cancelled). A trigger is the single
    source of truth — future queries that flip status cannot bypass it.
  - composio.Service.BuildTaskOverlay(userID) reuses CreateMCPSession and
    emits the Claude-style { mcpServers: { composio: { type: http, url,
    headers } } } shape the daemon's existing sidecar generators consume.
    Returns (nil, nil) on zero active connections so we never burn a
    Composio session for a user with nothing to call.
  - TaskService grows a Composio ComposioOverlayBuilder seam, wired in
    router.go after composiointeg.NewService succeeds. Five enqueue paths
    (issue / mention / quick-create / chat / auto-retry) attach the overlay
    after CreateAgentTask returns and before the daemon is notified — so
    every claim reads a settled row, with no second daemon hop. Best-effort:
    a builder failure logs and proceeds with no overlay.
  - resolveInitiatorFromTriggerComment derives the initiator user from the
    trigger comment when it was authored by a member. Agent-authored
    triggers are not treated as initiators (their connected-apps view is
    empty by construction).

Daemon side
  - handler/daemon.go claim path merges task.runtime_mcp_overlay onto
    agent.mcp_config via mergeMCPOverlay before populating
    TaskAgentData.McpConfig. Overlay wins on server-name collisions
    because it carries the live user-scoped session URL. Errors fall back
    to the agent config unchanged — a bad overlay must not surprise-disable
    saved MCP tools. The existing execenv sidecar generators (cursor /
    codex / openclaw / opencode / hermes-kiro) need no changes: they keep
    consuming the merged result through TaskAgentData.McpConfig.

Tests
  - 9 merge cases (mcp_overlay_test): both-nil short-circuit, agent-only
    pass-through, overlay-only canonicalization, two-side merge, name
    collision (overlay wins), top-level key preservation, malformed agent
    fallback, malformed overlay fallback, non-object server rejection.
  - 4 dispatch cases (composio): zero-connections returns nil without
    CreateSession, happy-path emits the right shape with the right user
    id, empty-URL defensive branch, SDK error surfacing.
  - 4 TaskService helper cases: nil Composio is a no-op (Queries-safe),
    invalid initiator does not call the builder, nil overlay skips the
    UPDATE, builder error swallowed without panic.
  - Migration 128 verified to roll up + down + up cleanly against the test
    database.

Out of scope (deferred): assignment-triggered enqueue paths with no
trigger comment get no overlay attached today (no initiator UUID flows
through enqueueIssueTask in that case). Retry paths recompute the overlay
fresh from the parent's initiator_user_id instead of inheriting the bearer
from the parent row, so a stale token can never resurface on a retry.

Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): per-agent allowlist + originator-scoped MCP overlay (MUL-3869) (#4736)

* feat(composio): per-agent allowlist + originator-scoped MCP overlay (MUL-3869)

Stage 3.1 of the Composio epic (MUL-3721 parent). PR #4704 wired in the
runtime_mcp_overlay column and a per-task dispatch hook; this change
inverts the default from "all-on" to opt-in and locks the overlay to the
agent owner's own connected apps:

- Agents carry composio_toolkit_allowlist TEXT[]. NULL or [] => no MCP.
  Owner-only read/write; non-owner GET/PUT silently redacts/drops the
  field (same shape as mcp_config).
- agent_task_queue carries originator_user_id UUID. Set from the
  top-of-chain HUMAN at every enqueue path:
    * issue/mention comment by member  -> author_id
    * issue/mention comment by agent   -> inherit via comment.source_task_id
                                          -> parent task originator_user_id
    * quick-create                     -> requester_id
    * chat                             -> initiator_user_id
    * retry                            -> SQL-inherited from parent row
    * autopilot                        -> NULL (system-driven)
- BuildTaskOverlay (composio dispatch) now takes (ctx, originatorUserID,
  agent) and short-circuits on five gates: invalid originator,
  originator != agent.owner_id, empty allowlist, empty intersection of
  allowlist ∩ active connections, defensive empty session URL. Composio
  CreateSession is called with BOTH `toolkits.slugs` (the intersection)
  AND `connected_accounts` (the pinned account ids), narrowing the
  tool-router twice.
- The originator-vs-owner gate closes the agent-fanout privacy hole: any
  workspace member who can @-mention a public agent used to project the
  owner's connected apps into their run. Now the overlay only mounts
  when the human at the top of the chain IS the agent owner.

Tests:
- dispatch_test.go covers all 5 gates plus uppercase/whitespace slug
  normalisation.
- task_runtime_mcp_overlay_test.go covers the no-op gates of the new
  applyRuntimeMCPOverlay signature.
- agent_composio_allowlist_test.go (handler): owner roundtrip
  (list/empty/null), workspace-admin silent-drop, owner-only GET
  visibility, pure normaliseComposioToolkitAllowlist.
- resolve_originator_test.go (service, DB-backed): member-authored,
  agent-authored inherits via comment.source_task_id, invalid id.

Migration 129 up/down/up verified against docker postgres.

Co-authored-by: multica-agent <github@multica.ai>

* chore(composio): gofmt + regenerate sqlc with v1.31.1 (MUL-3869 review nits)

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica-ai.local>
Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): accept nested connected account auth config

* feat(views): creator-only MCP tab for per-agent Composio allowlist (MUL-3870) (#4743)

Stage 3.2 frontend on top of the Stage 3.1 backend (MUL-3869, 4708dba97).
Adds an agent-detail tab that lets the agent owner pick which of their own
active Composio connections this agent may mount as MCP servers, writing the
selection to agent.composio_toolkit_allowlist via the existing PUT /api/agents.

- core/types: composio_toolkit_allowlist (+ _redacted) on Agent; tri-state
  composio_toolkit_allowlist on UpdateAgentRequest (omit/no-change, null/clear,
  array/replace), matching the backend contract.
- core/agents: useUpdateAgentAllowlist - optimistic mutation hook (patches the
  cached workspace agent list, rolls back on error, invalidates on settle).
- views: AgentMcpTab renders the owner's active connections as checkboxes;
  empty state links to Settings -> Integrations; defensive redacted state.
- views: wired into AgentOverviewPane as tab "composio_mcp", labeled "MCP Apps"
  to disambiguate from the existing raw-JSON "MCP" (mcp_config) tab. The entry
  is gated to the creator (currentUserId === agent.owner_id), matching the
  backend's owner-only read/write of the allowlist.
- i18n: tabs.composio_mcp + tab_body.composio_mcp.* in en/ja/ko/zh-Hans.
- tests: agent-mcp-tab.test.tsx (gating, toggle->allowlist body, active-only,
  empty, redacted); e2e/agent-mcp.spec.ts (creator sees tab + PUT body,
  non-creator hidden) with Composio + agent endpoints mocked at the boundary.

Note: the product spec says "creator"; the schema has no creator_id - the
backend gate and redaction are keyed on owner_id, so the tab uses owner_id.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): mount remote MCP for codex

* feat(agents): agent invocation permission system (MUL-3963) (#4844)

* feat(agents): agent invocation permission system (permission_mode + invocation targets)

MUL-3963: split who may INVOKE an agent out of the overloaded visibility
column into an explicit, extensible model on feature/composio-integration.

- DB: agent.permission_mode (private|public_to) + agent_invocation_target
  table (workspace/member/team targets) + lossless backfill from visibility
  (migration 130).
- canInvokeAgent: owner-only for private (NO admin bypass, NO A2A bypass);
  public_to honours the allow-list; A2A judged by the top-of-chain originator.
- All trigger paths rewired: issue assign, comment @agent/@squad, chat,
  quick-create, autopilot, squad leader, child-done.
- Agent API: permission_mode + invocation_targets on responses and
  create/update (owner-only writes); legacy visibility kept as a derived field
  so old clients never see a permission widening.
- Composio: BuildTaskOverlay now FOLLOWS invocation permission and uses the
  agent OWNER connection (removed the originator==owner gate); front-end warns
  when a shared agent enables Composio apps.
- CLI: --permission-mode / --public-to-workspace / --public-to-member (legacy
  --visibility still mapped).
- Frontend: AccessPicker (Private / workspace / specific people / team soon),
  permission rules mirror canInvokeAgent, Composio warning banner.
- Tests: migration backfill, admin cannot invoke others private, public_to
  workspace/member whitelist, A2A by originator, Composio overlay uses owner
  connection.

Co-authored-by: multica-agent <github@multica.ai>

* feat(agents): stackable, mixed public_to invocation targets (MUL-3963)

Follow-up on PR #4844: public_to now supports selecting MULTIPLE, MIXED
targets on one agent (e.g. Public to workspace + specific people + team),
with canInvokeAgent admitting on ANY matching target (OR).

- Frontend AccessPicker: reworked from a single exclusive kind into a
  stackable multi-select — an "Everyone in workspace" toggle, a member
  multi-select checklist, and a (disabled, v1) team placeholder can be
  combined freely. Emits the full union of selected targets; empty union
  collapses to Private. Existing team targets are preserved across saves.
  Added the access.public_group locale string (en/zh-Hans/ja/ko).
- Backend already supported this (agent_invocation_target is multi-row per
  agent; create/update take a target ARRAY and batch-replace the whole
  allow-list; canInvokeAgent OR-matches). Added tests to lock it in:
  mixed member+team targets, overlapping-member batch replace, and
  workspace+member stacking then narrowing.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): address review on invocation permission (MUL-3963)

张大彪 review on PR #4844 — three blockers + product ruling + nits:

1. Migration 130: drop the FK/cascade on agent_invocation_target
   (agent_id, created_by) per the Multica no-FK rule; relationships are now
   maintained in the app layer (matching MUL-3515 §4). Added
   DeleteAgentInvocationTargetsByArchivedRuntimeAgents and call it before
   DeleteArchivedAgentsByRuntime in all three runtime-delete paths
   (runtime.go x2, runtime_profile.go) so hard-deleting agents can't orphan
   target rows.
2. revokeAndRemoveMember: prune the leaving member's member-target grants
   (DeleteAgentInvocationTargetsByMember) in the same tx as the member-row
   delete, so a re-invited user can't reclaim a stale invocation grant.
3. Empty public_to is a phantom — parsePermissionInput now normalises a
   public_to with no resolvable targets to a single workspace target, so
   `--permission-mode public_to` alone (and any empty target array) means
   "public to workspace" instead of "shared but nobody can run it".

Product ruling: the system/no-human-originator → workspace-target path in
canInvokeAgent is a deliberate, documented exception (webhook/system/
workspace-wide automation); member/team targets still fail closed without a
resolved originator. Documented in code + locked with a test.

Nits: refreshed the stale "originator must be owner" comments — models.go
(via migration 130 COMMENT ON COLUMN + sqlc regen for composio_toolkit_allowlist
and originator_user_id) and agent-mcp-tab.tsx — to the owner-connection +
invocation-permission rules.

Tests: member remove/re-add regression, system workspace exception + member
fail-closed, empty public_to → workspace (plus the earlier mixed/overlap/
batch-replace suite). Migration 130 applied to the test DB; Go handler/service/
composio suites green; views typecheck clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): scope member invocation-target cleanup to one workspace (MUL-3963)

张大彪 3rd review — cross-workspace permission bug + comment nits:

- DeleteAgentInvocationTargetsByMember was a GLOBAL delete by user id, so
  removing a user from workspace A also wiped their member-target grants on
  agents in workspace B. Scoped it to a single workspace by joining through
  agent.workspace_id; revokeAndRemoveMember now passes (workspaceID, userID).
- Regression test TestRevokeMember_InvocationTargetCleanupIsWorkspaceScoped:
  same user allow-listed by agents in two workspaces; removal from one leaves
  the other workspace's target intact.
- Nits: refreshed the remaining stale "originator == agent.owner_id" /
  "owner-vs-originator" comments — CreateRetryTask (agent.sql, regenerated),
  and the AgentResponse allowlist doc + ListAgents/UpdateAgent redaction
  rationale in agent.go — to the owner-connection + invocation-permission rule.

Migration 130 applied to the test DB; Go handler/service/composio suites green;
go vet clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): agent access owner-only editable, read-only for others (MUL-3963) (#4853)

* fix(agents): make agent access owner-only editable, read-only for others (MUL-3963)

Interaction bug: a non-owner (incl. workspace admin) could open the AccessPicker
and set an agent public — the backend silently ignored it and the UI bounced
back to private. Access is owner-only, so non-owners must see a read-only state
and the backend must reject real changes explicitly.

Frontend:
- AccessPicker renders a static, non-interactive read-only state when the
  viewer is not the owner: the current access value + a lock affordance + a
  tooltip "Only the agent owner can change who can run this agent." No clickable
  trigger is rendered, so a non-owner can never open a control the backend would
  reject (the GitHub/Notion pattern for permission settings you can see but not
  edit). The editable multi-select picker is unchanged for the owner.
- agent-detail-inspector gates the picker on ownership specifically
  (currentUserId === agent.owner_id), NOT the general canEdit (which also admits
  admins, who may edit other fields but not access).
- New locale key access.owner_only_readonly (en/zh-Hans/ja/ko).

Backend:
- UpdateAgent now returns an explicit 403 when a non-owner submits a REAL
  permission change (permissionInputChangesAgent compares requested mode +
  target set against the persisted state); a no-op resubmit (admin PATCH-as-PUT
  echoing unchanged permission) is still tolerated so admin edits of other
  fields keep working. Replaces the previous silent-drop that caused the bounce.

Tests:
- access-picker.test.tsx: non-owner gets a non-interactive read-only display
  with the owner-only tooltip; owner gets an interactive picker; owner can pick
  a member and stack workspace + member.
- TestUpdateAgent_AccessChangeIsOwnerOnly: admin real change → 403; admin no-op
  resubmit → 200; admin editing other fields → 200; owner change → 200.

Incidental: fixed a pre-existing base typecheck break in
slash-command-suggestion.test.tsx (stray `signal` arg not in the suggestion
items type) that otherwise fails the whole @multica/views typecheck.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): compare legacy visibility, not expanded permission, for no-op detection (MUL-3963)

PR #4853 review: permissionInputChangesAgent expanded a legacy-only
visibility:"private" into a real private permission and compared it against the
agent's actual permission. A member-only public_to agent derives legacy
visibility "private", so an admin PATCH-as-PUT echoing visibility:"private"
while editing another field was misread as a public_to→private downgrade and
rejected with 403 — contradicting the "unchanged permission no-op is allowed"
contract.

Fix (per review): when a request carries ONLY legacy `visibility` (no
permission_mode / invocation_targets), derive the agent's CURRENT legacy
visibility from its real targets and compare the legacy string values. Equal =
no-op (allowed); a real legacy change (e.g. "workspace") still returns 403.
Requests that carry permission_mode / invocation_targets keep the precise
mode+target comparison.

Regression test TestUpdateAgent_LegacyVisibilityNoOpForMemberOnlyPublicTo:
member-only public_to agent — admin submitting visibility:"private" + a
non-permission field → 200 with targets unchanged; admin submitting
visibility:"workspace" → 403.

Go handler/composio suites green; migration 130 applied; go vet clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): brief agents on connected apps

* feat(composio): gate MCP apps behind feature flag

* fix(mobile): parse agent invocation permissions

* fix(tests): update agent fixtures for access fields

---------

Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Multica Eve <eve@devv.ai>
Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: Eve <eve@multica-ai.local>
2026-07-03 14:18:43 +08:00

3015 lines
116 KiB
Go

package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/featureflags"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/runtimeapps"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/featureflag"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/multica-ai/multica/server/pkg/redact"
"github.com/multica-ai/multica/server/pkg/skillbundle"
"github.com/multica-ai/multica/server/pkg/taskfailure"
)
type TaskService struct {
Queries *db.Queries
TxStarter TxStarter
Hub *realtime.Hub
Bus *events.Bus
Analytics analytics.Client
Metrics *obsmetrics.BusinessMetrics
Wakeup TaskWakeupNotifier
// FeatureFlags is the server-side toggle router. Nil is valid and returns
// each call site's default.
FeatureFlags *featureflag.Service
// EmptyClaim caches "this runtime has no queued task" so the daemon
// poll path can skip a Postgres scan on the steady-state empty case.
// Optional — a nil cache disables the fast path and every claim
// goes through the DB. Wired in router.go from the shared Redis
// client.
EmptyClaim *EmptyClaimCache
// Composio computes the per-task MCP overlay (Stage 3 of the Composio
// epic, MUL-3721) — the integration's "current user's connected apps
// → MCP session URL" hook called from each Enqueue* path. Optional: a
// nil ComposioOverlayBuilder turns the overlay step into a no-op so
// every Multica deployment that hasn't enabled Composio behaves
// exactly as before. Wired in router.go after composiointeg.NewService
// succeeds; the concrete type is *composio.Service.
Composio ComposioOverlayBuilder
analyticsContextMu sync.Mutex
analyticsContextCache map[string]analytics.TaskContext
analyticsContextOrder []string
}
// ComposioOverlayBuilder is the seam TaskService uses to build the per-task
// MCP overlay at enqueue time. Implemented by
// internal/integrations/composio.Service.BuildTaskOverlay; tests provide an
// inline fake so they don't have to spin a fake Composio SDK.
//
// Contract: a zero MCPOverlayResult means "no overlay for this run" — covers
// all gates the implementation enforces (no owner / empty allowlist / empty
// intersection with active connections / empty session URL). Any non-empty
// MCPOverlay is the exact value to store in agent_task_queue.runtime_mcp_overlay;
// ConnectedApps is non-secret metadata to store alongside it for daemon brief
// injection. A non-nil error is surfaced to the caller but treated as
// best-effort — failed overlay computation must not fail the enqueue.
//
// agent is passed by value so the builder can inspect OwnerID and
// ComposioToolkitAllowlist without re-querying the DB; every enqueue path
// already loaded the agent for runtime/archive checks, so passing it is
// free and avoids a second GetAgent round-trip in the hot path.
type ComposioOverlayBuilder interface {
BuildTaskOverlay(ctx context.Context, originatorUserID pgtype.UUID, agent db.Agent) (runtimeapps.MCPOverlayResult, error)
}
type TaskWakeupNotifier interface {
NotifyTaskAvailable(runtimeID, taskID string)
}
// triggerSummaryMaxLen caps the snapshot length so the row stays cheap to
// transmit (it ends up in every task list response). 200 is enough for a
// recognisable preview of a one-paragraph comment.
const triggerSummaryMaxLen = 200
// truncateForSummary returns s shortened to maxRunes, with a trailing
// `…` when truncated. Operates on runes (not bytes) so multibyte characters
// — Chinese / emoji — count as one each. Strips surrounding whitespace
// first so a leading newline doesn't waste budget.
func truncateForSummary(s string, maxRunes int) string {
// strings.Builder + Grow avoids the O(N²) realloc cycle of `+=` in
// a loop. Grow uses byte length, which is an upper bound for the
// rune-equivalent output (replacing \n/\r/\t with space is byte-equal
// for ASCII whitespace), so we never reallocate.
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
switch r {
case '\n', '\r', '\t':
b.WriteByte(' ')
default:
b.WriteRune(r)
}
}
rs := []rune(strings.TrimSpace(b.String()))
if len(rs) <= maxRunes {
return string(rs)
}
return string(rs[:maxRunes]) + "…"
}
const (
taskAnalyticsContextCacheMax = 4096
// claimResponseRecoveryWindow must exceed daemon client.Timeout for
// /tasks/claim (30s) plus /tasks/{id}/start (30s) plus scheduling slack.
// Longer pre-start work is protected by prepareLeaseDuration instead of
// stretching this global crash-recovery window.
claimResponseRecoveryWindow = 90 * time.Second
prepareLeaseDuration = 45 * time.Second
)
// buildCommentTriggerSummary fetches the comment content and truncates
// it for storage on the task row. Returns an invalid pgtype.Text when
// the comment is missing (deleted / wrong workspace / etc) so the column
// stays NULL — front-end falls back to a structural label in that case.
func (s *TaskService) buildCommentTriggerSummary(ctx context.Context, commentID pgtype.UUID) pgtype.Text {
if !commentID.Valid {
return pgtype.Text{}
}
comment, err := s.Queries.GetComment(ctx, commentID)
if err != nil {
return pgtype.Text{}
}
summary := truncateForSummary(comment.Content, triggerSummaryMaxLen)
if summary == "" {
return pgtype.Text{}
}
return pgtype.Text{String: summary, Valid: true}
}
func NewTaskService(q *db.Queries, tx TxStarter, hub *realtime.Hub, bus *events.Bus, wakeups ...TaskWakeupNotifier) *TaskService {
var wakeup TaskWakeupNotifier
if len(wakeups) > 0 {
wakeup = wakeups[0]
}
return &TaskService{Queries: q, TxStarter: tx, Hub: hub, Bus: bus, Wakeup: wakeup}
}
var trivialDoneMarkers = []string{
"done",
"готово",
"готова",
"сделано",
"完成",
"完了",
}
func isTrivialDoneOutput(output string) bool {
normalized := strings.TrimSpace(strings.ToLower(output))
normalized = strings.Trim(normalized, ".!!。… ")
for _, marker := range trivialDoneMarkers {
if normalized == marker {
return true
}
}
return false
}
func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskEnqueued(source, runtimeMode)
}
}
type runtimeMCPOverlayData struct {
Overlay json.RawMessage
ConnectedApps json.RawMessage
}
// buildRuntimeMCPOverlay computes the optional per-task Composio MCP overlay.
// Enqueue paths call this BEFORE inserting the queued row so the daemon cannot
// claim a task during the network round-trip to Composio and miss the overlay.
func (s *TaskService) buildRuntimeMCPOverlay(ctx context.Context, originatorUserID pgtype.UUID, agent db.Agent) runtimeMCPOverlayData {
if s == nil || s.Composio == nil {
return runtimeMCPOverlayData{}
}
if !featureflags.ComposioMCPAppsEnabled(ctx, s.FeatureFlags) {
return runtimeMCPOverlayData{}
}
result, err := s.Composio.BuildTaskOverlay(ctx, originatorUserID, agent)
if err != nil {
slog.Warn("runtime mcp overlay: BuildTaskOverlay failed; task will run without composio overlay",
"originator_user_id", util.UUIDToString(originatorUserID),
"agent_id", util.UUIDToString(agent.ID),
"error", err,
)
return runtimeMCPOverlayData{}
}
if len(result.MCPOverlay) == 0 {
slog.Debug("runtime mcp overlay: no composio overlay for task",
"originator_user_id", util.UUIDToString(originatorUserID),
"agent_id", util.UUIDToString(agent.ID),
)
return runtimeMCPOverlayData{}
}
data := runtimeMCPOverlayData{Overlay: result.MCPOverlay}
if len(result.ConnectedApps) > 0 {
raw, err := json.Marshal(result.ConnectedApps)
if err != nil {
slog.Warn("runtime mcp overlay: marshal connected app metadata failed",
"originator_user_id", util.UUIDToString(originatorUserID),
"agent_id", util.UUIDToString(agent.ID),
"error", err,
)
return data
}
data.ConnectedApps = raw
}
return data
}
// resolveOriginatorFromTriggerComment returns the top-of-chain HUMAN user
// id for a comment that triggered an Enqueue* path. The chain rules
// (MUL-3869):
//
// - trigger comment authored by a member → originator = author_id (that
// member IS the top-of-chain human).
// - trigger comment authored by an agent → read the parent task via
// comment.source_task_id and inherit its originator_user_id. This is
// the load-bearing case for agent fan-out: agent A @-mentions agent B,
// comment author is A, but we MUST surface the human who originally
// told A to run, not lose the originator at the first agent hop.
// - missing comment / unknown source task / NULL parent originator →
// invalid pgtype.UUID. BuildTaskOverlay treats that as "no overlay"
// (gate 1).
//
// A nil receiver / nil Queries falls through to invalid so unit-test
// setups that don't wire a DB stay safe.
func (s *TaskService) resolveOriginatorFromTriggerComment(ctx context.Context, commentID pgtype.UUID) pgtype.UUID {
if s == nil || s.Queries == nil {
return pgtype.UUID{}
}
if !commentID.Valid {
return pgtype.UUID{}
}
comment, err := s.Queries.GetComment(ctx, commentID)
if err != nil {
return pgtype.UUID{}
}
switch comment.AuthorType {
case "member":
return comment.AuthorID
case "agent":
// Inherit from the agent's own triggering task. comment.source_task_id
// is set by every agent comment-write path (see migration 120), so a
// NULL here means either the comment predates that migration or it
// was authored out-of-band — both fall through to "no overlay".
if !comment.SourceTaskID.Valid {
return pgtype.UUID{}
}
parent, err := s.Queries.GetAgentTask(ctx, comment.SourceTaskID)
if err != nil {
return pgtype.UUID{}
}
return parent.OriginatorUserID
default:
return pgtype.UUID{}
}
}
// resolveOriginatorForIssueTask returns the top-of-chain human for issue-backed
// dispatches. Comment-triggered runs keep the existing comment-chain semantics;
// direct issue assignment/creation falls back to the issue's member creator.
// Agent-created quick-create issues have an explicit origin link back to the
// quick-create task, so they can inherit that task's originator safely. Other
// agent/system origins, including autopilot, deliberately remain unattributed.
func (s *TaskService) resolveOriginatorForIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID) pgtype.UUID {
if triggerCommentID.Valid {
return s.resolveOriginatorFromTriggerComment(ctx, triggerCommentID)
}
if issue.CreatorType == "member" && issue.CreatorID.Valid {
return issue.CreatorID
}
if s == nil || s.Queries == nil || !issue.OriginType.Valid || !issue.OriginID.Valid {
return pgtype.UUID{}
}
switch issue.OriginType.String {
case "quick_create":
task, err := s.Queries.GetAgentTask(ctx, issue.OriginID)
if err != nil {
return pgtype.UUID{}
}
return task.OriginatorUserID
default:
return pgtype.UUID{}
}
}
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskDispatched(util.UUIDToString(task.ID), source, runtimeMode, taskQueueWaitSeconds(task))
}
}
func (s *TaskService) AnalyticsContextForTask(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
return s.taskAnalyticsContext(ctx, task)
}
func (s *TaskService) captureTaskStarted(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, provider := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskStarted(source, runtimeMode, provider)
}
}
func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
}
}
func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQueue) {
failureReason := taskFailureReason(task)
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
s.Metrics.RecordTaskFailed(source, runtimeMode, failureReason)
}
}
func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
}
// Revoke any mat_ task tokens minted for this task. Cancellation is
// a terminal transition, so the running agent process no longer
// needs to call back; eagerly deleting the token closes the
// window where a compromised process could keep authenticating
// against the API until the 24h expiry. Failure is non-fatal — the
// expiry / FK cascade are the durable guards. MUL-2600.
if err := s.Queries.DeleteTaskTokensByTask(ctx, task.ID); err != nil {
slog.Warn("cancel task: failed to revoke task tokens",
"task_id", util.UUIDToString(task.ID), "error", err)
}
}
func (s *TaskService) CaptureTaskUsage(ctx context.Context, task db.AgentTaskQueue, provider, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int64) {
if s.Metrics == nil {
return
}
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordLLMUsage(source, runtimeMode, provider, model, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens)
}
func (s *TaskService) CaptureQueuedExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
if s.Metrics == nil {
return
}
for _, task := range tasks {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskQueuedExpired(source, runtimeMode)
}
}
func (s *TaskService) CaptureLeaseExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
if s.Metrics == nil {
return
}
for _, task := range tasks {
source, _, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskLeaseExpired(source)
}
}
func (s *TaskService) cachedTaskAnalyticsContext(task db.AgentTaskQueue) (analytics.TaskContext, bool) {
key := taskAnalyticsContextKey(task)
if key == "" {
return analytics.TaskContext{}, false
}
s.analyticsContextMu.Lock()
defer s.analyticsContextMu.Unlock()
if s.analyticsContextCache == nil {
return analytics.TaskContext{}, false
}
tc, ok := s.analyticsContextCache[key]
return tc, ok
}
func (s *TaskService) storeTaskAnalyticsContext(task db.AgentTaskQueue, tc analytics.TaskContext) {
if tc.WorkspaceID == "" {
return
}
key := taskAnalyticsContextKey(task)
if key == "" {
return
}
s.analyticsContextMu.Lock()
defer s.analyticsContextMu.Unlock()
if s.analyticsContextCache == nil {
s.analyticsContextCache = make(map[string]analytics.TaskContext)
}
if _, ok := s.analyticsContextCache[key]; !ok {
s.analyticsContextOrder = append(s.analyticsContextOrder, key)
if len(s.analyticsContextOrder) > taskAnalyticsContextCacheMax {
oldest := s.analyticsContextOrder[0]
s.analyticsContextOrder = s.analyticsContextOrder[1:]
delete(s.analyticsContextCache, oldest)
}
}
s.analyticsContextCache[key] = tc
}
func taskAnalyticsContextKey(task db.AgentTaskQueue) string {
taskID := util.UUIDToString(task.ID)
if taskID == "" {
return ""
}
return strings.Join([]string{
taskID,
util.UUIDToString(task.RuntimeID),
util.UUIDToString(task.IssueID),
util.UUIDToString(task.ChatSessionID),
util.UUIDToString(task.AutopilotRunID),
}, "|")
}
func (s *TaskService) taskMetricsContext(ctx context.Context, task db.AgentTaskQueue) (source, runtimeMode, provider string) {
tc := s.taskAnalyticsContext(ctx, task)
source = "other"
switch {
case task.ChatSessionID.Valid:
source = "chat"
case task.IssueID.Valid:
if tc.Source == analytics.SourceAutopilot {
source = "autopilot_issue"
} else {
source = "issue"
}
case task.AutopilotRunID.Valid:
source = "autopilot"
default:
if _, ok := s.parseQuickCreateContext(task); ok {
source = "quick_create"
} else if tc.Source != "" {
source = tc.Source
}
}
return source, tc.RuntimeMode, tc.Provider
}
func (s *TaskService) taskAnalyticsContext(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
if tc, ok := s.cachedTaskAnalyticsContext(task); ok {
return tc
}
tc := analytics.TaskContext{
AgentID: util.UUIDToString(task.AgentID),
TaskID: util.UUIDToString(task.ID),
Source: analytics.SourceManual,
}
if task.IssueID.Valid {
tc.IssueID = util.UUIDToString(task.IssueID)
}
if task.ChatSessionID.Valid {
tc.ChatSessionID = util.UUIDToString(task.ChatSessionID)
tc.Source = analytics.SourceChat
}
if task.AutopilotRunID.Valid {
tc.AutopilotRunID = util.UUIDToString(task.AutopilotRunID)
tc.Source = analytics.SourceAutopilot
}
if task.RuntimeID.Valid {
if rt, err := s.Queries.GetAgentRuntime(ctx, task.RuntimeID); err == nil {
tc.WorkspaceID = util.UUIDToString(rt.WorkspaceID)
tc.RuntimeMode = rt.RuntimeMode
tc.Provider = rt.Provider
}
}
if tc.WorkspaceID == "" || tc.RuntimeMode == "" {
if agent, err := s.Queries.GetAgent(ctx, task.AgentID); err == nil {
if tc.WorkspaceID == "" {
tc.WorkspaceID = util.UUIDToString(agent.WorkspaceID)
}
if tc.RuntimeMode == "" {
tc.RuntimeMode = agent.RuntimeMode
}
}
}
if task.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
tc.WorkspaceID = util.UUIDToString(issue.WorkspaceID)
if issue.CreatorType == "member" {
tc.UserID = util.UUIDToString(issue.CreatorID)
}
if issue.OriginType.Valid {
switch issue.OriginType.String {
case "autopilot":
tc.Source = analytics.SourceAutopilot
if ap, err := s.Queries.GetAutopilot(ctx, issue.OriginID); err == nil {
if ap.CreatedByType == "member" {
tc.UserID = util.UUIDToString(ap.CreatedByID)
}
}
case "quick_create":
tc.Source = analytics.SourceManual
}
}
}
}
if task.ChatSessionID.Valid {
if cs, err := s.Queries.GetChatSession(ctx, task.ChatSessionID); err == nil {
tc.WorkspaceID = util.UUIDToString(cs.WorkspaceID)
tc.UserID = util.UUIDToString(cs.CreatorID)
}
}
if task.AutopilotRunID.Valid {
if run, err := s.Queries.GetAutopilotRun(ctx, task.AutopilotRunID); err == nil {
if ap, err := s.Queries.GetAutopilot(ctx, run.AutopilotID); err == nil {
tc.WorkspaceID = util.UUIDToString(ap.WorkspaceID)
if ap.CreatedByType == "member" {
tc.UserID = util.UUIDToString(ap.CreatedByID)
}
}
}
}
if qc, ok := s.parseQuickCreateContext(task); ok {
tc.WorkspaceID = qc.WorkspaceID
tc.UserID = qc.RequesterID
tc.Source = analytics.SourceManual
}
s.storeTaskAnalyticsContext(task, tc)
return tc
}
func taskQueueWaitSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.CreatedAt, task.DispatchedAt)
}
func taskRunSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.StartedAt, task.CompletedAt)
}
func taskTotalSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.CreatedAt, task.CompletedAt)
}
func durationSeconds(start, end pgtype.Timestamptz) float64 {
if !start.Valid || !end.Valid {
return -1
}
seconds := end.Time.Sub(start.Time).Seconds()
if seconds < 0 {
return 0
}
return seconds
}
func taskFailureReason(task db.AgentTaskQueue) string {
if task.FailureReason.Valid && task.FailureReason.String != "" {
return task.FailureReason.String
}
return "agent_error"
}
func taskErrorType(reason string) string {
switch reason {
case "runtime_offline", "runtime_recovery":
return "runtime"
case "timeout", "codex_semantic_inactivity":
return "timeout"
case "iteration_limit", "agent_fallback_message":
return "agent_output"
case "cancelled", "user_cancelled":
return "cancelled"
default:
return "agent_error"
}
}
// EnqueueTaskForIssue creates a queued task for an agent-assigned issue.
// No context snapshot is stored — the agent fetches all data it needs at
// runtime via the multica CLI.
func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) {
var commentID pgtype.UUID
if len(triggerCommentID) > 0 {
commentID = triggerCommentID[0]
}
return s.enqueueIssueTask(ctx, issue, commentID, false, "")
}
// EnqueueTaskForIssueWithHandoff is the assign/promote variant that carries a
// handoff note into the run's opening context (MUL-3375). The note rides a
// dedicated task column; the daemon renders it via the assignment-handoff
// branch. Empty note behaves exactly like EnqueueTaskForIssue.
func (s *TaskService) EnqueueTaskForIssueWithHandoff(ctx context.Context, issue db.Issue, handoffNote string) (db.AgentTaskQueue, error) {
return s.enqueueIssueTask(ctx, issue, pgtype.UUID{}, false, handoffNote)
}
// enqueueIssueTask is the shared implementation behind EnqueueTaskForIssue
// and the manual rerun path. forceFreshSession=true marks the task so the
// daemon claim handler skips the (agent_id, issue_id) resume lookup — the
// user already judged the prior output bad, a fresh agent session is the
// expected behavior.
// ResolveIssueReviewSHA returns the head SHA of the commit currently under
// review for an issue (the head_sha of its most-relevant linked PR), or the
// empty string when the issue has no linked PR. Callers thread this into both
// the reviewer-loop dedup check and the enqueue path so a pending review task
// pinned to an old head does not satisfy a request after HEAD advanced
// (TEN-356). Empty string is the safe default: it makes dedup fall back to the
// pre-TEN-356 (issue_id, agent_id) key and leaves the task's context NULL.
//
// The lookup fails soft — any DB error (including "no linked PR") returns "" so
// a transient github-table hiccup can never over-dedup a review out of
// existence; the worst case is the pre-TEN-356 coalescing behavior.
func (s *TaskService) ResolveIssueReviewSHA(ctx context.Context, issueID pgtype.UUID) string {
if !issueID.Valid {
return ""
}
sha, err := s.Queries.GetIssueReviewHeadSha(ctx, issueID)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
slog.Warn("resolve issue review sha failed",
"issue_id", util.UUIDToString(issueID), "error", err)
}
return ""
}
return sha
}
// headShaText wraps a resolved review SHA into the pgtype.Text the dedup/enqueue
// queries expect. Empty SHA marshals to an invalid (NULL) Text so the queries
// take their fall-back branch.
func headShaText(sha string) pgtype.Text {
return pgtype.Text{String: sha, Valid: sha != ""}
}
// ResolveIssueReviewSHAParam is ResolveIssueReviewSHA wrapped as the pgtype.Text
// the dedup queries take, so both service- and handler-package call sites can
// key dedup on the reviewed head with a single call (TEN-356).
func (s *TaskService) ResolveIssueReviewSHAParam(ctx context.Context, issueID pgtype.UUID) pgtype.Text {
return headShaText(s.ResolveIssueReviewSHA(ctx, issueID))
}
func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, triggerCommentID pgtype.UUID, forceFreshSession bool, handoffNote string) (db.AgentTaskQueue, error) {
if !issue.AssigneeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
}
agent, err := s.Queries.GetAgent(ctx, issue.AssigneeID)
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agent.ID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "agent has no runtime")
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
originatorUserID := s.resolveOriginatorForIssueTask(ctx, issue, triggerCommentID)
runtimeMCPOverlay := s.buildRuntimeMCPOverlay(ctx, originatorUserID, agent)
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
HandoffNote: pgtype.Text{String: handoffNote, Valid: handoffNote != ""},
OriginatorUserID: originatorUserID,
RuntimeMcpOverlay: runtimeMCPOverlay.Overlay,
RuntimeConnectedApps: runtimeMCPOverlay.ConnectedApps,
// Stamp the reviewed head so dedup can distinguish this run's target
// from a later request against a new HEAD (TEN-356).
HeadSha: headShaText(s.ResolveIssueReviewSHA(ctx, issue.ID)),
})
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("task enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"agent_id", util.UUIDToString(issue.AssigneeID),
"force_fresh_session", forceFreshSession,
)
// Order matters: broadcast first, notify daemon second. notifyTaskAvailable
// kicks an in-process channel that the daemon picks up over HTTP and
// claims; the claim path then emits its own task:dispatch. Doing the
// queued broadcast afterwards risks the dispatch event reaching clients
// before the queued one (rare but unsafe-by-construction). Publishing
// in the desired observe-order makes correctness independent of timing.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// EnqueueTaskForMention creates a queued task for a mentioned agent on an issue.
// Unlike EnqueueTaskForIssue, this takes an explicit agent ID rather than
// deriving it from the issue assignee.
func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, false, pgtype.UUID{}, false, "")
}
// EnqueueTaskForThreadParent creates a queued task for the agent who authored
// the direct parent comment a member replied to.
func (s *TaskService) EnqueueTaskForThreadParent(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, false, pgtype.UUID{}, false, "")
}
// EnqueueTaskForSquadLeader is the leader-role variant of EnqueueTaskForMention.
// The resulting task carries is_leader_task=true so that downstream
// self-trigger guards can distinguish a comment posted while the agent was
// acting as the squad's leader (skip) from one posted while it was acting
// as a worker (do not skip). This matters for agents that are simultaneously
// the leader and a worker of the same squad — see migration 090.
//
// squadID is stamped onto the task's squad_id column so the daemon claim
// handler can locate the squad and inject its briefing regardless of how the
// leader task was triggered (comment @squad, issue assign, autopilot,
// sub-issue done callback). See migration 127.
func (s *TaskService) EnqueueTaskForSquadLeader(ctx context.Context, issue db.Issue, leaderID pgtype.UUID, squadID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, leaderID, triggerCommentID, true, squadID, false, "")
}
// EnqueueTaskForSquadLeaderWithHandoff is the assign/promote variant carrying a
// handoff note into the leader run's opening context (MUL-3375). Empty note
// behaves exactly like EnqueueTaskForSquadLeader.
func (s *TaskService) EnqueueTaskForSquadLeaderWithHandoff(ctx context.Context, issue db.Issue, leaderID pgtype.UUID, squadID pgtype.UUID, handoffNote string) (db.AgentTaskQueue, error) {
return s.enqueueMentionTask(ctx, issue, leaderID, pgtype.UUID{}, true, squadID, false, handoffNote)
}
func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool, squadID pgtype.UUID, forceFreshSession bool, handoffNote string) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
slog.Error("mention task enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("mention task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("mention task enqueue failed: agent has no runtime", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
originatorUserID := s.resolveOriginatorForIssueTask(ctx, issue, triggerCommentID)
runtimeMCPOverlay := s.buildRuntimeMCPOverlay(ctx, originatorUserID, agent)
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
IsLeaderTask: pgtype.Bool{Bool: isLeader, Valid: isLeader},
ForceFreshSession: pgtype.Bool{Bool: forceFreshSession, Valid: forceFreshSession},
HandoffNote: pgtype.Text{String: handoffNote, Valid: handoffNote != ""},
SquadID: squadID,
OriginatorUserID: originatorUserID,
RuntimeMcpOverlay: runtimeMCPOverlay.Overlay,
RuntimeConnectedApps: runtimeMCPOverlay.ConnectedApps,
// Stamp the reviewed head so dedup can distinguish this run's target
// from a later request against a new HEAD (TEN-356).
HeadSha: headShaText(s.ResolveIssueReviewSHA(ctx, issue.ID)),
})
if err != nil {
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "is_leader_task", isLeader)
// See EnqueueTaskForIssue for ordering rationale.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// EnqueueDeferredAssigneeFallback creates an inert task that becomes claimable
// only after PromoteDueDeferredTasksForRuntime flips it from deferred to queued.
func (s *TaskService) EnqueueDeferredAssigneeFallback(ctx context.Context, issue db.Issue, agentID, squadID pgtype.UUID, escalationForTaskID pgtype.UUID, triggerCommentID pgtype.UUID, fireAt time.Time) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
slog.Error("deferred fallback enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("deferred fallback enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("deferred fallback enqueue failed: agent has no runtime", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
isLeader := squadID.Valid
task, err := s.Queries.CreateDeferredAgentTask(ctx, db.CreateDeferredAgentTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
TriggerSummary: s.buildCommentTriggerSummary(ctx, triggerCommentID),
IsLeaderTask: pgtype.Bool{Bool: isLeader, Valid: isLeader},
SquadID: squadID,
EscalationForTaskID: escalationForTaskID,
FireAt: pgtype.Timestamptz{Time: fireAt, Valid: true},
})
if err != nil {
slog.Error("deferred fallback enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create deferred task: %w", err)
}
slog.Info("deferred fallback task enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"agent_id", util.UUIDToString(agentID),
"fire_at", fireAt.UTC().Format(time.RFC3339),
)
return task, nil
}
// QuickCreateContext is the JSON payload stored on a quick-create task's
// context column. The daemon detects this variant via Type == "quick_create"
// and switches to the quick-create prompt template; the completion path
// uses RequesterID + WorkspaceID to write the inbox notification.
//
// ProjectID is the optional project the user picked in the modal. When
// non-empty the daemon claim handler resolves the project's title +
// resources, and the prompt template instructs the agent to pass
// `--project <uuid>` so the new issue lands in that project.
//
// SquadID is non-empty when the user picked a squad (rather than an agent)
// in the modal. The task is still enqueued against the squad's leader
// agent (Queries.CreateQuickCreateTask is agent-scoped); SquadID is the
// hint the daemon claim handler uses to layer the squad-leader briefing
// onto the agent's Instructions, matching the behavior of issue-bound
// tasks assigned to the squad.
type QuickCreateContext struct {
Type string `json:"type"`
Prompt string `json:"prompt"`
RequesterID string `json:"requester_id"`
WorkspaceID string `json:"workspace_id"`
ProjectID string `json:"project_id,omitempty"`
SquadID string `json:"squad_id,omitempty"`
AttachmentIDs []string `json:"attachment_ids,omitempty"`
// ParentIssueID is the optional UUID of the parent issue the new issue
// should be filed under. Set when the user opens the modal from "Add
// sub issue" on an existing issue; the daemon claim handler resolves the
// parent's identifier and the prompt template instructs the agent to
// pass `--parent <uuid>` so the sub-issue relationship is preserved
// across the manual→agent mode flip.
ParentIssueID string `json:"parent_issue_id,omitempty"`
}
// QuickCreateContextType marks a task as a quick-create job.
const QuickCreateContextType = "quick_create"
// EnqueueQuickCreateTask creates a queued task that has no issue / chat /
// autopilot link — the user's natural-language prompt is stored in the
// task's context JSONB and the agent is expected to translate it into a
// `multica issue create` call. Pre-validates that the agent is reachable
// (not archived, has a runtime) so the API can reject up-front rather than
// queue a task no one will ever claim.
//
// projectID is optional (zero-valued pgtype.UUID when the user didn't pick
// one). The handler is responsible for validating it belongs to the same
// workspace before passing it in.
//
// squadID is non-empty (Valid) when the user picked a squad as the actor.
// The handler has already resolved it to the squad's leader agent for
// agentID; the squadID hint is stamped into the task context so the daemon
// claim handler can inject the squad-leader briefing on dispatch.
//
// parentIssueID is optional (zero-valued pgtype.UUID when the user didn't
// open the modal from "Add sub issue"). The handler is responsible for
// validating it belongs to the same workspace before passing it in.
func (s *TaskService) EnqueueQuickCreateTask(ctx context.Context, workspaceID, requesterID pgtype.UUID, agentID, squadID pgtype.UUID, prompt string, projectID, parentIssueID pgtype.UUID, attachmentIDs []pgtype.UUID) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
payload := QuickCreateContext{
Type: QuickCreateContextType,
Prompt: prompt,
RequesterID: util.UUIDToString(requesterID),
WorkspaceID: util.UUIDToString(workspaceID),
}
if projectID.Valid {
payload.ProjectID = util.UUIDToString(projectID)
}
if squadID.Valid {
payload.SquadID = util.UUIDToString(squadID)
}
if parentIssueID.Valid {
payload.ParentIssueID = util.UUIDToString(parentIssueID)
}
if len(attachmentIDs) > 0 {
payload.AttachmentIDs = make([]string, 0, len(attachmentIDs))
for _, id := range attachmentIDs {
if id.Valid {
payload.AttachmentIDs = append(payload.AttachmentIDs, util.UUIDToString(id))
}
}
}
contextJSON, err := json.Marshal(payload)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("marshal quick-create context: %w", err)
}
runtimeMCPOverlay := s.buildRuntimeMCPOverlay(ctx, requesterID, agent)
task, err := s.Queries.CreateQuickCreateTask(ctx, db.CreateQuickCreateTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
Priority: priorityToInt("high"),
Context: contextJSON,
OriginatorUserID: requesterID,
RuntimeMcpOverlay: runtimeMCPOverlay.Overlay,
RuntimeConnectedApps: runtimeMCPOverlay.ConnectedApps,
})
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("create quick-create task: %w", err)
}
slog.Info("quick-create task enqueued",
"task_id", util.UUIDToString(task.ID),
"agent_id", util.UUIDToString(agentID),
"squad_id", payload.SquadID,
"requester_id", util.UUIDToString(requesterID),
"workspace_id", util.UUIDToString(workspaceID),
"project_id", payload.ProjectID,
"parent_issue_id", payload.ParentIssueID,
)
// Match every other Enqueue* path: kick the daemon WS so the task
// gets claimed promptly instead of waiting for the next 30 s poll
// cycle. Without this the user perceives "quick create never
// triggered" because the modal closes immediately and the task
// sits in 'queued' until the next sleepWithContextOrWakeup tick.
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// ErrChatTaskAgentArchived signals that EnqueueChatTask refused to
// queue work because the destination agent has been archived. This
// is a productizable state — surface it to the user as "this agent
// has been archived" rather than retrying.
var ErrChatTaskAgentArchived = errors.New("chat task: agent archived")
// ErrChatTaskAgentNoRuntime signals that EnqueueChatTask refused to
// queue work because the agent has never been associated with a
// runtime (agent.runtime_id IS NULL). This is the "agent has no
// daemon configured" case — productizable as "agent offline".
//
// IMPORTANT: this is NOT the same as "the daemon is currently
// disconnected". When agent.runtime_id IS set, EnqueueChatTask
// enqueues the task and the daemon claims it on next online; that
// path returns a task row, not this error.
var ErrChatTaskAgentNoRuntime = errors.New("chat task: agent has no runtime")
// EnqueueChatTask creates a queued task for a chat session.
// Unlike issue tasks, chat tasks have no issue_id.
//
// Errors split into two layers:
//
// - Productizable rejections (agent archived, no runtime) return
// the sentinel errors above. Callers (e.g. the Lark dispatcher)
// can errors.Is them to decide a user-visible outcome.
//
// - Infrastructure failures (DB load / insert errors) are wrapped
// as ordinary errors. The caller should treat them as retryable
// or page-worthy, NOT as user-facing state.
//
// initiatorUserID is the user who actually sent the triggering message — the
// real requester behind this run. Callers pass it explicitly because
// chat_session.creator_id is not a reliable source: Lark group sessions set the
// creator to the installer, not the sender (see the lark dispatcher). Web chat
// passes the request user; the lark dispatcher passes the inbound sender of the
// latest message in the silence window. Stored on the task so the daemon brief
// can attribute the run to the right person. See MUL-2645.
//
// forceFreshSession applies only to the task created by this call. The daemon
// uses it to skip prior chat-session resume for this dispatch without clearing
// the chat session's stored resume pointer for future normal messages.
func (s *TaskService) EnqueueChatTask(ctx context.Context, chatSession db.ChatSession, initiatorUserID pgtype.UUID, forceFreshSession bool) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, chatSession.AgentID)
if err != nil {
slog.Error("chat task enqueue failed", "chat_session_id", util.UUIDToString(chatSession.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
return db.AgentTaskQueue{}, ErrChatTaskAgentArchived
}
if !agent.RuntimeID.Valid {
return db.AgentTaskQueue{}, ErrChatTaskAgentNoRuntime
}
runtimeMCPOverlay := s.buildRuntimeMCPOverlay(ctx, initiatorUserID, agent)
task, err := s.Queries.CreateChatTask(ctx, db.CreateChatTaskParams{
AgentID: chatSession.AgentID,
RuntimeID: agent.RuntimeID,
Priority: 2, // medium priority for chat
ChatSessionID: chatSession.ID,
InitiatorUserID: initiatorUserID,
OriginatorUserID: initiatorUserID,
ForceFreshSession: pgtype.Bool{
Bool: forceFreshSession,
Valid: true,
},
RuntimeMcpOverlay: runtimeMCPOverlay.Overlay,
RuntimeConnectedApps: runtimeMCPOverlay.ConnectedApps,
})
if err != nil {
slog.Error("chat task enqueue failed", "chat_session_id", util.UUIDToString(chatSession.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create chat task: %w", err)
}
slog.Info("chat task enqueued", "task_id", util.UUIDToString(task.ID), "chat_session_id", util.UUIDToString(chatSession.ID), "agent_id", util.UUIDToString(chatSession.AgentID))
// See EnqueueTaskForIssue for ordering rationale.
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
// CancelTasksForIssue cancels every active task on the issue, reconciles each
// affected agent's status, and broadcasts task:cancelled events so frontends
// clear their live cards.
//
// Before #1587 this path was "cancel rows and return" — issue-status flips
// (e.g. user marks the issue `done` or `cancelled` while a task is still
// running) left the agent stuck at status="working" indefinitely, requiring a
// manual `multica agent update <id> --status idle` to unwedge. Matches the
// pattern already used by CancelTask and RerunIssue.
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
cancelled, err := s.Queries.CancelAgentTasksByIssue(ctx, issueID)
if err != nil {
return err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
return nil
}
// CancelTasksForAgent cancels every active task belonging to an agent
// (queued + dispatched + running), reconciles the agent's status, and
// broadcasts task:cancelled events. Used by the agent-level "Cancel all
// tasks" action — same shape as CancelTasksForIssue but scoped on agent_id.
//
// Returns the cancelled rows so callers can report counts / log them.
func (s *TaskService) CancelTasksForAgent(ctx context.Context, agentID pgtype.UUID) ([]db.AgentTaskQueue, error) {
cancelled, err := s.Queries.CancelAgentTasksByAgent(ctx, agentID)
if err != nil {
return nil, err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
// Reconcile once after the loop — agent transitions from
// working→available based on remaining task counts, no need to call
// per row (the rows we just cancelled all belong to the same agent).
s.ReconcileAgentStatus(ctx, agentID)
return cancelled, nil
}
// CancelTasksByTriggerComment cancels active tasks whose trigger is the given
// comment. Called from DeleteComment so an agent does not run with the
// now-deleted content already embedded in its prompt. Must be invoked BEFORE
// the comment row is deleted because the FK ON DELETE SET NULL would
// otherwise nullify trigger_comment_id and we'd lose the ability to find
// the affected tasks.
func (s *TaskService) CancelTasksByTriggerComment(ctx context.Context, commentID pgtype.UUID) error {
cancelled, err := s.Queries.CancelAgentTasksByTriggerComment(ctx, commentID)
if err != nil {
return err
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
return nil
}
// BroadcastCancelledTasks reconciles each affected agent's status and emits
// task:cancelled for every row. Callers must invoke this AFTER committing the
// cancellation so subscribers don't observe a "cancelled" event for a row
// that the tx might still roll back.
func (s *TaskService) BroadcastCancelledTasks(ctx context.Context, cancelled []db.AgentTaskQueue) {
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
}
func (s *TaskService) CaptureCancelledTasks(ctx context.Context, cancelled []db.AgentTaskQueue) {
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
}
}
type CancelledChatMessageResult struct {
ChatSessionID string
MessageID string
Content string
RestoreToInput bool
// Attachments are the rows detached from the deleted user message so they
// survive the ON DELETE CASCADE and can re-bind when the restored draft is
// re-sent.
Attachments []db.Attachment
}
type CancelTaskResult struct {
Task db.AgentTaskQueue
CancelledChatMessage *CancelledChatMessageResult
}
// CancelTask cancels a single task by ID. It broadcasts a task:cancelled event
// so frontends can update immediately.
func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
result, err := s.CancelTaskWithResult(ctx, taskID)
if err != nil {
return nil, err
}
return &result.Task, nil
}
// CancelTaskWithResult cancels a single task and returns any chat-specific
// cleanup result needed by user-facing callers.
func (s *TaskService) CancelTaskWithResult(ctx context.Context, taskID pgtype.UUID) (*CancelTaskResult, error) {
task, err := s.Queries.CancelAgentTask(ctx, taskID)
if errors.Is(err, pgx.ErrNoRows) {
existing, err := s.Queries.GetAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
return &CancelTaskResult{Task: existing}, nil
}
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
slog.Info("task cancelled", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskCancelled(ctx, task)
cancelledChatMessage := s.finalizeCancelledChatMessage(ctx, task)
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast cancellation as a task:failed event so frontends clear the live card
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, task)
return &CancelTaskResult{
Task: task,
CancelledChatMessage: cancelledChatMessage,
}, nil
}
func (s *TaskService) finalizeCancelledChatMessage(ctx context.Context, task db.AgentTaskQueue) *CancelledChatMessageResult {
if !task.ChatSessionID.Valid {
return nil
}
var cancelled *CancelledChatMessageResult
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
messages, err := qtx.ListTaskMessages(ctx, task.ID)
if err != nil {
return fmt.Errorf("list cancelled chat task messages: %w", err)
}
if len(messages) == 0 {
// Detach attachments BEFORE deleting the user message — the
// attachment FK is ON DELETE CASCADE, so deleting first would
// destroy rows the restored draft needs to re-bind.
detached, err := qtx.DetachAttachmentsFromUserChatMessageByTask(ctx, task.ID)
if err != nil {
return fmt.Errorf("detach cancelled chat message attachments: %w", err)
}
deleted, err := qtx.DeleteUserChatMessageByTask(ctx, task.ID)
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
if err != nil {
return fmt.Errorf("delete empty cancelled chat user message: %w", err)
}
cancelled = &CancelledChatMessageResult{
ChatSessionID: util.UUIDToString(deleted.ChatSessionID),
MessageID: util.UUIDToString(deleted.ID),
Content: deleted.Content,
RestoreToInput: true,
Attachments: detached,
}
return nil
}
if _, err := qtx.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: "Stopped.",
TaskID: task.ID,
ElapsedMs: computeChatElapsedMs(task),
}); err != nil {
return fmt.Errorf("create cancelled chat message: %w", err)
}
return nil
}); err != nil {
slog.Error("failed to finalize cancelled chat message",
"task_id", util.UUIDToString(task.ID),
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err,
)
return nil
}
return cancelled
}
// ClaimTask atomically claims the next queued task for an agent,
// respecting max_concurrent_tasks.
func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.AgentTaskQueue, error) {
start := time.Now()
var (
outcome = "unknown"
getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs int64
claimed *db.AgentTaskQueue
)
defer func() {
s.maybeLogClaimSlow(agentID, outcome, start, getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs)
}()
err := s.runInTx(ctx, func(qtx *db.Queries) error {
t0 := time.Now()
agent, err := qtx.GetAgentForClaimUpdate(ctx, agentID)
getAgentMs = time.Since(t0).Milliseconds()
if err != nil {
outcome = "error_get_agent"
return fmt.Errorf("agent not found: %w", err)
}
t0 = time.Now()
running, err := qtx.CountRunningTasks(ctx, agentID)
countRunningMs = time.Since(t0).Milliseconds()
if err != nil {
outcome = "error_count_running"
return fmt.Errorf("count running tasks: %w", err)
}
if running >= int64(agent.MaxConcurrentTasks) {
slog.Debug("task claim: no capacity", "agent_id", util.UUIDToString(agentID), "running", running, "max", agent.MaxConcurrentTasks)
outcome = "no_capacity"
return nil
}
t0 = time.Now()
task, err := qtx.ClaimAgentTask(ctx, db.ClaimAgentTaskParams{
AgentID: agentID,
PrepareLeaseSecs: prepareLeaseDuration.Seconds(),
})
claimAgentMs = time.Since(t0).Milliseconds()
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Debug("task claim: no tasks available", "agent_id", util.UUIDToString(agentID))
outcome = "no_tasks"
return nil
}
outcome = "error_claim"
return fmt.Errorf("claim task: %w", err)
}
claimedTask := task
claimed = &claimedTask
return nil
})
if err != nil {
if outcome == "unknown" {
outcome = "error_transaction"
}
return nil, err
}
if claimed == nil {
return nil, nil
}
slog.Info("task claimed", "task_id", util.UUIDToString(claimed.ID), "agent_id", util.UUIDToString(agentID))
s.captureTaskDispatched(ctx, *claimed)
// Refresh agent status from active tasks. This avoids a stale unconditional
// working write racing after a just-cancelled claim.
t0 := time.Now()
s.ReconcileAgentStatus(ctx, agentID)
updateStatusMs = time.Since(t0).Milliseconds()
// Broadcast task:dispatch. ResolveTaskWorkspaceID inside this path can
// re-query issue/chat_session/autopilot_run, so it can also be a real
// contributor to claim latency.
t0 = time.Now()
s.broadcastTaskDispatch(ctx, *claimed)
dispatchMs = time.Since(t0).Milliseconds()
outcome = "claimed"
return claimed, nil
}
// ClaimTaskForRuntime claims the next runnable task for a runtime while
// still respecting each agent's max_concurrent_tasks limit.
//
// Empty-claim fast path: when EmptyClaim is configured and a recent
// check verified the runtime had no queued tasks, returns immediately
// without touching Postgres. The cache is invalidated synchronously on
// every enqueue (notifyTaskAvailable), so a queued task becomes
// claimable on the next call rather than waiting for the TTL.
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
start := time.Now()
var (
outcome = "no_task"
listMs, loopMs int64
listCount, tried int
claimedFlag bool
)
defer func() {
totalMs := time.Since(start).Milliseconds()
if totalMs < 300 {
return
}
slog.Info("claim_for_runtime slow",
"runtime_id", util.UUIDToString(runtimeID),
"outcome", outcome,
"total_ms", totalMs,
"list_pending_ms", listMs,
"list_pending_count", listCount,
"agents_tried", tried,
"claim_loop_ms", loopMs,
"claimed", claimedFlag,
)
}()
runtimeKey := util.UUIDToString(runtimeID)
if err := s.PromoteDueDeferredTasksForRuntime(ctx, runtimeID); err != nil {
outcome = "error_promote_deferred"
return nil, err
}
// Check this before EmptyClaim: a lost claim response moves the task out of
// `queued`, so the empty-queued cache cannot represent recoverability.
stale, err := s.Queries.ReclaimStaleDispatchedTaskForRuntime(ctx, db.ReclaimStaleDispatchedTaskForRuntimeParams{
RuntimeID: runtimeID,
ClaimRecoverySecs: claimResponseRecoveryWindow.Seconds(),
PrepareLeaseSecs: prepareLeaseDuration.Seconds(),
})
if err == nil {
outcome = "reclaimed_dispatched"
claimedFlag = true
slog.Info("stale dispatched task reclaimed",
"task_id", util.UUIDToString(stale.ID),
"runtime_id", runtimeKey,
"agent_id", util.UUIDToString(stale.AgentID),
)
return &stale, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
outcome = "error_reclaim_dispatched"
return nil, fmt.Errorf("reclaim stale dispatched task: %w", err)
}
if s.EmptyClaim.IsEmpty(ctx, runtimeKey) {
outcome = "empty_cache_hit"
return nil, nil
}
// Sample the invalidation version BEFORE the SELECT. If a
// concurrent enqueue Bumps between this read and the post-SELECT
// MarkEmpty, the next IsEmpty will see the empty key tagged with
// a stale version and reject it — closing the race that would
// otherwise stall the just-queued task until the empty key's TTL
// expired.
preSelectVersion := s.EmptyClaim.CurrentVersion(ctx, runtimeKey)
t0 := time.Now()
tasks, err := s.Queries.ListQueuedClaimCandidatesByRuntime(ctx, runtimeID)
listMs = time.Since(t0).Milliseconds()
listCount = len(tasks)
if err != nil {
outcome = "error_list"
return nil, fmt.Errorf("list queued claim candidates: %w", err)
}
if len(tasks) == 0 {
s.EmptyClaim.MarkEmpty(ctx, runtimeKey, preSelectVersion)
outcome = "empty_db"
return nil, nil
}
loopStart := time.Now()
triedAgents := map[string]struct{}{}
var claimed *db.AgentTaskQueue
for _, candidate := range tasks {
agentKey := util.UUIDToString(candidate.AgentID)
if _, seen := triedAgents[agentKey]; seen {
continue
}
triedAgents[agentKey] = struct{}{}
tried++
task, err := s.ClaimTask(ctx, candidate.AgentID)
if err != nil {
loopMs = time.Since(loopStart).Milliseconds()
outcome = "error_claim"
return nil, err
}
if task != nil && task.RuntimeID == runtimeID {
claimed = task
break
}
}
loopMs = time.Since(loopStart).Milliseconds()
if claimed != nil {
claimedFlag = true
outcome = "claimed"
}
return claimed, nil
}
func (s *TaskService) PromoteDueDeferredTasksForRuntime(ctx context.Context, runtimeID pgtype.UUID) error {
tasks, err := s.Queries.PromoteDueDeferredTasksForRuntime(ctx, runtimeID)
if err != nil {
return fmt.Errorf("promote due deferred tasks: %w", err)
}
for _, task := range tasks {
slog.Info("deferred fallback task promoted",
"task_id", util.UUIDToString(task.ID),
"runtime_id", util.UUIDToString(runtimeID),
"agent_id", util.UUIDToString(task.AgentID),
)
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
}
return nil
}
// maybeLogClaimSlow emits one structured log per ClaimTask call when its total
// latency exceeds 300ms, so the prod tail can be diagnosed without flooding
// logs at normal poll rates. Called via defer so it captures the full path
// including post-claim updateAgentStatus / broadcastTaskDispatch (both of
// which can hit the DB) and any error exit.
func (s *TaskService) maybeLogClaimSlow(agentID pgtype.UUID, outcome string, start time.Time, getAgentMs, countRunningMs, claimAgentMs, updateStatusMs, dispatchMs int64) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 300 {
return
}
slog.Info("claim_task slow",
"agent_id", util.UUIDToString(agentID),
"outcome", outcome,
"total_ms", totalMs,
"get_agent_ms", getAgentMs,
"count_running_ms", countRunningMs,
"claim_agent_ms", claimAgentMs,
"update_status_ms", updateStatusMs,
"dispatch_ms", dispatchMs,
)
}
// StartTask transitions a dispatched task to running.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.StartAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("start task: %w", err)
}
s.cancelDeferredEscalationsForTask(ctx, task.ID)
slog.Info("task started", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskStarted(ctx, task)
// Tell every connected workspace WS client that this task transitioned
// (dispatched | waiting_local_directory) → running. Without this, the
// workspace-wide `agentTaskSnapshot` query only refreshes on the 30s
// staleTime, so any UI that distinguishes "queued" from "running" (e.g.
// the issue-card agent activity indicator) lags by up to half a minute
// on the transition users care about most.
s.broadcastTaskEvent(ctx, protocol.EventTaskRunning, task)
return &task, nil
}
func (s *TaskService) cancelDeferredEscalationsForTask(ctx context.Context, taskID pgtype.UUID) {
cancelled, err := s.Queries.CancelDeferredEscalationsForTask(ctx, taskID)
if err != nil {
slog.Warn("cancel deferred escalations for task failed", "task_id", util.UUIDToString(taskID), "error", err)
return
}
for _, task := range cancelled {
slog.Info("deferred fallback task cancelled",
"task_id", util.UUIDToString(task.ID),
"primary_task_id", util.UUIDToString(taskID),
"reason", "primary_acknowledged",
)
}
}
func (s *TaskService) CancelDeferredEscalationsForIssueAgent(ctx context.Context, issueID, agentID pgtype.UUID) {
cancelled, err := s.Queries.CancelDeferredEscalationsForIssueAgent(ctx, db.CancelDeferredEscalationsForIssueAgentParams{
IssueID: issueID,
AgentID: agentID,
})
if err != nil {
slog.Warn("cancel deferred escalations for issue agent failed",
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"error", err)
return
}
for _, task := range cancelled {
slog.Info("deferred fallback task cancelled",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"reason", "agent_comment_acknowledged",
)
}
}
// ExtendTaskPrepareLease keeps a claimed-but-not-started task protected while
// the daemon resolves cached inputs and prepares the execution environment.
func (s *TaskService) ExtendTaskPrepareLease(ctx context.Context, taskID, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.ExtendAgentTaskPrepareLease(ctx, db.ExtendAgentTaskPrepareLeaseParams{
ID: taskID,
RuntimeID: runtimeID,
LeaseSecs: prepareLeaseDuration.Seconds(),
})
if err != nil {
return nil, fmt.Errorf("extend task prepare lease: %w", err)
}
return &task, nil
}
// MarkTaskWaitingLocalDirectory parks a dispatched task in the
// waiting_local_directory state while the daemon waits for another in-flight
// task to release the project_resource path lock. reason carries a short
// human-readable hint (typically the contested path) that the UI surfaces
// next to the status. Returns the updated row so the daemon can confirm the
// transition and so the broadcast carries the up-to-date snapshot.
func (s *TaskService) MarkTaskWaitingLocalDirectory(ctx context.Context, taskID pgtype.UUID, reason string) (*db.AgentTaskQueue, error) {
reason = strings.TrimSpace(reason)
task, err := s.Queries.MarkAgentTaskWaitingLocalDirectory(ctx, db.MarkAgentTaskWaitingLocalDirectoryParams{
ID: taskID,
WaitReason: pgtype.Text{String: reason, Valid: reason != ""},
PrepareLeaseSecs: prepareLeaseDuration.Seconds(),
})
if err != nil {
return nil, fmt.Errorf("mark task waiting_local_directory: %w", err)
}
slog.Info("task waiting_local_directory",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(task.IssueID),
"reason", reason,
)
s.broadcastTaskEvent(ctx, protocol.EventTaskWaitingLocalDirectory, task)
return &task, nil
}
// CompleteTask marks a task as completed.
// Issue status is NOT changed here — the agent manages it via the CLI.
//
// For chat tasks, CompleteAgentTask and the chat_session resume-pointer
// update run in a single transaction. This closes a race where the next
// queued chat message could be claimed in the window between the task
// flipping to 'completed' and chat_session.session_id being refreshed,
// causing the new task to resume against a stale (or NULL) session.
func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte, sessionID, workDir string) (*db.AgentTaskQueue, error) {
var task db.AgentTaskQueue
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
t, err := qtx.CompleteAgentTask(ctx, db.CompleteAgentTaskParams{
ID: taskID,
Result: result,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
return err
}
task = t
if t.ChatSessionID.Valid {
// Pin the chat_session's runtime_id alongside the session_id so the
// next claim can apply the runtime-guard. Both fields move together:
// when there's no session_id to record, leave runtime_id untouched
// (NULL → COALESCE keeps the existing value).
var sessionRuntimeID pgtype.UUID
if sessionID != "" {
sessionRuntimeID = t.RuntimeID
}
// COALESCE in SQL guarantees empty inputs don't wipe the
// existing resume pointer; we still surface DB errors.
if err := qtx.UpdateChatSessionSession(ctx, db.UpdateChatSessionSessionParams{
ID: t.ChatSessionID,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
RuntimeID: sessionRuntimeID,
}); err != nil {
return fmt.Errorf("update chat session resume pointer: %w", err)
}
}
return nil
}); err != nil {
// When parallel agents race, a task may already be completed,
// cancelled, or failed by the time this call runs. The UPDATE
// … WHERE status = 'running' returns no rows in that case.
// Treat it as an idempotent success — same pattern as CancelTask.
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Info("complete task: already finalized",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"agent_id", util.UUIDToString(existing.AgentID),
)
return &existing, nil
}
slog.Warn("complete task failed",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"chat_session_id", util.UUIDToString(existing.ChatSessionID),
"agent_id", util.UUIDToString(existing.AgentID),
"error", err,
)
} else {
slog.Warn("complete task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("complete task: %w", err)
}
slog.Info("task completed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskCompleted(ctx, task)
// Invariant: every completed issue task must have at least one agent
// comment on the issue, so the user always sees something when a run
// ends. If the agent posted a comment during execution (result, progress
// ping, or CLI reply), HasAgentCommentedSince returns true and we skip.
// Otherwise, synthesize one from the final output. For comment-triggered
// tasks, TriggerCommentID threads the fallback under the original comment;
// for assignment-triggered tasks it is NULL and the fallback is top-level.
// Chat tasks have no IssueID and are handled separately below.
if task.IssueID.Valid {
suppressNoActionComment, err := HasSquadLeaderNoActionEvaluationForTask(ctx, s.Queries, task)
if err != nil {
slog.Warn("checking squad leader no_action evaluation failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(task.IssueID),
"agent_id", util.UUIDToString(task.AgentID),
"error", err,
)
}
agentCommented, _ := s.Queries.HasAgentCommentedSince(ctx, db.HasAgentCommentedSinceParams{
IssueID: task.IssueID,
AuthorID: task.AgentID,
Since: task.StartedAt,
})
if !suppressNoActionComment && !agentCommented {
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil {
if payload.Output != "" {
// Match the CLI's --content / --description behavior: agents that
// emit literal `\n` 4-char sequences (Python/JSON-style) get them
// decoded into real newlines before the comment hits the DB. See
// util.UnescapeBackslashEscapes for the exact contract.
body := util.UnescapeBackslashEscapes(payload.Output)
if task.TriggerCommentID.Valid && isTrivialDoneOutput(body) {
slog.Warn("suppressing trivial comment-trigger fallback output",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(task.IssueID),
"agent_id", util.UUIDToString(task.AgentID),
)
} else {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(body), "comment", task.TriggerCommentID, pgtype.UUID{})
}
}
}
}
}
// Quick-create tasks: locate the issue the agent just created and push
// an inbox confirmation to the requester. The agent has no issue / chat
// link, so the regular completion paths above don't apply. We find the
// new issue by querying for the most recent issue this agent created in
// the requester's workspace since the task started — more robust than
// parsing the agent's stdout for an identifier.
if qc, ok := s.parseQuickCreateContext(task); ok {
s.notifyQuickCreateCompleted(ctx, task, qc)
}
// For chat tasks, save assistant reply and broadcast chat:done. The
// resume pointer was already persisted inside the transaction above.
if task.ChatSessionID.Valid {
var assistantMsg *db.ChatMessage
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil && payload.Output != "" {
// Same unescape as the issue-comment path above: literal `\n` from
// agent stdout becomes a real newline so the chat panel renders
// paragraph breaks instead of one wall of prose.
body := util.UnescapeBackslashEscapes(payload.Output)
row, err := s.Queries.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: redact.Text(body),
TaskID: task.ID,
ElapsedMs: computeChatElapsedMs(task),
})
if err != nil {
slog.Error("failed to save assistant chat message", "task_id", util.UUIDToString(task.ID), "error", err)
} else {
assistantMsg = &row
// Event-driven unread: stamp unread_since on the first unread
// assistant message. No-op if the session already has unread.
// If the user is actively viewing the session, the frontend's
// auto-mark-read effect will clear this within a tick.
if err := s.Queries.SetUnreadSinceIfNull(ctx, task.ChatSessionID); err != nil {
slog.Warn("failed to set unread_since", "chat_session_id", util.UUIDToString(task.ChatSessionID), "error", err)
}
}
}
s.broadcastChatDone(ctx, task, assistantMsg)
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskCompleted, task)
return &task, nil
}
// FailTask marks a task as failed.
// Issue status is NOT changed here — the agent manages it via the CLI.
//
// sessionID/workDir are optional: when the agent established a real session
// before failing (e.g. crashed mid-conversation, was cancelled, or hit a
// tool error), the daemon should pass them so we can preserve the resume
// pointer on both the task row and the chat_session — otherwise the next
// chat turn would silently start a brand-new session and lose memory.
//
// failureReason is a coarse classifier consumed by the auto-retry path.
// Pass "" when unknown — the server runs the raw error text through
// taskfailure.Classify so the persisted failure_reason still lands in
// the canonical refined taxonomy rather than the legacy "agent_error"
// coarse bucket. Daemon callers that already produced a refined reason
// (via classifyPoisonedError, the timeout / runtime classifier, etc.)
// will have their value preserved untouched.
func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg, sessionID, workDir, failureReason string) (*db.AgentTaskQueue, error) {
// MUL-2946: synthesise a refined reason from the error text whenever the
// caller didn't supply one. This is the last write-path guard against
// "agent_error" coarse rows ending up in agent_task_queue.failure_reason
// — every other path either provides a classified reason directly
// (sweepers writing 'queued_expired' / 'runtime_offline' / 'timeout'
// / 'runtime_recovery' via SQL) or runs the daemon's classifyPoisonedError
// + taskfailure.Classify chain.
if failureReason == "" {
failureReason = taskfailure.Classify(errMsg).String()
}
var task db.AgentTaskQueue
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
t, err := qtx.FailAgentTask(ctx, db.FailAgentTaskParams{
ID: taskID,
Error: pgtype.Text{String: errMsg, Valid: true},
FailureReason: pgtype.Text{String: failureReason, Valid: failureReason != ""},
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
return err
}
task = t
// Keep resume-unsafe sessions on the task row for observability, but
// do not promote them to the chat-level resume pointer.
if t.ChatSessionID.Valid && !resumeUnsafeFailureReason(failureReason) {
// Pin the chat_session's runtime_id alongside the session_id so the
// next claim can apply the runtime-guard. Both fields move together:
// when there's no session_id to record, leave runtime_id untouched
// (NULL → COALESCE keeps the existing value).
var sessionRuntimeID pgtype.UUID
if sessionID != "" {
sessionRuntimeID = t.RuntimeID
}
if err := qtx.UpdateChatSessionSession(ctx, db.UpdateChatSessionSessionParams{
ID: t.ChatSessionID,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
RuntimeID: sessionRuntimeID,
}); err != nil {
return fmt.Errorf("update chat session resume pointer: %w", err)
}
}
return nil
}); err != nil {
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Info("fail task: already finalized",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"agent_id", util.UUIDToString(existing.AgentID),
)
return &existing, nil
}
slog.Warn("fail task failed",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"chat_session_id", util.UUIDToString(existing.ChatSessionID),
"agent_id", util.UUIDToString(existing.AgentID),
"error", err,
)
} else {
slog.Warn("fail task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("fail task: %w", err)
}
slog.Warn("task failed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID), "error", errMsg, "failure_reason", failureReason)
s.captureTaskFailed(ctx, task)
// Auto-retry eligible failures (orphan, timeout, runtime_offline,
// runtime_recovery). The helper itself enforces attempt < max_attempts
// and only triggers for issue/chat tasks.
retried, _ := s.MaybeRetryFailedTask(ctx, task)
// Skip the per-failure system comment when we'll immediately retry —
// the new task will surface its own status to the user, and we don't
// want to spam the issue with "task timed out" messages on every
// daemon hiccup.
if errMsg != "" && task.IssueID.Valid && retried == nil {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(errMsg), "system", task.TriggerCommentID, task.ID)
}
// Mirror the issue fallback for chat tasks: write an assistant
// chat_message tagged with the daemon-reported failure_reason so the
// conversation history shows what happened. Skip when auto-retry is
// pending (the new attempt will write its own outcome) — same guard as
// the issue path above.
if task.ChatSessionID.Valid && retried == nil {
if _, err := s.Queries.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: redact.Text(errMsg),
TaskID: pgtype.UUID{Bytes: task.ID.Bytes, Valid: true},
FailureReason: pgtype.Text{String: failureReason, Valid: failureReason != ""},
ElapsedMs: computeChatElapsedMs(task),
}); err != nil {
slog.Error("failed to save failure chat message",
"task_id", util.UUIDToString(task.ID),
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err)
} else if err := s.Queries.SetUnreadSinceIfNull(ctx, task.ChatSessionID); err != nil {
slog.Warn("failed to set unread_since on failure",
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err)
}
}
// Quick-create tasks: push a failure inbox notification to the
// requester so they can either retry or fall back to the advanced form
// without losing their original prompt. Skipped when an auto-retry is
// pending — the new attempt will write its own outcome.
if retried == nil {
if qc, ok := s.parseQuickCreateContext(task); ok {
s.notifyQuickCreateFailed(ctx, task, qc, errMsg)
}
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskFailed, task)
return &task, nil
}
// retryableReasons enumerates failure reasons that the auto-retry path is
// allowed to act on. Agent-side errors (compile failures, model rejections,
// etc.) are intentionally excluded — those are real problems that the user
// should see, not infrastructure flakiness.
var retryableReasons = map[string]bool{
"runtime_offline": true,
"runtime_recovery": true,
"timeout": true,
"codex_semantic_inactivity": true,
}
func resumeUnsafeFailureReason(reason string) bool {
switch reason {
// Keep in sync with GetLastTaskSession / GetLastChatTaskSession and
// CreateRetryTask's fresh-session CASE WHEN.
case "iteration_limit", "agent_fallback_message", "api_invalid_request", "codex_semantic_inactivity":
return true
default:
return false
}
}
// MaybeRetryFailedTask spawns a fresh queued attempt for a recently-failed
// task when the failure was infrastructure-shaped (daemon crash, runtime
// went offline, dispatch/run timeout) and the task hasn't exhausted its
// max_attempts budget. The child task inherits agent/runtime/issue/chat
// links and, for resume-safe failures, the parent's session_id/work_dir so
// the agent can resume the conversation when the backend supports it. Returns
// the new task, or nil when no retry was created.
//
// Autopilot tasks are NOT auto-retried here; the autopilot scheduler owns
// its own re-run cadence and we don't want to double-fire it.
func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentTaskQueue) (*db.AgentTaskQueue, error) {
if parent.Status != "failed" {
return nil, nil
}
reason := ""
if parent.FailureReason.Valid {
reason = parent.FailureReason.String
}
if !retryableReasons[reason] {
return nil, nil
}
if parent.Attempt >= parent.MaxAttempts {
slog.Info("task auto-retry skipped: budget exhausted",
"task_id", util.UUIDToString(parent.ID),
"attempt", parent.Attempt,
"max_attempts", parent.MaxAttempts,
)
return nil, nil
}
if parent.AutopilotRunID.Valid {
// Autopilot has its own retry semantics; do not double-trigger.
return nil, nil
}
if !parent.IssueID.Valid && !parent.ChatSessionID.Valid {
return nil, nil
}
var runtimeMCPOverlay runtimeMCPOverlayData
agent, agentErr := s.Queries.GetAgent(ctx, parent.AgentID)
if agentErr != nil {
// Best-effort: failing to resolve the agent for the overlay is not
// retry-fatal. Log and continue — the daemon will reject the claim
// later if the agent is genuinely gone.
slog.Warn("task auto-retry: load agent for overlay failed",
"parent_task_id", util.UUIDToString(parent.ID),
"agent_id", util.UUIDToString(parent.AgentID),
"error", agentErr,
)
} else {
runtimeMCPOverlay = s.buildRuntimeMCPOverlay(ctx, parent.OriginatorUserID, agent)
}
child, err := s.Queries.CreateRetryTask(ctx, db.CreateRetryTaskParams{
ID: parent.ID,
RuntimeMcpOverlay: runtimeMCPOverlay.Overlay,
RuntimeConnectedApps: runtimeMCPOverlay.ConnectedApps,
})
if err != nil {
slog.Warn("task auto-retry failed",
"parent_task_id", util.UUIDToString(parent.ID),
"reason", reason,
"error", err,
)
return nil, err
}
slog.Info("task auto-retry enqueued",
"parent_task_id", util.UUIDToString(parent.ID),
"child_task_id", util.UUIDToString(child.ID),
"reason", reason,
"attempt", child.Attempt,
"max_attempts", child.MaxAttempts,
)
// Retry creates a fresh queued row, same status transition (∅ → queued)
// as EnqueueTaskFor*. Broadcast queued first, then notify the daemon —
// see EnqueueTaskForIssue for ordering rationale.
//
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, child)
s.NotifyTaskEnqueued(ctx, child)
return &child, nil
}
// RerunIssue creates a fresh queued task for an agent on the issue. Used by
// the manual rerun endpoint.
//
// Target agent resolution:
// - sourceTaskID Valid: rerun the agent that ran that task (and reuse its
// leader/worker role). This is what the execution log retry button uses
// so a per-row retry survives a subsequent assignee change and correctly
// re-fires the squad worker or mention agent whose row was clicked. The
// source task's trigger_comment_id is also inherited (when the caller
// didn't pass one) so a per-row rerun of a comment- or mention-triggered
// task stays comment-triggered — the daemon's buildCommentPrompt path
// keys on TriggerCommentID, and losing it would degrade the rerun into
// a generic issue run that no longer carries the original comment.
// - sourceTaskID empty: fall back to the issue's current assignee (agent
// or squad leader). This preserves the CLI / API contract for callers
// that have an issue ID but no specific task to target.
//
// The new task is flagged force_fresh_session=true so the daemon starts a
// clean agent session instead of resuming the prior (agent_id, issue_id)
// session. A user clicking rerun has just judged the prior output bad —
// resuming the same conversation would replay the same poisoned state.
// Auto-retry of an orphaned mid-flight failure (HandleFailedTasks →
// MaybeRetryFailedTask → CreateRetryTask) does NOT take this path, so
// MUL-1128's mid-flight resume contract is preserved.
//
// Only tasks belonging to the target agent on this issue are cancelled.
// Tasks owned by other agents on the same issue (e.g. a parallel
// @-mention agent) are left alone — rerun must not collateral-cancel
// them.
func (s *TaskService) RerunIssue(ctx context.Context, issueID pgtype.UUID, sourceTaskID pgtype.UUID, triggerCommentID pgtype.UUID) (*db.AgentTaskQueue, error) {
issue, err := s.Queries.GetIssue(ctx, issueID)
if err != nil {
return nil, fmt.Errorf("load issue: %w", err)
}
// Determine the target agent for the rerun.
var (
agentID pgtype.UUID
isLeader bool
squadID pgtype.UUID
)
if sourceTaskID.Valid {
sourceTask, err := s.Queries.GetAgentTask(ctx, sourceTaskID)
if err != nil {
return nil, fmt.Errorf("load source task: %w", err)
}
if !sourceTask.IssueID.Valid || util.UUIDToString(sourceTask.IssueID) != util.UUIDToString(issueID) {
return nil, fmt.Errorf("source task does not belong to this issue")
}
agentID = sourceTask.AgentID
isLeader = sourceTask.IsLeaderTask
// Carry the source task's squad provenance so a rerun of a leader
// task still injects the squad briefing at claim time (see migration
// 127 / daemon claim handler).
squadID = sourceTask.SquadID
// Inherit trigger provenance so a per-row rerun of a comment- or
// mention-triggered task stays a comment-triggered task. Without
// this the daemon's buildCommentPrompt path is skipped (it keys on
// TriggerCommentID) and the rerun degrades into a generic issue
// run that has lost the original comment context. Only override
// when the caller didn't pass one explicitly.
if !triggerCommentID.Valid && sourceTask.TriggerCommentID.Valid {
triggerCommentID = sourceTask.TriggerCommentID
}
} else {
switch {
case issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid:
agentID = issue.AssigneeID
case issue.AssigneeType.String == "squad" && issue.AssigneeID.Valid:
squad, err := s.Queries.GetSquad(ctx, issue.AssigneeID)
if err != nil {
return nil, fmt.Errorf("issue is assigned to a squad but squad not found")
}
agentID = squad.LeaderID
isLeader = true
squadID = issue.AssigneeID
default:
return nil, fmt.Errorf("issue is not assigned to an agent or squad")
}
}
// Cancel only the target agent's active/queued tasks on this issue.
cancelled, err := s.Queries.CancelAgentTasksByIssueAndAgent(ctx, db.CancelAgentTasksByIssueAndAgentParams{
IssueID: issueID,
AgentID: agentID,
})
if err != nil {
slog.Warn("rerun: cancel prior tasks failed",
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"error", err,
)
}
for _, t := range cancelled {
s.captureTaskCancelled(ctx, t)
s.ReconcileAgentStatus(ctx, t.AgentID)
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, t)
}
task, err := s.enqueueRerunTask(ctx, issue, agentID, triggerCommentID, isLeader, squadID)
if err != nil {
return nil, err
}
slog.Info("issue rerun enqueued",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issueID),
"agent_id", util.UUIDToString(agentID),
"source_task_id", util.UUIDToString(sourceTaskID),
"is_leader", isLeader,
"cancelled_prior", len(cancelled),
)
return &task, nil
}
// enqueueRerunTask enqueues a fresh task for the given agent on the issue.
// When the target agent is the issue's single-agent assignee we use the
// assignee-driven path (enqueueIssueTask) so the issue-assignee bookkeeping
// stays in sync; otherwise (squad member, prior assignee that has since been
// reassigned, mention agent) we use the mention path with the same
// force_fresh_session=true contract.
func (s *TaskService) enqueueRerunTask(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID, isLeader bool, squadID pgtype.UUID) (db.AgentTaskQueue, error) {
if issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid &&
util.UUIDToString(issue.AssigneeID) == util.UUIDToString(agentID) {
return s.enqueueIssueTask(ctx, issue, triggerCommentID, true, "")
}
return s.enqueueMentionTask(ctx, issue, agentID, triggerCommentID, isLeader, squadID, true, "")
}
// HandleFailedTasks runs the post-failure side effects for a batch of
// freshly-failed tasks: optional auto-retry, task:failed event broadcast,
// agent status reconciliation, and (when an issue has no remaining active
// task and isn't being retried) resetting the issue back to todo so the
// daemon can pick it up again.
//
// All callers that surface a task as failed — sweepers, FailTask,
// recover-orphans — funnel through here so the same UI-consistency
// guarantees apply on every code path.
func (s *TaskService) HandleFailedTasks(ctx context.Context, tasks []db.AgentTaskQueue) int {
if len(tasks) == 0 {
return 0
}
affectedAgents := make(map[string]pgtype.UUID)
processedIssues := make(map[string]bool)
retriedIssues := make(map[string]bool)
retried := 0
for _, t := range tasks {
// Auto-retry first so the issue stays in_progress rather than
// flapping todo → in_progress within a tick.
if child, _ := s.MaybeRetryFailedTask(ctx, t); child != nil {
retried++
if t.IssueID.Valid {
retriedIssues[util.UUIDToString(t.IssueID)] = true
}
}
failureReason := "agent_error"
if t.FailureReason.Valid && t.FailureReason.String != "" {
failureReason = t.FailureReason.String
}
s.captureTaskFailed(ctx, t)
workspaceID := ""
if t.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, t.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
// Reset stuck in_progress issues only when no other active
// task exists for the issue and no retry was just enqueued.
issueKey := util.UUIDToString(t.IssueID)
if issue.Status == "in_progress" && !processedIssues[issueKey] && !retriedIssues[issueKey] {
processedIssues[issueKey] = true
hasActive, checkErr := s.Queries.HasActiveTaskForIssue(ctx, t.IssueID)
if checkErr != nil {
slog.Warn("handle failed tasks: active check failed",
"issue_id", issueKey,
"error", checkErr,
)
} else if !hasActive {
updatedIssue, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: t.IssueID,
Status: "todo",
WorkspaceID: issue.WorkspaceID,
})
if updateErr != nil {
slog.Warn("handle failed tasks: reset stuck issue failed",
"issue_id", issueKey,
"error", updateErr,
)
} else {
// This direct reset bypasses the HTTP UpdateIssue
// handler that normally emits issue:updated, so emit
// it here too. Without it the board / status-filter
// caches keep showing the issue as in_progress until
// the next write touches it (#4648 / MUL-3782).
s.broadcastIssueUpdated(updatedIssue, issue.Status)
}
}
}
}
}
if workspaceID == "" {
workspaceID = s.ResolveTaskWorkspaceID(ctx, t)
}
if workspaceID != "" {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskFailed,
WorkspaceID: workspaceID,
ActorType: "system",
Payload: map[string]any{
"task_id": util.UUIDToString(t.ID),
"agent_id": util.UUIDToString(t.AgentID),
"issue_id": util.UUIDToString(t.IssueID),
"status": "failed",
"failure_reason": failureReason,
},
})
}
affectedAgents[util.UUIDToString(t.AgentID)] = t.AgentID
}
for _, agentID := range affectedAgents {
s.ReconcileAgentStatus(ctx, agentID)
}
return retried
}
// runInTx executes fn inside a single DB transaction. If TxStarter is nil
// (e.g. some tests construct TaskService directly), fn runs against the
// regular Queries handle without transactional guarantees.
func (s *TaskService) runInTx(ctx context.Context, fn func(*db.Queries) error) error {
if s.TxStarter == nil {
return fn(s.Queries)
}
tx, err := s.TxStarter.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
if err := fn(s.Queries.WithTx(tx)); err != nil {
return err
}
return tx.Commit(ctx)
}
// ReportProgress broadcasts a progress update via the event bus.
func (s *TaskService) ReportProgress(ctx context.Context, taskID string, workspaceID string, summary string, step, total int) {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskProgress,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
TaskID: taskID,
Payload: protocol.TaskProgressPayload{
TaskID: taskID,
Summary: summary,
Step: step,
Total: total,
},
})
}
// ReconcileAgentStatus refreshes agent status from the current active task set.
func (s *TaskService) ReconcileAgentStatus(ctx context.Context, agentID pgtype.UUID) {
agent, err := s.Queries.RefreshAgentStatusFromTasks(ctx, agentID)
if err != nil {
return
}
slog.Debug("agent status reconciled", "agent_id", util.UUIDToString(agentID), "status", agent.Status)
s.publishAgentStatus(agent)
}
func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID, status string) {
agent, err := s.Queries.UpdateAgentStatus(ctx, db.UpdateAgentStatusParams{
ID: agentID,
Status: status,
})
if err != nil {
return
}
s.publishAgentStatus(agent)
}
func (s *TaskService) publishAgentStatus(agent db.Agent) {
s.Bus.Publish(events.Event{
Type: protocol.EventAgentStatus,
WorkspaceID: util.UUIDToString(agent.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"agent": agentToMap(agent)},
})
}
// LoadAgentSkills loads an agent's skills with their files for task execution.
func (s *TaskService) LoadAgentSkills(ctx context.Context, agentID pgtype.UUID) []AgentSkillData {
skills, err := s.Queries.ListAgentSkills(ctx, agentID)
if err != nil || len(skills) == 0 {
return nil
}
result := make([]AgentSkillData, 0, len(skills))
for _, sk := range skills {
data := AgentSkillData{
ID: util.UUIDToString(sk.ID),
Name: sk.Name,
Description: sk.Description,
Content: sk.Content,
}
files, _ := s.Queries.ListSkillFiles(ctx, sk.ID)
for _, f := range files {
data.Files = append(data.Files, AgentSkillFileData{Path: f.Path, Content: f.Content})
}
result = append(result, data)
}
return result
}
// LoadAgentSkillBundles returns every skill visible to an agent, including
// built-ins, with stable bundle hashes and lightweight refs for slim claims.
func (s *TaskService) LoadAgentSkillBundles(ctx context.Context, agentID pgtype.UUID) ([]AgentSkillData, []AgentSkillRefData) {
skills := s.LoadAgentSkills(ctx, agentID)
skills = append(skills, s.BuiltinSkills()...)
return BuildAgentSkillBundles(skills)
}
func BuildAgentSkillBundles(skills []AgentSkillData) ([]AgentSkillData, []AgentSkillRefData) {
bundles := make([]AgentSkillData, 0, len(skills))
refs := make([]AgentSkillRefData, 0, len(skills))
for _, skill := range skills {
source := skill.Source
id := skill.ID
if source == "" {
if id == "" {
source = skillbundle.SourceBuiltin
} else {
source = skillbundle.SourceWorkspace
}
}
if id == "" && source == skillbundle.SourceBuiltin {
id = "builtin:" + skill.Name
}
skill.Source = source
skill.ID = id
files := make([]skillbundle.File, 0, len(skill.Files))
for _, file := range skill.Files {
files = append(files, skillbundle.File{Path: file.Path, Content: file.Content})
}
manifest := skillbundle.BuildManifest(skillbundle.Skill{
ID: skill.ID,
Source: skill.Source,
Name: skill.Name,
Description: skill.Description,
Content: skill.Content,
Files: files,
})
skill.Hash = manifest.Hash
skill.SizeBytes = manifest.SizeBytes
fileRefsByPath := make(map[string]skillbundle.FileRef, len(manifest.Files))
for _, file := range manifest.Files {
fileRefsByPath[file.Path] = file
}
for i := range skill.Files {
if ref, ok := fileRefsByPath[skill.Files[i].Path]; ok {
skill.Files[i].SHA256 = ref.SHA256
skill.Files[i].SizeBytes = ref.SizeBytes
}
}
bundles = append(bundles, skill)
refFiles := make([]AgentSkillFileRefData, 0, len(manifest.Files))
for _, file := range manifest.Files {
refFiles = append(refFiles, AgentSkillFileRefData{
Path: file.Path,
SHA256: file.SHA256,
SizeBytes: file.SizeBytes,
})
}
refs = append(refs, AgentSkillRefData{
ID: skill.ID,
Source: skill.Source,
Name: skill.Name,
Description: skill.Description,
Hash: manifest.Hash,
SizeBytes: manifest.SizeBytes,
FileCount: manifest.FileCount,
Files: refFiles,
})
}
return bundles, refs
}
// AgentSkillData represents a skill for task execution responses.
type AgentSkillData struct {
ID string `json:"id"`
Source string `json:"source,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Hash string `json:"hash,omitempty"`
SizeBytes int64 `json:"size_bytes,omitempty"`
Content string `json:"content"`
Files []AgentSkillFileData `json:"files,omitempty"`
}
// AgentSkillFileData represents a supporting file within a skill.
type AgentSkillFileData struct {
Path string `json:"path"`
Content string `json:"content"`
SHA256 string `json:"sha256,omitempty"`
SizeBytes int64 `json:"size_bytes,omitempty"`
}
type AgentSkillRefData struct {
ID string `json:"id"`
Source string `json:"source"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Hash string `json:"hash"`
SizeBytes int64 `json:"size_bytes"`
FileCount int `json:"file_count"`
Files []AgentSkillFileRefData `json:"files,omitempty"`
}
type AgentSkillFileRefData struct {
Path string `json:"path"`
SHA256 string `json:"sha256"`
SizeBytes int64 `json:"size_bytes"`
}
// computeChatElapsedMs returns the wall-clock duration from task creation
// (user hit send) to terminal state (completed/failed). Stored on the
// assistant chat_message so the UI can render "Replied in 38s" /
// "Failed after 12s". Uses created_at — not started_at — because users
// experience total wait time, including queue + dispatch, not just the
// daemon's actual run time.
func computeChatElapsedMs(task db.AgentTaskQueue) pgtype.Int8 {
if !task.CompletedAt.Valid || !task.CreatedAt.Valid {
return pgtype.Int8{}
}
ms := task.CompletedAt.Time.Sub(task.CreatedAt.Time).Milliseconds()
if ms < 0 {
ms = 0
}
return pgtype.Int8{Int64: ms, Valid: true}
}
func priorityToInt(p string) int32 {
switch p {
case "urgent":
return 4
case "high":
return 3
case "medium":
return 2
case "low":
return 1
default:
return 0
}
}
// NotifyTaskEnqueued is the cross-package shim for callers outside
// TaskService (e.g. AutopilotService.dispatchRunOnly) that insert a
// row into agent_task_queue directly. Invalidates the empty-claim
// cache and kicks the daemon WS so the new task is claimed without
// waiting for the next poll.
func (s *TaskService) NotifyTaskEnqueued(ctx context.Context, task db.AgentTaskQueue) {
s.captureTaskQueued(ctx, task)
s.notifyTaskAvailable(task)
}
// notifyTaskAvailable runs after a task has been inserted: bumps the
// runtime's invalidation version so any in-flight claim that is about
// to write an "empty" verdict will have it rejected on read, then
// kicks the daemon WS so the daemon claims without waiting for its
// next poll. Order matters — Bump must happen before the wakeup,
// otherwise the wakeup-driven claim could read the still-current
// empty verdict and return null.
func (s *TaskService) notifyTaskAvailable(task db.AgentTaskQueue) {
if !task.RuntimeID.Valid {
return
}
runtimeKey := util.UUIDToString(task.RuntimeID)
// Use a background context: the cache bump / wakeup must outlive
// the request that created the task, otherwise an early client
// disconnect could leave the empty verdict in place and stall the
// just-queued task until the TTL expires. The cache itself bounds
// every Redis call with a short timeout so a wedged Redis cannot
// block enqueue.
s.EmptyClaim.Bump(context.Background(), runtimeKey)
if s.Wakeup == nil {
return
}
s.Wakeup.NotifyTaskAvailable(runtimeKey, util.UUIDToString(task.ID))
}
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
var payload map[string]any
if task.Context != nil {
json.Unmarshal(task.Context, &payload)
}
if payload == nil {
payload = map[string]any{}
}
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
payload["issue_id"] = util.UUIDToString(task.IssueID)
payload["agent_id"] = util.UUIDToString(task.AgentID)
// chat_session_id is the routing key the chat window uses to writethrough
// `chatKeys.pendingTask` to status="running" the moment the daemon claims
// the task. Without it the pill stays stuck at "Queued" until completion.
if task.ChatSessionID.Valid {
payload["chat_session_id"] = util.UUIDToString(task.ChatSessionID)
}
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventTaskDispatch,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
payload := map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
}
if task.ChatSessionID.Valid {
payload["chat_session_id"] = util.UUIDToString(task.ChatSessionID)
}
s.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
// ResolveTaskWorkspaceID determines the workspace ID for a task.
// For issue tasks, it comes from the issue. For chat tasks, from the chat session.
// For autopilot tasks, from the autopilot via its run.
// Returns "" when none of the links resolve — callers treat that as "not found".
func (s *TaskService) ResolveTaskWorkspaceID(ctx context.Context, task db.AgentTaskQueue) string {
if task.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
return util.UUIDToString(issue.WorkspaceID)
}
}
if task.ChatSessionID.Valid {
if cs, err := s.Queries.GetChatSession(ctx, task.ChatSessionID); err == nil {
return util.UUIDToString(cs.WorkspaceID)
}
}
if task.AutopilotRunID.Valid {
if run, err := s.Queries.GetAutopilotRun(ctx, task.AutopilotRunID); err == nil {
if ap, err := s.Queries.GetAutopilot(ctx, run.AutopilotID); err == nil {
return util.UUIDToString(ap.WorkspaceID)
}
}
}
// Quick-create tasks have no issue / chat / autopilot link — workspace
// lives in the context JSONB. Returning "" here is what blocked
// requireDaemonTaskAccess (404 on /start, /progress, /complete, /fail
// for the daemon) and silently dropped task:dispatch / task:completed
// broadcasts, which is why quick-create tasks appeared stuck queued.
if qc, ok := s.parseQuickCreateContext(task); ok {
return qc.WorkspaceID
}
return ""
}
func (s *TaskService) broadcastChatDone(ctx context.Context, task db.AgentTaskQueue, msg *db.ChatMessage) {
workspaceID := s.ResolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return
}
payload := protocol.ChatDonePayload{
ChatSessionID: util.UUIDToString(task.ChatSessionID),
TaskID: util.UUIDToString(task.ID),
}
if msg != nil {
payload.MessageID = util.UUIDToString(msg.ID)
payload.Content = msg.Content
if msg.CreatedAt.Valid {
payload.CreatedAt = msg.CreatedAt.Time.UTC().Format(time.RFC3339Nano)
}
if msg.ElapsedMs.Valid {
payload.ElapsedMs = msg.ElapsedMs.Int64
}
}
s.Bus.Publish(events.Event{
Type: protocol.EventChatDone,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
ChatSessionID: util.UUIDToString(task.ChatSessionID),
Payload: payload,
})
}
// broadcastIssueUpdated publishes the issue:updated event the frontend's
// realtime reconcile (onIssueUpdated) relies on to move an issue between status
// columns / status filters and reconcile their bucket counts. prevStatus is the
// issue's status before the write so the client can gate that reconcile on
// status_changed.
//
// The `issue` payload is a map (issueToMap), which the workspace WS fanout
// (listeners.go SubscribeAll) marshals and broadcasts as-is — that is what
// drives the UI reconcile. Note this does NOT cover the full HTTP UpdateIssue
// side effects: the activity-log and inbox listeners type-assert `issue` to a
// handler.IssueResponse and skip a map, so a background status reset does not
// emit status-change activity / notifications. That is intentional for the
// realtime-staleness fix (#4648 / MUL-3782); folding those side effects in
// would mean unifying the payload type and is left as a follow-up.
func (s *TaskService) broadcastIssueUpdated(issue db.Issue, prevStatus string) {
prefix := s.getIssuePrefix(issue.WorkspaceID)
s.Bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"issue": issueToMap(issue, prefix),
"status_changed": prevStatus != issue.Status,
"prev_status": prevStatus,
},
})
}
func (s *TaskService) getIssuePrefix(workspaceID pgtype.UUID) string {
ws, err := s.Queries.GetWorkspace(context.Background(), workspaceID)
if err != nil {
return ""
}
return ws.IssuePrefix
}
func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID pgtype.UUID, content, commentType string, parentID, sourceTaskID pgtype.UUID) {
if content == "" {
return
}
// Look up issue to get workspace ID for mention expansion and broadcasting.
issue, err := s.Queries.GetIssue(ctx, issueID)
if err != nil {
return
}
// Resolve the thread root for thread-level side effects without overwriting
// parentID. The stored parent_id must remain the exact comment being replied
// to; recursive thread reads recover the root when needed.
var rootComment *db.Comment
if parentID.Valid {
if root, err := s.Queries.GetThreadRoot(ctx, db.GetThreadRootParams{
CommentID: parentID,
WorkspaceID: issue.WorkspaceID,
}); err == nil {
rootComment = &root
}
}
comment, err := s.Queries.CreateComment(ctx, db.CreateCommentParams{
IssueID: issueID,
WorkspaceID: issue.WorkspaceID,
AuthorType: "agent",
AuthorID: agentID,
Content: content,
Type: commentType,
ParentID: parentID,
SourceTaskID: sourceTaskID,
})
if err != nil {
return
}
s.CancelDeferredEscalationsForIssueAgent(ctx, issueID, agentID)
s.Bus.Publish(events.Event{
Type: protocol.EventCommentCreated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "agent",
ActorID: util.UUIDToString(agentID),
Payload: map[string]any{
"comment": map[string]any{
"id": util.UUIDToString(comment.ID),
"issue_id": util.UUIDToString(comment.IssueID),
"author_type": comment.AuthorType,
"author_id": util.UUIDToString(comment.AuthorID),
"content": comment.Content,
"type": comment.Type,
"parent_id": util.UUIDToPtr(comment.ParentID),
"source_task_id": util.UUIDToPtr(comment.SourceTaskID),
"created_at": comment.CreatedAt.Time.Format("2006-01-02T15:04:05Z"),
},
"issue_title": issue.Title,
"issue_status": issue.Status,
},
})
s.AutoUnresolveThreadOnReply(ctx, rootComment, util.UUIDToString(issue.WorkspaceID), "agent", util.UUIDToString(agentID))
}
// AutoUnresolveThreadOnReply clears resolved_at on the thread root when a
// reply lands in a resolved thread, and broadcasts comment:unresolved. Shared
// between the user-facing Handler.CreateComment path and the agent-facing
// TaskService.createAgentComment path so the resolved-then-replied state can
// never desync (one of the bugs Emacs flagged on PR #2300). Errors are logged
// — the reply itself already committed, the desync is recoverable on next read.
func (s *TaskService) AutoUnresolveThreadOnReply(ctx context.Context, parent *db.Comment, workspaceID, actorType, actorID string) {
if parent == nil || !parent.ResolvedAt.Valid {
return
}
updated, err := s.Queries.UnresolveComment(ctx, parent.ID)
if err != nil {
slog.Warn("auto-unresolve on reply failed", "error", err, "comment_id", util.UUIDToString(parent.ID))
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventCommentUnresolved,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
Payload: map[string]any{
"comment": map[string]any{
"id": util.UUIDToString(updated.ID),
"issue_id": util.UUIDToString(updated.IssueID),
"author_type": updated.AuthorType,
"author_id": util.UUIDToString(updated.AuthorID),
"content": updated.Content,
"type": updated.Type,
"parent_id": util.UUIDToPtr(updated.ParentID),
"created_at": util.TimestampToString(updated.CreatedAt),
"updated_at": util.TimestampToString(updated.UpdatedAt),
"resolved_at": util.TimestampToPtr(updated.ResolvedAt),
"resolved_by_type": util.TextToPtr(updated.ResolvedByType),
"resolved_by_id": util.UUIDToPtr(updated.ResolvedByID),
},
},
})
}
func issueToMap(issue db.Issue, issuePrefix string) map[string]any {
return map[string]any{
"id": util.UUIDToString(issue.ID),
"workspace_id": util.UUIDToString(issue.WorkspaceID),
"number": issue.Number,
"identifier": issuePrefix + "-" + strconv.Itoa(int(issue.Number)),
"title": issue.Title,
"description": util.TextToPtr(issue.Description),
"status": issue.Status,
"priority": issue.Priority,
"assignee_type": util.TextToPtr(issue.AssigneeType),
"assignee_id": util.UUIDToPtr(issue.AssigneeID),
"creator_type": issue.CreatorType,
"creator_id": util.UUIDToString(issue.CreatorID),
"parent_issue_id": util.UUIDToPtr(issue.ParentIssueID),
"position": issue.Position,
"start_date": util.DateToPtr(issue.StartDate),
"due_date": util.DateToPtr(issue.DueDate),
"created_at": util.TimestampToString(issue.CreatedAt),
"updated_at": util.TimestampToString(issue.UpdatedAt),
}
}
// parseQuickCreateContext returns the quick-create payload if the task's
// context JSONB contains type == "quick_create"; otherwise the bool is
// false so callers can short-circuit. Tasks linked to an issue / chat /
// autopilot are never quick-create even if they happen to carry a
// context blob, so those are filtered up front.
func (s *TaskService) parseQuickCreateContext(task db.AgentTaskQueue) (QuickCreateContext, bool) {
if task.IssueID.Valid || task.ChatSessionID.Valid || task.AutopilotRunID.Valid {
return QuickCreateContext{}, false
}
if len(task.Context) == 0 {
return QuickCreateContext{}, false
}
var qc QuickCreateContext
if err := json.Unmarshal(task.Context, &qc); err != nil {
return QuickCreateContext{}, false
}
if qc.Type != QuickCreateContextType {
return QuickCreateContext{}, false
}
return qc, true
}
// notifyQuickCreateCompleted writes a success inbox notification to the
// requester pointing at the issue the agent just created. The issue is
// stamped with origin_type=quick_create + origin_id=<task_id> by the
// daemon-injected MULTICA_QUICK_CREATE_TASK_ID env var, so this lookup is
// deterministic — robust against the same agent creating other issues in
// parallel (e.g. assignment task running while max_concurrent_tasks > 1
// permits another quick-create alongside it).
func (s *TaskService) notifyQuickCreateCompleted(ctx context.Context, task db.AgentTaskQueue, qc QuickCreateContext) {
requesterID, err := util.ParseUUID(qc.RequesterID)
if err != nil {
slog.Warn("quick-create completion: invalid requester id", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
workspaceID, err := util.ParseUUID(qc.WorkspaceID)
if err != nil {
slog.Warn("quick-create completion: invalid workspace id", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
issue, err := s.Queries.GetIssueByOrigin(ctx, db.GetIssueByOriginParams{
WorkspaceID: workspaceID,
OriginType: pgtype.Text{String: "quick_create", Valid: true},
OriginID: task.ID,
})
if err != nil {
// No issue created — agent ran to completion but the CLI call must
// have failed. Surface as a failure inbox so the user sees something.
slog.Warn("quick-create completion: no issue found, writing failure inbox",
"task_id", util.UUIDToString(task.ID),
"agent_id", util.UUIDToString(task.AgentID),
"workspace_id", qc.WorkspaceID,
)
s.notifyQuickCreateFailed(ctx, task, qc, "agent finished without creating an issue")
return
}
// Link the new issue back to this task so subsequent reads of the task
// (Activity tab, Recent work, etc.) render it as a normal issue task
// (kind = "direct") instead of staying on the "Creating issue" active-
// wording label. Best-effort: a write failure here doesn't block the
// inbox notification, which is the more important signal to the user.
if err := s.Queries.LinkTaskToIssue(ctx, db.LinkTaskToIssueParams{
ID: task.ID,
IssueID: issue.ID,
}); err != nil {
slog.Warn("quick-create completion: link task→issue failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"error", err,
)
}
// Subscribe the requester so they receive notifications for follow-up
// comments and updates. The DB row's creator_type/creator_id is the
// agent (it ran the CLI), but the human who triggered the quick-create
// is the semantic creator from a UX perspective — without this they
// only see the one-shot completion inbox and miss everything after.
// Best-effort: log on failure but don't block the inbox notification.
if err := s.Queries.AddIssueSubscriber(ctx, db.AddIssueSubscriberParams{
IssueID: issue.ID,
UserType: "member",
UserID: requesterID,
Reason: "creator",
}); err != nil {
slog.Warn("quick-create completion: subscribe requester failed",
"task_id", util.UUIDToString(task.ID),
"issue_id", util.UUIDToString(issue.ID),
"requester_id", qc.RequesterID,
"error", err,
)
} else {
s.Bus.Publish(events.Event{
Type: protocol.EventSubscriberAdded,
WorkspaceID: qc.WorkspaceID,
ActorType: "agent",
ActorID: util.UUIDToString(task.AgentID),
Payload: map[string]any{
"issue_id": util.UUIDToString(issue.ID),
"user_type": "member",
"user_id": qc.RequesterID,
"reason": "creator",
},
})
}
prefix := s.getIssuePrefix(workspaceID)
identifier := fmt.Sprintf("%s-%d", prefix, issue.Number)
details, _ := json.Marshal(map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(issue.ID),
"identifier": identifier,
"original_prompt": qc.Prompt,
})
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: workspaceID,
RecipientType: "member",
RecipientID: requesterID,
Type: "quick_create_done",
Severity: "info",
IssueID: issue.ID,
Title: issue.Title,
Body: pgtype.Text{},
ActorType: pgtype.Text{String: "agent", Valid: true},
ActorID: task.AgentID,
Details: details,
})
if err != nil {
slog.Error("quick-create completion: inbox write failed", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
s.publishQuickCreateInbox(item, qc.WorkspaceID, util.UUIDToString(task.AgentID), issue.Status)
}
// notifyQuickCreateFailed writes a failure inbox notification carrying the
// original prompt + agent ID so the frontend can render an "Edit as
// advanced form" entry that pre-fills the legacy create-issue modal
// without asking the user to retype.
func (s *TaskService) notifyQuickCreateFailed(ctx context.Context, task db.AgentTaskQueue, qc QuickCreateContext, errMsg string) {
requesterID, err := util.ParseUUID(qc.RequesterID)
if err != nil {
return
}
workspaceID, err := util.ParseUUID(qc.WorkspaceID)
if err != nil {
return
}
if errMsg == "" {
errMsg = "Quick create did not finish successfully"
}
details, _ := json.Marshal(map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"original_prompt": qc.Prompt,
"error": redact.Text(errMsg),
})
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: workspaceID,
RecipientType: "member",
RecipientID: requesterID,
Type: "quick_create_failed",
Severity: "action_required",
IssueID: pgtype.UUID{},
Title: "Quick create failed",
Body: pgtype.Text{String: redact.Text(errMsg), Valid: true},
ActorType: pgtype.Text{String: "agent", Valid: true},
ActorID: task.AgentID,
Details: details,
})
if err != nil {
slog.Error("quick-create failure: inbox write failed", "task_id", util.UUIDToString(task.ID), "error", err)
return
}
s.publishQuickCreateInbox(item, qc.WorkspaceID, util.UUIDToString(task.AgentID), "")
}
// publishQuickCreateInbox emits the WS event so the requester's inbox list
// updates immediately. Mirrors the payload shape used by the other inbox
// listeners (notification_listeners.go).
func (s *TaskService) publishQuickCreateInbox(item db.InboxItem, workspaceID, agentID, issueStatus string) {
resp := map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
"details": json.RawMessage(item.Details),
"issue_status": issueStatus,
}
s.Bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: workspaceID,
ActorType: "agent",
ActorID: agentID,
Payload: map[string]any{"item": resp},
})
}
// agentToMap builds a simple map for broadcasting agent status updates.
func agentToMap(a db.Agent) map[string]any {
var rc any
if a.RuntimeConfig != nil {
json.Unmarshal(a.RuntimeConfig, &rc)
}
return map[string]any{
"id": util.UUIDToString(a.ID),
"workspace_id": util.UUIDToString(a.WorkspaceID),
"runtime_id": util.UUIDToString(a.RuntimeID),
"name": a.Name,
"description": a.Description,
"avatar_url": util.TextToPtr(a.AvatarUrl),
"runtime_mode": a.RuntimeMode,
"runtime_config": rc,
"visibility": a.Visibility,
"status": a.Status,
"max_concurrent_tasks": a.MaxConcurrentTasks,
"owner_id": util.UUIDToPtr(a.OwnerID),
"skills": []any{},
"created_at": util.TimestampToString(a.CreatedAt),
"updated_at": util.TimestampToString(a.UpdatedAt),
"archived_at": util.TimestampToPtr(a.ArchivedAt),
"archived_by": util.UUIDToPtr(a.ArchivedBy),
}
}