mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-30 02:51:19 +02:00
Compare commits
1 Commits
feature/co
...
feature/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21cd617d16 |
@@ -1,68 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestComposioCallbackIsPublic_NoCookieNot401 locks in the MUL-3843 fix: the
|
||||
// Composio OAuth callback must live OUTSIDE the Auth middleware group, because
|
||||
// Composio 302-redirects the user's browser to it and the cookie session is
|
||||
// frequently absent (expired session, SameSite=Strict / Safari ITP, private
|
||||
// window, self-hosted callback subdomain). Before the fix the route sat under
|
||||
// Auth, so a cookie-less browser got a hard 401 and a JSON blob instead of the
|
||||
// settings redirect — the exact symptom Yushen hit.
|
||||
//
|
||||
// With no COMPOSIO_API_KEY configured in the test env, h.Composio == nil, so a
|
||||
// cookie-less hit on the callback now reaches the handler and returns 503
|
||||
// ("not configured") rather than being short-circuited to 401 by the Auth
|
||||
// middleware. The precise non-401 code is incidental; what this test pins is
|
||||
// that the request is NOT rejected by auth.
|
||||
func TestComposioCallbackIsPublic_NoCookieNot401(t *testing.T) {
|
||||
// Deliberately send NO Authorization header / cookie — simulate the
|
||||
// cookie-stripped browser redirect coming back from Composio.
|
||||
resp, err := http.Get(testServer.URL + "/api/integrations/composio/callback?state=bogus&status=success&connected_account_id=ca_x")
|
||||
if err != nil {
|
||||
t.Fatalf("callback request failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
t.Fatalf("callback returned 401 without a session — it is still behind the Auth group (regression of MUL-3843). body=%s", body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestComposioNonCallbackEndpointsStayGated is the other half of the invariant:
|
||||
// moving the callback out of the Auth group must NOT loosen the four
|
||||
// session-scoped endpoints. A cookie-less request to them must still 401.
|
||||
func TestComposioNonCallbackEndpointsStayGated(t *testing.T) {
|
||||
gated := []struct {
|
||||
method string
|
||||
path string
|
||||
}{
|
||||
{http.MethodPost, "/api/integrations/composio/connect/init"},
|
||||
{http.MethodGet, "/api/integrations/composio/toolkits"},
|
||||
{http.MethodGet, "/api/integrations/composio/connections"},
|
||||
{http.MethodDelete, "/api/integrations/composio/connections/11111111-1111-1111-1111-111111111111"},
|
||||
}
|
||||
for _, tc := range gated {
|
||||
t.Run(tc.method+" "+tc.path, func(t *testing.T) {
|
||||
req, err := http.NewRequest(tc.method, testServer.URL+tc.path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("build request: %v", err)
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("request failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("expected 401 without a session, got %d — endpoint is no longer auth-gated", resp.StatusCode)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -471,6 +471,17 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
slog.Error("composio: service init failed; composio integration disabled", "error", serr)
|
||||
} else {
|
||||
h.Composio = svc
|
||||
// Stage 3 (MUL-3721) hook: feed the per-task MCP
|
||||
// overlay builder into TaskService so every Enqueue*
|
||||
// path attaches the initiator user's Composio session
|
||||
// URL to the task row before the daemon claims it.
|
||||
// taskSvc already exists by this point — it was
|
||||
// constructed inside NewHandler — and exposes its
|
||||
// Composio field for exactly this kind of late wiring,
|
||||
// so no Handler-level mutation is needed.
|
||||
if h.TaskService != nil {
|
||||
h.TaskService.Composio = svc
|
||||
}
|
||||
slog.Info("composio integration enabled")
|
||||
}
|
||||
}
|
||||
@@ -612,18 +623,6 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
// HandleCloudBillingStripeWebhook for the rationale).
|
||||
r.Post("/api/webhooks/stripe", h.HandleCloudBillingStripeWebhook)
|
||||
|
||||
// Composio OAuth callback (MUL-3843). NOT under the Auth group on purpose:
|
||||
// Composio 302-redirects the user's browser here at the end of the OAuth
|
||||
// flow, and the cookie session is frequently absent (expired session,
|
||||
// SameSite=Strict / Safari ITP stripping cross-site cookies, private
|
||||
// windows, self-hosted callbacks on a different subdomain). Identity is NOT
|
||||
// taken from the session — it comes from the HMAC-signed `state` query
|
||||
// param, which CompleteCallback verifies (signature, expiry, replay) before
|
||||
// doing anything. h.Composio == nil still returns 503. Keeping it inside the
|
||||
// Auth group made a missing cookie a hard 401, breaking the flow for exactly
|
||||
// the browsers above; the other four composio endpoints stay session-gated.
|
||||
r.Get("/api/integrations/composio/callback", h.ComposioCallback)
|
||||
|
||||
// Daemon API routes (require daemon token or valid user token)
|
||||
r.Route("/api/daemon", func(r chi.Router) {
|
||||
r.Use(middleware.DaemonAuth(queries, patCache, daemonTokenCache, cloudPATVerifier))
|
||||
@@ -782,12 +781,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
r.Post("/api/lark/binding/redeem", h.RedeemLarkBindingToken)
|
||||
|
||||
// Composio integration (MUL-3720). User-scoped (no workspace context):
|
||||
// a connection belongs to a user. These four require a logged-in
|
||||
// session; the OAuth callback is the outlier and lives outside the Auth
|
||||
// group (registered above with the other public OAuth/webhook routes —
|
||||
// see MUL-3843). All return 503 when COMPOSIO_API_KEY is unset.
|
||||
// a connection belongs to a user. The callback is a GET so it works as
|
||||
// a top-level browser redirect under cookie auth (no CSRF gate on GET).
|
||||
// All four return 503 when COMPOSIO_API_KEY is unset.
|
||||
r.Route("/api/integrations/composio", func(r chi.Router) {
|
||||
r.Post("/connect/init", h.ComposioConnectInit)
|
||||
r.Get("/callback", h.ComposioCallback)
|
||||
r.Get("/toolkits", h.ListComposioToolkits)
|
||||
r.Get("/connections", h.ListComposioConnections)
|
||||
r.Delete("/connections/{id}", h.DeleteComposioConnection)
|
||||
|
||||
@@ -1304,6 +1304,19 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
if agent.McpConfig != nil {
|
||||
mcpConfig = json.RawMessage(agent.McpConfig)
|
||||
}
|
||||
// Layer the per-task overlay (set at enqueue from the initiator
|
||||
// user's active integrations — currently Composio) on top of the
|
||||
// agent's saved mcp_config. Overlay wins on server-name collisions
|
||||
// because it carries the live user-scoped session URL. Errors are
|
||||
// logged but never fail the claim: a broken overlay must not prevent
|
||||
// the agent from running with its base config.
|
||||
if len(task.RuntimeMcpOverlay) > 0 {
|
||||
if merged, err := mergeMCPOverlay(mcpConfig, json.RawMessage(task.RuntimeMcpOverlay)); err != nil {
|
||||
slog.Warn("daemon claim: merge runtime_mcp_overlay failed; falling back to agent mcp_config", "task_id", uuidToString(task.ID), "error", err)
|
||||
} else {
|
||||
mcpConfig = merged
|
||||
}
|
||||
}
|
||||
// runtime_config is stored as JSONB and may legitimately be the
|
||||
// empty object `{}` for agents that haven't opted into any
|
||||
// provider-specific tuning. Forward only non-empty payloads so the
|
||||
|
||||
@@ -10,16 +10,11 @@ import (
|
||||
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
|
||||
)
|
||||
|
||||
// Composio integration handlers (MUL-3720, Stage 2 MVP). A Composio connection
|
||||
// belongs to a user, not a workspace, so these handlers live outside the
|
||||
// workspace-membership group. The four management endpoints (connect/init,
|
||||
// toolkits, connections, delete) are user-scoped (requireUserID) and sit under
|
||||
// the Auth middleware. ComposioCallback is the exception: it is a public route
|
||||
// (outside the Auth group, see router.go / MUL-3843) because the browser often
|
||||
// arrives without a session cookie — its identity comes from the signed state,
|
||||
// not requireUserID. The whole block returns 503 when h.Composio is nil
|
||||
// (COMPOSIO_API_KEY unset), matching the Lark/GitHub "integration not
|
||||
// configured" convention.
|
||||
// Composio integration handlers (MUL-3720, Stage 2 MVP). All routes are
|
||||
// user-scoped (requireUserID) and live outside the workspace-membership group —
|
||||
// a Composio connection belongs to a user, not a workspace. The whole block
|
||||
// returns 503 when h.Composio is nil (COMPOSIO_API_KEY unset), matching the
|
||||
// Lark/GitHub "integration not configured" convention.
|
||||
|
||||
// ComposioConnectInitRequest is the POST /connect/init body.
|
||||
type ComposioConnectInitRequest struct {
|
||||
@@ -92,15 +87,13 @@ func (h *Handler) ComposioConnectInit(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// ComposioCallback (GET /api/integrations/composio/callback) is the browser
|
||||
// redirect target Composio sends the user back to after the hosted flow. It is
|
||||
// registered as a PUBLIC route (outside the Auth middleware group — see
|
||||
// router.go / MUL-3843), because the browser frequently lands here without a
|
||||
// session cookie (expired session, SameSite/ITP stripping, private window,
|
||||
// self-hosted callback subdomain). Identity therefore comes solely from the
|
||||
// HMAC-signed `state` query param, which CompleteCallback verifies before
|
||||
// doing anything. On success the row is upserted and the browser is redirected
|
||||
// to the settings page; any failure redirects to the same page with a stable
|
||||
// error code so the user is never left on a blank API response.
|
||||
// redirect target Composio sends the user back to after the hosted flow. The
|
||||
// signed `state` query param is the source of truth for the user identity, so
|
||||
// the request is attributed from it (the route still sits under Auth so the
|
||||
// browser session is present, but identity comes from the state). On success
|
||||
// the row is upserted and the browser is redirected to the settings page; any
|
||||
// failure redirects to the same page with a stable error code so the user is
|
||||
// never left on a blank API response.
|
||||
func (h *Handler) ComposioCallback(w http.ResponseWriter, r *http.Request) {
|
||||
if h.Composio == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
|
||||
|
||||
160
server/internal/handler/mcp_overlay.go
Normal file
160
server/internal/handler/mcp_overlay.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// mergeMCPOverlay layers a per-task overlay on top of an agent's saved
|
||||
// mcp_config and returns the merged JSON for the daemon claim wire shape.
|
||||
//
|
||||
// The merge contract (kept deliberately shallow):
|
||||
//
|
||||
// - Both inputs are expected to be the Claude-style
|
||||
// `{"mcpServers": {<name>: <object>}}` shape every supported runtime
|
||||
// consumes via execenv's cursor / openclaw / opencode / codex / hermes
|
||||
// sidecar generators. Anything not under `mcpServers` is preserved from
|
||||
// the AGENT side only — overlays today only carry server entries and
|
||||
// should not silently introduce other top-level keys.
|
||||
//
|
||||
// - Merge is by SERVER NAME (the inner-map key under `mcpServers`).
|
||||
// On a name collision the OVERLAY wins. This is on purpose: the overlay
|
||||
// carries the live, user-scoped session URL (e.g. the user's own
|
||||
// Composio MCP bearer), whereas the agent's saved entry under the same
|
||||
// name — if any — would be a stale or admin-shared placeholder.
|
||||
//
|
||||
// - Either side may be empty / nil / the literal `null`. Empty-everywhere
|
||||
// returns `nil` so the daemon's `hasManagedCursorMcpConfig` short-circuit
|
||||
// keeps treating the task as "no managed MCP at all".
|
||||
//
|
||||
// Failure mode: on malformed input the agent config is returned unchanged
|
||||
// (with the error). Callers must never silently drop the agent's saved
|
||||
// servers because the overlay JSON was bad — that would surprise-disable
|
||||
// existing MCP tools.
|
||||
func mergeMCPOverlay(agentMcpConfig, overlay json.RawMessage) (json.RawMessage, error) {
|
||||
if !hasManagedJSON(overlay) {
|
||||
return passthroughAgentMcpConfig(agentMcpConfig), nil
|
||||
}
|
||||
if !hasManagedJSON(agentMcpConfig) {
|
||||
// Re-marshal the overlay alone so the daemon receives the exact
|
||||
// canonical shape (the input may have been stored with arbitrary
|
||||
// whitespace by Postgres' JSONB representation).
|
||||
var oCfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(overlay, &oCfg); err != nil {
|
||||
return nil, fmt.Errorf("merge mcp overlay: parse overlay: %w", err)
|
||||
}
|
||||
out, err := json.Marshal(oCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("merge mcp overlay: marshal overlay: %w", err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var aCfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(agentMcpConfig, &aCfg); err != nil {
|
||||
// Agent config malformed: surface the error but return the original
|
||||
// bytes unchanged so the agent's setup behavior matches the no-overlay
|
||||
// path (which would also have shipped the bad bytes downstream).
|
||||
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: parse agent mcp_config: %w", err)
|
||||
}
|
||||
var oCfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(overlay, &oCfg); err != nil {
|
||||
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: parse overlay: %w", err)
|
||||
}
|
||||
|
||||
// Pull each side's `mcpServers` sub-map, default to empty so a
|
||||
// well-formed top level with no servers is treated like absent.
|
||||
aServers, err := unmarshalServerMap(aCfg["mcpServers"])
|
||||
if err != nil {
|
||||
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: agent mcpServers: %w", err)
|
||||
}
|
||||
oServers, err := unmarshalServerMap(oCfg["mcpServers"])
|
||||
if err != nil {
|
||||
return passthroughAgentMcpConfig(agentMcpConfig), fmt.Errorf("merge mcp overlay: overlay mcpServers: %w", err)
|
||||
}
|
||||
|
||||
merged := make(map[string]json.RawMessage, len(aServers)+len(oServers))
|
||||
for k, v := range aServers {
|
||||
merged[k] = v
|
||||
}
|
||||
// Overlay wins on collisions.
|
||||
for k, v := range oServers {
|
||||
merged[k] = v
|
||||
}
|
||||
|
||||
// Rebuild: keep any non-mcpServers top-level keys from the agent config,
|
||||
// then write the merged mcpServers map back. This is the only place we
|
||||
// touch top-level keys; everything else is left alone.
|
||||
out := make(map[string]json.RawMessage, len(aCfg)+1)
|
||||
for k, v := range aCfg {
|
||||
if k == "mcpServers" {
|
||||
continue
|
||||
}
|
||||
out[k] = v
|
||||
}
|
||||
if len(merged) > 0 {
|
||||
serversBytes, err := json.Marshal(merged)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("merge mcp overlay: marshal merged servers: %w", err)
|
||||
}
|
||||
out["mcpServers"] = serversBytes
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
final, err := json.Marshal(out)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("merge mcp overlay: marshal merged: %w", err)
|
||||
}
|
||||
return final, nil
|
||||
}
|
||||
|
||||
// hasManagedJSON reports whether a raw JSON column carries an actual managed
|
||||
// payload (non-empty and not the literal `null`). Matches the convention
|
||||
// hasManagedCursorMcpConfig uses on the daemon side so the merge respects
|
||||
// the same "absent" semantics every consumer already agrees on.
|
||||
func hasManagedJSON(raw json.RawMessage) bool {
|
||||
trimmed := bytes.TrimSpace(raw)
|
||||
return len(trimmed) > 0 && !bytes.Equal(trimmed, []byte("null"))
|
||||
}
|
||||
|
||||
// passthroughAgentMcpConfig returns the agent config unchanged, or nil when
|
||||
// it is absent. Used on the early-return paths so a caller assigning the
|
||||
// return value into a wire field gets the same nil/non-nil split as today.
|
||||
func passthroughAgentMcpConfig(agentMcpConfig json.RawMessage) json.RawMessage {
|
||||
if !hasManagedJSON(agentMcpConfig) {
|
||||
return nil
|
||||
}
|
||||
return agentMcpConfig
|
||||
}
|
||||
|
||||
// unmarshalServerMap decodes the `mcpServers` sub-object into a map keyed by
|
||||
// server name. A nil/absent value returns an empty map (not an error) so the
|
||||
// caller can compose without an extra nil-check.
|
||||
func unmarshalServerMap(raw json.RawMessage) (map[string]json.RawMessage, error) {
|
||||
if !hasManagedJSON(raw) {
|
||||
return map[string]json.RawMessage{}, nil
|
||||
}
|
||||
var m map[string]json.RawMessage
|
||||
if err := json.Unmarshal(raw, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m == nil {
|
||||
return map[string]json.RawMessage{}, nil
|
||||
}
|
||||
// Reject non-object server entries early — every runtime expects the
|
||||
// inner value to be an object and would 500 in the sidecar generator
|
||||
// otherwise. Mirrors parseCursorManagedMcpServers' guard.
|
||||
for name, server := range m {
|
||||
if name == "" {
|
||||
return nil, errors.New("mcp server name must not be empty")
|
||||
}
|
||||
trimmed := bytes.TrimSpace(server)
|
||||
if len(trimmed) == 0 || trimmed[0] != '{' {
|
||||
return nil, fmt.Errorf("mcpServers.%s must be a JSON object", name)
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
196
server/internal/handler/mcp_overlay_test.go
Normal file
196
server/internal/handler/mcp_overlay_test.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestMergeMCPOverlayAgentNilOverlayNil covers the "no managed MCP anywhere"
|
||||
// branch: both inputs absent must return nil so the daemon's "managed
|
||||
// mcp_config?" short-circuit treats the task as no-config — exactly the
|
||||
// behavior tasks had before Stage 3 introduced the overlay column.
|
||||
func TestMergeMCPOverlayAgentNilOverlayNil(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
agent json.RawMessage
|
||||
overlay json.RawMessage
|
||||
}{
|
||||
{"both_nil", nil, nil},
|
||||
{"agent_null_overlay_nil", json.RawMessage("null"), nil},
|
||||
{"agent_nil_overlay_null", nil, json.RawMessage("null")},
|
||||
{"agent_empty_overlay_empty", json.RawMessage(""), json.RawMessage("")},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, err := mergeMCPOverlay(tc.agent, tc.overlay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got != nil {
|
||||
t.Errorf("expected nil result, got %s", string(got))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayAgentOnly covers the "no Composio for this task" path:
|
||||
// every existing agent.mcp_config must be passed through unchanged so
|
||||
// Stage 3 cannot break MCP setup for tasks where the initiator has no
|
||||
// connections.
|
||||
func TestMergeMCPOverlayAgentOnly(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx","args":["mcp-server-fetch"]}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
// Pass-through preserves the exact bytes (no re-marshal) so the
|
||||
// existing trust-byte-identity tests upstream are unaffected.
|
||||
if string(got) != string(agent) {
|
||||
t.Errorf("expected pass-through, got %s", string(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayOverlayOnly covers the "agent has no mcp_config" path:
|
||||
// the overlay is canonicalized through json.Marshal so the daemon receives
|
||||
// a deterministic shape regardless of how Postgres' JSONB serialized the
|
||||
// stored value.
|
||||
func TestMergeMCPOverlayOverlayOnly(t *testing.T) {
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc","headers":{"Authorization":"Bearer mcp_xyz"}}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(nil, overlay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
var cfg map[string]any
|
||||
if err := json.Unmarshal(got, &cfg); err != nil {
|
||||
t.Fatalf("unmarshal result: %v", err)
|
||||
}
|
||||
servers, ok := cfg["mcpServers"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("missing mcpServers, got %s", string(got))
|
||||
}
|
||||
if _, ok := servers["composio"]; !ok {
|
||||
t.Errorf("expected composio server, got %s", string(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayMergesBothSides — the headline case: agent's saved
|
||||
// servers must survive, and the overlay's composio entry must appear alongside.
|
||||
func TestMergeMCPOverlayMergesBothSides(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"},"github":{"command":"npx"}}}`)
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, overlay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
var cfg map[string]any
|
||||
if err := json.Unmarshal(got, &cfg); err != nil {
|
||||
t.Fatalf("unmarshal result: %v", err)
|
||||
}
|
||||
servers, ok := cfg["mcpServers"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("missing mcpServers, got %s", string(got))
|
||||
}
|
||||
for _, want := range []string{"fetch", "github", "composio"} {
|
||||
if _, ok := servers[want]; !ok {
|
||||
t.Errorf("missing server %q in merged result %s", want, string(got))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayCollisionOverlayWins — the contract the comment on
|
||||
// mergeMCPOverlay calls out explicitly. The overlay carries the live, user-
|
||||
// scoped session URL; on a name collision it must win so the daemon's
|
||||
// sidecar generator emits the live URL, not whatever placeholder the agent
|
||||
// had saved.
|
||||
func TestMergeMCPOverlayCollisionOverlayWins(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://placeholder.example/old"}}}`)
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/new"}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, overlay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
var cfg map[string]map[string]map[string]any
|
||||
if err := json.Unmarshal(got, &cfg); err != nil {
|
||||
t.Fatalf("unmarshal result: %v", err)
|
||||
}
|
||||
gotURL, _ := cfg["mcpServers"]["composio"]["url"].(string)
|
||||
if gotURL != "https://mcp.composio.dev/s/new" {
|
||||
t.Errorf("collision: expected overlay URL to win, got %q", gotURL)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayPreservesAgentTopLevelKeys — anything outside
|
||||
// `mcpServers` on the agent side must be preserved. The overlay only
|
||||
// carries server entries; it must not silently strip non-server top-level
|
||||
// keys the agent admin saved.
|
||||
func TestMergeMCPOverlayPreservesAgentTopLevelKeys(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}},"experimental":{"foo":"bar"}}`)
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, overlay)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
var cfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(got, &cfg); err != nil {
|
||||
t.Fatalf("unmarshal result: %v", err)
|
||||
}
|
||||
if _, ok := cfg["experimental"]; !ok {
|
||||
t.Errorf("expected experimental key preserved, got %s", string(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayBadOverlayFallsBackToAgent — a malformed overlay must
|
||||
// not surprise-disable the agent's saved servers. The merge returns the
|
||||
// agent config unchanged plus an error so the caller can log.
|
||||
func TestMergeMCPOverlayBadOverlayFallsBackToAgent(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`)
|
||||
overlay := json.RawMessage(`{ this is not json`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, overlay)
|
||||
if err == nil {
|
||||
t.Fatalf("expected parse error, got nil")
|
||||
}
|
||||
if string(got) != string(agent) {
|
||||
t.Errorf("expected agent config preserved on overlay parse failure, got %s", string(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayBadAgentReturnsBytesAndError — symmetric guard: a
|
||||
// malformed agent.mcp_config must not panic the merger. We return the
|
||||
// original bytes and surface the parse error so the daemon's downstream
|
||||
// sidecar generator can give the existing error path (instead of a
|
||||
// half-merged config).
|
||||
func TestMergeMCPOverlayBadAgentReturnsBytesAndError(t *testing.T) {
|
||||
agent := json.RawMessage(`{ this is not json`)
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":{"type":"http","url":"https://mcp.composio.dev/s/abc"}}}`)
|
||||
|
||||
got, err := mergeMCPOverlay(agent, overlay)
|
||||
if err == nil {
|
||||
t.Fatalf("expected parse error, got nil")
|
||||
}
|
||||
if string(got) != string(agent) {
|
||||
t.Errorf("expected agent bytes returned unchanged, got %s", string(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMergeMCPOverlayRejectsNonObjectServer pins the type guard: an
|
||||
// mcpServers map whose value is a primitive (or array) is rejected, so a
|
||||
// future bug that wrote `mcpServers: {composio: "https://..."}` (a string
|
||||
// instead of an object) doesn't quietly travel through the merge and blow
|
||||
// up in execenv's sidecar generator. Same behavior as
|
||||
// parseCursorManagedMcpServers.
|
||||
func TestMergeMCPOverlayRejectsNonObjectServer(t *testing.T) {
|
||||
agent := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`)
|
||||
overlay := json.RawMessage(`{"mcpServers":{"composio":"not-an-object"}}`)
|
||||
|
||||
if _, err := mergeMCPOverlay(agent, overlay); err == nil {
|
||||
t.Fatalf("expected error for non-object server, got nil")
|
||||
}
|
||||
}
|
||||
115
server/internal/integrations/composio/dispatch.go
Normal file
115
server/internal/integrations/composio/dispatch.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package composio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
// mcpOverlayServerName is the deterministic key under `mcpServers` used to
|
||||
// place the Composio session into the merged MCP config. Daemon-side merge
|
||||
// is by server name, so this constant is the integration's namespace: a
|
||||
// future provider adding its own overlay must pick a distinct name (e.g.
|
||||
// "pipedream") to avoid collisions, and an agent's own `mcp_config` entry
|
||||
// named "composio" is overridden by this overlay on purpose — the overlay
|
||||
// carries the live user-scoped session URL, the agent config carries a
|
||||
// generic service-wide entry at most.
|
||||
const mcpOverlayServerName = "composio"
|
||||
|
||||
// composioMCPServer is the wire shape of one MCP server entry in the
|
||||
// Claude-style `{"mcpServers": {...}}` config that every supported runtime
|
||||
// (Cursor, Codex, Claude, OpenCode, OpenClaw, Hermes/Kiro) consumes.
|
||||
//
|
||||
// `type: http` is what marks the entry as a streamable HTTP MCP endpoint —
|
||||
// the form Composio's session helper returns. Headers carry the per-session
|
||||
// bearer token (`Authorization: Bearer mcp_…`). Bearer secret material in
|
||||
// the value, so callers must NEVER log this struct without redacting
|
||||
// Headers; the daemon's redact pipeline already pattern-matches the
|
||||
// `Bearer mcp_…` shape, but the safe rule remains "log the URL only".
|
||||
type composioMCPServer struct {
|
||||
Type string `json:"type"`
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers,omitempty"`
|
||||
}
|
||||
|
||||
// mcpOverlayPayload is the per-task overlay JSON written to
|
||||
// agent_task_queue.runtime_mcp_overlay and read by the daemon claim handler
|
||||
// at task dispatch.
|
||||
//
|
||||
// Shape is deliberately a subset of agent.mcp_config (Claude-style
|
||||
// `mcpServers` map) so the daemon's merge is a flat dictionary union keyed
|
||||
// by server name. Anything more elaborate (capability filtering, env
|
||||
// injection, …) would force every sidecar generator to learn about overlays
|
||||
// individually; keeping the shape identical lets the merge stay pure
|
||||
// substitution.
|
||||
type mcpOverlayPayload struct {
|
||||
MCPServers map[string]composioMCPServer `json:"mcpServers"`
|
||||
}
|
||||
|
||||
// BuildTaskOverlay returns the JSON overlay to write into
|
||||
// agent_task_queue.runtime_mcp_overlay for a task whose initiator is the
|
||||
// given Multica user, or (nil, nil) when the user has no active Composio
|
||||
// connections — in which case no Composio session is created and no token
|
||||
// is provisioned.
|
||||
//
|
||||
// The overlay shape is exactly the one the daemon-side merge expects:
|
||||
//
|
||||
// {"mcpServers": {"composio": {"type": "http", "url": "...", "headers": {...}}}}
|
||||
//
|
||||
// The (nil, nil) early return is load-bearing for cost / privacy reasons:
|
||||
//
|
||||
// - cost: each Composio MCP session is a separate session id at Composio,
|
||||
// so we do not provision one for users who would have nothing to call
|
||||
// anyway. This makes the integration scale with the active-connect
|
||||
// population, not the total task population.
|
||||
//
|
||||
// - privacy: a user without any connection has not consented to any
|
||||
// third-party reach — emitting an overlay would still attach a bearer
|
||||
// scoped to their composio user namespace, which is wasted attack
|
||||
// surface.
|
||||
//
|
||||
// Idempotency: this is called per-enqueue, not per-claim, so a single task
|
||||
// has at most one overlay generated and stored. A task that gets re-enqueued
|
||||
// (only via the retry path, which inserts a fresh row, not the same row)
|
||||
// will compute a new overlay for the new row — the parent row's overlay is
|
||||
// already cleared by the terminal-state trigger by the time retry runs.
|
||||
//
|
||||
// Errors are returned so the caller can decide whether to fail the enqueue
|
||||
// (probably no — best-effort enqueue keeps the agent runnable without
|
||||
// Composio tools) or just log and continue. CreateMCPSession failures are
|
||||
// expected to be transient (Composio API outage / network blip).
|
||||
func (s *Service) BuildTaskOverlay(ctx context.Context, userID pgtype.UUID) (json.RawMessage, error) {
|
||||
session, err := s.CreateMCPSession(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("composio: build task overlay: %w", err)
|
||||
}
|
||||
if session == nil {
|
||||
// No active connections — see method comment for why this is the
|
||||
// load-bearing zero-cost path.
|
||||
return nil, nil
|
||||
}
|
||||
if session.URL == "" {
|
||||
// Defensive: Composio returned a session row but no URL. Treat as
|
||||
// "no overlay" rather than writing a half-baked entry; the daemon
|
||||
// would otherwise wire up an MCP server with an empty URL which
|
||||
// every runtime fails on noisily.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
payload := mcpOverlayPayload{
|
||||
MCPServers: map[string]composioMCPServer{
|
||||
mcpOverlayServerName: {
|
||||
Type: "http",
|
||||
URL: session.URL,
|
||||
Headers: session.Headers,
|
||||
},
|
||||
},
|
||||
}
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("composio: marshal task overlay: %w", err)
|
||||
}
|
||||
return raw, nil
|
||||
}
|
||||
172
server/internal/integrations/composio/dispatch_test.go
Normal file
172
server/internal/integrations/composio/dispatch_test.go
Normal file
@@ -0,0 +1,172 @@
|
||||
package composio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
sdk "github.com/multica-ai/multica/server/pkg/composio"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// seedActiveConnection writes a single active row for the user/toolkit pair
|
||||
// so BuildTaskOverlay's "user has at least one active connection" branch is
|
||||
// reachable in tests without touching the real Composio API or DB.
|
||||
func seedActiveConnection(t *testing.T, store *fakeStore, userID pgtype.UUID, toolkit, connectedAccountID string) {
|
||||
t.Helper()
|
||||
if _, err := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
|
||||
UserID: userID,
|
||||
ToolkitSlug: toolkit,
|
||||
AuthConfigID: "ac_test",
|
||||
ConnectedAccountID: connectedAccountID,
|
||||
ComposioUserID: uuidToString(userID),
|
||||
}); err != nil {
|
||||
t.Fatalf("seed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func uuidToString(u pgtype.UUID) string {
|
||||
if !u.Valid {
|
||||
return ""
|
||||
}
|
||||
b := u.Bytes
|
||||
const hex = "0123456789abcdef"
|
||||
out := make([]byte, 36)
|
||||
idx := 0
|
||||
for i, x := range b {
|
||||
out[idx] = hex[x>>4]
|
||||
out[idx+1] = hex[x&0xf]
|
||||
idx += 2
|
||||
if i == 3 || i == 5 || i == 7 || i == 9 {
|
||||
out[idx] = '-'
|
||||
idx++
|
||||
}
|
||||
}
|
||||
return string(out)
|
||||
}
|
||||
|
||||
// TestBuildTaskOverlay_NoConnections is the load-bearing zero-cost branch:
|
||||
// a user with no active rows must NOT cause CreateMCPSession to fire, and
|
||||
// must NOT emit any overlay JSON. This is the property that lets Composio
|
||||
// scale with the active-connect population, not the total task population.
|
||||
func TestBuildTaskOverlay_NoConnections(t *testing.T) {
|
||||
t.Parallel()
|
||||
sdkFake := &fakeSDK{}
|
||||
store := newFakeStore()
|
||||
svc := newTestService(t, sdkFake, store)
|
||||
|
||||
user := mintUUID(7)
|
||||
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if overlay != nil {
|
||||
t.Errorf("expected nil overlay for user with no connections, got %s", string(overlay))
|
||||
}
|
||||
if sdkFake.createSessCalls != 0 {
|
||||
t.Errorf("expected zero CreateSession calls for empty user, got %d", sdkFake.createSessCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildTaskOverlay_WithConnections — the happy path. The overlay must
|
||||
// be the exact shape the daemon-side merge expects:
|
||||
//
|
||||
// {"mcpServers": {"composio": {"type": "http", "url": "...", "headers": {...}}}}
|
||||
//
|
||||
// We also assert that CreateSession was sent the Multica user id verbatim
|
||||
// (the composio_user_id == Multica user id invariant the rest of the
|
||||
// integration depends on).
|
||||
func TestBuildTaskOverlay_WithConnections(t *testing.T) {
|
||||
t.Parallel()
|
||||
sdkFake := &fakeSDK{
|
||||
createSessResp: &sdk.CreateSessionResponse{
|
||||
MCP: sdk.MCPDescriptor{URL: "https://mcp.composio.dev/session/abc"},
|
||||
},
|
||||
}
|
||||
store := newFakeStore()
|
||||
svc := newTestService(t, sdkFake, store)
|
||||
user := mintUUID(13)
|
||||
seedActiveConnection(t, store, user, "notion", "ca_user_notion")
|
||||
|
||||
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(overlay) == 0 {
|
||||
t.Fatalf("expected non-empty overlay, got nil")
|
||||
}
|
||||
// MCP session was provisioned with the Multica user id verbatim.
|
||||
if sdkFake.lastSessReq.UserID != uuidToString(user) {
|
||||
t.Errorf("CreateSession user id: got %q, want %q", sdkFake.lastSessReq.UserID, uuidToString(user))
|
||||
}
|
||||
|
||||
var payload mcpOverlayPayload
|
||||
if err := json.Unmarshal(overlay, &payload); err != nil {
|
||||
t.Fatalf("unmarshal overlay: %v", err)
|
||||
}
|
||||
srv, ok := payload.MCPServers[mcpOverlayServerName]
|
||||
if !ok {
|
||||
t.Fatalf("overlay missing %q server, got %s", mcpOverlayServerName, string(overlay))
|
||||
}
|
||||
if srv.Type != "http" {
|
||||
t.Errorf("type: got %q, want \"http\"", srv.Type)
|
||||
}
|
||||
if srv.URL != "https://mcp.composio.dev/session/abc" {
|
||||
t.Errorf("url: got %q", srv.URL)
|
||||
}
|
||||
if srv.Headers["x-api-key"] != "secret" {
|
||||
t.Errorf("headers missing x-api-key: %v", srv.Headers)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildTaskOverlay_EmptyURL guards a defensive branch: Composio
|
||||
// returning a 200 with an empty mcp.url must not produce a half-baked
|
||||
// overlay (every runtime sidecar generator would emit a server with an
|
||||
// empty URL, breaking the task).
|
||||
func TestBuildTaskOverlay_EmptyURL(t *testing.T) {
|
||||
t.Parallel()
|
||||
sdkFake := &fakeSDK{
|
||||
createSessResp: &sdk.CreateSessionResponse{
|
||||
MCP: sdk.MCPDescriptor{URL: ""},
|
||||
},
|
||||
}
|
||||
store := newFakeStore()
|
||||
svc := newTestService(t, sdkFake, store)
|
||||
user := mintUUID(14)
|
||||
seedActiveConnection(t, store, user, "github", "ca_user_github")
|
||||
|
||||
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if overlay != nil {
|
||||
t.Errorf("expected nil overlay when MCP URL is empty, got %s", string(overlay))
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildTaskOverlay_SDKError — an SDK failure (Composio outage, network
|
||||
// blip, …) must surface as an error so the caller can log it. The caller
|
||||
// (TaskService.applyRuntimeMCPOverlay) is responsible for swallowing the
|
||||
// error and proceeding with no overlay — best-effort enqueue.
|
||||
func TestBuildTaskOverlay_SDKError(t *testing.T) {
|
||||
t.Parallel()
|
||||
sdkFake := &fakeSDK{createSessErr: errors.New("composio: 503 backend")}
|
||||
store := newFakeStore()
|
||||
svc := newTestService(t, sdkFake, store)
|
||||
user := mintUUID(15)
|
||||
seedActiveConnection(t, store, user, "slack", "ca_user_slack")
|
||||
|
||||
overlay, err := svc.BuildTaskOverlay(context.Background(), user)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error from SDK failure, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "create session") {
|
||||
t.Errorf("error should mention create session, got %v", err)
|
||||
}
|
||||
if overlay != nil {
|
||||
t.Errorf("expected nil overlay on SDK error, got %s", string(overlay))
|
||||
}
|
||||
}
|
||||
@@ -39,12 +39,34 @@ type TaskService struct {
|
||||
// goes through the DB. Wired in router.go from the shared Redis
|
||||
// client.
|
||||
EmptyClaim *EmptyClaimCache
|
||||
// Composio computes the per-task MCP overlay (Stage 3 of the Composio
|
||||
// epic, MUL-3721) — the integration's "current user's connected apps
|
||||
// → MCP session URL" hook called from each Enqueue* path. Optional: a
|
||||
// nil ComposioOverlayBuilder turns the overlay step into a no-op so
|
||||
// every Multica deployment that hasn't enabled Composio behaves
|
||||
// exactly as before. Wired in router.go after composiointeg.NewService
|
||||
// succeeds; the concrete type is *composio.Service.
|
||||
Composio ComposioOverlayBuilder
|
||||
|
||||
analyticsContextMu sync.Mutex
|
||||
analyticsContextCache map[string]analytics.TaskContext
|
||||
analyticsContextOrder []string
|
||||
}
|
||||
|
||||
// ComposioOverlayBuilder is the seam TaskService uses to build the per-task
|
||||
// MCP overlay at enqueue time. Implemented by
|
||||
// internal/integrations/composio.Service.BuildTaskOverlay; tests provide an
|
||||
// inline fake so they don't have to spin a fake Composio SDK.
|
||||
//
|
||||
// Contract: (nil, nil) means "user has no active connections, do not write
|
||||
// any overlay onto the task". Any non-nil JSON is the exact value to store
|
||||
// in agent_task_queue.runtime_mcp_overlay; non-nil error is surfaced to the
|
||||
// caller but treated as best-effort — failed overlay computation must not
|
||||
// fail the enqueue.
|
||||
type ComposioOverlayBuilder interface {
|
||||
BuildTaskOverlay(ctx context.Context, userID pgtype.UUID) (json.RawMessage, error)
|
||||
}
|
||||
|
||||
type TaskWakeupNotifier interface {
|
||||
NotifyTaskAvailable(runtimeID, taskID string)
|
||||
}
|
||||
@@ -144,6 +166,83 @@ func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQu
|
||||
}
|
||||
}
|
||||
|
||||
// applyRuntimeMCPOverlay computes the per-task Composio MCP overlay for the
|
||||
// task's initiator user and writes it to agent_task_queue.runtime_mcp_overlay.
|
||||
// Called by every Enqueue* path AFTER the task row is inserted and BEFORE
|
||||
// the daemon is notified, so by the time the claim handler reads the row
|
||||
// the overlay is in place (or has been deterministically skipped).
|
||||
//
|
||||
// Best-effort: any failure here is logged and swallowed. The agent must
|
||||
// still run with its base agent.mcp_config — losing third-party tools is a
|
||||
// degraded UX, refusing to enqueue is a worse one.
|
||||
//
|
||||
// No-op when:
|
||||
// - s.Composio is nil — Composio integration not configured (the common
|
||||
// case for any deployment that hasn't set COMPOSIO_API_KEY).
|
||||
// - initiatorUserID is not a valid UUID — the task has no attributable
|
||||
// human initiator (autopilot, system-driven, etc.), so there is no
|
||||
// "current user's connected apps" to reflect.
|
||||
// - BuildTaskOverlay returns (nil, nil) — the initiator user has zero
|
||||
// active connections.
|
||||
func (s *TaskService) applyRuntimeMCPOverlay(ctx context.Context, taskID, initiatorUserID pgtype.UUID) {
|
||||
if s == nil || s.Composio == nil {
|
||||
return
|
||||
}
|
||||
if !initiatorUserID.Valid {
|
||||
return
|
||||
}
|
||||
if !taskID.Valid {
|
||||
return
|
||||
}
|
||||
overlay, err := s.Composio.BuildTaskOverlay(ctx, initiatorUserID)
|
||||
if err != nil {
|
||||
slog.Warn("runtime mcp overlay: BuildTaskOverlay failed; task will run without composio overlay",
|
||||
"task_id", util.UUIDToString(taskID),
|
||||
"initiator_user_id", util.UUIDToString(initiatorUserID),
|
||||
"error", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
if len(overlay) == 0 {
|
||||
return
|
||||
}
|
||||
if err := s.Queries.SetAgentTaskRuntimeMCPOverlay(ctx, db.SetAgentTaskRuntimeMCPOverlayParams{
|
||||
ID: taskID,
|
||||
RuntimeMcpOverlay: overlay,
|
||||
}); err != nil {
|
||||
slog.Warn("runtime mcp overlay: SetAgentTaskRuntimeMCPOverlay failed; task will run without composio overlay",
|
||||
"task_id", util.UUIDToString(taskID),
|
||||
"error", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
slog.Debug("runtime mcp overlay: attached composio session",
|
||||
"task_id", util.UUIDToString(taskID),
|
||||
"initiator_user_id", util.UUIDToString(initiatorUserID),
|
||||
)
|
||||
}
|
||||
|
||||
// resolveInitiatorFromTriggerComment returns the comment author's user UUID
|
||||
// when the trigger comment was authored by a member, otherwise an invalid
|
||||
// pgtype.UUID. Used by the issue and mention enqueue paths to feed the
|
||||
// per-task overlay builder. Agent-authored trigger comments — e.g. a squad
|
||||
// leader fan-out — are intentionally NOT treated as initiators here: their
|
||||
// "user-connected apps" view is empty by construction and we want to avoid
|
||||
// spending a Composio session for a guaranteed-empty overlay.
|
||||
func (s *TaskService) resolveInitiatorFromTriggerComment(ctx context.Context, commentID pgtype.UUID) pgtype.UUID {
|
||||
if !commentID.Valid {
|
||||
return pgtype.UUID{}
|
||||
}
|
||||
comment, err := s.Queries.GetComment(ctx, commentID)
|
||||
if err != nil {
|
||||
return pgtype.UUID{}
|
||||
}
|
||||
if comment.AuthorType != "member" {
|
||||
return pgtype.UUID{}
|
||||
}
|
||||
return comment.AuthorID
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
@@ -500,6 +599,7 @@ func (s *TaskService) enqueueIssueTask(ctx context.Context, issue db.Issue, trig
|
||||
// queued broadcast afterwards risks the dispatch event reaching clients
|
||||
// before the queued one (rare but unsafe-by-construction). Publishing
|
||||
// in the desired observe-order makes correctness independent of timing.
|
||||
s.applyRuntimeMCPOverlay(ctx, task.ID, s.resolveInitiatorFromTriggerComment(ctx, triggerCommentID))
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
|
||||
s.NotifyTaskEnqueued(ctx, task)
|
||||
return task, nil
|
||||
@@ -568,6 +668,7 @@ func (s *TaskService) enqueueMentionTask(ctx context.Context, issue db.Issue, ag
|
||||
|
||||
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "is_leader_task", isLeader)
|
||||
// See EnqueueTaskForIssue for ordering rationale.
|
||||
s.applyRuntimeMCPOverlay(ctx, task.ID, s.resolveInitiatorFromTriggerComment(ctx, triggerCommentID))
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
|
||||
s.NotifyTaskEnqueued(ctx, task)
|
||||
return task, nil
|
||||
@@ -692,6 +793,7 @@ func (s *TaskService) EnqueueQuickCreateTask(ctx context.Context, workspaceID, r
|
||||
// cycle. Without this the user perceives "quick create never
|
||||
// triggered" because the modal closes immediately and the task
|
||||
// sits in 'queued' until the next sleepWithContextOrWakeup tick.
|
||||
s.applyRuntimeMCPOverlay(ctx, task.ID, requesterID)
|
||||
s.NotifyTaskEnqueued(ctx, task)
|
||||
return task, nil
|
||||
}
|
||||
@@ -768,6 +870,7 @@ func (s *TaskService) EnqueueChatTask(ctx context.Context, chatSession db.ChatSe
|
||||
|
||||
slog.Info("chat task enqueued", "task_id", util.UUIDToString(task.ID), "chat_session_id", util.UUIDToString(chatSession.ID), "agent_id", util.UUIDToString(chatSession.AgentID))
|
||||
// See EnqueueTaskForIssue for ordering rationale.
|
||||
s.applyRuntimeMCPOverlay(ctx, task.ID, initiatorUserID)
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, task)
|
||||
s.NotifyTaskEnqueued(ctx, task)
|
||||
return task, nil
|
||||
@@ -1649,6 +1752,16 @@ func (s *TaskService) MaybeRetryFailedTask(ctx context.Context, parent db.AgentT
|
||||
// Retry creates a fresh queued row, same status transition (∅ → queued)
|
||||
// as EnqueueTaskFor*. Broadcast queued first, then notify the daemon —
|
||||
// see EnqueueTaskForIssue for ordering rationale.
|
||||
//
|
||||
// The parent task's initiator_user_id is the canonical reference here:
|
||||
// the user behind the original run has not changed and the new row
|
||||
// inherits the same attribution at the DB layer (see
|
||||
// agent.sql / CreateRetryTask). NOTE: CreateRetryTask currently does
|
||||
// NOT copy initiator_user_id forward to the child, so on retry we read
|
||||
// the parent's value directly to keep the overlay attached. If a future
|
||||
// schema change starts copying it, this still works (parent vs child
|
||||
// carry the same value).
|
||||
s.applyRuntimeMCPOverlay(ctx, child.ID, parent.InitiatorUserID)
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskQueued, child)
|
||||
s.NotifyTaskEnqueued(ctx, child)
|
||||
return &child, nil
|
||||
|
||||
100
server/internal/service/task_runtime_mcp_overlay_test.go
Normal file
100
server/internal/service/task_runtime_mcp_overlay_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
// stubOverlayBuilder records every call so the test can assert which
|
||||
// branches of TaskService.applyRuntimeMCPOverlay reached the builder
|
||||
// vs short-circuited before it.
|
||||
type stubOverlayBuilder struct {
|
||||
calls int
|
||||
lastUser pgtype.UUID
|
||||
resp json.RawMessage
|
||||
err error
|
||||
respIsNil bool
|
||||
}
|
||||
|
||||
func (s *stubOverlayBuilder) BuildTaskOverlay(_ context.Context, userID pgtype.UUID) (json.RawMessage, error) {
|
||||
s.calls++
|
||||
s.lastUser = userID
|
||||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
if s.respIsNil {
|
||||
return nil, nil
|
||||
}
|
||||
return s.resp, nil
|
||||
}
|
||||
|
||||
// TestApplyRuntimeMCPOverlay_NoComposioIsNoOp pins the safety-net that
|
||||
// makes Stage 3 a pure addition for every Multica deployment that hasn't
|
||||
// enabled Composio: when s.Composio is nil, the helper must not panic
|
||||
// (Queries can legitimately be nil in unit-test setup) and must not call
|
||||
// any builder. This is the property that lets every existing enqueue test
|
||||
// keep passing without instantiating a Composio service.
|
||||
func TestApplyRuntimeMCPOverlay_NoComposioIsNoOp(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := &TaskService{} // Queries nil on purpose
|
||||
taskID := pgtype.UUID{Bytes: [16]byte{0x01}, Valid: true}
|
||||
userID := pgtype.UUID{Bytes: [16]byte{0x02}, Valid: true}
|
||||
|
||||
// Should not panic and should not touch Queries.
|
||||
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
|
||||
}
|
||||
|
||||
// TestApplyRuntimeMCPOverlay_InvalidInitiatorIsNoOp covers the
|
||||
// no-attributable-human-initiator path (autopilot, system-driven). When
|
||||
// the initiator UUID is not valid we must NOT call BuildTaskOverlay — we
|
||||
// must not pay the Composio session cost for a guaranteed-empty overlay.
|
||||
func TestApplyRuntimeMCPOverlay_InvalidInitiatorIsNoOp(t *testing.T) {
|
||||
t.Parallel()
|
||||
builder := &stubOverlayBuilder{}
|
||||
svc := &TaskService{Composio: builder}
|
||||
taskID := pgtype.UUID{Bytes: [16]byte{0x03}, Valid: true}
|
||||
|
||||
svc.applyRuntimeMCPOverlay(context.Background(), taskID, pgtype.UUID{}) // invalid
|
||||
if builder.calls != 0 {
|
||||
t.Errorf("expected 0 builder calls for invalid initiator, got %d", builder.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestApplyRuntimeMCPOverlay_NilOverlaySkipsUpdate — BuildTaskOverlay
|
||||
// returning (nil, nil) means the user has no active connections. The
|
||||
// helper must short-circuit before touching Queries so a unit-test setup
|
||||
// with Queries=nil is safe and so we don't issue a pointless UPDATE
|
||||
// against the queue table in production.
|
||||
func TestApplyRuntimeMCPOverlay_NilOverlaySkipsUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
builder := &stubOverlayBuilder{respIsNil: true}
|
||||
svc := &TaskService{Composio: builder} // Queries nil on purpose
|
||||
taskID := pgtype.UUID{Bytes: [16]byte{0x04}, Valid: true}
|
||||
userID := pgtype.UUID{Bytes: [16]byte{0x05}, Valid: true}
|
||||
|
||||
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
|
||||
if builder.calls != 1 {
|
||||
t.Errorf("expected exactly 1 builder call, got %d", builder.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestApplyRuntimeMCPOverlay_BuilderErrorSwallowed — best-effort enqueue:
|
||||
// a builder error (Composio outage etc.) must be logged + swallowed so
|
||||
// the task still queues. We assert the helper does not panic and does
|
||||
// not attempt the UPDATE, which would crash with a nil Queries.
|
||||
func TestApplyRuntimeMCPOverlay_BuilderErrorSwallowed(t *testing.T) {
|
||||
t.Parallel()
|
||||
builder := &stubOverlayBuilder{err: errors.New("upstream 503")}
|
||||
svc := &TaskService{Composio: builder}
|
||||
taskID := pgtype.UUID{Bytes: [16]byte{0x06}, Valid: true}
|
||||
userID := pgtype.UUID{Bytes: [16]byte{0x07}, Valid: true}
|
||||
|
||||
svc.applyRuntimeMCPOverlay(context.Background(), taskID, userID)
|
||||
if builder.calls != 1 {
|
||||
t.Errorf("expected 1 builder call, got %d", builder.calls)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
DROP TRIGGER IF EXISTS trg_clear_runtime_mcp_overlay ON agent_task_queue;
|
||||
DROP FUNCTION IF EXISTS clear_runtime_mcp_overlay_on_terminal_state();
|
||||
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS runtime_mcp_overlay;
|
||||
@@ -0,0 +1,53 @@
|
||||
-- Per-task MCP overlay computed at task enqueue (dispatch) time, layered on
|
||||
-- top of agent.mcp_config when the daemon prepares the execution environment.
|
||||
--
|
||||
-- Stage 3 of the Composio epic (MUL-3721 / MUL-3715): the Composio integration
|
||||
-- writes a fresh `{"mcpServers": {"composio": {"type": "http", "url": "...",
|
||||
-- "headers": {"Authorization": "Bearer ..."}}}}` here for every task whose
|
||||
-- initiator user has at least one active connection. The daemon claim
|
||||
-- handler merges this on top of agent.mcp_config (overlay wins on name
|
||||
-- collisions because it carries the live, user-scoped MCP session URL).
|
||||
--
|
||||
-- The value is short-lived: it carries a bearer token that becomes useless
|
||||
-- once the task ends, so we wipe it on every terminal state transition via
|
||||
-- the trigger below. Worst case a still-active task row keeps the token for
|
||||
-- the duration of the run, mode-0600 in DB and never logged.
|
||||
ALTER TABLE agent_task_queue
|
||||
ADD COLUMN runtime_mcp_overlay JSONB NULL;
|
||||
|
||||
COMMENT ON COLUMN agent_task_queue.runtime_mcp_overlay IS
|
||||
'Per-task MCP servers computed at dispatch time, merged on top of agent.mcp_config. Currently used by Composio integration to inject the initiator user''s session URL. Cleared after task completes via trg_clear_runtime_mcp_overlay.';
|
||||
|
||||
-- Auto-clear the overlay the moment the task enters any terminal state, so
|
||||
-- the row in the queue never keeps a stale bearer beyond the live run. This
|
||||
-- is a defense-in-depth measure on top of how short the Composio session
|
||||
-- token already is — clearing here means a future audit of the table never
|
||||
-- finds bearers attached to rows that finished hours/days ago.
|
||||
--
|
||||
-- Trigger-based rather than per-query SET clauses because the terminal
|
||||
-- transitions live in ~12 different sqlc queries (CompleteAgentTask,
|
||||
-- FailAgentTask, FailStaleTasks, ExpireStaleQueuedTasks, FailTasksFor
|
||||
-- OfflineRuntimes, RecoverOrphanedTasksForRuntime, plus six Cancel*
|
||||
-- variants); a trigger is a single source of truth that future queries
|
||||
-- can't bypass.
|
||||
CREATE OR REPLACE FUNCTION clear_runtime_mcp_overlay_on_terminal_state()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
-- Only act on actual transitions INTO a terminal state from a non-terminal
|
||||
-- one, and only when there is something to wipe. Re-touching an already-
|
||||
-- terminal row (or a row whose overlay is already NULL) leaves the
|
||||
-- column unchanged and the trigger is a cheap no-op.
|
||||
IF NEW.status IN ('completed', 'failed', 'cancelled')
|
||||
AND OLD.status IS DISTINCT FROM NEW.status
|
||||
AND NEW.runtime_mcp_overlay IS NOT NULL THEN
|
||||
NEW.runtime_mcp_overlay := NULL;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_clear_runtime_mcp_overlay ON agent_task_queue;
|
||||
CREATE TRIGGER trg_clear_runtime_mcp_overlay
|
||||
BEFORE UPDATE OF status ON agent_task_queue
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION clear_runtime_mcp_overlay_on_terminal_state();
|
||||
@@ -1,6 +1,6 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.31.1
|
||||
// sqlc v1.30.0
|
||||
// source: agent.sql
|
||||
|
||||
package db
|
||||
@@ -178,7 +178,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
||||
@@ -215,6 +215,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -223,7 +224,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
|
||||
@@ -271,6 +272,7 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -286,7 +288,7 @@ const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :m
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Cancels active tasks belonging to a chat session. Called from
|
||||
@@ -334,6 +336,7 @@ func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSession
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -349,7 +352,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Cancels every active task on the issue and returns the affected rows so the
|
||||
@@ -397,6 +400,7 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -412,7 +416,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type CancelAgentTasksByIssueAndAgentParams struct {
|
||||
@@ -464,6 +468,7 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -479,7 +484,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now(), prepare_lease_expires_at = NULL
|
||||
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Cancels active tasks whose trigger is the given comment. Called when a
|
||||
@@ -527,6 +532,7 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -567,7 +573,7 @@ WHERE id = (
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type ClaimAgentTaskParams struct {
|
||||
@@ -618,6 +624,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, arg ClaimAgentTaskParams)
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -701,7 +708,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4, prepare_lease_expires_at = NULL
|
||||
WHERE id = $1 AND status = 'running'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type CompleteAgentTaskParams struct {
|
||||
@@ -750,6 +757,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -855,7 +863,7 @@ VALUES (
|
||||
$9,
|
||||
$10
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type CreateAgentTaskParams struct {
|
||||
@@ -916,6 +924,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -923,7 +932,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type CreateQuickCreateTaskParams struct {
|
||||
@@ -975,6 +984,7 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -998,7 +1008,7 @@ SELECT
|
||||
p.squad_id
|
||||
FROM agent_task_queue p
|
||||
WHERE p.id = $1
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Clones a parent task into a fresh queued attempt. Carries forward the
|
||||
@@ -1046,6 +1056,7 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -1069,7 +1080,7 @@ FROM victims v
|
||||
WHERE t.id = v.id
|
||||
AND t.status = 'queued'
|
||||
AND t.created_at < now() - make_interval(secs => $1::double precision)
|
||||
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id
|
||||
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id, t.runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type ExpireStaleQueuedTasksParams struct {
|
||||
@@ -1140,6 +1151,7 @@ func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQue
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1158,7 +1170,7 @@ WHERE id = $1
|
||||
AND runtime_id = $2
|
||||
AND status IN ('dispatched', 'waiting_local_directory')
|
||||
AND started_at IS NULL
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type ExtendAgentTaskPrepareLeaseParams struct {
|
||||
@@ -1205,6 +1217,7 @@ func (q *Queries) ExtendAgentTaskPrepareLease(ctx context.Context, arg ExtendAge
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -1219,7 +1232,7 @@ SET status = 'failed',
|
||||
work_dir = COALESCE($5, work_dir),
|
||||
prepare_lease_expires_at = NULL
|
||||
WHERE id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type FailAgentTaskParams struct {
|
||||
@@ -1279,6 +1292,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -1294,7 +1308,7 @@ WHERE (
|
||||
AND (prepare_lease_expires_at IS NULL OR prepare_lease_expires_at < now())
|
||||
)
|
||||
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type FailStaleTasksParams struct {
|
||||
@@ -1353,6 +1367,7 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams)
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1476,7 +1491,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
|
||||
}
|
||||
|
||||
const getAgentTask = `-- name: GetAgentTask :one
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
@@ -1514,12 +1529,13 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getAgentTaskInWorkspace = `-- name: GetAgentTaskInWorkspace :one
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id FROM agent_task_queue atq
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE atq.id = $1 AND a.workspace_id = $2
|
||||
`
|
||||
@@ -1570,6 +1586,7 @@ func (q *Queries) GetAgentTaskInWorkspace(ctx context.Context, arg GetAgentTaskI
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -1999,7 +2016,7 @@ func (q *Queries) ListActiveAgentsByRuntimeForUpdate(ctx context.Context, runtim
|
||||
}
|
||||
|
||||
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -2049,6 +2066,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2061,7 +2079,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
}
|
||||
|
||||
const listAgentTasks = `-- name: ListAgentTasks :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE agent_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -2106,6 +2124,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2216,7 +2235,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
|
||||
}
|
||||
|
||||
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
@@ -2261,6 +2280,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2273,7 +2293,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
}
|
||||
|
||||
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status = 'queued'
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
@@ -2326,6 +2346,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2338,7 +2359,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
|
||||
}
|
||||
|
||||
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay FROM agent_task_queue
|
||||
WHERE issue_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -2383,6 +2404,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2395,15 +2417,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
|
||||
}
|
||||
|
||||
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id FROM agent_task_queue atq
|
||||
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id FROM (
|
||||
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id
|
||||
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason, t.initiator_user_id, t.handoff_note, t.prepare_lease_expires_at, t.squad_id, t.runtime_mcp_overlay FROM (
|
||||
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason, atq.initiator_user_id, atq.handoff_note, atq.prepare_lease_expires_at, atq.squad_id, atq.runtime_mcp_overlay
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
@@ -2470,6 +2492,7 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2487,7 +2510,7 @@ SET status = 'waiting_local_directory',
|
||||
wait_reason = $2,
|
||||
prepare_lease_expires_at = now() + make_interval(secs => $3::double precision)
|
||||
WHERE id = $1 AND status = 'dispatched'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type MarkAgentTaskWaitingLocalDirectoryParams struct {
|
||||
@@ -2539,6 +2562,7 @@ func (q *Queries) MarkAgentTaskWaitingLocalDirectory(ctx context.Context, arg Ma
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -2558,7 +2582,7 @@ WHERE id = (
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
type ReclaimStaleDispatchedTaskForRuntimeParams struct {
|
||||
@@ -2606,6 +2630,7 @@ func (q *Queries) ReclaimStaleDispatchedTaskForRuntime(ctx context.Context, arg
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -2619,7 +2644,7 @@ SET status = 'failed',
|
||||
wait_reason = NULL,
|
||||
prepare_lease_expires_at = NULL
|
||||
WHERE runtime_id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Called by the daemon at startup. Atomically fails any dispatched/running/
|
||||
@@ -2668,6 +2693,7 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2756,6 +2782,29 @@ func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, erro
|
||||
return i, err
|
||||
}
|
||||
|
||||
const setAgentTaskRuntimeMCPOverlay = `-- name: SetAgentTaskRuntimeMCPOverlay :exec
|
||||
UPDATE agent_task_queue
|
||||
SET runtime_mcp_overlay = $2
|
||||
WHERE id = $1 AND status = 'queued'
|
||||
`
|
||||
|
||||
type SetAgentTaskRuntimeMCPOverlayParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
RuntimeMcpOverlay []byte `json:"runtime_mcp_overlay"`
|
||||
}
|
||||
|
||||
// Attaches the per-task MCP overlay (computed at dispatch time from the
|
||||
// initiator user's active integrations — currently Composio) to a task
|
||||
// still in 'queued'. The status guard means a task that already started
|
||||
// (or was cancelled / failed between Create and this update) is never
|
||||
// rewritten, so the daemon never reads a half-applied overlay. The trigger
|
||||
// trg_clear_runtime_mcp_overlay automatically wipes the column whenever a
|
||||
// task enters a terminal state, so callers do not need a paired clear.
|
||||
func (q *Queries) SetAgentTaskRuntimeMCPOverlay(ctx context.Context, arg SetAgentTaskRuntimeMCPOverlayParams) error {
|
||||
_, err := q.db.Exec(ctx, setAgentTaskRuntimeMCPOverlay, arg.ID, arg.RuntimeMcpOverlay)
|
||||
return err
|
||||
}
|
||||
|
||||
const startAgentTask = `-- name: StartAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'running',
|
||||
@@ -2763,7 +2812,7 @@ SET status = 'running',
|
||||
wait_reason = NULL,
|
||||
prepare_lease_expires_at = NULL
|
||||
WHERE id = $1 AND status IN ('dispatched', 'waiting_local_directory')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay
|
||||
`
|
||||
|
||||
// Transitions a task to running. Accepts either 'dispatched' (the normal
|
||||
@@ -2806,6 +2855,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
|
||||
&i.HandoffNote,
|
||||
&i.PrepareLeaseExpiresAt,
|
||||
&i.SquadID,
|
||||
&i.RuntimeMcpOverlay,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.31.1
|
||||
// sqlc v1.30.0
|
||||
|
||||
package db
|
||||
|
||||
@@ -102,6 +102,8 @@ type AgentTaskQueue struct {
|
||||
HandoffNote pgtype.Text `json:"handoff_note"`
|
||||
PrepareLeaseExpiresAt pgtype.Timestamptz `json:"prepare_lease_expires_at"`
|
||||
SquadID pgtype.UUID `json:"squad_id"`
|
||||
// Per-task MCP servers computed at dispatch time, merged on top of agent.mcp_config. Currently used by Composio integration to inject the initiator user's session URL. Cleared after task completes via trg_clear_runtime_mcp_overlay.
|
||||
RuntimeMcpOverlay []byte `json:"runtime_mcp_overlay"`
|
||||
}
|
||||
|
||||
type Attachment struct {
|
||||
|
||||
@@ -167,6 +167,18 @@ UPDATE agent_task_queue
|
||||
SET issue_id = $2
|
||||
WHERE id = $1 AND issue_id IS NULL;
|
||||
|
||||
-- name: SetAgentTaskRuntimeMCPOverlay :exec
|
||||
-- Attaches the per-task MCP overlay (computed at dispatch time from the
|
||||
-- initiator user's active integrations — currently Composio) to a task
|
||||
-- still in 'queued'. The status guard means a task that already started
|
||||
-- (or was cancelled / failed between Create and this update) is never
|
||||
-- rewritten, so the daemon never reads a half-applied overlay. The trigger
|
||||
-- trg_clear_runtime_mcp_overlay automatically wipes the column whenever a
|
||||
-- task enters a terminal state, so callers do not need a paired clear.
|
||||
UPDATE agent_task_queue
|
||||
SET runtime_mcp_overlay = $2
|
||||
WHERE id = $1 AND status = 'queued';
|
||||
|
||||
-- name: CreateRetryTask :one
|
||||
-- Clones a parent task into a fresh queued attempt. Carries forward the
|
||||
-- agent's resume context (session_id/work_dir) so the child can continue
|
||||
|
||||
Reference in New Issue
Block a user