Files
multica/server/internal/handler/daemon.go
LinYushen cb68669c73 feat(composio): gate MCP apps behind feature flag (#4876)
* feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720) (#4608)

* feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720)

Compose the merged server/pkg/composio SDK into a user-facing connection
manager: signed-state connect handshake, local user_composio_connection
mirror, idempotent disconnect, and a per-user MCP session helper (not yet
wired into task dispatch).

- migration 127_user_composio_connection (no FK/cascade, per DB rules)
- sqlc queries: upsert (idempotent on user_id+connected_account_id), list
  active, owner-scoped get, mark revoked
- internal/integrations/composio: signed HMAC-SHA256 state, BeginConnect,
  CompleteCallback (idempotent upsert), ListConnections, Disconnect
  (upstream 404 = idempotent success), CreateMCPSession (no-op when empty,
  pins connected_accounts per toolkit), CallbackRedirect
- REST handlers under /api/integrations/composio (user-scoped, 503 when
  COMPOSIO_API_KEY unset): connect/init, callback (302), connections list,
  delete
- router wiring gated by COMPOSIO_API_KEY; COMPOSIO_AUTH_CONFIGS_JSON maps
  toolkit->auth_config (MVP: notion); state secret from COMPOSIO_STATE_SECRET
  or derived from JWT_SECRET; callback base from COMPOSIO_CALLBACK_BASE_URL
  or MULTICA_PUBLIC_URL
- tests: state (expire/tamper/wrong-secret), service (mapping, callback
  idempotency, non-success, disconnect owner/404 idempotency, MCP pin),
  handlers (httptest), redact regression for Bearer mcp_ tokens

MVP scope: Notion only; no task-dispatch overlay, sharing, or webhook
event handling (later stages).

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): bind callback account to user + idempotent revoked disconnect (MUL-3720)

Address PR 4608 review (CHANGES_REQUESTED):

- callback: verify connected_account_id with Composio before mirroring it.
  The signed state only proved user/toolkit/exp, so a valid state paired with
  a tampered connected_account_id would be written verbatim. CompleteCallback
  now calls ListConnectedAccounts and fails closed (ErrAccountVerification)
  unless the account belongs to the state's user (composio_user_id == multica
  user id) and was created under the toolkit's auth config. No row is written
  on mismatch / unknown account / upstream error.

- disconnect: short-circuit to a no-op when the local row is already revoked,
  before touching upstream. Previously a second DELETE re-hit Composio and a
  non-404 upstream error surfaced as a 502, breaking the 204-idempotent
  contract.

- CreateMCPSession: document the v1 single-active-connection-per-(user,toolkit)
  constraint and make duplicate selection deterministic (newest-wins, rows are
  connected_at DESC) instead of order-dependent map overwrite. Stage 3 owns the
  real single-account-enforcement vs multi-account-shape decision.

Tests: tampered/wrong-auth-config/unknown-account callback rejection, revoked-row
disconnect no-op (asserts upstream not re-hit). composio pkg 85% coverage; all
green.

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): list all toolkits + dynamic auth-config resolution (MUL-3720)

Yushen's follow-up to the Notion MVP: surface the full Composio toolkit
catalog, render it in Settings, and drop the static env mapping in favor of
dynamic auth-config discovery.

Config correctness (per Composio docs):
- Remove COMPOSIO_AUTH_CONFIGS_JSON entirely. The toolkit→auth_config mapping
  is now resolved at request time from the project's /auth_configs (cached,
  5-min TTL), so enabling a toolkit is a dashboard action, not a redeploy.
- Do NOT add COMPOSIO_PROJECT_ID. The project API key (x-api-key) authenticates
  to exactly one project; the project is resolved from the key. Only org-level
  endpoints use x-org-api-key, which this integration never calls.

Backend:
- SDK: server/pkg/composio/auth_configs.go — ListAuthConfigs (toolkit_slug,
  is_composio_managed, show_disabled, limit, cursor).
- service: dynamic resolver (authConfigMap cache; betterAuthConfig prefers a
  custom/white-label config over Composio-managed, newest wins); BeginConnect
  and CompleteCallback resolve via it; ListToolkits fetches the full catalog
  (paginated, capped) annotated with connectable = has an enabled auth config,
  connectable-first ordering.
- handler + route: GET /api/integrations/composio/toolkits (user-scoped, 503
  when COMPOSIO_API_KEY unset) returning slug/name/logo/category/connectable.

Frontend:
- core: ComposioToolkit/ComposioConnection types, api client methods, and
  composio query options (@multica/core/composio).
- views: Settings → Integrations now has a Composio section rendering every
  toolkit as a card with search. Connect is gated on `connectable`;
  non-connectable toolkits show a muted "not configured" hint instead of a
  dead button. Connected toolkits show a badge + Disconnect (with confirm).
- i18n: composio block added to en/zh-Hans/ja/ko settings.

Tests: SDK + service (dynamic resolution, custom-over-managed preference,
connectable flag, resolver-error soft-degrade) and handler toolkits endpoint;
composio pkg 85.7% coverage. go build/vet/gofmt clean; core+views typecheck,
core+views lint, and core tests (691) all green.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): close cross-toolkit callback fail-open by signing auth_config_id into state (MUL-3720)

Re-review blocker: CompleteCallback resolved the toolkit's auth config at
callback time and ignored a resolve error/empty result, while
verifyAccountOwnership skipped the auth-config comparison when the expected
value was empty. A user could then pass another toolkit's connected_account_id
into this toolkit's callback — the owner check passed and it was written under
the wrong toolkit_slug/account binding.

Fix: the auth_config_id is already resolved in BeginConnect (before the state
is signed), so sign it into the state and compare it exactly at callback. No
re-resolve, no fail-open. verifyAccountOwnership now fails closed when the
expected auth config is empty (rejects instead of skipping) and requires an
exact match — closing the cross-toolkit binding gap.

Tests: state round-trips auth_config_id; BeginConnect signs it; callback
rejects wrong/cross-toolkit auth config and an empty (no-mapping) auth config
fails closed. composio pkg 85.2% coverage, all green.

Frontend (non-blocking): the Composio settings tab now surfaces an error when
the connections query fails instead of silently rendering everything as
unconnected.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): hide Settings section entirely when integration unconfigured (MUL-3720)

Decision (option 2, hide-then-merge): don't show a card that leaks the internal
COMPOSIO_API_KEY env-var name to every end user. IntegrationsTab now gates the
whole Composio section (heading + body) on the toolkits query — a 503 means the
key is unset, so the section is withheld instead of rendering the not-configured
card. Admin-only setup guidance is a later, role-gated affordance.

Removed the notConfigured card (and now-unused ApiError import) from
ComposioTab; it only mounts when configured. views typecheck + lint clean.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): Stage 2 frontend polish — callback toast, last_used & expired UI, e2e (MUL-3718) (#4688)

* feat(composio): callback toast + refresh, last_used & expired UI, e2e (MUL-3718)

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): real callback redirect route + StrictMode-safe toast dedup (MUL-3718 review)

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): callback endpoint should not require Multica auth (MUL-3843) (#4709)

* fix(composio): move OAuth callback out of the Auth group (MUL-3843)

Composio 302-redirects the browser to /api/integrations/composio/callback
at the end of the OAuth flow, but PR #4608 mounted it inside the cookie-auth
middleware group. When the session cookie is absent (expired session,
SameSite=Strict / Safari ITP, private window, self-hosted callback subdomain)
the Auth middleware returned a hard 401 and a JSON blob instead of the
settings redirect, breaking the flow.

Identity never came from the cookie anyway: it is carried by the HMAC-signed
state param that CompleteCallback verifies (signature, expiry, replay) and
cross-checked by verifyAccountOwnership; h.Composio == nil still 503s. So the
callback is registered alongside the other public OAuth/webhook routes; the
other four composio endpoints stay session-gated.

Refs MUL-3843, MUL-3715.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): correct stale callback routing comments (MUL-3843)

The package header and ComposioCallback doc comments still described the
callback as sitting under the Auth middleware group. After the route was
moved out (this PR), update both to state it is a public route whose identity
comes from the signed state — addressing review nit from 张大彪.

Refs MUL-3843.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): inject MCP overlay into agent runtime at task dispatch (MUL-3721) (#4704)

Stage 3 of the Composio epic. Wires the per-user Composio MCP session into
every agent task so the agent process sees the initiator's connected tools
without any prompt-time plumbing.

Server side
  - Migration 128 adds agent_task_queue.runtime_mcp_overlay JSONB plus a
    BEFORE-UPDATE trigger that wipes the column on any transition into a
    terminal status (completed / failed / cancelled). A trigger is the single
    source of truth — future queries that flip status cannot bypass it.
  - composio.Service.BuildTaskOverlay(userID) reuses CreateMCPSession and
    emits the Claude-style { mcpServers: { composio: { type: http, url,
    headers } } } shape the daemon's existing sidecar generators consume.
    Returns (nil, nil) on zero active connections so we never burn a
    Composio session for a user with nothing to call.
  - TaskService grows a Composio ComposioOverlayBuilder seam, wired in
    router.go after composiointeg.NewService succeeds. Five enqueue paths
    (issue / mention / quick-create / chat / auto-retry) attach the overlay
    after CreateAgentTask returns and before the daemon is notified — so
    every claim reads a settled row, with no second daemon hop. Best-effort:
    a builder failure logs and proceeds with no overlay.
  - resolveInitiatorFromTriggerComment derives the initiator user from the
    trigger comment when it was authored by a member. Agent-authored
    triggers are not treated as initiators (their connected-apps view is
    empty by construction).

Daemon side
  - handler/daemon.go claim path merges task.runtime_mcp_overlay onto
    agent.mcp_config via mergeMCPOverlay before populating
    TaskAgentData.McpConfig. Overlay wins on server-name collisions
    because it carries the live user-scoped session URL. Errors fall back
    to the agent config unchanged — a bad overlay must not surprise-disable
    saved MCP tools. The existing execenv sidecar generators (cursor /
    codex / openclaw / opencode / hermes-kiro) need no changes: they keep
    consuming the merged result through TaskAgentData.McpConfig.

Tests
  - 9 merge cases (mcp_overlay_test): both-nil short-circuit, agent-only
    pass-through, overlay-only canonicalization, two-side merge, name
    collision (overlay wins), top-level key preservation, malformed agent
    fallback, malformed overlay fallback, non-object server rejection.
  - 4 dispatch cases (composio): zero-connections returns nil without
    CreateSession, happy-path emits the right shape with the right user
    id, empty-URL defensive branch, SDK error surfacing.
  - 4 TaskService helper cases: nil Composio is a no-op (Queries-safe),
    invalid initiator does not call the builder, nil overlay skips the
    UPDATE, builder error swallowed without panic.
  - Migration 128 verified to roll up + down + up cleanly against the test
    database.

Out of scope (deferred): assignment-triggered enqueue paths with no
trigger comment get no overlay attached today (no initiator UUID flows
through enqueueIssueTask in that case). Retry paths recompute the overlay
fresh from the parent's initiator_user_id instead of inheriting the bearer
from the parent row, so a stale token can never resurface on a retry.

Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): per-agent allowlist + originator-scoped MCP overlay (MUL-3869) (#4736)

* feat(composio): per-agent allowlist + originator-scoped MCP overlay (MUL-3869)

Stage 3.1 of the Composio epic (MUL-3721 parent). PR #4704 wired in the
runtime_mcp_overlay column and a per-task dispatch hook; this change
inverts the default from "all-on" to opt-in and locks the overlay to the
agent owner's own connected apps:

- Agents carry composio_toolkit_allowlist TEXT[]. NULL or [] => no MCP.
  Owner-only read/write; non-owner GET/PUT silently redacts/drops the
  field (same shape as mcp_config).
- agent_task_queue carries originator_user_id UUID. Set from the
  top-of-chain HUMAN at every enqueue path:
    * issue/mention comment by member  -> author_id
    * issue/mention comment by agent   -> inherit via comment.source_task_id
                                          -> parent task originator_user_id
    * quick-create                     -> requester_id
    * chat                             -> initiator_user_id
    * retry                            -> SQL-inherited from parent row
    * autopilot                        -> NULL (system-driven)
- BuildTaskOverlay (composio dispatch) now takes (ctx, originatorUserID,
  agent) and short-circuits on five gates: invalid originator,
  originator != agent.owner_id, empty allowlist, empty intersection of
  allowlist ∩ active connections, defensive empty session URL. Composio
  CreateSession is called with BOTH `toolkits.slugs` (the intersection)
  AND `connected_accounts` (the pinned account ids), narrowing the
  tool-router twice.
- The originator-vs-owner gate closes the agent-fanout privacy hole: any
  workspace member who can @-mention a public agent used to project the
  owner's connected apps into their run. Now the overlay only mounts
  when the human at the top of the chain IS the agent owner.

Tests:
- dispatch_test.go covers all 5 gates plus uppercase/whitespace slug
  normalisation.
- task_runtime_mcp_overlay_test.go covers the no-op gates of the new
  applyRuntimeMCPOverlay signature.
- agent_composio_allowlist_test.go (handler): owner roundtrip
  (list/empty/null), workspace-admin silent-drop, owner-only GET
  visibility, pure normaliseComposioToolkitAllowlist.
- resolve_originator_test.go (service, DB-backed): member-authored,
  agent-authored inherits via comment.source_task_id, invalid id.

Migration 129 up/down/up verified against docker postgres.

Co-authored-by: multica-agent <github@multica.ai>

* chore(composio): gofmt + regenerate sqlc with v1.31.1 (MUL-3869 review nits)

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica-ai.local>
Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): accept nested connected account auth config

* feat(views): creator-only MCP tab for per-agent Composio allowlist (MUL-3870) (#4743)

Stage 3.2 frontend on top of the Stage 3.1 backend (MUL-3869, 4708dba97).
Adds an agent-detail tab that lets the agent owner pick which of their own
active Composio connections this agent may mount as MCP servers, writing the
selection to agent.composio_toolkit_allowlist via the existing PUT /api/agents.

- core/types: composio_toolkit_allowlist (+ _redacted) on Agent; tri-state
  composio_toolkit_allowlist on UpdateAgentRequest (omit/no-change, null/clear,
  array/replace), matching the backend contract.
- core/agents: useUpdateAgentAllowlist - optimistic mutation hook (patches the
  cached workspace agent list, rolls back on error, invalidates on settle).
- views: AgentMcpTab renders the owner's active connections as checkboxes;
  empty state links to Settings -> Integrations; defensive redacted state.
- views: wired into AgentOverviewPane as tab "composio_mcp", labeled "MCP Apps"
  to disambiguate from the existing raw-JSON "MCP" (mcp_config) tab. The entry
  is gated to the creator (currentUserId === agent.owner_id), matching the
  backend's owner-only read/write of the allowlist.
- i18n: tabs.composio_mcp + tab_body.composio_mcp.* in en/ja/ko/zh-Hans.
- tests: agent-mcp-tab.test.tsx (gating, toggle->allowlist body, active-only,
  empty, redacted); e2e/agent-mcp.spec.ts (creator sees tab + PUT body,
  non-creator hidden) with Composio + agent endpoints mocked at the boundary.

Note: the product spec says "creator"; the schema has no creator_id - the
backend gate and redaction are keyed on owner_id, so the tab uses owner_id.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): mount remote MCP for codex

