Compare commits

..

1 Commits

Author SHA1 Message Date
Eve
21cd617d16 feat(composio): inject MCP overlay into agent runtime at task dispatch (MUL-3721)
Stage 3 of the Composio epic. Wires the per-user Composio MCP session into
every agent task so the agent process sees the initiator's connected tools
without any prompt-time plumbing.

Server side
  - Migration 128 adds agent_task_queue.runtime_mcp_overlay JSONB plus a
    BEFORE-UPDATE trigger that wipes the column on any transition into a
    terminal status (completed / failed / cancelled). A trigger is the single
    source of truth — future queries that flip status cannot bypass it.
  - composio.Service.BuildTaskOverlay(userID) reuses CreateMCPSession and
    emits the Claude-style { mcpServers: { composio: { type: http, url,
    headers } } } shape the daemon's existing sidecar generators consume.
    Returns (nil, nil) on zero active connections so we never burn a
    Composio session for a user with nothing to call.
  - TaskService grows a Composio ComposioOverlayBuilder seam, wired in
    router.go after composiointeg.NewService succeeds. Five enqueue paths
    (issue / mention / quick-create / chat / auto-retry) attach the overlay
    after CreateAgentTask returns and before the daemon is notified — so
    every claim reads a settled row, with no second daemon hop. Best-effort:
    a builder failure logs and proceeds with no overlay.
  - resolveInitiatorFromTriggerComment derives the initiator user from the
    trigger comment when it was authored by a member. Agent-authored
    triggers are not treated as initiators (their connected-apps view is
    empty by construction).

Daemon side
  - handler/daemon.go claim path merges task.runtime_mcp_overlay onto
    agent.mcp_config via mergeMCPOverlay before populating
    TaskAgentData.McpConfig. Overlay wins on server-name collisions
    because it carries the live user-scoped session URL. Errors fall back
    to the agent config unchanged — a bad overlay must not surprise-disable
    saved MCP tools. The existing execenv sidecar generators (cursor /
    codex / openclaw / opencode / hermes-kiro) need no changes: they keep
    consuming the merged result through TaskAgentData.McpConfig.

Tests
  - 9 merge cases (mcp_overlay_test): both-nil short-circuit, agent-only
    pass-through, overlay-only canonicalization, two-side merge, name
    collision (overlay wins), top-level key preservation, malformed agent
    fallback, malformed overlay fallback, non-object server rejection.
  - 4 dispatch cases (composio): zero-connections returns nil without
    CreateSession, happy-path emits the right shape with the right user
    id, empty-URL defensive branch, SDK error surfacing.
  - 4 TaskService helper cases: nil Composio is a no-op (Queries-safe),
    invalid initiator does not call the builder, nil overlay skips the
    UPDATE, builder error swallowed without panic.
  - Migration 128 verified to roll up + down + up cleanly against the test
    database.

Out of scope (deferred): assignment-triggered enqueue paths with no
trigger comment get no overlay attached today (no initiator UUID flows
through enqueueIssueTask in that case). Retry paths recompute the overlay
fresh from the parent's initiator_user_id instead of inheriting the bearer
from the parent row, so a stale token can never resurface on a retry.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 17:12:18 +08:00
15 changed files with 1047 additions and 134 deletions

View File