* feat(agents): agent invocation permission system (MUL-3963) (#4844)

* feat(agents): agent invocation permission system (permission_mode + invocation targets)

MUL-3963: split who may INVOKE an agent out of the overloaded visibility
column into an explicit, extensible model on feature/composio-integration.

- DB: agent.permission_mode (private|public_to) + agent_invocation_target
  table (workspace/member/team targets) + lossless backfill from visibility
  (migration 130).
- canInvokeAgent: owner-only for private (NO admin bypass, NO A2A bypass);
  public_to honours the allow-list; A2A judged by the top-of-chain originator.
- All trigger paths rewired: issue assign, comment @agent/@squad, chat,
  quick-create, autopilot, squad leader, child-done.
- Agent API: permission_mode + invocation_targets on responses and
  create/update (owner-only writes); legacy visibility kept as a derived field
  so old clients never see a permission widening.
- Composio: BuildTaskOverlay now FOLLOWS invocation permission and uses the
  agent OWNER connection (removed the originator==owner gate); front-end warns
  when a shared agent enables Composio apps.
- CLI: --permission-mode / --public-to-workspace / --public-to-member (legacy
  --visibility still mapped).
- Frontend: AccessPicker (Private / workspace / specific people / team soon),
  permission rules mirror canInvokeAgent, Composio warning banner.
- Tests: migration backfill, admin cannot invoke others private, public_to
  workspace/member whitelist, A2A by originator, Composio overlay uses owner
  connection.

Co-authored-by: multica-agent <github@multica.ai>

* feat(agents): stackable, mixed public_to invocation targets (MUL-3963)

Follow-up on PR #4844: public_to now supports selecting MULTIPLE, MIXED
targets on one agent (e.g. Public to workspace + specific people + team),
with canInvokeAgent admitting on ANY matching target (OR).

- Frontend AccessPicker: reworked from a single exclusive kind into a
  stackable multi-select — an "Everyone in workspace" toggle, a member
  multi-select checklist, and a (disabled, v1) team placeholder can be
  combined freely. Emits the full union of selected targets; empty union
  collapses to Private. Existing team targets are preserved across saves.
  Added the access.public_group locale string (en/zh-Hans/ja/ko).
- Backend already supported this (agent_invocation_target is multi-row per
  agent; create/update take a target ARRAY and batch-replace the whole
  allow-list; canInvokeAgent OR-matches). Added tests to lock it in:
  mixed member+team targets, overlapping-member batch replace, and
  workspace+member stacking then narrowing.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): address review on invocation permission (MUL-3963)

张大彪 review on PR #4844 — three blockers + product ruling + nits:

1. Migration 130: drop the FK/cascade on agent_invocation_target
   (agent_id, created_by) per the Multica no-FK rule; relationships are now
   maintained in the app layer (matching MUL-3515 §4). Added
   DeleteAgentInvocationTargetsByArchivedRuntimeAgents and call it before
   DeleteArchivedAgentsByRuntime in all three runtime-delete paths
   (runtime.go x2, runtime_profile.go) so hard-deleting agents can't orphan
   target rows.
2. revokeAndRemoveMember: prune the leaving member's member-target grants
   (DeleteAgentInvocationTargetsByMember) in the same tx as the member-row
   delete, so a re-invited user can't reclaim a stale invocation grant.
3. Empty public_to is a phantom — parsePermissionInput now normalises a
   public_to with no resolvable targets to a single workspace target, so
   `--permission-mode public_to` alone (and any empty target array) means
   "public to workspace" instead of "shared but nobody can run it".

Product ruling: the system/no-human-originator → workspace-target path in
canInvokeAgent is a deliberate, documented exception (webhook/system/
workspace-wide automation); member/team targets still fail closed without a
resolved originator. Documented in code + locked with a test.

Nits: refreshed the stale "originator must be owner" comments — models.go
(via migration 130 COMMENT ON COLUMN + sqlc regen for composio_toolkit_allowlist
and originator_user_id) and agent-mcp-tab.tsx — to the owner-connection +
invocation-permission rules.

Tests: member remove/re-add regression, system workspace exception + member
fail-closed, empty public_to → workspace (plus the earlier mixed/overlap/
batch-replace suite). Migration 130 applied to the test DB; Go handler/service/
composio suites green; views typecheck clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): scope member invocation-target cleanup to one workspace (MUL-3963)

张大彪 3rd review — cross-workspace permission bug + comment nits:

- DeleteAgentInvocationTargetsByMember was a GLOBAL delete by user id, so
  removing a user from workspace A also wiped their member-target grants on
  agents in workspace B. Scoped it to a single workspace by joining through
  agent.workspace_id; revokeAndRemoveMember now passes (workspaceID, userID).
- Regression test TestRevokeMember_InvocationTargetCleanupIsWorkspaceScoped:
  same user allow-listed by agents in two workspaces; removal from one leaves
  the other workspace's target intact.
- Nits: refreshed the remaining stale "originator == agent.owner_id" /
  "owner-vs-originator" comments — CreateRetryTask (agent.sql, regenerated),
  and the AgentResponse allowlist doc + ListAgents/UpdateAgent redaction
  rationale in agent.go — to the owner-connection + invocation-permission rule.

Migration 130 applied to the test DB; Go handler/service/composio suites green;
go vet clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): agent access owner-only editable, read-only for others (MUL-3963) (#4853)

* fix(agents): make agent access owner-only editable, read-only for others (MUL-3963)

Interaction bug: a non-owner (incl. workspace admin) could open the AccessPicker
and set an agent public — the backend silently ignored it and the UI bounced
back to private. Access is owner-only, so non-owners must see a read-only state
and the backend must reject real changes explicitly.

Frontend:
- AccessPicker renders a static, non-interactive read-only state when the
  viewer is not the owner: the current access value + a lock affordance + a
  tooltip "Only the agent owner can change who can run this agent." No clickable
  trigger is rendered, so a non-owner can never open a control the backend would
  reject (the GitHub/Notion pattern for permission settings you can see but not
  edit). The editable multi-select picker is unchanged for the owner.
- agent-detail-inspector gates the picker on ownership specifically
  (currentUserId === agent.owner_id), NOT the general canEdit (which also admits
  admins, who may edit other fields but not access).
- New locale key access.owner_only_readonly (en/zh-Hans/ja/ko).

Backend:
- UpdateAgent now returns an explicit 403 when a non-owner submits a REAL
  permission change (permissionInputChangesAgent compares requested mode +
  target set against the persisted state); a no-op resubmit (admin PATCH-as-PUT
  echoing unchanged permission) is still tolerated so admin edits of other
  fields keep working. Replaces the previous silent-drop that caused the bounce.

Tests:
- access-picker.test.tsx: non-owner gets a non-interactive read-only display
  with the owner-only tooltip; owner gets an interactive picker; owner can pick
  a member and stack workspace + member.
- TestUpdateAgent_AccessChangeIsOwnerOnly: admin real change → 403; admin no-op
  resubmit → 200; admin editing other fields → 200; owner change → 200.

Incidental: fixed a pre-existing base typecheck break in
slash-command-suggestion.test.tsx (stray `signal` arg not in the suggestion
items type) that otherwise fails the whole @multica/views typecheck.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

* fix(agents): compare legacy visibility, not expanded permission, for no-op detection (MUL-3963)

PR #4853 review: permissionInputChangesAgent expanded a legacy-only
visibility:"private" into a real private permission and compared it against the
agent's actual permission. A member-only public_to agent derives legacy
visibility "private", so an admin PATCH-as-PUT echoing visibility:"private"
while editing another field was misread as a public_to→private downgrade and
rejected with 403 — contradicting the "unchanged permission no-op is allowed"
contract.

Fix (per review): when a request carries ONLY legacy `visibility` (no
permission_mode / invocation_targets), derive the agent's CURRENT legacy
visibility from its real targets and compare the legacy string values. Equal =
no-op (allowed); a real legacy change (e.g. "workspace") still returns 403.
Requests that carry permission_mode / invocation_targets keep the precise
mode+target comparison.

Regression test TestUpdateAgent_LegacyVisibilityNoOpForMemberOnlyPublicTo:
member-only public_to agent — admin submitting visibility:"private" + a
non-permission field → 200 with targets unchanged; admin submitting
visibility:"workspace" → 403.

Go handler/composio suites green; migration 130 applied; go vet clean.

Refs MUL-3963.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): brief agents on connected apps

* feat(composio): gate MCP apps behind feature flag

* fix(mobile): parse agent invocation permissions

* fix(tests): update agent fixtures for access fields

---------

Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Multica Eve <eve@devv.ai>
Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: Eve <eve@multica-ai.local>
2026-07-03 14:18:43 +08:00