@@ -1,68 +0,0 @@
package main
import (
"io"
"net/http"
"testing"
)
// TestComposioCallbackIsPublic_NoCookieNot401 locks in the MUL-3843 fix: the
// Composio OAuth callback must live OUTSIDE the Auth middleware group, because
// Composio 302-redirects the user's browser to it and the cookie session is
// frequently absent (expired session, SameSite=Strict / Safari ITP, private
// window, self-hosted callback subdomain). Before the fix the route sat under
// Auth, so a cookie-less browser got a hard 401 and a JSON blob instead of the
// settings redirect — the exact symptom Yushen hit.
//
// With no COMPOSIO_API_KEY configured in the test env, h.Composio == nil, so a
// cookie-less hit on the callback now reaches the handler and returns 503
// ("not configured") rather than being short-circuited to 401 by the Auth
// middleware. The precise non-401 code is incidental; what this test pins is
// that the request is NOT rejected by auth.
func TestComposioCallbackIsPublic_NoCookieNot401(t *testing.T) {
// Deliberately send NO Authorization header / cookie — simulate the
// cookie-stripped browser redirect coming back from Composio.
resp, err := http.Get(testServer.URL + "/api/integrations/composio/callback?state=bogus&status=success&connected_account_id=ca_x")
if err != nil {
t.Fatalf("callback request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode == http.StatusUnauthorized {
t.Fatalf("callback returned 401 without a session — it is still behind the Auth group (regression of MUL-3843). body=%s", body)
}
}
// TestComposioNonCallbackEndpointsStayGated is the other half of the invariant:
// moving the callback out of the Auth group must NOT loosen the four
// session-scoped endpoints. A cookie-less request to them must still 401.
func TestComposioNonCallbackEndpointsStayGated(t *testing.T) {
gated := []struct {
method string
path string
}{
{http.MethodPost, "/api/integrations/composio/connect/init"},
{http.MethodGet, "/api/integrations/composio/toolkits"},
{http.MethodGet, "/api/integrations/composio/connections"},
{http.MethodDelete, "/api/integrations/composio/connections/11111111-1111-1111-1111-111111111111"},
}
for _, tc := range gated {
t.Run(tc.method+" "+tc.path, func(t *testing.T) {
req, err := http.NewRequest(tc.method, testServer.URL+tc.path, nil)
if err != nil {
t.Fatalf("build request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("expected 401 without a session, got %d — endpoint is no longer auth-gated", resp.StatusCode)
}
})
}
}

View File

@@ -471,6 +471,17 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
slog.Error("composio: service init failed; composio integration disabled", "error", serr)
} else {
h.Composio = svc
// Stage 3 (MUL-3721) hook: feed the per-task MCP
// overlay builder into TaskService so every Enqueue*
// path attaches the initiator user's Composio session
// URL to the task row before the daemon claims it.
// taskSvc already exists by this point — it was
// constructed inside NewHandler — and exposes its
// Composio field for exactly this kind of late wiring,
// so no Handler-level mutation is needed.
if h.TaskService != nil {
h.TaskService.Composio = svc
}
slog.Info("composio integration enabled")
}
}
@@ -612,18 +623,6 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// HandleCloudBillingStripeWebhook for the rationale).
r.Post("/api/webhooks/stripe", h.HandleCloudBillingStripeWebhook)
// Composio OAuth callback (MUL-3843). NOT under the Auth group on purpose:
// Composio 302-redirects the user's browser here at the end of the OAuth
// flow, and the cookie session is frequently absent (expired session,
// SameSite=Strict / Safari ITP stripping cross-site cookies, private
// windows, self-hosted callbacks on a different subdomain). Identity is NOT
// taken from the session — it comes from the HMAC-signed `state` query
// param, which CompleteCallback verifies (signature, expiry, replay) before
// doing anything. h.Composio == nil still returns 503. Keeping it inside the
// Auth group made a missing cookie a hard 401, breaking the flow for exactly
// the browsers above; the other four composio endpoints stay session-gated.
r.Get("/api/integrations/composio/callback", h.ComposioCallback)
// Daemon API routes (require daemon token or valid user token)
r.Route("/api/daemon", func(r chi.Router) {
r.Use(middleware.DaemonAuth(queries, patCache, daemonTokenCache, cloudPATVerifier))
@@ -782,12 +781,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
r.Post("/api/lark/binding/redeem", h.RedeemLarkBindingToken)
// Composio integration (MUL-3720). User-scoped (no workspace context):
// a connection belongs to a user. These four require a logged-in
// session; the OAuth callback is the outlier and lives outside the Auth
// group (registered above with the other public OAuth/webhook routes —
// see MUL-3843). All return 503 when COMPOSIO_API_KEY is unset.
// a connection belongs to a user. The callback is a GET so it works as
// a top-level browser redirect under cookie auth (no CSRF gate on GET).
// All four return 503 when COMPOSIO_API_KEY is unset.
r.Route("/api/integrations/composio", func(r chi.Router) {
r.Post("/connect/init", h.ComposioConnectInit)
r.Get("/callback", h.ComposioCallback)
r.Get("/toolkits", h.ListComposioToolkits)
r.Get("/connections", h.ListComposioConnections)
r.Delete("/connections/{id}", h.DeleteComposioConnection)

View File

@@ -1304,6 +1304,19 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
if agent.McpConfig != nil {
mcpConfig = json.RawMessage(agent.McpConfig)
}
// Layer the per-task overlay (set at enqueue from the initiator
// user's active integrations — currently Composio) on top of the
// agent's saved mcp_config. Overlay wins on server-name collisions
// because it carries the live user-scoped session URL. Errors are
// logged but never fail the claim: a broken overlay must not prevent
// the agent from running with its base config.
if len(task.RuntimeMcpOverlay) > 0 {
if merged, err := mergeMCPOverlay(mcpConfig, json.RawMessage(task.RuntimeMcpOverlay)); err != nil {
slog.Warn("daemon claim: merge runtime_mcp_overlay failed; falling back to agent mcp_config", "task_id", uuidToString(task.ID), "error", err)
} else {
mcpConfig = merged
}
}
// runtime_config is stored as JSONB and may legitimately be the
// empty object `{}` for agents that haven't opted into any
// provider-specific tuning. Forward only non-empty payloads so the

View File

@@ -10,16 +10,11 @@ import (
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
)
// Composio integration handlers (MUL-3720, Stage 2 MVP). A Composio connection
// belongs to a user, not a workspace, so these handlers live outside the
// workspace-membership group. The four management endpoints (connect/init,
// toolkits, connections, delete) are user-scoped (requireUserID) and sit under
// the Auth middleware. ComposioCallback is the exception: it is a public route
// (outside the Auth group, see router.go / MUL-3843) because the browser often
// arrives without a session cookie — its identity comes from the signed state,
// not requireUserID. The whole block returns 503 when h.Composio is nil
// (COMPOSIO_API_KEY unset), matching the Lark/GitHub "integration not
// configured" convention.
// Composio integration handlers (MUL-3720, Stage 2 MVP). All routes are
// user-scoped (requireUserID) and live outside the workspace-membership group —
// a Composio connection belongs to a user, not a workspace. The whole block
// returns 503 when h.Composio is nil (COMPOSIO_API_KEY unset), matching the
// Lark/GitHub "integration not configured" convention.
// ComposioConnectInitRequest is the POST /connect/init body.
type ComposioConnectInitRequest struct {
@@ -92,15 +87,13 @@ func (h *Handler) ComposioConnectInit(w http.ResponseWriter, r *http.Request) {
}
// ComposioCallback (GET /api/integrations/composio/callback) is the browser
// redirect target Composio sends the user back to after the hosted flow. It is
// registered as a PUBLIC route (outside the Auth middleware group — see
// router.go / MUL-3843), because the browser frequently lands here without a
// session cookie (expired session, SameSite/ITP stripping, private window,
// self-hosted callback subdomain). Identity therefore comes solely from the
// HMAC-signed `state` query param, which CompleteCallback verifies before
// doing anything. On success the row is upserted and the browser is redirected
// to the settings page; any failure redirects to the same page with a stable
// error code so the user is never left on a blank API response.
// redirect target Composio sends the user back to after the hosted flow. The
// signed `state` query param is the source of truth for the user identity, so
// the request is attributed from it (the route still sits under Auth so the
// browser session is present, but identity comes from the state). On success
// the row is upserted and the browser is redirected to the settings page; any
// failure redirects to the same page with a stable error code so the user is
// never left on a blank API response.
func (h *Handler) ComposioCallback(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")

View File

@@ -0,0 +1,160 @@
package handler
import (
"bytes"
"encoding/json"
"errors"
"fmt"
)
// mergeMCPOverlay layers a per-task overlay on top of an agent's saved
// mcp_config and returns the merged JSON for the daemon claim wire shape.
//
// The merge contract (kept deliberately shallow):
//
// - Both inputs are expected to be the Claude-style
// `{"mcpServers": {<name>: <object>}}` shape every supported runtime
// consumes via execenv's cursor / openclaw / opencode / codex / hermes
// sidecar generators. Anything not under `mcpServers` is preserved from
// the AGENT side only — overlays today only carry server entries and
// should not silently introduce other top-level keys.
//
// - Merge is by SERVER NAME (the inner-map key under `mcpServers`).
// On a name collision the OVERLAY wins. This is on purpose: the overlay
// carries the live, user-scoped session URL (e.g. the user's own
// Composio MCP bearer), whereas the agent's saved entry under the same
// name — if any — would be a stale or admin-shared placeholder.
//
// - Either side may be empty / nil / the literal `null`. Empty-everywhere
// returns `nil` so the daemon's `hasManagedCursorMcpConfig` short-circuit
// keeps treating the task as "no managed MCP at all".
//
// Failure mode: on malformed input the agent config is returned unchanged
// (with the error). Callers must never silently drop the agent's saved
// servers because the overlay JSON was bad — that would surprise-disable
// existing MCP tools.
func mergeMCPOverlay(agentMcpConfig, overlay json.RawMessage) (json.RawMessage, error) {
if !hasManagedJSON(overlay) {
return passthroughAgentMcpConfig(agentMcpConfig), nil
}
if !hasManagedJSON(agentMcpConfig) {
// Re-marshal the overlay alone so the daemon receives the exact
// canonical shape (the input may have been stored with arbitrary
// whitespace by Postgres' JSONB representation).
var oCfg map[string]json.RawMessage
if err := json.Unmarshal(overlay, &oCfg); err != nil {
return nil, fmt.Errorf("merge mcp overlay: parse overlay: %w", err)
}
out, err := json.Marshal(oCfg)
if err != nil {
return nil, fmt.Errorf("merge mcp overlay: marshal overlay: %w", err)
}
return out, nil
}
var aCfg map[string]json.RawMessage
if err := json.Unmarshal(agentMcpConfig, &aCfg); err != nil {
// Agent config malformed: surface the error but return the original
// bytes unchanged so the agent's setup behavior matches the no-overlay
// path (which would also have shipped the bad bytes downstream).
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: parse agent mcp_config: %w", err)
}
var oCfg map[string]json.RawMessage
if err := json.Unmarshal(overlay, &oCfg); err != nil {
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: parse overlay: %w", err)
}
// Pull each side's `mcpServers` sub-map, default to empty so a
// well-formed top level with no servers is treated like absent.
aServers, err := unmarshalServerMap(aCfg["mcpServers"])
if err != nil {
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: agent mcpServers: %w", err)
}
oServers, err := unmarshalServerMap(oCfg["mcpServers"])
if err != nil {
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: overlay mcpServers: %w", err)
}
merged := make(map[string]json.RawMessage, len(aServers)+len(oServers))
for k, v := range aServers {
merged[k] = v
}
// Overlay wins on collisions.
for k, v := range oServers {
merged[k] = v
}
// Rebuild: keep any non-mcpServers top-level keys from the agent config,
// then write the merged mcpServers map back. This is the only place we
// touch top-level keys; everything else is left alone.
out := make(map[string]json.RawMessage, len(aCfg)+1)
for k, v := range aCfg {
if k == "mcpServers" {
continue
}
out[k] = v
}
if len(merged) > 0 {
serversBytes, err := json.Marshal(merged)
if err != nil {
return nil, fmt.Errorf("merge mcp overlay: marshal merged servers: %w", err)
}
out["mcpServers"] = serversBytes
}
if len(out) == 0 {
return nil, nil
}
final, err := json.Marshal(out)
if err != nil {
return nil, fmt.Errorf("merge mcp overlay: marshal merged: %w", err)
}
return final, nil
}
// hasManagedJSON reports whether a raw JSON column carries an actual managed
// payload (non-empty and not the literal `null`). Matches the convention
// hasManagedCursorMcpConfig uses on the daemon side so the merge respects
// the same "absent" semantics every consumer already agrees on.
func hasManagedJSON(raw json.RawMessage) bool {
trimmed := bytes.TrimSpace(raw)
return len(trimmed) > 0 && !bytes.Equal(trimmed, []byte("null"))
}
// passthroughAgentMcpConfig returns the agent config unchanged, or nil when
// it is absent. Used on the early-return paths so a caller assigning the
// return value into a wire field gets the same nil/non-nil split as today.
func passthroughAgentMcpConfig(agentMcpConfig json.RawMessage) json.RawMessage {
if !hasManagedJSON(agentMcpConfig) {
return nil
}
return agentMcpConfig
}
// unmarshalServerMap decodes the `mcpServers` sub-object into a map keyed by
// server name. A nil/absent value returns an empty map (not an error) so the
// caller can compose without an extra nil-check.
func unmarshalServerMap(raw json.RawMessage) (map[string]json.RawMessage, error) {
if !hasManagedJSON(raw) {
return map[string]json.RawMessage{}, nil
}
var m map[string]json.RawMessage
if err := json.Unmarshal(raw, &m); err != nil {
return nil, err
}
if m == nil {
return map[string]json.RawMessage{}, nil
}
// Reject non-object server entries early — every runtime expects the
// inner value to be an object and would 500 in the sidecar generator
// otherwise. Mirrors parseCursorManagedMcpServers' guard.
for name, server := range m {
if name == "" {
return nil, errors.New("mcp server name must not be empty")
}
trimmed := bytes.TrimSpace(server)
if len(trimmed) == 0 || trimmed[0] != '{' {
return nil, fmt.Errorf("mcpServers.%s must be a JSON object", name)
}
}
return m, nil
}

View File

@@ -0,0 +1,196 @@
package handler
import (
"encoding/json"
"testing"
)
// TestMergeMCPOverlayAgentNilOverlayNil covers the "no managed MCP anywhere"
// branch: both inputs absent must return nil so the daemon's "managed
// mcp_config?" short-circuit treats the task as no-config — exactly the
// behavior tasks had before Stage 3 introduced the overlay column.
func TestMergeMCPOverlayAgentNilOverlayNil(t *testing.T) {
cases := []struct {
name string
agent json.RawMessage
overlay json.RawMessage
}{
{"both_nil", nil, nil},
{"agent_null_overlay_nil", json.RawMessage("null"), nil},
{"agent_nil_overlay_null", nil, json.RawMessage("null")},
{"agent_empty_overlay_empty", json.RawMessage(""), json.RawMessage("")},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, err := mergeMCPOverlay(tc.agent, tc.overlay)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != nil {
t.Errorf("expected nil result, got %s", string(got))
}
})
}
}
// TestMergeMCPOverlayAgentOnly covers the "no Composio for this task" path:
// every existing agent.mcp_config must be passed through unchanged so
// Stage 3 cannot break MCP setup for tasks where the initiator has no
// connections.
func TestMergeMCPOverlayAgentOnly(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx","args":["mcp-server-fetch"]}}}`)
got, err := mergeMCPOverlay(agent, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Pass-through preserves the exact bytes (no re-marshal) so the
// existing trust-byte-identity tests upstream are unaffected.
if string(got) != string(agent) {
t.Errorf("expected pass-through, got %s", string(got))
}
}
// TestMergeMCPOverlayOverlayOnly covers the "agent has no mcp_config" path:
// the overlay is canonicalized through json.Marshal so the daemon receives
// a deterministic shape regardless of how Postgres' JSONB serialized the
// stored value.
func TestMergeMCPOverlayOverlayOnly(t *testing.T) {
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc","headers":{"Authorization":"Bearer mcp_xyz"}}}}`)
got, err := mergeMCPOverlay(nil, overlay)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var cfg map[string]any
if err := json.Unmarshal(got, &cfg); err != nil {
t.Fatalf("unmarshal result: %v", err)
}
servers, ok := cfg["mcpServers"].(map[string]any)
if !ok {
t.Fatalf("missing mcpServers, got %s", string(got))
}
if _, ok := servers["composio"]; !ok {
t.Errorf("expected composio server, got %s", string(got))
}
}
// TestMergeMCPOverlayMergesBothSides — the headline case: agent's saved
// servers must survive, and the overlay's composio entry must appear alongside.
func TestMergeMCPOverlayMergesBothSides(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"},"github":{"command":"npx"}}}`)
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
got, err := mergeMCPOverlay(agent, overlay)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var cfg map[string]any
if err := json.Unmarshal(got, &cfg); err != nil {
t.Fatalf("unmarshal result: %v", err)
}
servers, ok := cfg["mcpServers"].(map[string]any)
if !ok {
t.Fatalf("missing mcpServers, got %s", string(got))
}
for _, want := range []string{"fetch", "github", "composio"} {
if _, ok := servers[want]; !ok {
t.Errorf("missing server %q in merged result %s", want, string(got))
}
}
}
// TestMergeMCPOverlayCollisionOverlayWins — the contract the comment on
// mergeMCPOverlay calls out explicitly. The overlay carries the live, user-
// scoped session URL; on a name collision it must win so the daemon's
// sidecar generator emits the live URL, not whatever placeholder the agent
// had saved.
func TestMergeMCPOverlayCollisionOverlayWins(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://placeholder.example/old"}}}`)
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/new"}}}`)
got, err := mergeMCPOverlay(agent, overlay)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var cfg map[string]map[string]map[string]any
if err := json.Unmarshal(got, &cfg); err != nil {
t.Fatalf("unmarshal result: %v", err)
}
gotURL, _ := cfg["mcpServers"]["composio"]["url"].(string)
if gotURL != "https://mcp.composio.dev/s/new" {
t.Errorf("collision: expected overlay URL to win, got %q", gotURL)
}
}
// TestMergeMCPOverlayPreservesAgentTopLevelKeys — anything outside
// `mcpServers` on the agent side must be preserved. The overlay only
// carries server entries; it must not silently strip non-server top-level
// keys the agent admin saved.
func TestMergeMCPOverlayPreservesAgentTopLevelKeys(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}},"experimental":{"foo":"bar"}}`)
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
got, err := mergeMCPOverlay(agent, overlay)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var cfg map[string]json.RawMessage
if err := json.Unmarshal(got, &cfg); err != nil {
t.Fatalf("unmarshal result: %v", err)
}
if _, ok := cfg["experimental"]; !ok {
t.Errorf("expected experimental key preserved, got %s", string(got))
}
}
// TestMergeMCPOverlayBadOverlayFallsBackToAgent — a malformed overlay must
// not surprise-disable the agent's saved servers. The merge returns the
// agent config unchanged plus an error so the caller can log.
func TestMergeMCPOverlayBadOverlayFallsBackToAgent(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`)
overlay := json.RawMessage(`{ this is not json`)
got, err := mergeMCPOverlay(agent, overlay)
if err == nil {
t.Fatalf("expected parse error, got nil")
}
if string(got) != string(agent) {
t.Errorf("expected agent config preserved on overlay parse failure, got %s", string(got))
}
}
// TestMergeMCPOverlayBadAgentReturnsBytesAndError — symmetric guard: a
// malformed agent.mcp_config must not panic the merger. We return the
// original bytes and surface the parse error so the daemon's downstream
// sidecar generator can give the existing error path (instead of a
// half-merged config).
func TestMergeMCPOverlayBadAgentReturnsBytesAndError(t *testing.T) {
agent := json.RawMessage(`{ this is not json`)
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
got, err := mergeMCPOverlay(agent, overlay)
if err == nil {
t.Fatalf("expected parse error, got nil")
}
if string(got) != string(agent) {
t.Errorf("expected agent bytes returned unchanged, got %s", string(got))
}
}
// TestMergeMCPOverlayRejectsNonObjectServer pins the type guard: an
// mcpServers map whose value is a primitive (or array) is rejected, so a
// future bug that wrote `mcpServers: {composio: "https://..."}` (a string
// instead of an object) doesn't quietly travel through the merge and blow
// up in execenv's sidecar generator. Same behavior as
// parseCursorManagedMcpServers.
func TestMergeMCPOverlayRejectsNonObjectServer(t *testing.T) {
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`)
overlay := json.RawMessage(`{"mcpServers":{"composio":"not-an-object"}}`)
if _, err := mergeMCPOverlay(agent, overlay); err == nil {
t.Fatalf("expected error for non-object server, got nil")
}
}

View File

@@ -0,0 +1,115 @@
package composio
import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5/pgtype"
)
// mcpOverlayServerName is the deterministic key under `mcpServers` used to
// place the Composio session into the merged MCP config. Daemon-side merge
// is by server name, so this constant is the integration's namespace: a
// future provider adding its own overlay must pick a distinct name (e.g.
// "pipedream") to avoid collisions, and an agent's own `mcp_config` entry
// named "composio" is overridden by this overlay on purpose — the overlay
// carries the live user-scoped session URL, the agent config carries a
// generic service-wide entry at most.
const mcpOverlayServerName = "composio"
// composioMCPServer is the wire shape of one MCP server entry in the
// Claude-style `{"mcpServers": {...}}` config that every supported runtime
// (Cursor, Codex, Claude, OpenCode, OpenClaw, Hermes/Kiro) consumes.
//
// `type: http` is what marks the entry as a streamable HTTP MCP endpoint —
// the form Composio's session helper returns. Headers carry the per-session
// bearer token (`Authorization: Bearer mcp_…`). Bearer secret material in
// the value, so callers must NEVER log this struct without redacting
// Headers; the daemon's redact pipeline already pattern-matches the
// `Bearer mcp_…` shape, but the safe rule remains "log the URL only".
type composioMCPServer struct {
Type string `json:"type"`
URL string `json:"url"`
Headers map[string]string `json:"headers,omitempty"`
}
// mcpOverlayPayload is the per-task overlay JSON written to
// agent_task_queue.runtime_mcp_overlay and read by the daemon claim handler
// at task dispatch.
//
// Shape is deliberately a subset of agent.mcp_config (Claude-style
// `mcpServers` map) so the daemon's merge is a flat dictionary union keyed
// by server name. Anything more elaborate (capability filtering, env
// injection, …) would force every sidecar generator to learn about overlays
// individually; keeping the shape identical lets the merge stay pure
// substitution.
type mcpOverlayPayload struct {
MCPServers map[string]composioMCPServer `json:"mcpServers"`
}
// BuildTaskOverlay returns the JSON overlay to write into
// agent_task_queue.runtime_mcp_overlay for a task whose initiator is the
// given Multica user, or (nil, nil) when the user has no active Composio
// connections — in which case no Composio session is created and no token
// is provisioned.
//
// The overlay shape is exactly the one the daemon-side merge expects:
//
// {"mcpServers": {"composio": {"type": "http", "url": "...", "headers": {...}}}}
//
// The (nil, nil) early return is load-bearing for cost / privacy reasons:
//
// - cost: each Composio MCP session is a separate session id at Composio,
// so we do not provision one for users who would have nothing to call
// anyway. This makes the integration scale with the active-connect
// population, not the total task population.
//
// - privacy: a user without any connection has not consented to any
// third-party reach — emitting an overlay would still attach a bearer
// scoped to their composio user namespace, which is wasted attack
// surface.
//
// Idempotency: this is called per-enqueue, not per-claim, so a single task
// has at most one overlay generated and stored. A task that gets re-enqueued
// (only via the retry path, which inserts a fresh row, not the same row)
// will compute a new overlay for the new row — the parent row's overlay is
// already cleared by the terminal-state trigger by the time retry runs.
//
// Errors are returned so the caller can decide whether to fail the enqueue
// (probably no — best-effort enqueue keeps the agent runnable without
// Composio tools) or just log and continue. CreateMCPSession failures are
// expected to be transient (Composio API outage / network blip).
func (s *Service) BuildTaskOverlay(ctx context.Context, userID pgtype.UUID) (json.RawMessage, error) {
session, err := s.CreateMCPSession(ctx, userID)
if err != nil {
return nil, fmt.Errorf("composio: build task overlay: %w", err)
}
if session == nil {
// No active connections — see method comment for why this is the
// load-bearing zero-cost path.
return nil, nil
}
if session.URL == "" {
// Defensive: Composio returned a session row but no URL. Treat as
// "no overlay" rather than writing a half-baked entry; the daemon
// would otherwise wire up an MCP server with an empty URL which
// every runtime fails on noisily.
return nil, nil
}
payload := mcpOverlayPayload{
MCPServers: map[string]composioMCPServer{
mcpOverlayServerName: {
Type: "http",
URL: session.URL,
Headers: session.Headers,
},
},
}
raw, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("composio: marshal task overlay: %w", err)
}
return raw, nil
}

View File

@@ -0,0 +1,172 @@
package composio
import (
"context"
"encoding/json"
"errors"
"strings"
"testing"
"github.com/jackc/pgx/v5/pgtype"
sdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// seedActiveConnection writes a single active row for the user/toolkit pair
// so BuildTaskOverlay's "user has at least one active connection" branch is
// reachable in tests without touching the real Composio API or DB.
func seedActiveConnection(t *testing.T, store *fakeStore, userID pgtype.UUID, toolkit, connectedAccountID string) {
t.Helper()
if _, err := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
UserID: userID,
ToolkitSlug: toolkit,
AuthConfigID: "ac_test",
ConnectedAccountID: connectedAccountID,
ComposioUserID: uuidToString(userID),
}); err != nil {
t.Fatalf("seed: %v", err)
}
}
func uuidToString(u pgtype.UUID) string {
if !u.Valid {
return ""
}
b := u.Bytes
const hex = "0123456789abcdef"
out := make([]byte, 36)
idx := 0
for i, x := range b {
out[idx] = hex[x>>4]
out[idx+1] = hex[x&0xf]
idx += 2
if i == 3 || i == 5 || i == 7 || i == 9 {
out[idx] = '-'
idx++
}
}
return string(out)
}
// TestBuildTaskOverlay_NoConnections is the load-bearing zero-cost branch:
// a user with no active rows must NOT cause CreateMCPSession to fire, and
// must NOT emit any overlay JSON. This is the property that lets Composio
// scale with the active-connect population, not the total task population.
func TestBuildTaskOverlay_NoConnections(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{}
store := newFakeStore()
svc := newTestService(t, sdkFake, store)
user := mintUUID(7)
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if overlay != nil {
t.Errorf("expected nil overlay for user with no connections, got %s", string(overlay))
}
if sdkFake.createSessCalls != 0 {
t.Errorf("expected zero CreateSession calls for empty user, got %d", sdkFake.createSessCalls)
}
}
// TestBuildTaskOverlay_WithConnections — the happy path. The overlay must
// be the exact shape the daemon-side merge expects:
//
// {"mcpServers": {"composio": {"type": "http", "url": "...", "headers": {...}}}}
//
// We also assert that CreateSession was sent the Multica user id verbatim
// (the composio_user_id == Multica user id invariant the rest of the
// integration depends on).
func TestBuildTaskOverlay_WithConnections(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
createSessResp: &sdk.CreateSessionResponse{
MCP: sdk.MCPDescriptor{URL: "https://mcp.composio.dev/session/abc"},
},
}
store := newFakeStore()
svc := newTestService(t, sdkFake, store)
user := mintUUID(13)
seedActiveConnection(t, store, user, "notion", "ca_user_notion")
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(overlay) == 0 {
t.Fatalf("expected non-empty overlay, got nil")
}
// MCP session was provisioned with the Multica user id verbatim.
if sdkFake.lastSessReq.UserID != uuidToString(user) {
t.Errorf("CreateSession user id: got %q, want %q", sdkFake.lastSessReq.UserID, uuidToString(user))
}
var payload mcpOverlayPayload
if err := json.Unmarshal(overlay, &payload); err != nil {
t.Fatalf("unmarshal overlay: %v", err)
}
srv, ok := payload.MCPServers[mcpOverlayServerName]
if !ok {
t.Fatalf("overlay missing %q server, got %s", mcpOverlayServerName, string(overlay))
}
if srv.Type != "http" {
t.Errorf("type: got %q, want \"http\"", srv.Type)
}
if srv.URL != "https://mcp.composio.dev/session/abc" {
t.Errorf("url: got %q", srv.URL)
}
if srv.Headers["x-api-key"] != "secret" {
t.Errorf("headers missing x-api-key: %v", srv.Headers)
}
}
// TestBuildTaskOverlay_EmptyURL guards a defensive branch: Composio
// returning a 200 with an empty mcp.url must not produce a half-baked
// overlay (every runtime sidecar generator would emit a server with an
// empty URL, breaking the task).
func TestBuildTaskOverlay_EmptyURL(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
createSessResp: &sdk.CreateSessionResponse{
MCP: sdk.MCPDescriptor{URL: ""},
},
}
store := newFakeStore()
svc := newTestService(t, sdkFake, store)
user := mintUUID(14)
seedActiveConnection(t, store, user, "github", "ca_user_github")
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if overlay != nil {
t.Errorf("expected nil overlay when MCP URL is empty, got %s", string(overlay))
}
}
// TestBuildTaskOverlay_SDKError — an SDK failure (Composio outage, network
// blip, …) must surface as an error so the caller can log it. The caller
// (TaskService.applyRuntimeMCPOverlay) is responsible for swallowing the
// error and proceeding with no overlay — best-effort enqueue.
func TestBuildTaskOverlay_SDKError(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{createSessErr: errors.New("composio: 503 backend")}
store := newFakeStore()
svc := newTestService(t, sdkFake, store)
user := mintUUID(15)
seedActiveConnection(t, store, user, "slack", "ca_user_slack")
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
if err == nil {
t.Fatalf("expected error from SDK failure, got nil")
}
if !strings.Contains(err.Error(), "create session") {
t.Errorf("error should mention create session, got %v", err)
}
if overlay != nil {
t.Errorf("expected nil overlay on SDK error, got %s", string(overlay))
}
}

View File

@@ -39,12 +39,34 @@ type TaskService struct {
// goes through the DB. Wired in router.go from the shared Redis
// client.
EmptyClaim *EmptyClaimCache
// Composio computes the per-task MCP overlay (Stage 3 of the Composio
// epic, MUL-3721) — the integration's "current user's connected apps
// → MCP session URL" hook called from each Enqueue* path. Optional: a
// nil ComposioOverlayBuilder turns the overlay step into a no-op so
// every Multica deployment that hasn't enabled Composio behaves
// exactly as before. Wired in router.go after composiointeg.NewService
// succeeds; the concrete type is *composio.Service.
Composio ComposioOverlayBuilder
analyticsContextMu sync.Mutex
analyticsContextCache map[string]analytics.TaskContext
analyticsContextOrder []string
}
// ComposioOverlayBuilder is the seam TaskService uses to build the per-task
// MCP overlay at enqueue time. Implemented by
// internal/integrations/composio.Service.BuildTaskOverlay; tests provide an
// inline fake so they don't have to spin a fake Composio SDK.
//
// Contract: (nil, nil) means "user has no active connections, do not write
// any overlay onto the task". Any non-nil JSON is the exact value to store
// in agent_task_queue.runtime_mcp_overlay; non-nil error is surfaced to the
// caller but treated as best-effort — failed overlay computation must not
// fail the enqueue.
type ComposioOverlayBuilder interface {
BuildTaskOverlay(ctx context.Context, userID pgtype.UUID) (json.RawMessage, error)
}
type TaskWakeupNotifier interface {
NotifyTaskAvailable(runtimeID, taskID string)
}
@@ -144,6 +166,83 @@ func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQu
}
}
// applyRuntimeMCPOverlay computes the per-task Composio MCP overlay for the
// task's initiator user and writes it to agent_task_queue.runtime_mcp_overlay.
// Called by every Enqueue* path AFTER the task row is inserted and BEFORE
// the daemon is notified, so by the time the claim handler reads the row
// the overlay is in place (or has been deterministically skipped).
//
// Best-effort: any failure here is logged and swallowed. The agent must
// still run with its base agent.mcp_config — losing third-party tools is a
// degraded UX, refusing to enqueue is a worse one.
//
// No-op when:
// - s.Composio is nil — Composio integration not configured (the common
// case for any deployment that hasn't set COMPOSIO_API_KEY).
// - initiatorUserID is not a valid UUID — the task has no attributable
// human initiator (autopilot, system-driven, etc.), so there is no
// "current user's connected apps" to reflect.
// - BuildTaskOverlay returns (nil, nil) — the initiator user has zero
// active connections.
func (s *TaskService) applyRuntimeMCPOverlay(ctx context.Context, taskID, initiatorUserID pgtype.UUID) {
if s == nil || s.Composio == nil {
return
}
if !initiatorUserID.Valid {
return
}
if !taskID.Valid {
return
}
overlay, err := s.Composio.BuildTaskOverlay(ctx, initiatorUserID)
if err != nil {
slog.Warn("runtime mcp overlay: BuildTaskOverlay failed; task will run without composio overlay",
"task_id", util.UUIDToString(taskID),
"initiator_user_id", util.UUIDToString(initiatorUserID),
"error", err,
)
return
}
if len(overlay) == 0 {
return
}
if err := s.Queries.SetAgentTaskRuntimeMCPOverlay(ctx, db.SetAgentTaskRuntimeMCPOverlayParams{
ID: taskID,
RuntimeMcpOverlay: overlay,
}); err != nil {
slog.Warn("runtime mcp overlay: SetAgentTaskRuntimeMCPOverlay failed; task will run without composio overlay",
"task_id", util.UUIDToString(taskID),
"error", err,
)
return
}
slog.Debug("runtime mcp overlay: attached composio session",
"task_id", util.UUIDToString(taskID),
"initiator_user_id", util.UUIDToString(initiatorUserID),
)
}
// resolveInitiatorFromTriggerComment returns the comment author's user UUID
// when the trigger comment was authored by a member, otherwise an invalid
// pgtype.UUID. Used by the issue and mention enqueue paths to feed the
// per-task overlay builder. Agent-authored trigger comments — e.g. a squad
// leader fan-out — are intentionally NOT treated as initiators here: their
// "user-connected apps" view is empty by construction and we want to avoid
// spending a Composio session for a guaranteed-empty overlay.
func (s *TaskService) resolveInitiatorFromTriggerComment(ctx context.Context, commentID pgtype.UUID) pgtype.UUID {
if !commentID.Valid {
return pgtype.UUID{}
}
comment, err := s.Queries.GetComment(ctx, commentID)
if err != nil {
return pgtype.UUID{}
}
if comment.AuthorType != "member" {
return pgtype.UUID{}
}
return comment.AuthorID
}
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
@@ -500,6 +599,7 @@ func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, trig
// queued broadcast afterwards risks the dispatch event reaching clients
// before the queued one (rare but unsafe-by-construction). Publishing
// in the desired observe-order makes correctness independent of timing.
s.applyRuntimeMCPOverlay(ctx, task.ID, s.resolveInitiatorFromTriggerComment(ctx, triggerCommentID))
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
@@ -568,6 +668,7 @@ func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, ag
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "is_leader_task", isLeader)
// See EnqueueTaskForIssue for ordering rationale.
s.applyRuntimeMCPOverlay(ctx, task.ID, s.resolveInitiatorFromTriggerComment(ctx, triggerCommentID))
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
@@ -692,6 +793,7 @@ func (s *TaskService) EnqueueQuickCreateTask(ctx context.Context, workspaceID, r
// cycle. Without this the user perceives "quick create never
// triggered" because the modal closes immediately and the task
// sits in 'queued' until the next sleepWithContextOrWakeup tick.
s.applyRuntimeMCPOverlay(ctx, task.ID, requesterID)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
}
@@ -768,6 +870,7 @@ func (s *TaskService) EnqueueChatTask(ctx context.Context, chatSession db.ChatSe
slog.Info("chat task enqueued", "task_id", util.UUIDToString(task.ID), "chat_session_id", util.UUIDToString(chatSession.ID), "agent_id", util.UUIDToString(chatSession.AgentID))
// See EnqueueTaskForIssue for ordering rationale.
s.applyRuntimeMCPOverlay(ctx, task.ID, initiatorUserID)
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
s.NotifyTaskEnqueued(ctx, task)
return task, nil
@@ -1649,6 +1752,16 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
// Retry creates a fresh queued row, same status transition (∅ → queued)
// as EnqueueTaskFor*. Broadcast queued first, then notify the daemon —
// see EnqueueTaskForIssue for ordering rationale.
//
// The parent task's initiator_user_id is the canonical reference here:
// the user behind the original run has not changed and the new row
// inherits the same attribution at the DB layer (see
// agent.sql / CreateRetryTask). NOTE: CreateRetryTask currently does
// NOT copy initiator_user_id forward to the child, so on retry we read
// the parent's value directly to keep the overlay attached. If a future
// schema change starts copying it, this still works (parent vs child
// carry the same value).
s.applyRuntimeMCPOverlay(ctx, child.ID, parent.InitiatorUserID)
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, child)
s.NotifyTaskEnqueued(ctx, child)
return &child, nil

View File

@@ -0,0 +1,100 @@
package service
import (
"context"
"encoding/json"
"errors"
"testing"
"github.com/jackc/pgx/v5/pgtype"
)
// stubOverlayBuilder records every call so the test can assert which
// branches of TaskService.applyRuntimeMCPOverlay reached the builder
// vs short-circuited before it.
type stubOverlayBuilder struct {
calls int
lastUser pgtype.UUID
resp json.RawMessage
err error
respIsNil bool
}
func (s *stubOverlayBuilder) BuildTaskOverlay(_ context.Context, userID pgtype.UUID) (json.RawMessage, error) {
s.calls++
s.lastUser = userID
if s.err != nil {
return nil, s.err
}
if s.respIsNil {
return nil, nil
}
return s.resp, nil
}
// TestApplyRuntimeMCPOverlay_NoComposioIsNoOp pins the safety-net that
// makes Stage 3 a pure addition for every Multica deployment that hasn't
// enabled Composio: when s.Composio is nil, the helper must not panic
// (Queries can legitimately be nil in unit-test setup) and must not call
// any builder. This is the property that lets every existing enqueue test
// keep passing without instantiating a Composio service.
func TestApplyRuntimeMCPOverlay_NoComposioIsNoOp(t *testing.T) {
t.Parallel()
svc := &TaskService{} // Queries nil on purpose
taskID := pgtype.UUID{Bytes: [16]byte{0x01}, Valid: true}
userID := pgtype.UUID{Bytes: [16]byte{0x02}, Valid: true}
// Should not panic and should not touch Queries.
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
}
// TestApplyRuntimeMCPOverlay_InvalidInitiatorIsNoOp covers the
// no-attributable-human-initiator path (autopilot, system-driven). When
// the initiator UUID is not valid we must NOT call BuildTaskOverlay — we
// must not pay the Composio session cost for a guaranteed-empty overlay.
func TestApplyRuntimeMCPOverlay_InvalidInitiatorIsNoOp(t *testing.T) {
t.Parallel()
builder := &stubOverlayBuilder{}
svc := &TaskService{Composio: builder}
taskID := pgtype.UUID{Bytes: [16]byte{0x03}, Valid: true}
svc.applyRuntimeMCPOverlay(context.Background(), taskID, pgtype.UUID{}) // invalid
if builder.calls != 0 {
t.Errorf("expected 0 builder calls for invalid initiator, got %d", builder.calls)
}
}
// TestApplyRuntimeMCPOverlay_NilOverlaySkipsUpdate — BuildTaskOverlay
// returning (nil, nil) means the user has no active connections. The
// helper must short-circuit before touching Queries so a unit-test setup
// with Queries=nil is safe and so we don't issue a pointless UPDATE
// against the queue table in production.
func TestApplyRuntimeMCPOverlay_NilOverlaySkipsUpdate(t *testing.T) {
t.Parallel()
builder := &stubOverlayBuilder{respIsNil: true}
svc := &TaskService{Composio: builder} // Queries nil on purpose
taskID := pgtype.UUID{Bytes: [16]byte{0x04}, Valid: true}
userID := pgtype.UUID{Bytes: [16]byte{0x05}, Valid: true}
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
if builder.calls != 1 {
t.Errorf("expected exactly 1 builder call, got %d", builder.calls)
}
}
// TestApplyRuntimeMCPOverlay_BuilderErrorSwallowed — best-effort enqueue:
// a builder error (Composio outage etc.) must be logged + swallowed so
// the task still queues. We assert the helper does not panic and does
// not attempt the UPDATE, which would crash with a nil Queries.
func TestApplyRuntimeMCPOverlay_BuilderErrorSwallowed(t *testing.T) {
t.Parallel()
builder := &stubOverlayBuilder{err: errors.New("upstream 503")}
svc := &TaskService{Composio: builder}
taskID := pgtype.UUID{Bytes: [16]byte{0x06}, Valid: true}
userID := pgtype.UUID{Bytes: [16]byte{0x07}, Valid: true}
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
if builder.calls != 1 {
t.Errorf("expected 1 builder call, got %d", builder.calls)
}
}

View File

@@ -0,0 +1,3 @@
DROP TRIGGER IF EXISTS trg_clear_runtime_mcp_overlay ON agent_task_queue;
DROP FUNCTION IF EXISTS clear_runtime_mcp_overlay_on_terminal_state();
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS runtime_mcp_overlay;

View File

@@ -0,0 +1,53 @@
-- Per-task MCP overlay computed at task enqueue (dispatch) time, layered on
-- top of agent.mcp_config when the daemon prepares the execution environment.
--
-- Stage 3 of the Composio epic (MUL-3721 / MUL-3715): the Composio integration
-- writes a fresh `{"mcpServers": {"composio": {"type": "http", "url": "...",
-- "headers": {"Authorization": "Bearer ..."}}}}` here for every task whose
-- initiator user has at least one active connection. The daemon claim
-- handler merges this on top of agent.mcp_config (overlay wins on name
-- collisions because it carries the live, user-scoped MCP session URL).
--
-- The value is short-lived: it carries a bearer token that becomes useless
-- once the task ends, so we wipe it on every terminal state transition via
-- the trigger below. Worst case a still-active task row keeps the token for
-- the duration of the run, mode-0600 in DB and never logged.
ALTER TABLE agent_task_queue
ADD COLUMN runtime_mcp_overlay JSONB NULL;
COMMENT ON COLUMN agent_task_queue.runtime_mcp_overlay IS
'Per-task MCP servers computed at dispatch time, merged on top of agent.mcp_config. Currently used by Composio integration to inject the initiator user''s session URL. Cleared after task completes via trg_clear_runtime_mcp_overlay.';
-- Auto-clear the overlay the moment the task enters any terminal state, so
-- the row in the queue never keeps a stale bearer beyond the live run. This
-- is a defense-in-depth measure on top of how short the Composio session
-- token already is — clearing here means a future audit of the table never
-- finds bearers attached to rows that finished hours/days ago.
--
-- Trigger-based rather than per-query SET clauses because the terminal
-- transitions live in ~12 different sqlc queries (CompleteAgentTask,
-- FailAgentTask, FailStaleTasks, ExpireStaleQueuedTasks, FailTasksFor
-- OfflineRuntimes, RecoverOrphanedTasksForRuntime, plus six Cancel*
-- variants); a trigger is a single source of truth that future queries
-- can't bypass.
CREATE OR REPLACE FUNCTION clear_runtime_mcp_overlay_on_terminal_state()
RETURNS TRIGGER AS $$
BEGIN
-- Only act on actual transitions INTO a terminal state from a non-terminal
-- one, and only when there is something to wipe. Re-touching an already-
-- terminal row (or a row whose overlay is already NULL) leaves the
-- column unchanged and the trigger is a cheap no-op.
IF NEW.status IN ('completed', 'failed', 'cancelled')
AND OLD.status IS DISTINCT FROM NEW.status
AND NEW.runtime_mcp_overlay IS NOT NULL THEN
NEW.runtime_mcp_overlay := NULL;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_clear_runtime_mcp_overlay ON agent_task_queue;
CREATE TRIGGER trg_clear_runtime_mcp_overlay
BEFORE UPDATE OF status ON agent_task_queue
FOR EACH ROW
EXECUTE FUNCTION clear_runtime_mcp_overlay_on_terminal_state();

View File

@@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// sqlc v1.30.0
// source: agent.sql
package db
@@ -178,7 +178,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -215,6 +215,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -223,7 +224,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
@@ -271,6 +272,7 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -286,7 +288,7 @@ const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :m
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Cancels active tasks belonging to a chat session. Called from
@@ -334,6 +336,7 @@ func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSession
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -349,7 +352,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Cancels every active task on the issue and returns the affected rows so the
@@ -397,6 +400,7 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -412,7 +416,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type CancelAgentTasksByIssueAndAgentParams struct {
@@ -464,6 +468,7 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -479,7 +484,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Cancels active tasks whose trigger is the given comment. Called when a
@@ -527,6 +532,7 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -567,7 +573,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type ClaimAgentTaskParams struct {
@@ -618,6 +624,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, arg ClaimAgentTaskParams)
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -701,7 +708,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4, prepare_lease_expires_at = NULL
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type CompleteAgentTaskParams struct {
@@ -750,6 +757,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -855,7 +863,7 @@ VALUES (
$9,
$10
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type CreateAgentTaskParams struct {
@@ -916,6 +924,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -923,7 +932,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type CreateQuickCreateTaskParams struct {
@@ -975,6 +984,7 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -998,7 +1008,7 @@ SELECT
p.squad_id
FROM agent_task_queue p
WHERE p.id = $1
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Clones a parent task into a fresh queued attempt. Carries forward the
@@ -1046,6 +1056,7 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -1069,7 +1080,7 @@ FROM victims v
WHERE t.id = v.id
AND t.status = 'queued'
AND t.created_at < now() - make_interval(secs => $1::double precision)
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id, t.runtime_mcp_overlay
`
type ExpireStaleQueuedTasksParams struct {
@@ -1140,6 +1151,7 @@ func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQue
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -1158,7 +1170,7 @@ WHERE id = $1
AND runtime_id = $2
AND status IN ('dispatched', 'waiting_local_directory')
AND started_at IS NULL
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type ExtendAgentTaskPrepareLeaseParams struct {
@@ -1205,6 +1217,7 @@ func (q *Queries) ExtendAgentTaskPrepareLease(ctx context.Context, arg ExtendAge
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -1219,7 +1232,7 @@ SET status = 'failed',
work_dir = COALESCE($5, work_dir),
prepare_lease_expires_at = NULL
WHERE id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type FailAgentTaskParams struct {
@@ -1279,6 +1292,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -1294,7 +1308,7 @@ WHERE (
AND (prepare_lease_expires_at IS NULL OR prepare_lease_expires_at < now())
)
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type FailStaleTasksParams struct {
@@ -1353,6 +1367,7 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams)
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -1476,7 +1491,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE id = $1
`
@@ -1514,12 +1529,13 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
const getAgentTaskInWorkspace = `-- name: GetAgentTaskInWorkspace :one
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id FROM agent_task_queue atq
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE atq.id = $1 AND a.workspace_id = $2
`
@@ -1570,6 +1586,7 @@ func (q *Queries) GetAgentTaskInWorkspace(ctx context.Context, arg GetAgentTaskI
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -1999,7 +2016,7 @@ func (q *Queries) ListActiveAgentsByRuntimeForUpdate(ctx context.Context, runtim
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
ORDER BY created_at DESC
`
@@ -2049,6 +2066,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2061,7 +2079,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@@ -2106,6 +2124,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2216,7 +2235,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
@@ -2261,6 +2280,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2273,7 +2293,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
}
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC
`
@@ -2326,6 +2346,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2338,7 +2359,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
@@ -2383,6 +2404,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2395,15 +2417,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
}
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id FROM agent_task_queue atq
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
UNION ALL
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id, t.runtime_mcp_overlay FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
@@ -2470,6 +2492,7 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2487,7 +2510,7 @@ SET status = 'waiting_local_directory',
wait_reason = $2,
prepare_lease_expires_at = now() + make_interval(secs => $3::double precision)
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type MarkAgentTaskWaitingLocalDirectoryParams struct {
@@ -2539,6 +2562,7 @@ func (q *Queries) MarkAgentTaskWaitingLocalDirectory(ctx context.Context, arg Ma
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -2558,7 +2582,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
type ReclaimStaleDispatchedTaskForRuntimeParams struct {
@@ -2606,6 +2630,7 @@ func (q *Queries) ReclaimStaleDispatchedTaskForRuntime(ctx context.Context, arg
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}
@@ -2619,7 +2644,7 @@ SET status = 'failed',
wait_reason = NULL,
prepare_lease_expires_at = NULL
WHERE runtime_id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Called by the daemon at startup. Atomically fails any dispatched/running/
@@ -2668,6 +2693,7 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
); err != nil {
return nil, err
}
@@ -2756,6 +2782,29 @@ func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, erro
return i, err
}
const setAgentTaskRuntimeMCPOverlay = `-- name: SetAgentTaskRuntimeMCPOverlay :exec
UPDATE agent_task_queue
SET runtime_mcp_overlay = $2
WHERE id = $1 AND status = 'queued'
`
type SetAgentTaskRuntimeMCPOverlayParams struct {
ID pgtype.UUID `json:"id"`
RuntimeMcpOverlay []byte `json:"runtime_mcp_overlay"`
}
// Attaches the per-task MCP overlay (computed at dispatch time from the
// initiator user's active integrations — currently Composio) to a task
// still in 'queued'. The status guard means a task that already started
// (or was cancelled / failed between Create and this update) is never
// rewritten, so the daemon never reads a half-applied overlay. The trigger
// trg_clear_runtime_mcp_overlay automatically wipes the column whenever a
// task enters a terminal state, so callers do not need a paired clear.
func (q *Queries) SetAgentTaskRuntimeMCPOverlay(ctx context.Context, arg SetAgentTaskRuntimeMCPOverlayParams) error {
_, err := q.db.Exec(ctx, setAgentTaskRuntimeMCPOverlay, arg.ID, arg.RuntimeMcpOverlay)
return err
}
const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running',
@@ -2763,7 +2812,7 @@ SET status = 'running',
wait_reason = NULL,
prepare_lease_expires_at = NULL
WHERE id = $1 AND status IN ('dispatched', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
`
// Transitions a task to running. Accepts either 'dispatched' (the normal
@@ -2806,6 +2855,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.HandoffNote,
&i.PrepareLeaseExpiresAt,
&i.SquadID,
&i.RuntimeMcpOverlay,
)
return i, err
}

View File

@@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// sqlc v1.30.0
package db
@@ -102,6 +102,8 @@ type AgentTaskQueue struct {
HandoffNote pgtype.Text `json:"handoff_note"`
PrepareLeaseExpiresAt pgtype.Timestamptz `json:"prepare_lease_expires_at"`
SquadID pgtype.UUID `json:"squad_id"`
// Per-task MCP servers computed at dispatch time, merged on top of agent.mcp_config. Currently used by Composio integration to inject the initiator user's session URL. Cleared after task completes via trg_clear_runtime_mcp_overlay.
RuntimeMcpOverlay []byte `json:"runtime_mcp_overlay"`
}
type Attachment struct {

View File

@@ -167,6 +167,18 @@ UPDATE agent_task_queue
SET issue_id = $2
WHERE id = $1 AND issue_id IS NULL;
-- name: SetAgentTaskRuntimeMCPOverlay :exec
-- Attaches the per-task MCP overlay (computed at dispatch time from the
-- initiator user's active integrations — currently Composio) to a task
-- still in 'queued'. The status guard means a task that already started
-- (or was cancelled / failed between Create and this update) is never
-- rewritten, so the daemon never reads a half-applied overlay. The trigger
-- trg_clear_runtime_mcp_overlay automatically wipes the column whenever a
-- task enters a terminal state, so callers do not need a paired clear.
UPDATE agent_task_queue
SET runtime_mcp_overlay = $2
WHERE id = $1 AND status = 'queued';
-- name: CreateRetryTask :one
-- Clones a parent task into a fresh queued attempt. Carries forward the
-- agent's resume context (session_id/work_dir) so the child can continue