2864 lines
108 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/integrations/slack"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/runtimeapps"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/multica-ai/multica/server/pkg/redact"
)
// ---------------------------------------------------------------------------
// Daemon workspace ownership helpers
// ---------------------------------------------------------------------------
// requireDaemonWorkspaceAccess verifies the caller has access to the given workspace.
// For daemon tokens (mdt_), compares the token's workspace ID directly.
// For PAT/JWT fallback, verifies user membership in the workspace.
func (h *Handler) requireDaemonWorkspaceAccess(w http.ResponseWriter, r *http.Request, workspaceID string) bool {
if workspaceID == "" {
writeError(w, http.StatusNotFound, "not found")
return false
}
// Daemon token: workspace must match.
if daemonWsID := middleware.DaemonWorkspaceIDFromContext(r.Context()); daemonWsID != "" {
if daemonWsID != workspaceID {
writeError(w, http.StatusNotFound, "not found")
return false
}
return true
}
// PAT/JWT fallback: check membership cache before hitting DB.
userID := requestUserID(r)
if userID != "" {
if h.MembershipCache.Get(r.Context(), userID, workspaceID) {
return true
}
}
_, ok := h.requireWorkspaceMember(w, r, workspaceID, "not found")
if ok && userID != "" {
h.MembershipCache.Set(r.Context(), userID, workspaceID)
}
return ok
}
// requireDaemonRuntimeAccess looks up a runtime and verifies the caller owns its workspace.
//
// Only pgx.ErrNoRows is treated as a real "runtime gone" 404 — the daemon uses
// that response to drop the stale runtime from its in-memory map and re-register,
// so collapsing transient DB errors into the same 404 would force the daemon to
// self-cleanup on a hiccup. Other DB errors become 500.
func (h *Handler) requireDaemonRuntimeAccess(w http.ResponseWriter, r *http.Request, runtimeID string) (db.AgentRuntime, bool) {
runtimeUUID, ok := parseUUIDOrBadRequest(w, runtimeID, "runtime_id")
if !ok {
return db.AgentRuntime{}, false
}
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
if err != nil {
if isNotFound(err) {
writeError(w, http.StatusNotFound, "runtime not found")
return db.AgentRuntime{}, false
}
slog.Warn("get agent runtime failed", "runtime_id", runtimeID, "error", err)
writeError(w, http.StatusInternalServerError, "failed to load runtime")
return db.AgentRuntime{}, false
}
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(rt.WorkspaceID)) {
return db.AgentRuntime{}, false
}
return rt, true
}
// requireDaemonTaskAccess looks up a task and verifies the caller owns its workspace.
func (h *Handler) requireDaemonTaskAccess(w http.ResponseWriter, r *http.Request, taskID string) (db.AgentTaskQueue, bool) {
task, _, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
return task, ok
}
// requireDaemonTaskAccessWithWorkspace is the workspace-aware variant of
// requireDaemonTaskAccess. It returns the resolved workspace ID alongside
// the task row so callers that need to forward workspace_id into
// taskToResponse (powering RelativeWorkDir) don't have to repeat the
// ResolveTaskWorkspaceID lookup. The two helpers share their entire
// implementation; the simpler one is preserved for ergonomic call sites
// that genuinely don't need workspace_id.
func (h *Handler) requireDaemonTaskAccessWithWorkspace(w http.ResponseWriter, r *http.Request, taskID string) (db.AgentTaskQueue, string, bool) {
taskUUID, ok := parseUUIDOrBadRequest(w, taskID, "task_id")
if !ok {
return db.AgentTaskQueue{}, "", false
}
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
if err != nil {
// Only treat pgx.ErrNoRows as a real "task gone" signal — daemon
// uses this 404 to interrupt the running agent, so a transient DB
// error must not be reported as a deletion.
if isNotFound(err) {
writeError(w, http.StatusNotFound, "task not found")
return db.AgentTaskQueue{}, "", false
}
slog.Warn("get agent task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusInternalServerError, "failed to load task")
return db.AgentTaskQueue{}, "", false
}
wsID := h.TaskService.ResolveTaskWorkspaceID(r.Context(), task)
if wsID == "" {
writeError(w, http.StatusNotFound, "task not found")
return db.AgentTaskQueue{}, "", false
}
if !h.requireDaemonWorkspaceAccess(w, r, wsID) {
return db.AgentTaskQueue{}, "", false
}
return task, wsID, true
}
// verifyDaemonWorkspaceAccess checks workspace access without writing an HTTP error.
// Used in loops where individual items may be skipped silently.
func (h *Handler) verifyDaemonWorkspaceAccess(r *http.Request, workspaceID string) bool {
if workspaceID == "" {
return false
}
if daemonWsID := middleware.DaemonWorkspaceIDFromContext(r.Context()); daemonWsID != "" {
return daemonWsID == workspaceID
}
userID := requestUserID(r)
if userID == "" {
return false
}
if h.MembershipCache.Get(r.Context(), userID, workspaceID) {
return true
}
_, err := h.getWorkspaceMember(r.Context(), userID, workspaceID)
if err != nil {
return false
}
h.MembershipCache.Set(r.Context(), userID, workspaceID)
return true
}
// ---------------------------------------------------------------------------
// Daemon Registration & Heartbeat
// ---------------------------------------------------------------------------
type DaemonRegisterRequest struct {
WorkspaceID string `json:"workspace_id"`
DaemonID string `json:"daemon_id"`
// LegacyDaemonIDs lists prior hostname-derived daemon_ids this machine
// may have registered under before switching to a persistent UUID. The
// handler merges any matching runtime rows into the new row so agents
// and tasks keep working without manual intervention.
LegacyDaemonIDs []string `json:"legacy_daemon_ids"`
DeviceName string `json:"device_name"`
CLIVersion string `json:"cli_version"` // multica CLI version
LaunchedBy string `json:"launched_by"` // "desktop" when spawned by the Electron app
Runtimes []struct {
Name string `json:"name"`
Type string `json:"type"`
Version string `json:"version"` // agent CLI version (claude/codex)
Status string `json:"status"`
// ProfileID, when non-empty, marks this as an instance of a custom
// runtime_profile (MUL-3284). Empty = built-in runtime (legacy path).
// Type carries the protocol family for both built-in and custom rows
// so task routing (agent.New) is unchanged.
ProfileID string `json:"profile_id"`
} `json:"runtimes"`
FailedProfiles []struct {
ProfileID string `json:"profile_id"`
CommandName string `json:"command_name"`
Reason string `json:"reason"`
} `json:"failed_profiles"`
}
type daemonWorkspaceReposResponse struct {
WorkspaceID string `json:"workspace_id"`
Repos []RepoData `json:"repos"`
ReposVersion string `json:"repos_version"`
Settings json.RawMessage `json:"settings,omitempty"`
}
func normalizeWorkspaceRepos(repos []RepoData) []RepoData {
if len(repos) == 0 {
return []RepoData{}
}
normalized := make([]RepoData, 0, len(repos))
seen := make(map[string]struct{}, len(repos))
for _, repo := range repos {
url := strings.TrimSpace(repo.URL)
if url == "" {
continue
}
if _, exists := seen[url]; exists {
continue
}
seen[url] = struct{}{}
normalized = append(normalized, RepoData{URL: url, Description: repo.Description})
}
return normalized
}
func workspaceReposVersion(repos []RepoData) string {
urls := make([]string, 0, len(repos))
for _, repo := range repos {
if repo.URL == "" {
continue
}
urls = append(urls, repo.URL)
}
sort.Strings(urls)
sum := sha256.Sum256([]byte(strings.Join(urls, "\n")))
return hex.EncodeToString(sum[:])
}
func parseWorkspaceRepos(raw []byte) []RepoData {
if len(raw) == 0 {
return []RepoData{}
}
var repos []RepoData
if err := json.Unmarshal(raw, &repos); err != nil {
return []RepoData{}
}
return normalizeWorkspaceRepos(repos)
}
func workspaceReposResponse(workspaceID string, raw []byte, settingsRaw []byte) daemonWorkspaceReposResponse {
repos := parseWorkspaceRepos(raw)
resp := daemonWorkspaceReposResponse{
WorkspaceID: workspaceID,
Repos: repos,
ReposVersion: workspaceReposVersion(repos),
}
if len(settingsRaw) > 0 {
resp.Settings = json.RawMessage(settingsRaw)
}
return resp
}
// normalizeProvider canonicalizes a provider string for storage: trimmed and
// lowercased so client-side pricing lookups tolerate case drift. Returns "" for
// a blank input.
func normalizeProvider(s string) string {
return strings.ToLower(strings.TrimSpace(s))
}
func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
var req DaemonRegisterRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
req.WorkspaceID = strings.TrimSpace(req.WorkspaceID)
req.DaemonID = strings.TrimSpace(req.DaemonID)
req.DeviceName = strings.TrimSpace(req.DeviceName)
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
if len(req.Runtimes) == 0 && len(req.FailedProfiles) == 0 {
writeError(w, http.StatusBadRequest, "at least one runtime or failed profile is required")
return
}
wsUUID, ok := parseUUIDOrBadRequest(w, req.WorkspaceID, "workspace_id")
if !ok {
return
}
req.WorkspaceID = uuidToString(wsUUID)
// Verify workspace access and resolve owner.
// Daemon tokens (mdt_) prove workspace access directly; OwnerID will be zero
// (the SQL COALESCE preserves any existing owner on upsert).
// PAT/JWT tokens require a membership check and set OwnerID from the member.
var ownerID pgtype.UUID
if daemonWsID := middleware.DaemonWorkspaceIDFromContext(r.Context()); daemonWsID != "" {
if daemonWsID != req.WorkspaceID {
writeError(w, http.StatusNotFound, "workspace not found")
return
}
// ownerID stays zero — COALESCE keeps the existing owner on upsert.
} else {
member, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found")
if !ok {
return
}
ownerID = member.UserID
}
ws, err := h.Queries.GetWorkspace(r.Context(), wsUUID)
if err != nil {
writeError(w, http.StatusNotFound, "workspace not found")
return
}
resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes))
for _, runtime := range req.Runtimes {
provider := normalizeProvider(runtime.Type)
if provider == "" {
provider = "unknown"
}
name := strings.TrimSpace(runtime.Name)
if name == "" {
name = provider
if req.DeviceName != "" {
name = fmt.Sprintf("%s (%s)", provider, req.DeviceName)
}
}
deviceInfo := strings.TrimSpace(req.DeviceName)
if runtime.Version != "" && deviceInfo != "" {
deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version)
} else if runtime.Version != "" {
deviceInfo = runtime.Version
}
status := "online"
if runtime.Status == "offline" {
status = "offline"
}
metadata, _ := json.Marshal(map[string]any{
"version": runtime.Version,
"cli_version": req.CLIVersion,
"launched_by": req.LaunchedBy,
})
var registered db.AgentRuntime
var inserted bool
isCustom := strings.TrimSpace(runtime.ProfileID) != ""
if isCustom {
profileUUID, pok := parseUUIDOrBadRequest(w, strings.TrimSpace(runtime.ProfileID), "profile_id")
if !pok {
return
}
// The profile must exist in this workspace and be enabled. Trust
// the profile's stored protocol_family over the daemon-sent type so
// the provider used for task routing cannot drift from the profile.
profile, perr := h.Queries.GetRuntimeProfileForWorkspace(r.Context(), db.GetRuntimeProfileForWorkspaceParams{
ID: profileUUID,
WorkspaceID: wsUUID,
})
if perr != nil {
writeError(w, http.StatusBadRequest, "unknown runtime profile: "+runtime.ProfileID)
return
}
if !profile.Enabled {
writeError(w, http.StatusConflict, "runtime profile is disabled: "+runtime.ProfileID)
return
}
provider = profile.ProtocolFamily
prow, err := h.Queries.UpsertAgentRuntimeWithProfile(r.Context(), db.UpsertAgentRuntimeWithProfileParams{
WorkspaceID: wsUUID,
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: provider,
Status: status,
DeviceInfo: deviceInfo,
Metadata: metadata,
OwnerID: ownerID,
ProfileID: profileUUID,
})
if err != nil {
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.RuntimeFailed(
uuidToString(ownerID),
req.WorkspaceID,
req.DaemonID,
provider,
"registration_failed",
"db_error",
true,
))
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
return
}
inserted = prow.Inserted
registered = db.AgentRuntime{
ID: prow.ID,
WorkspaceID: prow.WorkspaceID,
DaemonID: prow.DaemonID,
Name: prow.Name,
RuntimeMode: prow.RuntimeMode,
Provider: prow.Provider,
Status: prow.Status,
DeviceInfo: prow.DeviceInfo,
Metadata: prow.Metadata,
LastSeenAt: prow.LastSeenAt,
CreatedAt: prow.CreatedAt,
UpdatedAt: prow.UpdatedAt,
OwnerID: prow.OwnerID,
LegacyDaemonID: prow.LegacyDaemonID,
Visibility: prow.Visibility,
ProfileID: prow.ProfileID,
}
} else {
row, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{
WorkspaceID: wsUUID,
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: provider,
Status: status,
DeviceInfo: deviceInfo,
Metadata: metadata,
OwnerID: ownerID,
})
if err != nil {
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.RuntimeFailed(
uuidToString(ownerID),
req.WorkspaceID,
req.DaemonID,
provider,
"registration_failed",
"db_error",
true,
))
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
return
}
inserted = row.Inserted
registered = db.AgentRuntime{
ID: row.ID,
WorkspaceID: row.WorkspaceID,
DaemonID: row.DaemonID,
Name: row.Name,
RuntimeMode: row.RuntimeMode,
Provider: row.Provider,
Status: row.Status,
DeviceInfo: row.DeviceInfo,
Metadata: row.Metadata,
LastSeenAt: row.LastSeenAt,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
OwnerID: row.OwnerID,
LegacyDaemonID: row.LegacyDaemonID,
Visibility: row.Visibility,
ProfileID: row.ProfileID,
}
}
// Inserted is false for normal daemon reconnects/upserts, so
// runtime_ready is a first-ready-per-runtime-row signal.
if inserted {
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.RuntimeRegistered(
uuidToString(ownerID),
req.WorkspaceID,
uuidToString(registered.ID),
req.DaemonID,
provider,
runtime.Version,
req.CLIVersion,
))
if registered.Status == "online" {
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.RuntimeReady(
uuidToString(ownerID),
req.WorkspaceID,
uuidToString(registered.ID),
req.DaemonID,
provider,
0,
))
}
}
// Seamless migration from the previous hostname-derived identity. The
// daemon sends every legacy daemon_id it may have registered under
// (e.g. "host.local", "host", "host-staging"); for each match we
// reassign agents + tasks onto the new UUID-keyed row, then delete
// the stale row so there's only ever one runtime per machine.
//
// Only built-in runtimes participate: legacy rows predate custom
// profiles, so a profile-keyed instance never has a hostname-derived
// ancestor to merge, and mergeLegacyRuntimes scopes by provider alone
// (no profile_id), which could otherwise fold a built-in row into a
// custom one of the same provider.
if !isCustom {
h.mergeLegacyRuntimes(r, registered, provider, req.LegacyDaemonIDs)
}
resp = append(resp, runtimeToResponse(registered))
}
for _, failed := range req.FailedProfiles {
profileID := strings.TrimSpace(failed.ProfileID)
if profileID == "" {
continue
}
profileUUID, pok := parseUUIDOrBadRequest(w, profileID, "profile_id")
if !pok {
return
}
profile, perr := h.Queries.GetRuntimeProfileForWorkspace(r.Context(), db.GetRuntimeProfileForWorkspaceParams{
ID: profileUUID,
WorkspaceID: wsUUID,
})
if perr != nil || !profile.Enabled {
continue
}
name := profile.DisplayName
if req.DeviceName != "" {
name = fmt.Sprintf("%s (%s)", name, req.DeviceName)
}
deviceInfo := strings.TrimSpace(req.DeviceName)
reason := strings.TrimSpace(failed.Reason)
if reason == "" {
reason = "custom runtime command could not be resolved"
}
commandName := strings.TrimSpace(failed.CommandName)
if commandName == "" {
commandName = profile.CommandName
}
metadata, _ := json.Marshal(map[string]any{
"version": "",
"cli_version": req.CLIVersion,
"launched_by": req.LaunchedBy,
"runtime_profile_registration_error": true,
"runtime_profile_failure_reason": reason,
"command_name": commandName,
})
if _, err := h.Queries.UpsertAgentRuntimeWithProfile(r.Context(), db.UpsertAgentRuntimeWithProfileParams{
WorkspaceID: wsUUID,
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: profile.ProtocolFamily,
Status: "offline",
DeviceInfo: deviceInfo,
Metadata: metadata,
OwnerID: ownerID,
ProfileID: profileUUID,
}); err != nil {
slog.Warn("failed to record runtime profile registration failure",
"workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID,
"profile_id", profileID, "error", err)
}
}
slog.Info("daemon registered", "workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID, "runtimes_count", len(resp))
h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{
"runtimes": resp,
})
repoResp := workspaceReposResponse(req.WorkspaceID, ws.Repos, ws.Settings)
writeJSON(w, http.StatusOK, map[string]any{
"runtimes": resp,
"repos": repoResp.Repos,
"repos_version": repoResp.ReposVersion,
"settings": repoResp.Settings,
})
}
// mergeLegacyRuntimes folds every runtime row keyed on a prior hostname-derived
// daemon_id into the newly registered UUID-keyed row. For each legacy id the
// lookup is case-insensitive and returns *all* matching rows — case-only drift
// may have already minted duplicates historically (e.g. `Foo.local` AND
// `foo.local` coexisting), and we need to consolidate every one of them, not
// just the first. Per match we reassign agents and tasks, record the legacy
// id on the new row for audit, then delete the stale row.
//
// Scoping by (workspace_id, provider) is sufficient since provider is single-
// runtime-per-daemon; `unique (workspace_id, daemon_id, provider)` prevents
// any two *exact* matches but the `LOWER(...)` comparison crosses that bound
// precisely when case-duplicate rows exist — which is the bug we're fixing.
// We also dedupe across legacy ids so overlapping candidates (e.g. `foo` and
// `foo.local` both resolving to the same stored row) don't double-process.
func (h *Handler) mergeLegacyRuntimes(r *http.Request, registered db.AgentRuntime, provider string, legacyIDs []string) {
newID := uuidToString(registered.ID)
merged := make(map[string]struct{})
for _, legacyID := range legacyIDs {
legacyID = strings.TrimSpace(legacyID)
if legacyID == "" {
continue
}
matches, err := h.Queries.FindLegacyRuntimesByDaemonID(r.Context(), db.FindLegacyRuntimesByDaemonIDParams{
WorkspaceID: registered.WorkspaceID,
Provider: provider,
DaemonID: legacyID,
})
if err != nil {
slog.Warn("legacy runtime merge: lookup failed", "legacy_daemon_id", legacyID, "error", err)
continue
}
for _, old := range matches {
oldID := uuidToString(old.ID)
if oldID == newID {
continue
}
if _, seen := merged[oldID]; seen {
continue
}
merged[oldID] = struct{}{}
agents, err := h.Queries.ReassignAgentsToRuntime(r.Context(), db.ReassignAgentsToRuntimeParams{
NewRuntimeID: registered.ID,
OldRuntimeID: old.ID,
})
if err != nil {
slog.Warn("legacy runtime merge: reassign agents failed", "legacy_daemon_id", legacyID, "old_runtime_id", oldID, "new_runtime_id", newID, "error", err)
continue
}
tasks, err := h.Queries.ReassignTasksToRuntime(r.Context(), db.ReassignTasksToRuntimeParams{
NewRuntimeID: registered.ID,
OldRuntimeID: old.ID,
})
if err != nil {
slog.Warn("legacy runtime merge: reassign tasks failed", "legacy_daemon_id", legacyID, "old_runtime_id", oldID, "new_runtime_id", newID, "error", err)
continue
}
if err := h.Queries.RecordRuntimeLegacyDaemonID(r.Context(), db.RecordRuntimeLegacyDaemonIDParams{
ID: registered.ID,
LegacyDaemonID: strToText(legacyID),
}); err != nil {
slog.Warn("legacy runtime merge: record legacy daemon_id failed", "legacy_daemon_id", legacyID, "error", err)
}
if err := h.Queries.DeleteAgentRuntime(r.Context(), old.ID); err != nil {
slog.Warn("legacy runtime merge: delete old runtime failed", "old_runtime_id", oldID, "error", err)
continue
}
slog.Info("legacy runtime merged",
"legacy_daemon_id", legacyID,
"old_runtime_id", oldID,
"new_runtime_id", newID,
"provider", provider,
"agents_reassigned", agents,
"tasks_reassigned", tasks,
)
}
}
}
func (h *Handler) GetDaemonWorkspaceRepos(w http.ResponseWriter, r *http.Request) {
workspaceID := strings.TrimSpace(chi.URLParam(r, "workspaceId"))
if !h.requireDaemonWorkspaceAccess(w, r, workspaceID) {
return
}
ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusNotFound, "workspace not found")
return
}
writeJSON(w, http.StatusOK, workspaceReposResponse(workspaceID, ws.Repos, ws.Settings))
}
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
var req struct {
RuntimeIDs []string `json:"runtime_ids"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.RuntimeIDs) == 0 {
writeError(w, http.StatusBadRequest, "runtime_ids is required")
return
}
runtimeUUIDs, ok := parseUUIDSliceOrBadRequest(w, req.RuntimeIDs, "runtime_ids")
if !ok {
return
}
// Track affected workspaces for WS notifications.
affectedWorkspaces := make(map[string]bool)
for i, rid := range req.RuntimeIDs {
// Look up the runtime and verify ownership.
rt, err := h.Queries.GetAgentRuntime(r.Context(), runtimeUUIDs[i])
if err != nil {
slog.Warn("deregister: runtime not found", "runtime_id", rid, "error", err)
continue
}
wsID := uuidToString(rt.WorkspaceID)
if !h.verifyDaemonWorkspaceAccess(r, wsID) {
slog.Warn("deregister: workspace mismatch", "runtime_id", rid)
continue
}
if err := h.Queries.SetAgentRuntimeOffline(r.Context(), rt.ID); err != nil {
slog.Warn("deregister: failed to set offline", "runtime_id", rid, "error", err)
continue
}
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.RuntimeOffline(
uuidToString(rt.OwnerID),
wsID,
uuidToString(rt.ID),
rt.DaemonID.String,
rt.Provider,
))
affectedWorkspaces[wsID] = true
}
// Notify frontend clients so they re-fetch runtime list.
for wsID := range affectedWorkspaces {
h.publish(protocol.EventDaemonRegister, wsID, "system", "", map[string]any{
"action": "deregister",
})
}
slog.Info("daemon deregistered", "runtime_ids", req.RuntimeIDs)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
type DaemonHeartbeatRequest struct {
RuntimeID string `json:"runtime_id"`
SupportsBatchImport bool `json:"supports_batch_import,omitempty"`
}
// heartbeatHasPendingTimeout bounds the cheap HasPending probe on the
// heartbeat hot path. Probes are read-only (ZCARD in Redis) so a timeout is
// ack-safe: the worst case is "we didn't find out if anything was queued this
// tick" and the next heartbeat (default 15s later) will try again.
//
// PopPending is deliberately NOT bounded this way — its Redis implementation
// runs a Lua claim script whose ZREM + SET-running side effects cannot be
// cleanly un-run from the client side if the context expires mid-script. We
// therefore only invoke PopPending after HasPending confirms there is work
// to claim, so we never start a claim we might have to abort.
const heartbeatHasPendingTimeout = 1 * time.Second
// maxLocalSkillImportBatch is how many pending import requests the heartbeat
// handler pops per cycle. Higher values let the daemon process more imports
// in parallel but increase per-heartbeat latency.
//
// Timeout invariant: IMPORT_CONCURRENCY (views/.../runtime-local-skill-import-panel.tsx)
// × heartbeat period (~15s) must stay within runtimeLocalSkillPendingTimeout
// (runtime_local_skills.go), and IMPORT_POLL_TIMEOUT_MS (core/runtimes/local-skills.ts)
// must exceed pendingTimeout + runningTimeout.
const maxLocalSkillImportBatch = 10
// runtimeLivenessTTL is how long a Redis liveness record stays valid before
// expiring. The daemon refreshes it every heartbeat (~15s), so this just
// needs to be a few heartbeats long — the value (90s) tolerates ~6 missed
// beats before Redis declares the runtime dead.
//
// It is intentionally shorter than the sweeper's stale threshold (150s in
// cmd/server/runtime_sweeper.go). That ordering is safe and desirable:
// Redis can declare a runtime dead before the DB stale window opens, and
// the sweeper will simply ignore it until the DB column also crosses the
// threshold. The unsafe direction would be the opposite (Redis claiming
// "alive" past the DB stale window, masking a truly dead runtime when the
// sweeper consults Redis as the source of truth) — that cannot happen here.
const runtimeLivenessTTL = 90 * time.Second
// runtimeHeartbeatDBFlushInterval is the maximum staleness we tolerate on
// agent_runtime.last_seen_at while Redis is the active liveness source. When
// last_seen_at gets older than this, the heartbeat path schedules a DB write
// so (a) the UI's "last seen" display stays bounded and (b) the sweeper's
// DB-only fallback path (used when an IsAliveBatch call to Redis errors) does
// not false-positive on alive-but-Redis-only runtimes.
//
// Load-bearing invariant: this must be strictly less than the sweeper's
// stale threshold (150s in cmd/server/runtime_sweeper.go) MINUS one daemon
// heartbeat cycle (~15s) MINUS the BatchedHeartbeatScheduler tick interval
// (~30s). Worst-case DB age for an alive runtime is therefore bounded by
// flush + heartbeat + batchTick = 60 + 15 + 30 = 105s, leaving a 45s buffer
// below the 150s stale window. If you tune any of these constants, recompute
// the chain and keep at least a one-tick buffer.
//
// We intentionally keep the per-runtime flush throttle at 60s (rather than
// pushing it higher) so a crashed runtime is detected within ~150s instead
// of ~10 minutes. The bulk of the DB-pressure win comes from batched
// coalescing in HeartbeatScheduler — at 70 online runtimes that collapses
// ~17 single-row UPDATE/s into ~0.03 bulk UPDATE/s (one per batch tick),
// independent of how the per-runtime throttle is tuned.
const runtimeHeartbeatDBFlushInterval = 60 * time.Second
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
start := time.Now()
authPath := middleware.DaemonAuthPathFromContext(r.Context())
var (
outcome = "unauth"
runtimeID string
decodeMs, runtimeLookupMs, workspaceCheckMs int64
authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64
probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool
)
defer func() {
logHeartbeatEndpointSlow(runtimeID, outcome, authPath, start, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut)
}()
decodeStart := time.Now()
var req DaemonHeartbeatRequest
decodeErr := json.NewDecoder(r.Body).Decode(&req)
decodeMs = time.Since(decodeStart).Milliseconds()
if decodeErr != nil {
outcome = "bad_body"
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.RuntimeID == "" {
outcome = "missing_runtime_id"
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
runtimeID = req.RuntimeID
// Inlined and instrumented version of requireDaemonRuntimeAccess so we
// can attribute the runtime-lookup and workspace-check sub-stages
// independently in slow-logs. Together with the auth_path label set by
// DaemonAuth middleware, this lets us tell whether prod heartbeat tail
// latency is in pgx pool acquisition (runtime_lookup_ms), in the PAT
// fallback workspace-membership query (workspace_check_ms), or upstream.
runtimeUUID, ok := parseUUIDOrBadRequest(w, req.RuntimeID, "runtime_id")
if !ok {
outcome = "bad_runtime_id"
return
}
lookupStart := time.Now()
rt, lookupErr := h.Queries.GetAgentRuntime(r.Context(), runtimeUUID)
runtimeLookupMs = time.Since(lookupStart).Milliseconds()
if lookupErr != nil {
// Only pgx.ErrNoRows means the runtime row is gone. Daemon reads this
// 404 as a signal to drop the stale runtime locally; treating a
// transient DB error the same way would force daemons to self-cleanup
// on a hiccup.
if isNotFound(lookupErr) {
outcome = "runtime_not_found"
writeError(w, http.StatusNotFound, "runtime not found")
return
}
outcome = "runtime_lookup_error"
slog.Warn("get agent runtime failed", "runtime_id", req.RuntimeID, "error", lookupErr)
writeError(w, http.StatusInternalServerError, "failed to load runtime")
return
}
wsCheckStart := time.Now()
wsOK := h.requireDaemonWorkspaceAccess(w, r, uuidToString(rt.WorkspaceID))
workspaceCheckMs = time.Since(wsCheckStart).Milliseconds()
if !wsOK {
outcome = "workspace_denied"
return
}
authMs = time.Since(start).Milliseconds()
ack, m, err := h.processHeartbeat(r.Context(), rt, req.SupportsBatchImport)
updateMs = m.UpdateMs
probeModelMs = m.ProbeModelMs
popModelMs = m.PopModelMs
probeSkillsMs = m.ProbeSkillsMs
popSkillsMs = m.PopSkillsMs
probeImportMs = m.ProbeImportMs
popImportMs = m.PopImportMs
probeModelTimedOut = m.ProbeModelTimedOut
probeSkillsTimedOut = m.ProbeSkillsTimedOut
probeImportTimedOut = m.ProbeImportTimedOut
if err != nil {
outcome = "error_update"
writeError(w, http.StatusInternalServerError, "heartbeat failed")
return
}
outcome = "ok"
// Preserve the existing HTTP response shape: the runtime_id field is new
// in the WS path and would be redundant noise on the HTTP path where the
// caller already knows which runtime it asked about.
resp := map[string]any{"status": ack.Status}
if ack.PendingUpdate != nil {
resp["pending_update"] = ack.PendingUpdate
}
if ack.PendingModelList != nil {
resp["pending_model_list"] = ack.PendingModelList
}
if ack.PendingLocalSkills != nil {
resp["pending_local_skills"] = ack.PendingLocalSkills
}
if ack.PendingLocalSkillImport != nil {
resp["pending_local_skill_import"] = ack.PendingLocalSkillImport
}
if len(ack.PendingLocalSkillImports) > 0 {
resp["pending_local_skill_imports"] = ack.PendingLocalSkillImports
}
if ack.FeatureFlags != nil {
resp["feature_flags"] = ack.FeatureFlags
}
writeJSON(w, http.StatusOK, resp)
}
// HandleDaemonWSHeartbeat is the daemonws.HeartbeatHandler entry point: it
// resolves the runtime, verifies the connection's workspace owns it, and
// returns the ack payload. It is the WebSocket-side mirror of DaemonHeartbeat.
//
// Workspace authorization is re-checked on every heartbeat instead of trusted
// from the upgrade-time check because runtime ownership can change (e.g. a
// runtime is reassigned to another workspace mid-connection).
//
// When the runtime row is missing (pgx.ErrNoRows), the function returns a
// successful ack with Status=HeartbeatStatusRuntimeGone and RuntimeGone=true
// instead of an error. That keeps the hub from logging every beat at Warn,
// and tells the daemon to drop the stale runtime and re-register. Other DB
// errors still propagate as errors so they keep their existing Warn logging
// and the daemon does not mistake a hiccup for a deletion.
func (h *Handler) HandleDaemonWSHeartbeat(ctx context.Context, identity daemonws.ClientIdentity, runtimeID string, supportsBatchImport bool) (*protocol.DaemonHeartbeatAckPayload, error) {
runtimeUUID, err := util.ParseUUID(runtimeID)
if err != nil {
return nil, fmt.Errorf("invalid runtime_id: %w", err)
}
rt, err := h.Queries.GetAgentRuntime(ctx, runtimeUUID)
if err != nil {
if isNotFound(err) {
return &protocol.DaemonHeartbeatAckPayload{
RuntimeID: runtimeID,
Status: protocol.HeartbeatStatusRuntimeGone,
RuntimeGone: true,
}, nil
}
return nil, fmt.Errorf("get agent runtime: %w", err)
}
if !identity.AllowsWorkspace(uuidToString(rt.WorkspaceID)) {
return nil, fmt.Errorf("runtime not in connection workspace")
}
ack, _, err := h.processHeartbeat(ctx, rt, supportsBatchImport)
return ack, err
}
// recordHeartbeat marks the runtime as alive. When LivenessStore is available
// (Redis configured and reachable) it writes a TTL'd liveness key and skips
// the DB row write on most beats — the DB is only updated on the
// offline→online transition or once per runtimeHeartbeatDBFlushInterval to
// keep last_seen_at fresh enough for the UI and the DB-fallback sweeper.
//
// When LivenessStore is unavailable (no Redis configured) or any Touch call
// errors, recordHeartbeat falls back to writing the DB on every beat — that
// is the original behavior and keeps the sweeper's DB-only path correct.
//
// The actual DB write is delegated to h.HeartbeatScheduler so production can
// coalesce many runtimes' bumps into one bulk UPDATE per tick. See
// heartbeat_scheduler.go for the two implementations.
func (h *Handler) recordHeartbeat(ctx context.Context, rt db.AgentRuntime) error {
now := time.Now()
// Decide whether the DB row needs a write *before* touching Redis, so a
// Touch failure can simply force needDBWrite=true without re-evaluating
// the structural reasons.
needDBWrite := !h.LivenessStore.Available() ||
rt.Status != "online" ||
!rt.LastSeenAt.Valid ||
now.Sub(rt.LastSeenAt.Time) >= runtimeHeartbeatDBFlushInterval
if h.LivenessStore.Available() {
if err := h.LivenessStore.Touch(ctx, uuidToString(rt.ID), runtimeLivenessTTL); err != nil {
// Redis hiccup: degrade transparently to the DB-only path for
// this beat. The sweeper falls back to its DB threshold the
// same way when IsAliveBatch fails, so end-to-end correctness
// is preserved.
slog.Warn("liveness touch failed; falling back to DB heartbeat",
"runtime_id", uuidToString(rt.ID), "error", err)
needDBWrite = true
}
}
if !needDBWrite {
return nil
}
// Either bumps last_seen_at on an already-online row (Touch + race
// fallback) or flips status from offline to online. The scheduler
// chooses sync vs batched per case; see HeartbeatScheduler doc.
return h.HeartbeatScheduler.Schedule(ctx, rt)
}
// heartbeatMetrics carries per-stage timings out of processHeartbeat so the
// HTTP slow-log can stay structured. The WS path discards them.
type heartbeatMetrics struct {
UpdateMs, ProbeModelMs, PopModelMs, ProbeSkillsMs, PopSkillsMs, ProbeImportMs, PopImportMs int64
ProbeModelTimedOut, ProbeSkillsTimedOut, ProbeImportTimedOut bool
}
// processHeartbeat does the work shared by HTTP POST /api/daemon/heartbeat and
// the WebSocket daemon:heartbeat path: records liveness and pulls any pending
// actions queued for the runtime. Auth and request decoding live in the
// caller because they differ between transports.
func (h *Handler) processHeartbeat(ctx context.Context, rt db.AgentRuntime, supportsBatchImport bool) (*protocol.DaemonHeartbeatAckPayload, heartbeatMetrics, error) {
var m heartbeatMetrics
runtimeID := uuidToString(rt.ID)
updateStart := time.Now()
if err := h.recordHeartbeat(ctx, rt); err != nil {
m.UpdateMs = time.Since(updateStart).Milliseconds()
return nil, m, err
}
m.UpdateMs = time.Since(updateStart).Milliseconds()
slog.Debug("daemon heartbeat", "runtime_id", runtimeID)
ack := &protocol.DaemonHeartbeatAckPayload{
RuntimeID: runtimeID,
Status: "ok",
}
if h.DaemonFeatureFlags != nil {
ack.FeatureFlags = h.DaemonFeatureFlags.EvaluateForRuntime(ctx, rt)
}
probeUpdateCtx, cancelProbeUpdate := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
hasUpdate, probeUpdateErr := h.UpdateStore.HasPending(probeUpdateCtx, runtimeID)
cancelProbeUpdate()
switch {
case probeUpdateErr == nil && hasUpdate:
pending, popUpdateErr := h.UpdateStore.PopPending(ctx, runtimeID)
if popUpdateErr != nil {
slog.Warn("update PopPending failed", "error", popUpdateErr, "runtime_id", runtimeID)
} else if pending != nil {
ack.PendingUpdate = &protocol.DaemonHeartbeatPendingUpdate{
ID: pending.ID,
TargetVersion: pending.TargetVersion,
}
}
case probeUpdateErr != nil:
if errors.Is(probeUpdateErr, context.DeadlineExceeded) || errors.Is(probeUpdateErr, context.Canceled) {
slog.Warn("update HasPending timed out", "runtime_id", runtimeID)
} else {
slog.Warn("update HasPending failed", "error", probeUpdateErr, "runtime_id", runtimeID)
}
}
// Probe then claim the model list queue. Same pattern as the local-skill
// queues below — a slow shared store cannot stall the heartbeat on
// empty-queue ticks, but the claim itself runs unbounded because its
// Lua side effects cannot be safely aborted mid-script.
probeModelStart := time.Now()
probeModelCtx, cancelProbeModel := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
hasModel, probeModelErr := h.ModelListStore.HasPending(probeModelCtx, runtimeID)
cancelProbeModel()
m.ProbeModelMs = time.Since(probeModelStart).Milliseconds()
switch {
case probeModelErr == nil && hasModel:
popStart := time.Now()
pendingModel, popErr := h.ModelListStore.PopPending(ctx, runtimeID)
m.PopModelMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("model list PopPending failed", "error", popErr, "runtime_id", runtimeID)
} else if pendingModel != nil {
ack.PendingModelList = &protocol.DaemonHeartbeatPendingModelList{ID: pendingModel.ID}
}
case probeModelErr != nil:
if errors.Is(probeModelErr, context.DeadlineExceeded) || errors.Is(probeModelErr, context.Canceled) {
m.ProbeModelTimedOut = true
slog.Warn("model list HasPending timed out", "runtime_id", runtimeID, "elapsed_ms", m.ProbeModelMs)
} else {
slog.Warn("model list HasPending failed", "error", probeModelErr, "runtime_id", runtimeID)
}
}
// Probe then claim the local-skill list queue. The probe is bounded so a
// slow shared store cannot stall the heartbeat on empty-queue ticks; the
// claim runs unbounded (it inherits only ctx) because its Lua side
// effects cannot be safely aborted mid-script.
probeSkillsStart := time.Now()
probeSkillsCtx, cancelProbeSkills := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
hasSkills, probeErr := h.LocalSkillListStore.HasPending(probeSkillsCtx, runtimeID)
cancelProbeSkills()
m.ProbeSkillsMs = time.Since(probeSkillsStart).Milliseconds()
switch {
case probeErr == nil && hasSkills:
popStart := time.Now()
pendingSkills, popErr := h.LocalSkillListStore.PopPending(ctx, runtimeID)
m.PopSkillsMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("local skill list PopPending failed", "error", popErr, "runtime_id", runtimeID)
} else if pendingSkills != nil {
ack.PendingLocalSkills = &protocol.DaemonHeartbeatPendingLocalSkills{ID: pendingSkills.ID}
}
case probeErr != nil:
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
m.ProbeSkillsTimedOut = true
slog.Warn("local skill list HasPending timed out", "runtime_id", runtimeID, "elapsed_ms", m.ProbeSkillsMs)
} else {
slog.Warn("local skill list HasPending failed", "error", probeErr, "runtime_id", runtimeID)
}
}
probeImportStart := time.Now()
probeImportCtx, cancelProbeImport := context.WithTimeout(ctx, heartbeatHasPendingTimeout)
hasImport, probeErr := h.LocalSkillImportStore.HasPending(probeImportCtx, runtimeID)
cancelProbeImport()
m.ProbeImportMs = time.Since(probeImportStart).Milliseconds()
switch {
case probeErr == nil && hasImport:
popStart := time.Now()
if supportsBatchImport {
pendingImports, popErr := h.LocalSkillImportStore.PopPendingBatch(ctx, runtimeID, maxLocalSkillImportBatch)
m.PopImportMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("local skill import PopPendingBatch failed", "error", popErr, "runtime_id", runtimeID, "claimed", len(pendingImports))
}
// Always dispatch whatever was claimed — even on partial
// failure the claimed requests have already transitioned to
// running in the store. Dropping them here would leave them
// stranded until the running timeout.
if len(pendingImports) > 0 {
// Backwards compat: singular field carries the first item so
// old daemons that don't know the plural field still get one.
ack.PendingLocalSkillImport = &protocol.DaemonHeartbeatPendingLocalSkillImport{
ID: pendingImports[0].ID,
SkillKey: pendingImports[0].SkillKey,
}
batch := make([]protocol.DaemonHeartbeatPendingLocalSkillImport, 0, len(pendingImports))
for _, p := range pendingImports {
batch = append(batch, protocol.DaemonHeartbeatPendingLocalSkillImport{
ID: p.ID,
SkillKey: p.SkillKey,
})
}
ack.PendingLocalSkillImports = batch
}
} else {
pendingImport, popErr := h.LocalSkillImportStore.PopPending(ctx, runtimeID)
m.PopImportMs = time.Since(popStart).Milliseconds()
if popErr != nil {
slog.Warn("local skill import PopPending failed", "error", popErr, "runtime_id", runtimeID)
} else if pendingImport != nil {
ack.PendingLocalSkillImport = &protocol.DaemonHeartbeatPendingLocalSkillImport{
ID: pendingImport.ID,
SkillKey: pendingImport.SkillKey,
}
}
}
case probeErr != nil:
if errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) {
m.ProbeImportTimedOut = true
slog.Warn("local skill import HasPending timed out", "runtime_id", runtimeID, "elapsed_ms", m.ProbeImportMs)
} else {
slog.Warn("local skill import HasPending failed", "error", probeErr, "runtime_id", runtimeID)
}
}
return ack, m, nil
}
// logHeartbeatEndpointSlow emits one structured log when /api/daemon/heartbeat
// exceeds 500ms, splitting auth / update / probe / pop phases for both queues
// so the prod tail can be attributed without flooding logs at normal rates.
// auth_ms is further decomposed into decode_ms, runtime_lookup_ms, and
// workspace_check_ms; auth_path labels which token kind authenticated the
// request ("daemon_token", "pat", or "jwt"). Mirrors logClaimEndpointSlow.
func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Time, decodeMs, runtimeLookupMs, workspaceCheckMs, authMs, updateMs, probeModelMs, popModelMs, probeSkillsMs, popSkillsMs, probeImportMs, popImportMs int64, probeModelTimedOut, probeSkillsTimedOut, probeImportTimedOut bool) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 500 && !probeModelTimedOut && !probeSkillsTimedOut && !probeImportTimedOut {
return
}
slog.Info("heartbeat_endpoint slow",
"runtime_id", runtimeID,
"outcome", outcome,
"auth_path", authPath,
"total_ms", totalMs,
"auth_ms", authMs,
"decode_ms", decodeMs,
"runtime_lookup_ms", runtimeLookupMs,
"workspace_check_ms", workspaceCheckMs,
"update_ms", updateMs,
"probe_model_ms", probeModelMs,
"pop_model_ms", popModelMs,
"probe_skills_ms", probeSkillsMs,
"pop_skills_ms", popSkillsMs,
"probe_import_ms", probeImportMs,
"pop_import_ms", popImportMs,
"probe_model_timed_out", probeModelTimedOut,
"probe_skills_timed_out", probeSkillsTimedOut,
"probe_import_timed_out", probeImportTimedOut,
)
}
// logClaimEndpointSlow emits one structured log when the /tasks/claim endpoint
// exceeds 500ms, splitting auth / claim / response-build phases so the prod
// tail can be diagnosed without flooding logs at normal poll rates.
func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, claimMs, buildMs int64, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes int) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 500 {
return
}
slog.Info("claim_endpoint slow",
"runtime_id", runtimeID,
"outcome", outcome,
"total_ms", totalMs,
"auth_ms", authMs,
"claim_ms", claimMs,
"build_ms", buildMs,
"payload_bytes", payloadBytes,
"agent_skill_count", agentSkillCount,
"builtin_skill_count", builtinSkillCount,
"skill_payload_bytes", skillPayloadBytes,
)
}
func requestHasDaemonCapability(r *http.Request, capability string) bool {
for _, part := range strings.Split(r.Header.Get("X-Client-Capabilities"), ",") {
if strings.TrimSpace(part) == capability {
return true
}
}
return false
}
func parseRuntimeConnectedAppsForClaim(raw []byte, taskID pgtype.UUID) []runtimeapps.ConnectedApp {
raw = bytes.TrimSpace(raw)
if len(raw) == 0 || bytes.Equal(raw, []byte("null")) {
return nil
}
var apps []runtimeapps.ConnectedApp
if err := json.Unmarshal(raw, &apps); err != nil {
slog.Warn("daemon claim: unmarshal runtime_connected_apps failed",
"task_id", uuidToString(taskID),
"error", err,
)
return nil
}
return apps
}
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.
// The response includes the agent's name and skills, fetched fresh from the DB.
func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
start := time.Now()
var (
outcome = "unauth"
authMs, claimMs, buildMs int64
payloadBytes int
agentSkillCount int
builtinSkillCount int
skillPayloadBytes int
buildStart time.Time
)
defer func() {
// Emit at function exit so error / unauth paths also carry timing.
// build_ms is computed from buildStart only when we entered the
// response-build phase (otherwise stays 0).
if !buildStart.IsZero() {
buildMs = time.Since(buildStart).Milliseconds()
}
logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes)
}()
// Verify the caller owns this runtime's workspace. The runtime's
// workspace_id is the authoritative value a claimed task must match
// below — a task whose resolved workspace doesn't equal this runtime's
// workspace is rejected even if it was enqueued against this
// runtime_id (defense-in-depth against upstream routing bugs).
runtime, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
if !ok {
return
}
runtimeWorkspaceID := uuidToString(runtime.WorkspaceID)
authMs = time.Since(start).Milliseconds()
claimStart := time.Now()
task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID))
claimMs = time.Since(claimStart).Milliseconds()
if err != nil {
outcome = "error_claim"
writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error())
return
}
if task == nil {
slog.Debug("no task to claim", "runtime_id", runtimeID)
payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": nil})
outcome = "no_task"
return
}
outcome = "claimed"
buildStart = time.Now()
// Build response with fresh agent data (name + skills + custom_env + custom_args).
resp := taskToResponse(*task, runtimeWorkspaceID)
composioMCPEnabled := h.composioMCPAppsEnabled(r.Context())
if composioMCPEnabled {
resp.ConnectedApps = parseRuntimeConnectedAppsForClaim(task.RuntimeConnectedApps, task.ID)
}
if agent, err := h.Queries.GetAgent(r.Context(), task.AgentID); err == nil {
useSkillRefs := requestHasDaemonCapability(r, protocol.DaemonCapabilitySkillBundlesV1)
var customEnv map[string]string
if agent.CustomEnv != nil {
if err := json.Unmarshal(agent.CustomEnv, &customEnv); err != nil {
slog.Warn("failed to unmarshal agent custom_env", "agent_id", uuidToString(agent.ID), "error", err)
}
}
var customArgs []string
if agent.CustomArgs != nil {
if err := json.Unmarshal(agent.CustomArgs, &customArgs); err != nil {
slog.Warn("failed to unmarshal agent custom_args", "agent_id", uuidToString(agent.ID), "error", err)
}
}
var mcpConfig json.RawMessage
if agent.McpConfig != nil {
mcpConfig = json.RawMessage(agent.McpConfig)
}
// Layer the per-task overlay (set at enqueue from the initiator
// user's active integrations — currently Composio) on top of the
// agent's saved mcp_config. Overlay wins on server-name collisions
// because it carries the live user-scoped session URL. Errors are
// logged but never fail the claim: a broken overlay must not prevent
// the agent from running with its base config.
if composioMCPEnabled && len(task.RuntimeMcpOverlay) > 0 {
if merged, err := mergeMCPOverlay(mcpConfig, json.RawMessage(task.RuntimeMcpOverlay)); err != nil {
slog.Warn("daemon claim: merge runtime_mcp_overlay failed; falling back to agent mcp_config", "task_id", uuidToString(task.ID), "error", err)
} else {
mcpConfig = merged
}
}
// runtime_config is stored as JSONB and may legitimately be the
// empty object `{}` for agents that haven't opted into any
// provider-specific tuning. Forward only non-empty payloads so the
// daemon's per-provider decoders treat absent-or-empty identically.
var runtimeConfig json.RawMessage
if rc := bytes.TrimSpace(agent.RuntimeConfig); len(rc) > 0 && !bytes.Equal(rc, []byte("{}")) && !bytes.Equal(rc, []byte("null")) {
runtimeConfig = json.RawMessage(agent.RuntimeConfig)
}
resp.Agent = &TaskAgentData{
ID: uuidToString(agent.ID),
Name: agent.Name,
Instructions: agent.Instructions,
CustomEnv: customEnv,
CustomArgs: customArgs,
McpConfig: mcpConfig,
Model: agent.Model.String,
ThinkingLevel: agent.ThinkingLevel.String,
RuntimeConfig: runtimeConfig,
}
if useSkillRefs {
_, skillRefs := h.TaskService.LoadAgentSkillBundles(r.Context(), task.AgentID)
agentSkillCount = len(skillRefs)
resp.Agent.SkillRefs = skillRefs
} else {
skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID)
agentSkillCount = len(skills)
builtinSkills := h.TaskService.BuiltinSkills()
builtinSkillCount = len(builtinSkills)
skills = append(skills, builtinSkills...)
resp.Agent.Skills = skills
}
}
// Resolve the runtime owner's profile description so the daemon can
// inject "## Requesting User" into the brief. Empty fields short-circuit
// the heading entirely on the daemon side; cloud / system runtimes with
// no owner stay anonymous. Failure here must not block claim — the agent
// can still run without the user-context section.
if runtime.OwnerID.Valid {
if owner, err := h.Queries.GetUser(r.Context(), runtime.OwnerID); err == nil {
resp.RequestingUserName = owner.Name
resp.RequestingUserProfileDescription = owner.ProfileDescription
} else {
slog.Debug("failed to load runtime owner for brief injection",
"runtime_id", runtimeID,
"owner_id", uuidToString(runtime.OwnerID),
"error", err,
)
}
}
// Stored task initiator: chat tasks persist the real message sender at
// enqueue time (web: request user; Lark: inbound sender — NOT the chat
// session creator, which for Lark groups is the installer). When set, it is
// the authoritative initiator for this run; resolve the live name/email so
// the daemon can render `## Task Initiator`. Comment-triggered tasks instead
// resolve their initiator from the triggering comment's author below; the
// two paths are mutually exclusive (a task is either chat or issue-bound).
// See MUL-2645.
if task.InitiatorUserID.Valid {
resp.InitiatorType = "member"
resp.InitiatorID = uuidToString(task.InitiatorUserID)
if u, err := h.Queries.GetUser(r.Context(), task.InitiatorUserID); err == nil {
resp.InitiatorName = u.Name
resp.InitiatorEmail = u.Email
}
}
// Include workspace ID and repos so the daemon can set up worktrees.
//
// Repo precedence: project-bound github_repo resources override workspace
// repos when present. Mixing both would just confuse the agent — if a
// project explicitly attached its repos, those are the authoritative set
// for issues inside that project. When the project has no github_repo
// resources (or no project at all), we fall back to the workspace repos.
if task.IssueID.Valid {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
resp.WorkspaceID = uuidToString(issue.WorkspaceID)
resp.ThreadName = issue.Title
// Squad-leader briefing injection: keyed off the task being a
// leader-task (is_leader_task) carrying a squad_id — NOT off the
// issue being assigned to a squad. The task flag is stamped at
// enqueue time and is true for every ISSUE-BOUND path that routes
// work to a squad leader: direct assign-to-squad, comment
// @squad-mention (even when the issue itself is assigned to a
// plain agent — the MUL-3724 case), sub-issue done callback,
// autopilot squad-assignee, and retry-clone inheritance. The old
// issue.AssigneeType=="squad" gate missed the comment-mention
// path, so the leader booted with zero squad context and
// degraded into doing the work itself instead of orchestrating.
//
// NOTE: quick-create tasks do NOT reach this block — they have a
// NULL issue_id (so the enclosing `task.IssueID.Valid` is false)
// and do NOT carry is_leader_task / squad_id columns. They route
// their squad through the task CONTEXT JSON (QuickCreateContext.
// SquadID) and get their briefing from the separate quick-create
// branch further below (search `qc.SquadID`). Do not "unify" the
// two by deleting that branch: it also sets resp.SquadID /
// resp.SquadName so the new issue defaults to the squad assignee,
// and there is no issue row to hang this column-based path on.
//
// We resolve the squad directly from task.SquadID rather than
// reverse-looking-up "which squad is this agent the leader of",
// which is ambiguous when one agent leads multiple squads. The
// uuidToString(squad.LeaderID) == resp.Agent.ID re-check is kept
// as a defensive gate: if the squad's leader was swapped after the
// task was enqueued, we never feed a stale briefing to a
// non-leader. It also doubles as the dangling-squad_id guard: a
// squad hard-deleted after enqueue makes GetSquadInWorkspace
// return no row (err != nil) — we skip injection silently, which
// is exactly the same observable result as "condition not
// matched". Claim still succeeds; no stale briefing is emitted.
// (No FK on squad_id — see migration 127.) We append (not replace)
// so per-agent instructions stay authoritative; the squad briefing
// stacks on top as task-specific squad context.
if resp.Agent != nil && task.IsLeaderTask && task.SquadID.Valid {
if squad, err := h.Queries.GetSquadInWorkspace(r.Context(), db.GetSquadInWorkspaceParams{
ID: task.SquadID,
WorkspaceID: issue.WorkspaceID,
}); err == nil && uuidToString(squad.LeaderID) == resp.Agent.ID {
briefing := buildSquadLeaderBriefing(r.Context(), h.Queries, squad)
if strings.TrimSpace(resp.Agent.Instructions) == "" {
resp.Agent.Instructions = briefing
} else {
resp.Agent.Instructions = resp.Agent.Instructions + "\n\n" + briefing
}
slog.Debug("injected squad leader briefing",
"squad_id", uuidToString(squad.ID),
"squad_name", squad.Name,
"leader_agent_id", resp.Agent.ID,
)
}
}
var projectRepos []RepoData
if issue.ProjectID.Valid {
resp.ProjectID = uuidToString(issue.ProjectID)
if proj, err := h.Queries.GetProject(r.Context(), issue.ProjectID); err == nil {
resp.ProjectTitle = proj.Title
resp.ProjectDescription = proj.Description.String
}
if rows := h.listProjectResourcesForProject(r.Context(), issue.ProjectID); len(rows) > 0 {
out := make([]ProjectResourceData, 0, len(rows))
for _, row := range rows {
label := ""
if row.Label.Valid {
label = row.Label.String
}
ref := json.RawMessage(row.ResourceRef)
if len(ref) == 0 {
ref = json.RawMessage("{}")
}
out = append(out, ProjectResourceData{
ID: uuidToString(row.ID),
ResourceType: row.ResourceType,
ResourceRef: ref,
Label: label,
})
// Lift github_repo resources into the daemon's repo list
// so `multica repo checkout` and the meta-skill render
// them as the issue's repos.
if row.ResourceType == "github_repo" {
var payload struct {
URL string `json:"url"`
Ref string `json:"ref,omitempty"`
}
if json.Unmarshal(row.ResourceRef, &payload) == nil && payload.URL != "" {
projectRepos = append(projectRepos, RepoData{URL: payload.URL, Ref: strings.TrimSpace(payload.Ref)})
}
}
}
resp.ProjectResources = out
}
}
if len(projectRepos) > 0 {
resp.Repos = projectRepos
} else if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
}
}
}
// Fetch the triggering comment content so the daemon can embed it
// directly in the agent prompt (prevents the agent from ignoring comments
// when stale output files exist in a reused workdir). Also surface the
// comment author's kind and display name so the agent knows whether it
// was triggered by a human or by another agent — a signal used by the
// harness instructions to avoid mention loops between agents.
if task.TriggerCommentID.Valid {
if comment, err := h.Queries.GetComment(r.Context(), task.TriggerCommentID); err == nil {
resp.TriggerCommentContent = comment.Content
resp.TriggerThreadID = uuidToString(comment.ID)
if comment.ParentID.Valid {
resp.TriggerThreadID = uuidToString(comment.ParentID)
}
resp.TriggerAuthorType = comment.AuthorType
// The triggering comment's author is the task initiator — the
// real requester behind this run. Surface it (type + id + name,
// plus email for members) so a workspace-visible agent can
// attribute the request to the right person instead of to the
// runtime owner. Same lookups as the display name above; we just
// also capture the id and email. See MUL-2645.
resp.InitiatorType = comment.AuthorType
if comment.AuthorID.Valid {
resp.InitiatorID = uuidToString(comment.AuthorID)
}
switch comment.AuthorType {
case "agent":
if comment.AuthorID.Valid {
if a, err := h.Queries.GetAgent(r.Context(), comment.AuthorID); err == nil {
resp.TriggerAuthorName = a.Name
resp.InitiatorName = a.Name
}
}
case "member":
// For member-authored comments, AuthorID is a user UUID
// (see handler.resolveActor) — look up the user's display name.
if comment.AuthorID.Valid {
if u, err := h.Queries.GetUser(r.Context(), comment.AuthorID); err == nil {
resp.TriggerAuthorName = u.Name
resp.InitiatorName = u.Name
resp.InitiatorEmail = u.Email
}
}
}
// Count comments that arrived issue-wide since this agent's last
// run, so the daemon can tell it the full catch-up volume up front
// (the prompt then steers it to read the triggering thread first).
// Anchor = the prior task's started_at (never completed_at: a long
// run would miss comments posted while it ran). Cold start (no prior
// task) → no anchor → no hint. Excludes the agent's own comments and
// the triggering comment itself because that body is already
// injected into the prompt. Best-effort: any DB error or zero count
// leaves the hint suppressed.
if startedAt, err := h.Queries.GetLastTaskStartedAtForIssueAndAgent(r.Context(), db.GetLastTaskStartedAtForIssueAndAgentParams{
AgentID: task.AgentID,
IssueID: comment.IssueID,
}); err == nil && startedAt.Valid {
if cnt, err := h.Queries.CountNewCommentsSince(r.Context(), db.CountNewCommentsSinceParams{
AnchorID: task.TriggerCommentID,
IssueID: comment.IssueID,
WorkspaceID: comment.WorkspaceID,
Since: startedAt,
AuthorID: task.AgentID,
}); err == nil && cnt > 0 {
resp.NewCommentCount = int(cnt)
resp.NewCommentsSince = startedAt.Time.UTC().Format(time.RFC3339)
}
}
}
}
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
//
// Skip all prior state when the task was flagged as a manual rerun:
// the user just judged the prior output bad, so the daemon must start a
// fresh agent session in a fresh workdir instead of resuming anything
// from the same conversation that produced that output.
if !task.ForceFreshSession {
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
// Resume the prior session when it ran on the same runtime —
// including comment-triggered follow-ups, so the agent keeps the
// issue's conversation context across turns. The "Focus on THIS
// comment" guard in prompt.go defends against inheriting the prior
// turn's "Done." marker, and GetLastTaskSession already excludes
// poisoned sessions.
if prior.RuntimeID == task.RuntimeID {
resp.PriorSessionID = prior.SessionID.String
}
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
}
}
}
}
// Chat task: populate workspace/session info from the chat_session table.
if task.ChatSessionID.Valid {
if cs, err := h.Queries.GetChatSession(r.Context(), task.ChatSessionID); err == nil {
resp.WorkspaceID = uuidToString(cs.WorkspaceID)
resp.ChatSessionID = uuidToString(cs.ID)
resp.ThreadName = cs.Title
// Flag a channel-backed session so the daemon makes the agent aware
// it is operating inside Slack — read this conversation's history
// from the channel via `multica chat history` / `multica chat thread`,
// not from Multica (MUL-3871). Empty for a web-only chat session.
// ChatInThread tells the agent which command to start with: the
// latest trigger was a thread reply iff its reply-target thread
// (last_thread_id) differs from its own message id (a top-level
// @mention records its own ts as both).
if binding, berr := h.Queries.GetChannelChatSessionBindingBySession(r.Context(), db.GetChannelChatSessionBindingBySessionParams{
ChatSessionID: cs.ID,
ChannelType: string(slack.TypeSlack),
}); berr == nil {
resp.ChatChannelType = string(slack.TypeSlack)
resp.ChatInThread = binding.LastThreadID.Valid && binding.LastThreadID.String != "" &&
binding.LastThreadID.String != binding.LastMessageID.String
}
if ws, err := h.Queries.GetWorkspace(r.Context(), cs.WorkspaceID); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
}
}
if !task.ForceFreshSession {
// Resume chat sessions only when the stored pointer was produced
// by the same runtime as the claiming task. When the chat_session
// pointer is missing (legacy NULL runtime_id), stale (last task
// failed before reporting completion), or runtime-mismatched, fall
// back to the most recent task row that recorded a session_id —
// otherwise a single failed turn would silently drop the entire
// conversation memory on the next message. The fallback also
// requires runtime to match.
if cs.SessionID.Valid && cs.RuntimeID.Valid && cs.RuntimeID == task.RuntimeID {
resp.PriorSessionID = cs.SessionID.String
}
if cs.WorkDir.Valid {
resp.PriorWorkDir = cs.WorkDir.String
}
if prior, err := h.Queries.GetLastChatTaskSession(r.Context(), cs.ID); err == nil && prior.SessionID.Valid {
if resp.PriorSessionID == "" && prior.RuntimeID == task.RuntimeID {
resp.PriorSessionID = prior.SessionID.String
}
if prior.WorkDir.Valid && resp.PriorWorkDir == "" {
resp.PriorWorkDir = prior.WorkDir.String
}
}
}
// Build the chat prompt from EVERY user message that has arrived
// since the agent's last reply — not just the most recent one. A
// short-window debounce (MUL-2968) can land several user messages
// before a single run fires; the agent resumes its prior session
// and only learns of new input through resp.ChatMessage, so
// delivering just the latest message would silently drop the
// earlier ones (e.g. "看上海天气" then "还有青岛" → only Qingdao
// answered). The unanswered set is the trailing run of user
// messages after the last assistant message (every completed or
// failed run writes an assistant row, so that anchor advances each
// turn). Attachments are collected from each included message so
// the agent can `multica attachment download <id>` — the markdown
// URL alone is signed and 30-min expiring on the private CDN.
if msgs, err := h.Queries.ListChatMessages(r.Context(), cs.ID); err == nil && len(msgs) > 0 {
unanswered := trailingUserMessages(msgs)
parts := make([]string, 0, len(unanswered))
for _, m := range unanswered {
if strings.TrimSpace(m.Content) != "" {
parts = append(parts, m.Content)
}
if atts, attErr := h.Queries.ListAttachmentsByChatMessage(r.Context(), db.ListAttachmentsByChatMessageParams{
ChatMessageID: m.ID,
WorkspaceID: parseUUID(resp.WorkspaceID),
}); attErr == nil && len(atts) > 0 {
for _, a := range atts {
resp.ChatMessageAttachments = append(resp.ChatMessageAttachments, ChatAttachmentMeta{
ID: uuidToString(a.ID),
Filename: a.Filename,
ContentType: a.ContentType,
})
}
}
}
resp.ChatMessage = strings.Join(parts, "\n\n")
if strings.TrimSpace(resp.ThreadName) == "" {
resp.ThreadName = resp.ChatMessage
}
}
}
}
// Autopilot run_only task: resolve workspace from autopilot_run →
// autopilot, and include the autopilot instructions because there is no
// issue for the agent to fetch.
if task.AutopilotRunID.Valid {
if run, err := h.Queries.GetAutopilotRun(r.Context(), task.AutopilotRunID); err == nil {
resp.AutopilotID = uuidToString(run.AutopilotID)
resp.AutopilotSource = run.Source
if run.TriggerPayload != nil {
resp.AutopilotTriggerPayload = json.RawMessage(run.TriggerPayload)
}
if ap, err := h.Queries.GetAutopilot(r.Context(), run.AutopilotID); err == nil {
resp.AutopilotTitle = ap.Title
resp.ThreadName = ap.Title
if ap.Description.Valid {
resp.AutopilotDescription = ap.Description.String
}
if resp.WorkspaceID == "" {
resp.WorkspaceID = uuidToString(ap.WorkspaceID)
}
if len(resp.Repos) == 0 {
if ws, err := h.Queries.GetWorkspace(r.Context(), ap.WorkspaceID); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
}
}
}
}
}
}
// Handoff note (MUL-3375) is populated by taskToResponse (the shared mapper
// resp came from above), so the daemon's prompt + issue_context.md render the
// assignment-handoff branch. Empty for all other task kinds.
// Quick-create task: no issue / chat / autopilot link — workspace and
// prompt come from the task's context JSONB. Resolve workspace from
// there so the isolation check below has something to compare.
hasQuickCreate := false
if task.Context != nil && !task.IssueID.Valid && !task.ChatSessionID.Valid && !task.AutopilotRunID.Valid {
var qc service.QuickCreateContext
if json.Unmarshal(task.Context, &qc) == nil && qc.Type == service.QuickCreateContextType {
hasQuickCreate = true
resp.QuickCreatePrompt = qc.Prompt
resp.QuickCreateAttachmentIDs = append([]string(nil), qc.AttachmentIDs...)
resp.ThreadName = qc.Prompt
resp.WorkspaceID = qc.WorkspaceID
// When the user picked a project in the modal, surface its title
// and resources to the daemon so the agent has the same context
// it would for an issue-bound task: the prompt template can name
// the project, and `multica repo checkout` sees the project's
// github_repo resources instead of the workspace fallback.
var projectRepos []RepoData
if qc.ProjectID != "" {
projectUUID, err := util.ParseUUID(qc.ProjectID)
if err == nil {
resp.ProjectID = qc.ProjectID
if proj, err := h.Queries.GetProject(r.Context(), projectUUID); err == nil {
resp.ProjectTitle = proj.Title
resp.ProjectDescription = proj.Description.String
}
if rows := h.listProjectResourcesForProject(r.Context(), projectUUID); len(rows) > 0 {
out := make([]ProjectResourceData, 0, len(rows))
for _, row := range rows {
label := ""
if row.Label.Valid {
label = row.Label.String
}
ref := json.RawMessage(row.ResourceRef)
if len(ref) == 0 {
ref = json.RawMessage("{}")
}
out = append(out, ProjectResourceData{
ID: uuidToString(row.ID),
ResourceType: row.ResourceType,
ResourceRef: ref,
Label: label,
})
if row.ResourceType == "github_repo" {
var payload struct {
URL string `json:"url"`
Ref string `json:"ref,omitempty"`
}
if json.Unmarshal(row.ResourceRef, &payload) == nil && payload.URL != "" {
projectRepos = append(projectRepos, RepoData{URL: payload.URL, Ref: strings.TrimSpace(payload.Ref)})
}
}
}
resp.ProjectResources = out
}
}
}
if len(projectRepos) > 0 {
resp.Repos = projectRepos
} else if ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(qc.WorkspaceID)); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
}
}
// Parent-issue resolution for quick-create tasks opened from
// "Add sub issue". The handler already verified workspace
// membership at submit time; here we re-fetch to pull the
// human-readable identifier (e.g. MUL-123) the agent will
// reference in the prompt. If the parent was deleted between
// submit and claim we surface the UUID anyway — the agent
// still passes `--parent <uuid>` and the server-side create
// will fail loud, which is a better outcome than silently
// dropping the sub-issue intent.
if qc.ParentIssueID != "" {
resp.ParentIssueID = qc.ParentIssueID
if parentUUID, err := util.ParseUUID(qc.ParentIssueID); err == nil {
if wsUUID, wsErr := util.ParseUUID(qc.WorkspaceID); wsErr == nil {
parent, perr := h.Queries.GetIssueInWorkspace(r.Context(), db.GetIssueInWorkspaceParams{
ID: parentUUID,
WorkspaceID: wsUUID,
})
if perr == nil && parent.ID.Valid {
if ws, werr := h.Queries.GetWorkspace(r.Context(), wsUUID); werr == nil {
resp.ParentIssueIdentifier = ws.IssuePrefix + "-" + strconv.Itoa(int(parent.Number))
}
}
}
}
}
// Squad-leader briefing injection for quick-create tasks. When
// the user picked a squad in the modal, the task runs on the
// squad's leader agent (resolved by the handler). Surface the
// same Operating Protocol + Roster + user Instructions that
// issue-bound squad tasks see, so the leader can decide to
// delegate before opening the issue.
if resp.Agent != nil && qc.SquadID != "" {
wsUUID, wsErr := util.ParseUUID(qc.WorkspaceID)
squadUUID, sqErr := util.ParseUUID(qc.SquadID)
if wsErr == nil && sqErr == nil {
if squad, err := h.Queries.GetSquadInWorkspace(r.Context(), db.GetSquadInWorkspaceParams{
ID: squadUUID,
WorkspaceID: wsUUID,
}); err == nil && uuidToString(squad.LeaderID) == resp.Agent.ID {
briefing := buildSquadLeaderBriefing(r.Context(), h.Queries, squad)
if strings.TrimSpace(resp.Agent.Instructions) == "" {
resp.Agent.Instructions = briefing
} else {
resp.Agent.Instructions = resp.Agent.Instructions + "\n\n" + briefing
}
// Surface the squad identity to the daemon so the
// quick-create prompt defaults the new issue's
// assignee to the squad, not the leader agent.
resp.SquadID = uuidToString(squad.ID)
resp.SquadName = squad.Name
slog.Debug("injected squad leader briefing for quick-create",
"squad_id", uuidToString(squad.ID),
"squad_name", squad.Name,
"leader_agent_id", resp.Agent.ID,
)
}
}
}
}
}
// Workspace isolation check: the daemon uses this response's workspace_id
// as the only authority for MULTICA_WORKSPACE_ID in the agent env. An
// empty value would make the CLI silently fall back to the user-global
// config and talk to whatever workspace the user happened to last
// configure; a value that doesn't match the runtime's workspace means
// upstream routed a foreign-workspace task here. Both cases must hard-
// fail AND cancel the just-dispatched task so the queue / agent status
// don't sit stuck until the stale-task sweeper fires minutes later.
if resp.WorkspaceID == "" || resp.WorkspaceID != runtimeWorkspaceID {
outcome = "error_workspace"
slog.Error("task claim: workspace isolation check failed, cancelling task",
"task_id", uuidToString(task.ID),
"runtime_id", runtimeID,
"runtime_workspace", runtimeWorkspaceID,
"resolved_workspace", resp.WorkspaceID,
"has_issue", task.IssueID.Valid,
"has_chat", task.ChatSessionID.Valid,
"has_autopilot_run", task.AutopilotRunID.Valid,
"has_quick_create", hasQuickCreate,
)
if _, cerr := h.TaskService.CancelTask(r.Context(), task.ID); cerr != nil {
slog.Error("task claim: cancel after workspace check failed",
"task_id", uuidToString(task.ID), "error", cerr)
}
writeError(w, http.StatusInternalServerError, "task workspace isolation check failed")
return
}
// Workspace-level Context (workspace.context DB column) — the per-workspace
// system prompt that workspace owners set in Settings → General. Inject it
// into the brief regardless of task kind (issue / chat / autopilot /
// quick-create) so every agent running in the workspace sees the same
// shared context. Empty string when the owner hasn't set one; the daemon
// skips rendering the heading in that case.
if ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(resp.WorkspaceID)); err == nil {
if ws.Context.Valid {
resp.WorkspaceContext = ws.Context.String
}
} else {
slog.Warn("task claim: failed to load workspace for context injection",
"task_id", uuidToString(task.ID),
"workspace_id", resp.WorkspaceID,
"error", err,
)
}
// Mint a task-scoped `mat_` token bound to (agent, task, workspace,
// owner). The daemon will inject this as MULTICA_TOKEN into the agent
// process instead of its own credential, so any API call the agent
// makes — even one that strips X-Agent-ID / X-Task-ID headers — is
// recognized server-side as actor=agent, closing the lateral-movement
// path on owner-only endpoints (e.g. `/api/agents/{id}/env`). Runtime
// owner is required because task tokens are still bound to an owning user;
// without one, fail the claim explicitly instead of letting the daemon
// fall back to a member/owner credential. MUL-3292.
// Token expires after the queue/runtime upper bound (24h) so it survives
// long-running tasks but cannot outlive a forgotten one.
if !runtime.OwnerID.Valid {
outcome = "error_token"
slog.Error("task claim: runtime owner missing; cancelling task to avoid unscoped agent credentials",
"task_id", uuidToString(task.ID),
"runtime_id", runtimeID,
"workspace_id", runtimeWorkspaceID,
)
if _, cerr := h.TaskService.CancelTask(r.Context(), task.ID); cerr != nil {
slog.Error("task claim: cancel after missing runtime owner failed",
"task_id", uuidToString(task.ID), "error", cerr)
}
writeError(w, http.StatusInternalServerError, "runtime owner required to mint task token")
return
}
tokenStr, terr := auth.GenerateAgentTaskToken()
if terr != nil {
outcome = "error_token"
slog.Error("task claim: failed to generate agent task token",
"task_id", uuidToString(task.ID), "error", terr)
writeError(w, http.StatusInternalServerError, "failed to mint task token")
return
}
if _, terr := h.Queries.CreateTaskToken(r.Context(), db.CreateTaskTokenParams{
TokenHash: auth.HashToken(tokenStr),
TaskID: task.ID,
AgentID: task.AgentID,
WorkspaceID: parseUUID(resp.WorkspaceID),
UserID: runtime.OwnerID,
ExpiresAt: pgtype.Timestamptz{Time: time.Now().Add(24 * time.Hour), Valid: true},
}); terr != nil {
outcome = "error_token"
slog.Error("task claim: failed to persist agent task token",
"task_id", uuidToString(task.ID), "error", terr)
writeError(w, http.StatusInternalServerError, "failed to persist task token")
return
}
resp.AuthToken = tokenStr
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
if resp.Agent != nil && len(resp.Agent.Skills) > 0 {
if skillPayload, err := json.Marshal(resp.Agent.Skills); err == nil {
skillPayloadBytes = len(skillPayload)
}
} else if resp.Agent != nil && len(resp.Agent.SkillRefs) > 0 {
if skillPayload, err := json.Marshal(resp.Agent.SkillRefs); err == nil {
skillPayloadBytes = len(skillPayload)
}
}
payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": resp})
}
type resolveSkillBundlesRequest struct {
Skills []resolveSkillBundleRef `json:"skills"`
}
type resolveSkillBundleRef struct {
ID string `json:"id"`
Source string `json:"source"`
Hash string `json:"hash"`
}
// ResolveTaskSkillBundles returns full skill content for refs from a slim
// claim. The daemon calls this after claim and before execenv.Prepare so
// runtimes still see complete local skill files at startup.
//
// If a requested hash no longer matches the agent's current skill bundle, the
// endpoint returns the current bundle and hash. Stage 1 does not snapshot skill
// content at claim time; the daemon validates the returned bundle before
// writing it to cache and materializing it.
func (h *Handler) ResolveTaskSkillBundles(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
taskID := chi.URLParam(r, "taskId")
runtime, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
if !ok {
return
}
task, taskWorkspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
if taskWorkspaceID != uuidToString(runtime.WorkspaceID) || uuidToString(task.RuntimeID) != runtimeID {
writeError(w, http.StatusNotFound, "task not found")
return
}
if task.Status != "dispatched" && task.Status != "waiting_local_directory" {
writeError(w, http.StatusConflict, "task is not preparing")
return
}
var req resolveSkillBundlesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.Skills) == 0 {
writeJSON(w, http.StatusOK, map[string]any{"bundles": []service.AgentSkillData{}})
return
}
bundles, _ := h.TaskService.LoadAgentSkillBundles(r.Context(), task.AgentID)
allowed := make(map[string]service.AgentSkillData, len(bundles))
for _, bundle := range bundles {
allowed[bundle.Source+"\x00"+bundle.ID] = bundle
}
resolved := make([]service.AgentSkillData, 0, len(req.Skills))
for _, ref := range req.Skills {
if ref.ID == "" || ref.Source == "" || ref.Hash == "" {
writeError(w, http.StatusBadRequest, "invalid skill ref")
return
}
bundle, ok := allowed[ref.Source+"\x00"+ref.ID]
if !ok {
writeError(w, http.StatusNotFound, "skill bundle not found")
return
}
resolved = append(resolved, bundle)
}
writeJSON(w, http.StatusOK, map[string]any{"bundles": resolved})
}
// trailingUserMessages returns the run of user messages after the last
// assistant message in a chronologically-ordered chat history — the set the
// agent has NOT yet replied to. The agent resumes its prior session and only
// learns of new input through the claim response's chat_message, so a single
// run that covers a debounced burst (MUL-2968) must deliver every one of
// these, not just the latest. Every completed or failed run writes an
// assistant row, so the anchor advances one turn at a time; the result is the
// whole slice on the first turn and exactly the new message(s) thereafter.
func trailingUserMessages(msgs []db.ChatMessage) []db.ChatMessage {
start := 0
for i := len(msgs) - 1; i >= 0; i-- {
if msgs[i].Role != "user" {
start = i + 1
break
}
}
return msgs[start:]
}
// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime.
func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
// Verify the caller owns this runtime's workspace.
runtime, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
if !ok {
return
}
workspaceID := uuidToString(runtime.WorkspaceID)
tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list pending tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t, workspaceID)
}
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------
// Task Lifecycle (called by daemon)
// ---------------------------------------------------------------------------
// ExtendTaskPrepareLease keeps a dispatched task protected while the daemon is
// resolving startup inputs and preparing the execution environment.
func (h *Handler) ExtendTaskPrepareLease(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
taskID := chi.URLParam(r, "taskId")
runtime, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
if !ok {
return
}
task, taskWorkspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
if taskWorkspaceID != uuidToString(runtime.WorkspaceID) || uuidToString(task.RuntimeID) != runtimeID {
writeError(w, http.StatusNotFound, "task not found")
return
}
updated, err := h.TaskService.ExtendTaskPrepareLease(r.Context(), parseUUID(taskID), parseUUID(runtimeID))
if err != nil {
slog.Warn("extend task prepare lease failed", "task_id", taskID, "runtime_id", runtimeID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
writeJSON(w, http.StatusOK, taskToResponse(*updated, taskWorkspaceID))
}
// StartTask marks a dispatched task as running.
func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
_, workspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID))
if err != nil {
slog.Warn("start task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task started", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task, workspaceID))
}
// TaskWaitLocalDirectoryRequest is the body the daemon POSTs when it parks
// a freshly-dispatched task on a busy local_directory path.
type TaskWaitLocalDirectoryRequest struct {
// Reason is a short hint surfaced by the UI alongside the status —
// typically "<path>" or "<path> (holder: <task short id>)". Small
// enough to fit on the issue card. Empty is accepted; the column is
// nullable on the server.
Reason string `json:"reason"`
}
// MarkTaskWaitingLocalDirectory transitions a dispatched task to
// waiting_local_directory. Called by the daemon when, after claiming a task
// whose project carries a local_directory resource, it discovers another
// in-flight task already holds the path's mutex.
func (h *Handler) MarkTaskWaitingLocalDirectory(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
_, workspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
var req TaskWaitLocalDirectoryRequest
if r.ContentLength != 0 {
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
}
task, err := h.TaskService.MarkTaskWaitingLocalDirectory(r.Context(), parseUUID(taskID), req.Reason)
if err != nil {
slog.Warn("mark task waiting_local_directory failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
writeJSON(w, http.StatusOK, taskToResponse(*task, workspaceID))
}
// ReportTaskProgress broadcasts a progress update.
type TaskProgressRequest struct {
Summary string `json:"summary"`
Step int `json:"step"`
Total int `json:"total"`
}
func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskProgressRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
// Verify ownership and resolve workspace ID.
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
workspaceID := ""
if task.IssueID.Valid {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
}
h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// CompleteTask marks a running task as completed.
type TaskCompleteRequest struct {
PRURL string `json:"pr_url"`
Output string `json:"output"`
SessionID string `json:"session_id"` // Claude session ID for future resumption
WorkDir string `json:"work_dir"` // working directory used during execution
}
func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
_, workspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
var req TaskCompleteRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
result, _ := json.Marshal(req)
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir)
if err != nil {
slog.Warn("complete task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
h.emitIssueExecutedOnFirstCompletion(r, task)
// Best-effort revoke of any agent task token minted at claim time.
// The token would naturally expire at the 24h watermark and is also
// cascaded on agent_task deletion, but eagerly deleting it on
// completion shrinks the window where a compromised agent process
// can keep making API calls after its task finishes. Failure here is
// non-fatal; the expiry / cascade are the durable guards.
if err := h.Queries.DeleteTaskTokensByTask(r.Context(), task.ID); err != nil {
slog.Warn("complete task: failed to revoke task tokens", "task_id", uuidToString(task.ID), "error", err)
}
slog.Info("task completed", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task, workspaceID))
}
// emitIssueExecutedOnFirstCompletion atomically flips issue.first_executed_at
// and fires the issue_executed analytics event iff this is the first task on
// the issue to reach terminal done. Retries / re-assignments / comment-
// triggered follow-ups hit the WHERE first_executed_at IS NULL clause and
// no-op, so the funnel counts unique issues, not tasks.
func (h *Handler) emitIssueExecutedOnFirstCompletion(r *http.Request, task *db.AgentTaskQueue) {
if task == nil {
return
}
marked, err := h.Queries.MarkIssueFirstExecuted(r.Context(), task.IssueID)
if err != nil {
if !isNotFound(err) {
slog.Warn("analytics: mark issue first-executed failed", "issue_id", uuidToString(task.IssueID), "error", err)
}
return
}
var durationMS int64
if task.StartedAt.Valid && task.CompletedAt.Valid {
durationMS = task.CompletedAt.Time.Sub(task.StartedAt.Time).Milliseconds()
}
taskContext := h.TaskService.AnalyticsContextForTask(r.Context(), *task)
// distinct_id prefers the human creator so agent-driven events flow into
// the issue-author's person profile (same place signup and
// workspace_created land). Agent-created issues keep the agent id with a
// prefix so PostHog doesn't merge them into a user by accident.
distinct := uuidToString(marked.CreatorID)
if marked.CreatorType == "agent" {
distinct = "agent:" + distinct
}
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.IssueExecuted(
distinct,
uuidToString(marked.WorkspaceID),
uuidToString(marked.ID),
uuidToString(task.ID),
uuidToString(task.AgentID),
taskContext.Source,
taskContext.RuntimeMode,
taskContext.Provider,
durationMS,
))
}
// ReportTaskUsage stores per-task token usage. Called independently of
// complete/fail so usage is captured even when tasks fail or are blocked.
type TaskUsagePayload struct {
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
var req struct {
Usage []TaskUsagePayload `json:"usage"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
// Provider is lowercased on write so client-side pricing lookups tolerate
// case drift. An empty provider (an older daemon that omits the field) is
// stamped from the task's runtime, so generic model ids like `auto` still
// resolve to a provider instead of landing as '' and pricing $0.
var runtimeProvider string
runtimeProviderLoaded := false
for _, u := range req.Usage {
provider := normalizeProvider(u.Provider)
if provider == "" {
if !runtimeProviderLoaded {
if rt, err := h.Queries.GetAgentRuntime(r.Context(), task.RuntimeID); err == nil {
runtimeProvider = normalizeProvider(rt.Provider)
} else {
slog.Warn("load runtime provider for usage backfill failed",
"task_id", taskID, "runtime_id", uuidToString(task.RuntimeID), "error", err)
}
runtimeProviderLoaded = true
}
provider = runtimeProvider
}
if err := h.Queries.UpsertTaskUsage(r.Context(), db.UpsertTaskUsageParams{
TaskID: parseUUID(taskID),
Provider: provider,
Model: u.Model,
InputTokens: u.InputTokens,
OutputTokens: u.OutputTokens,
CacheReadTokens: u.CacheReadTokens,
CacheWriteTokens: u.CacheWriteTokens,
}); err != nil {
slog.Warn("upsert task usage failed", "task_id", taskID, "model", u.Model, "error", err)
continue
}
h.TaskService.CaptureTaskUsage(r.Context(), task, provider, u.Model, u.InputTokens, u.OutputTokens, u.CacheReadTokens, u.CacheWriteTokens)
// Surface prompt-cache effectiveness per run so cache hit rates are
// observable in logs, not just queryable from runtime_usage. The ratio
// is cached input over total input-side tokens; a persistently low
// value flags a prompt prefix that is not being reused across runs
// (e.g. volatile values poisoning the cacheable prefix). MUL-3887.
if totalInput := u.InputTokens + u.CacheReadTokens + u.CacheWriteTokens; totalInput > 0 {
slog.Info("task prompt-cache usage",
"task_id", taskID,
"provider", provider,
"model", u.Model,
"input_tokens", u.InputTokens,
"output_tokens", u.OutputTokens,
"cache_read_tokens", u.CacheReadTokens,
"cache_write_tokens", u.CacheWriteTokens,
"cache_read_ratio", float64(u.CacheReadTokens)/float64(totalInput),
)
}
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// GetTaskStatus returns the current status of a task.
// Used by the daemon to detect terminal/interruption signals (cancelled,
// failed, completed) while a task is executing mid-flight.
func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": task.Status})
}
// FailTask marks a running task as failed.
type TaskFailRequest struct {
Error string `json:"error"`
SessionID string `json:"session_id,omitempty"`
WorkDir string `json:"work_dir,omitempty"`
FailureReason string `json:"failure_reason,omitempty"`
}
func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
_, workspaceID, ok := h.requireDaemonTaskAccessWithWorkspace(w, r, taskID)
if !ok {
return
}
var req TaskFailRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error, req.SessionID, req.WorkDir, req.FailureReason)
if err != nil {
slog.Warn("fail task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
// Best-effort revoke of the mat_ task token minted at claim. Same
// rationale as CompleteTask — eager deletion shrinks the post-
// terminal window. The 24h expiry / cascade are the durable guards.
if err := h.Queries.DeleteTaskTokensByTask(r.Context(), task.ID); err != nil {
slog.Warn("fail task: failed to revoke task tokens", "task_id", uuidToString(task.ID), "error", err)
}
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error, "failure_reason", req.FailureReason)
writeJSON(w, http.StatusOK, taskToResponse(*task, workspaceID))
}
// ---------------------------------------------------------------------------
// Task Messages (live agent output)
// ---------------------------------------------------------------------------
type TaskMessageRequest struct {
Seq int `json:"seq"`
Type string `json:"type"`
Tool string `json:"tool,omitempty"`
Content string `json:"content,omitempty"`
Input map[string]any `json:"input,omitempty"`
Output string `json:"output,omitempty"`
}
type TaskMessageBatchRequest struct {
Messages []TaskMessageRequest `json:"messages"`
}
// ReportTaskMessages receives a batch of agent execution messages from the daemon.
func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskMessageBatchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.Messages) == 0 {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
// Verify the caller owns this task's workspace.
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
workspaceID := ""
if task.IssueID.Valid {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
}
if workspaceID == "" && task.ChatSessionID.Valid {
if cs, err := h.Queries.GetChatSession(r.Context(), task.ChatSessionID); err == nil {
workspaceID = uuidToString(cs.WorkspaceID)
}
}
for _, msg := range req.Messages {
// Redact sensitive information before persisting or broadcasting.
msg.Content = redact.Text(msg.Content)
msg.Output = redact.Text(msg.Output)
msg.Input = redact.InputMap(msg.Input)
var inputJSON []byte
if msg.Input != nil {
inputJSON, _ = json.Marshal(msg.Input)
}
created, createErr := h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{
TaskID: parseUUID(taskID),
Seq: int32(msg.Seq),
Type: msg.Type,
Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""},
Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""},
Input: inputJSON,
Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""},
})
if createErr != nil {
slog.Error("failed to create task message", "task_id", taskID, "seq", msg.Seq, "error", createErr)
writeError(w, http.StatusInternalServerError, "failed to persist task message")
return
}
if workspaceID != "" {
h.publishTask(protocol.EventTaskMessage, workspaceID, "system", "", taskID,
taskMessageToPayload(created, taskID, uuidToString(task.IssueID)))
}
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
func taskMessageToPayload(m db.TaskMessage, taskID, issueID string) protocol.TaskMessagePayload {
var input map[string]any
if m.Input != nil {
json.Unmarshal(m.Input, &input)
}
createdAt := ""
if m.CreatedAt.Valid {
createdAt = m.CreatedAt.Time.UTC().Format(time.RFC3339Nano)
}
return protocol.TaskMessagePayload{
TaskID: taskID,
IssueID: issueID,
Seq: int(m.Seq),
Type: m.Type,
Tool: m.Tool.String,
Content: m.Content.String,
Input: input,
Output: m.Output.String,
CreatedAt: createdAt,
}
}
// ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect).
func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
var (
messages []db.TaskMessage
err error
)
if sinceStr := r.URL.Query().Get("since"); sinceStr != "" {
sinceSeq, parseErr := strconv.Atoi(sinceStr)
if parseErr != nil {
writeError(w, http.StatusBadRequest, "invalid since parameter")
return
}
messages, err = h.Queries.ListTaskMessagesSince(r.Context(), db.ListTaskMessagesSinceParams{
TaskID: parseUUID(taskID),
Seq: int32(sinceSeq),
})
} else {
messages, err = h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID))
}
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list task messages")
return
}
issueID := uuidToString(task.IssueID)
resp := make([]protocol.TaskMessagePayload, len(messages))
for i, m := range messages {
resp[i] = taskMessageToPayload(m, taskID, issueID)
}
writeJSON(w, http.StatusOK, resp)
}
// GetActiveTaskForIssue returns all currently active tasks for an issue.
// Returns { tasks: [...] } array (may be empty).
func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), issue.ID)
if err != nil {
tasks = nil
}
workspaceID := uuidToString(issue.WorkspaceID)
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t, workspaceID)
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": resp})
}
// CancelTask cancels a running or queued task by ID.
// Verifies both that the URL-parameter issue belongs to the caller's workspace
// and that the task belongs to that same issue — a task UUID from a different
// issue (in any workspace) must not be cancellable through this route.
func (h *Handler) CancelTask(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
taskID := chi.URLParam(r, "taskId")
existing, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err != nil || uuidToString(existing.IssueID) != uuidToString(issue.ID) {
writeError(w, http.StatusNotFound, "task not found")
return
}
task, err := h.TaskService.CancelTask(r.Context(), existing.ID)
if err != nil {
slog.Warn("cancel task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task cancelled by user", "task_id", taskID, "issue_id", uuidToString(task.IssueID))
writeJSON(w, http.StatusOK, taskToResponse(*task, uuidToString(issue.WorkspaceID)))
}
// ListTasksByIssue returns all tasks (any status) for an issue — used for execution history.
func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
tasks, err := h.Queries.ListTasksByIssue(r.Context(), issue.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list tasks")
return
}
workspaceID := uuidToString(issue.WorkspaceID)
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t, workspaceID)
}
writeJSON(w, http.StatusOK, resp)
}
// ListTaskMessagesByUser returns task messages for a task.
// Used by the frontend under regular user auth (not daemon auth).
// Verifies the task belongs to the caller's workspace.
func (h *Handler) ListTaskMessagesByUser(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
taskUUID, ok := parseUUIDOrBadRequest(w, taskID, "task_id")
if !ok {
return
}
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
// Verify the task belongs to the caller's workspace.
wsID := h.TaskService.ResolveTaskWorkspaceID(r.Context(), task)
if wsID == "" || wsID != middleware.WorkspaceIDFromContext(r.Context()) {
writeError(w, http.StatusNotFound, "task not found")
return
}
var (
messages []db.TaskMessage
queryErr error
)
if sinceStr := r.URL.Query().Get("since"); sinceStr != "" {
sinceSeq, parseErr := strconv.Atoi(sinceStr)
if parseErr != nil {
writeError(w, http.StatusBadRequest, "invalid since parameter")
return
}
messages, queryErr = h.Queries.ListTaskMessagesSince(r.Context(), db.ListTaskMessagesSinceParams{
TaskID: taskUUID,
Seq: int32(sinceSeq),
})
} else {
messages, queryErr = h.Queries.ListTaskMessages(r.Context(), taskUUID)
}
if queryErr != nil {
writeError(w, http.StatusInternalServerError, "failed to list task messages")
return
}
issueID := uuidToString(task.IssueID)
resp := make([]protocol.TaskMessagePayload, len(messages))
for i, m := range messages {
resp[i] = taskMessageToPayload(m, taskID, issueID)
}
writeJSON(w, http.StatusOK, resp)
}
// GetIssueUsage returns aggregated token usage for all tasks belonging to an issue.
func (h *Handler) GetIssueUsage(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
row, err := h.Queries.GetIssueUsageSummary(r.Context(), issue.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get issue usage")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"total_input_tokens": row.TotalInputTokens,
"total_output_tokens": row.TotalOutputTokens,
"total_cache_read_tokens": row.TotalCacheReadTokens,
"total_cache_write_tokens": row.TotalCacheWriteTokens,
"task_count": row.TaskCount,
})
}
// GetIssueGCCheck returns minimal issue info needed by the daemon GC loop.
// Gated on workspace access so a daemon token scoped to workspace A cannot
// read issue metadata from workspace B via UUID enumeration.
func (h *Handler) GetIssueGCCheck(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "issueId")
issueUUID, ok := parseUUIDOrBadRequest(w, issueID, "issue_id")
if !ok {
return
}
issue, err := h.Queries.GetIssue(r.Context(), issueUUID)
if err != nil {
writeError(w, http.StatusNotFound, "issue not found")
return
}
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(issue.WorkspaceID)) {
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": issue.Status,
"updated_at": issue.UpdatedAt.Time,
})
}
// GetChatSessionGCCheck returns the status and updated_at of a chat session
// for the daemon GC loop. A 404 here means the session was hard-deleted
// (DeleteChatSession in chat.go runs a real DELETE), which the daemon treats
// as an immediate-clean signal — the user's explicit delete is the strongest
// reclaim authorization we can get.
//
// Same anti-enumeration shape as GetIssueGCCheck: workspace mismatch returns
// the same 404 so a scoped daemon token can't probe other workspaces.
func (h *Handler) GetChatSessionGCCheck(w http.ResponseWriter, r *http.Request) {
sessionID := chi.URLParam(r, "sessionId")
sessionUUID, ok := parseUUIDOrBadRequest(w, sessionID, "session_id")
if !ok {
return
}
session, err := h.Queries.GetChatSession(r.Context(), sessionUUID)
if err != nil {
writeError(w, http.StatusNotFound, "chat session not found")
return
}
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(session.WorkspaceID)) {
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": session.Status,
"updated_at": session.UpdatedAt.Time,
})
}
// GetAutopilotRunGCCheck returns the status and completed_at of an autopilot
// run for the daemon GC loop. The daemon decides purely on terminal status:
// an autopilot run's workdir is never reused, so a terminal run is reclaimed on
// sight while non-terminal status is a skip signal — completed_at is returned
// for the API contract and diagnostics, not as a TTL anchor.
//
// Workspace ownership is resolved via the parent autopilot row.
func (h *Handler) GetAutopilotRunGCCheck(w http.ResponseWriter, r *http.Request) {
runID := chi.URLParam(r, "runId")
runUUID, ok := parseUUIDOrBadRequest(w, runID, "run_id")
if !ok {
return
}
run, err := h.Queries.GetAutopilotRun(r.Context(), runUUID)
if err != nil {
writeError(w, http.StatusNotFound, "autopilot run not found")
return
}
autopilot, err := h.Queries.GetAutopilot(r.Context(), run.AutopilotID)
if err != nil {
// Parent autopilot is gone — treat as not found rather than 500
// so the daemon can fall through to its orphan-by-mtime path.
writeError(w, http.StatusNotFound, "autopilot run not found")
return
}
if !h.requireDaemonWorkspaceAccess(w, r, uuidToString(autopilot.WorkspaceID)) {
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": run.Status,
"completed_at": run.CompletedAt.Time,
})
}
// GetTaskGCCheck returns the agent_task_queue status for quick-create cleanup.
// Quick-create tasks have no parent record (no issue_id at WriteGCMeta time,
// no chat session, no autopilot run) so the daemon keys GC directly on the
// task row itself.
func (h *Handler) GetTaskGCCheck(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": task.Status,
"completed_at": task.CompletedAt.Time,
})
}