Compare commits

...

4 Commits

Author SHA1 Message Date
yushen
bf2dd15f37 fix(composio): correct stale callback routing comments (MUL-3843)
The package header and ComposioCallback doc comments still described the
callback as sitting under the Auth middleware group. After the route was
moved out (this PR), update both to state it is a public route whose identity
comes from the signed state — addressing review nit from 张大彪.

Refs MUL-3843.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 17:40:27 +08:00
yushen
9ddd8d1b9b fix(composio): move OAuth callback out of the Auth group (MUL-3843)
Composio 302-redirects the browser to /api/integrations/composio/callback
at the end of the OAuth flow, but PR #4608 mounted it inside the cookie-auth
middleware group. When the session cookie is absent (expired session,
SameSite=Strict / Safari ITP, private window, self-hosted callback subdomain)
the Auth middleware returned a hard 401 and a JSON blob instead of the
settings redirect, breaking the flow.

Identity never came from the cookie anyway: it is carried by the HMAC-signed
state param that CompleteCallback verifies (signature, expiry, replay) and
cross-checked by verifyAccountOwnership; h.Composio == nil still 503s. So the
callback is registered alongside the other public OAuth/webhook routes; the
other four composio endpoints stay session-gated.

Refs MUL-3843, MUL-3715.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 17:32:04 +08:00
LinYushen
51ae12604c feat(composio): Stage 2 frontend polish — callback toast, last_used & expired UI, e2e (MUL-3718) (#4688)
* feat(composio): callback toast + refresh, last_used & expired UI, e2e (MUL-3718)

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): real callback redirect route + StrictMode-safe toast dedup (MUL-3718 review)

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 14:11:42 +08:00
LinYushen
b933d9fd41 feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720) (#4608)
* feat(composio): server-side connect flow + connections REST (Notion MVP) (MUL-3720)

Compose the merged server/pkg/composio SDK into a user-facing connection
manager: signed-state connect handshake, local user_composio_connection
mirror, idempotent disconnect, and a per-user MCP session helper (not yet
wired into task dispatch).

- migration 127_user_composio_connection (no FK/cascade, per DB rules)
- sqlc queries: upsert (idempotent on user_id+connected_account_id), list
  active, owner-scoped get, mark revoked
- internal/integrations/composio: signed HMAC-SHA256 state, BeginConnect,
  CompleteCallback (idempotent upsert), ListConnections, Disconnect
  (upstream 404 = idempotent success), CreateMCPSession (no-op when empty,
  pins connected_accounts per toolkit), CallbackRedirect
- REST handlers under /api/integrations/composio (user-scoped, 503 when
  COMPOSIO_API_KEY unset): connect/init, callback (302), connections list,
  delete
- router wiring gated by COMPOSIO_API_KEY; COMPOSIO_AUTH_CONFIGS_JSON maps
  toolkit->auth_config (MVP: notion); state secret from COMPOSIO_STATE_SECRET
  or derived from JWT_SECRET; callback base from COMPOSIO_CALLBACK_BASE_URL
  or MULTICA_PUBLIC_URL
- tests: state (expire/tamper/wrong-secret), service (mapping, callback
  idempotency, non-success, disconnect owner/404 idempotency, MCP pin),
  handlers (httptest), redact regression for Bearer mcp_ tokens

MVP scope: Notion only; no task-dispatch overlay, sharing, or webhook
event handling (later stages).

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): bind callback account to user + idempotent revoked disconnect (MUL-3720)

Address PR 4608 review (CHANGES_REQUESTED):

- callback: verify connected_account_id with Composio before mirroring it.
  The signed state only proved user/toolkit/exp, so a valid state paired with
  a tampered connected_account_id would be written verbatim. CompleteCallback
  now calls ListConnectedAccounts and fails closed (ErrAccountVerification)
  unless the account belongs to the state's user (composio_user_id == multica
  user id) and was created under the toolkit's auth config. No row is written
  on mismatch / unknown account / upstream error.

- disconnect: short-circuit to a no-op when the local row is already revoked,
  before touching upstream. Previously a second DELETE re-hit Composio and a
  non-404 upstream error surfaced as a 502, breaking the 204-idempotent
  contract.

- CreateMCPSession: document the v1 single-active-connection-per-(user,toolkit)
  constraint and make duplicate selection deterministic (newest-wins, rows are
  connected_at DESC) instead of order-dependent map overwrite. Stage 3 owns the
  real single-account-enforcement vs multi-account-shape decision.

Tests: tampered/wrong-auth-config/unknown-account callback rejection, revoked-row
disconnect no-op (asserts upstream not re-hit). composio pkg 85% coverage; all
green.

Co-authored-by: multica-agent <github@multica.ai>

* feat(composio): list all toolkits + dynamic auth-config resolution (MUL-3720)

Yushen's follow-up to the Notion MVP: surface the full Composio toolkit
catalog, render it in Settings, and drop the static env mapping in favor of
dynamic auth-config discovery.

Config correctness (per Composio docs):
- Remove COMPOSIO_AUTH_CONFIGS_JSON entirely. The toolkit→auth_config mapping
  is now resolved at request time from the project's /auth_configs (cached,
  5-min TTL), so enabling a toolkit is a dashboard action, not a redeploy.
- Do NOT add COMPOSIO_PROJECT_ID. The project API key (x-api-key) authenticates
  to exactly one project; the project is resolved from the key. Only org-level
  endpoints use x-org-api-key, which this integration never calls.

Backend:
- SDK: server/pkg/composio/auth_configs.go — ListAuthConfigs (toolkit_slug,
  is_composio_managed, show_disabled, limit, cursor).
- service: dynamic resolver (authConfigMap cache; betterAuthConfig prefers a
  custom/white-label config over Composio-managed, newest wins); BeginConnect
  and CompleteCallback resolve via it; ListToolkits fetches the full catalog
  (paginated, capped) annotated with connectable = has an enabled auth config,
  connectable-first ordering.
- handler + route: GET /api/integrations/composio/toolkits (user-scoped, 503
  when COMPOSIO_API_KEY unset) returning slug/name/logo/category/connectable.

Frontend:
- core: ComposioToolkit/ComposioConnection types, api client methods, and
  composio query options (@multica/core/composio).
- views: Settings → Integrations now has a Composio section rendering every
  toolkit as a card with search. Connect is gated on `connectable`;
  non-connectable toolkits show a muted "not configured" hint instead of a
  dead button. Connected toolkits show a badge + Disconnect (with confirm).
- i18n: composio block added to en/zh-Hans/ja/ko settings.

Tests: SDK + service (dynamic resolution, custom-over-managed preference,
connectable flag, resolver-error soft-degrade) and handler toolkits endpoint;
composio pkg 85.7% coverage. go build/vet/gofmt clean; core+views typecheck,
core+views lint, and core tests (691) all green.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): close cross-toolkit callback fail-open by signing auth_config_id into state (MUL-3720)

Re-review blocker: CompleteCallback resolved the toolkit's auth config at
callback time and ignored a resolve error/empty result, while
verifyAccountOwnership skipped the auth-config comparison when the expected
value was empty. A user could then pass another toolkit's connected_account_id
into this toolkit's callback — the owner check passed and it was written under
the wrong toolkit_slug/account binding.

Fix: the auth_config_id is already resolved in BeginConnect (before the state
is signed), so sign it into the state and compare it exactly at callback. No
re-resolve, no fail-open. verifyAccountOwnership now fails closed when the
expected auth config is empty (rejects instead of skipping) and requires an
exact match — closing the cross-toolkit binding gap.

Tests: state round-trips auth_config_id; BeginConnect signs it; callback
rejects wrong/cross-toolkit auth config and an empty (no-mapping) auth config
fails closed. composio pkg 85.2% coverage, all green.

Frontend (non-blocking): the Composio settings tab now surfaces an error when
the connections query fails instead of silently rendering everything as
unconnected.

Co-authored-by: multica-agent <github@multica.ai>

* fix(composio): hide Settings section entirely when integration unconfigured (MUL-3720)

Decision (option 2, hide-then-merge): don't show a card that leaks the internal
COMPOSIO_API_KEY env-var name to every end user. IntegrationsTab now gates the
whole Composio section (heading + body) on the toolkits query — a 503 means the
key is unset, so the section is withheld instead of rendering the not-configured
card. Admin-only setup guidance is a later, role-gated affordance.

Removed the notConfigured card (and now-unused ApiError import) from
ComposioTab; it only mounts when configured. views typecheck + lint clean.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-06-29 12:50:11 +08:00
30 changed files with 3541 additions and 4 deletions

View File

@@ -37,4 +37,83 @@ test.describe("Settings", () => {
await expect(page.getByText("Workspace settings saved").first()).toBeVisible({ timeout: 5000 });
await expect(page.getByRole("button", { name: new RegExp(originalName) }).first()).toBeVisible();
});
// Composio connect flow, fully mocked at the network boundary so it runs
// without a configured COMPOSIO_API_KEY or a live Composio project. The
// backend redirect is simulated by pointing the init endpoint's redirect_url
// straight back at the settings page with ?connected=<slug> — exercising the
// frontend's callback toast + connections refresh (MUL-3718) end to end.
test("connecting a Composio toolkit shows a toast and refreshes the list", async ({
page,
}) => {
const workspaceSlug = await loginAsDefault(page);
const settingsUrl = `/${workspaceSlug}/settings?tab=integrations`;
// Stateful: connections is empty until the (mocked) connect flow lands.
let connected = false;
await page.route("**/api/integrations/composio/toolkits", (route) =>
route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify([
{ slug: "notion", name: "Notion", connectable: true },
]),
}),
);
await page.route("**/api/integrations/composio/connections", (route) => {
if (route.request().method() !== "GET") return route.fallback();
return route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify(
connected
? [
{
id: "conn-notion-1",
toolkit_slug: "notion",
status: "active",
connected_at: new Date().toISOString(),
last_used_at: null,
},
]
: [],
),
});
});
await page.route("**/api/integrations/composio/connect/init", (route) => {
// Composio would 302 through its hosted consent and back to our callback,
// which emits CallbackRedirect's slug-less shape:
// `/settings?tab=integrations&connected=<slug>`. The web proxy's
// legacy-route redirect then prepends the last workspace slug, landing on
// the real settings route. Mock that exact backend shape (NOT the final
// slugged URL) so the test exercises the same redirect path real users hit.
connected = true;
return route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify({
redirect_url: `/settings?tab=integrations&connected=notion`,
}),
});
});
await page.goto(settingsUrl, { waitUntil: "domcontentloaded" });
await waitForPageText(page, "Composio");
// Notion starts disconnected → click Connect.
await page.getByRole("button", { name: /^Connect$/ }).first().click();
// Success toast from the simulated callback redirect.
await expect(page.getByText("Connected").first()).toBeVisible({ timeout: 10000 });
// List refreshed without a manual reload: the Notion card now offers
// Disconnect, and the one-shot ?connected param has been stripped.
await expect(
page.getByRole("button", { name: /Disconnect/ }).first(),
).toBeVisible({ timeout: 10000 });
await expect(page).not.toHaveURL(/connected=notion/);
});
});

View File

@@ -112,6 +112,9 @@ import type {
BeginLarkInstallResponse,
LarkInstallStatusResponse,
RedeemLarkBindingTokenResponse,
ComposioToolkit,
ComposioConnection,
ComposioConnectInitResponse,
Squad,
SquadMember,
SquadMemberStatusListResponse,
@@ -2270,4 +2273,34 @@ export class ApiClient {
body: JSON.stringify({ token }),
});
}
// Composio integration (MUL-3720). All routes are user-scoped (a connection
// belongs to a user, not a workspace), so none take a workspaceId.
/** Full Composio toolkit catalog, each annotated with `connectable`
* (whether the project has an enabled auth config for it). */
async listComposioToolkits(): Promise<ComposioToolkit[]> {
return this.fetch(`/api/integrations/composio/toolkits`);
}
/** The caller's active Composio connections. */
async listComposioConnections(): Promise<ComposioConnection[]> {
return this.fetch(`/api/integrations/composio/connections`);
}
/** Starts a hosted Composio connect flow for a toolkit and returns the
* redirect URL the browser should be sent to. */
async beginComposioConnect(toolkitSlug: string): Promise<ComposioConnectInitResponse> {
return this.fetch(`/api/integrations/composio/connect/init`, {
method: "POST",
body: JSON.stringify({ toolkit_slug: toolkitSlug }),
});
}
/** Disconnects a Composio connection the caller owns. */
async deleteComposioConnection(connectionId: string): Promise<void> {
await this.fetch(`/api/integrations/composio/connections/${connectionId}`, {
method: "DELETE",
});
}
}

View File

@@ -0,0 +1 @@
export { composioKeys, composioToolkitsOptions, composioConnectionsOptions } from "./queries";

View File

@@ -0,0 +1,26 @@
import { queryOptions } from "@tanstack/react-query";
import { api } from "../api";
/** Query-key namespace for Composio integration data. */
export const composioKeys = {
all: ["composio"] as const,
toolkits: () => [...composioKeys.all, "toolkits"] as const,
connections: () => [...composioKeys.all, "connections"] as const,
};
/** The full Composio toolkit catalog (with per-toolkit `connectable`). The
* catalog changes rarely, so a long staleTime avoids refetching it every time
* the Settings tab mounts. */
export const composioToolkitsOptions = () =>
queryOptions({
queryKey: composioKeys.toolkits(),
queryFn: () => api.listComposioToolkits(),
staleTime: 5 * 60 * 1000,
});
/** The current user's active Composio connections. */
export const composioConnectionsOptions = () =>
queryOptions({
queryKey: composioKeys.connections(),
queryFn: () => api.listComposioConnections(),
});

View File

@@ -85,6 +85,8 @@
"./github/queries": "./github/queries.ts",
"./lark": "./lark/index.ts",
"./lark/queries": "./lark/queries.ts",
"./composio": "./composio/index.ts",
"./composio/queries": "./composio/queries.ts",
"./feedback": "./feedback/index.ts",
"./feedback/mutations": "./feedback/mutations.ts",
"./realtime": "./realtime/index.ts",

View File

@@ -0,0 +1,36 @@
/** A Composio toolkit as surfaced by GET /api/integrations/composio/toolkits.
*
* Wire shape mirrors `ComposioToolkitResponse` in
* `server/internal/handler/integrations_composio.go`. New fields the backend
* adds later MUST stay optional so older desktop builds keep parsing — see
* CLAUDE.md → API Response Compatibility. */
export interface ComposioToolkit {
slug: string;
name: string;
logo?: string;
category?: string;
/** Whether the project has an enabled auth config for this toolkit. When
* false the UI must not offer a working Connect button — BeginConnect would
* 400 with "toolkit not supported". */
connectable: boolean;
}
/** A user's Composio connected account, as returned by
* GET /api/integrations/composio/connections. Mirrors
* `ComposioConnectionResponse` server-side. */
export interface ComposioConnection {
id: string;
toolkit_slug: string;
/** Connection lifecycle state. `expired` surfaces a Reconnect affordance in
* the UI; the backend only starts emitting it once Stage 4 webhook handling
* lands (MUL-3719), but the client renders the branch ahead of that. */
status: "active" | "expired" | "revoked" | string;
connected_at: string;
last_used_at?: string | null;
}
/** Response of POST /api/integrations/composio/connect/init — the hosted
* Composio Connect Link the browser is redirected to. */
export interface ComposioConnectInitResponse {
redirect_url: string;
}

View File

@@ -119,6 +119,11 @@ export type {
LarkInstallStatusResponse,
RedeemLarkBindingTokenResponse,
} from "./lark";
export type {
ComposioToolkit,
ComposioConnection,
ComposioConnectInitResponse,
} from "./composio";
export type {
Autopilot,
AutopilotStatus,

View File

@@ -300,6 +300,38 @@
"install_error_forbidden": "You no longer have permission to install Lark Bots in this workspace. Ask a workspace admin to continue.",
"install_error_generic": "Install failed. Try again."
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "Connected",
"toast_connect_failed": "Couldn't complete the connection. Please try again.",
"last_used": "Last used {{when}}",
"last_used_never": "Never used",
"expired": "Token expired",
"reconnect": "Reconnect"
},
"repositories": {
"section_title": "Repositories",
"description": "Git repositories associated with this workspace. Agents use these to clone and work on code.",

View File

@@ -300,6 +300,38 @@
"install_error_forbidden": "このワークスペースに Lark ボットを設置する権限がなくなりました。ワークスペース管理者にお問い合わせください。",
"install_error_generic": "設置に失敗しました。もう一度お試しください。"
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "接続しました",
"toast_connect_failed": "接続を完了できませんでした。もう一度お試しください。",
"last_used": "最終使用 {{when}}",
"last_used_never": "未使用",
"expired": "トークンの有効期限切れ",
"reconnect": "再接続"
},
"repositories": {
"section_title": "リポジトリ",
"description": "このワークスペースに関連付けられた Git リポジトリです。エージェントはこれらをクローンしてコードを作業します。",

View File

@@ -376,5 +376,37 @@
"install_error_session_lost": "설치 세션이 만료되었거나 유실되었어요. 다시 스캔해 처음부터 진행하세요.",
"install_error_forbidden": "이 워크스페이스에 Lark 봇을 설치할 권한이 더 이상 없어요. 워크스페이스 관리자에게 문의하세요.",
"install_error_generic": "설치에 실패했어요. 다시 시도하세요."
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "연결되었습니다",
"toast_connect_failed": "연결을 완료하지 못했습니다. 다시 시도해 주세요.",
"last_used": "마지막 사용 {{when}}",
"last_used_never": "사용한 적 없음",
"expired": "토큰 만료됨",
"reconnect": "다시 연결"
}
}

View File

@@ -300,6 +300,38 @@
"install_error_forbidden": "你已没有在此工作区安装飞书 Bot 的权限,请联系工作区管理员。",
"install_error_generic": "安装失败,请重试。"
},
"composio": {
"section_title": "Composio",
"page_description": "浏览 Composio 的全部 toolkit连接你的 agent 可以操作的应用。目前只有已配置 auth config 的 toolkit 可以连接。",
"not_enabled_title": "Composio 集成未启用",
"not_enabled_description_prefix": "在服务器上设置",
"not_enabled_description_suffix": "以启用 Composio toolkit 连接。",
"loading": "正在加载 toolkit…",
"load_failed": "加载 Composio toolkit 失败。",
"empty_title": "没有可用的 toolkit",
"empty_description": "Composio 未返回任何 toolkit。请检查 API key 和项目配置。",
"search_placeholder": "搜索 toolkit…",
"connect": "连接",
"connecting": "连接中…",
"connected": "已连接",
"disconnect": "断开",
"disconnecting": "断开中…",
"not_connectable": "未配置",
"not_connectable_hint": "该 toolkit 在你的 Composio 项目中还没有 auth config因此无法连接。请在 Composio 后台为它添加一个 auth config 后再连接。",
"connect_failed": "无法发起连接,请重试。",
"disconnect_failed": "无法断开连接,请重试。",
"toast_disconnected": "已断开",
"disconnect_confirm_title": "断开这个应用?",
"disconnect_confirm_description": "你的已连接账号将在 Composio 侧被撤销,你的 agent 将失去对该 toolkit 的访问权限。你可以稍后重新连接。",
"disconnect_confirm_cancel": "取消",
"connections_load_failed": "无法加载你已有的连接,连接状态可能不完整。",
"toast_connected": "已连接",
"toast_connect_failed": "连接未能完成,请重试。",
"last_used": "最近使用 {{when}}",
"last_used_never": "从未使用",
"expired": "令牌已过期",
"reconnect": "重新连接"
},
"repositories": {
"section_title": "代码仓库",
"description": "与该工作区关联的 Git 仓库。智能体会从这里 clone 代码并完成工作。",

View File

@@ -0,0 +1,193 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { StrictMode } from "react";
import { render, screen, waitFor } from "@testing-library/react";
import { I18nProvider } from "@multica/core/i18n/react";
import enCommon from "../../locales/en/common.json";
import enSettings from "../../locales/en/settings.json";
import type { ComposioConnection, ComposioToolkit } from "@multica/core/types";
// --- Mutable refs the mocked hooks read from, so each test can shape the data
// without re-mocking the modules. ---
const toolkitsRef = vi.hoisted(() => ({
current: { data: [] as ComposioToolkit[], isLoading: false, isError: false },
}));
const connectionsRef = vi.hoisted(() => ({
current: { data: [] as ComposioConnection[], isError: false },
}));
const searchParamsRef = vi.hoisted(() => ({ current: new URLSearchParams("tab=integrations") }));
const mockInvalidate = vi.hoisted(() => vi.fn());
const mockReplace = vi.hoisted(() => vi.fn());
const mockToastSuccess = vi.hoisted(() => vi.fn());
const mockToastError = vi.hoisted(() => vi.fn());
vi.mock("@tanstack/react-query", () => ({
useQuery: (opts: { queryKey: unknown[] }) => {
const key = JSON.stringify(opts.queryKey);
if (key.includes("toolkits")) return toolkitsRef.current;
if (key.includes("connections")) return connectionsRef.current;
return { data: undefined };
},
useQueryClient: () => ({ invalidateQueries: mockInvalidate }),
queryOptions: <T,>(opts: T) => opts,
}));
vi.mock("@multica/core/composio", () => ({
composioKeys: {
all: ["composio"],
toolkits: () => ["composio", "toolkits"],
connections: () => ["composio", "connections"],
},
composioToolkitsOptions: () => ({ queryKey: ["composio", "toolkits"], queryFn: vi.fn() }),
composioConnectionsOptions: () => ({ queryKey: ["composio", "connections"], queryFn: vi.fn() }),
}));
vi.mock("@multica/core/api", () => ({
api: {
beginComposioConnect: vi.fn(),
deleteComposioConnection: vi.fn(),
},
}));
vi.mock("../../navigation", () => ({
useNavigation: () => ({
push: vi.fn(),
replace: mockReplace,
back: vi.fn(),
pathname: "/acme/settings",
searchParams: searchParamsRef.current,
getShareableUrl: (p: string) => `https://app.example${p}`,
}),
}));
vi.mock("sonner", () => ({
toast: { success: mockToastSuccess, error: mockToastError },
}));
import { ComposioTab } from "./composio-tab";
function renderTab() {
return render(
<I18nProvider locale="en" resources={{ en: { common: enCommon, settings: enSettings } }}>
<ComposioTab />
</I18nProvider>,
);
}
// StrictMode reproduces React's dev-mode mount → cleanup → mount double-invoke,
// which is exactly what would double-fire the callback toast without the
// consumed-key ref guard.
function renderTabStrict() {
return render(
<StrictMode>
<I18nProvider locale="en" resources={{ en: { common: enCommon, settings: enSettings } }}>
<ComposioTab />
</I18nProvider>
</StrictMode>,
);
}
const NOTION: ComposioToolkit = {
slug: "notion",
name: "Notion",
connectable: true,
};
beforeEach(() => {
vi.clearAllMocks();
toolkitsRef.current = { data: [NOTION], isLoading: false, isError: false };
connectionsRef.current = { data: [], isError: false };
searchParamsRef.current = new URLSearchParams("tab=integrations");
});
describe("ComposioTab", () => {
it("renders a connected card with a 'never used' placeholder when last_used_at is null", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "active",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: null,
},
],
isError: false,
};
renderTab();
expect(screen.getByText(enSettings.composio.connected)).toBeInTheDocument();
expect(screen.getByText(enSettings.composio.last_used_never)).toBeInTheDocument();
});
it("renders a 'Last used' line when last_used_at is present", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "active",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: new Date(Date.now() - 2 * 60 * 1000).toISOString(),
},
],
isError: false,
};
renderTab();
// "Last used {{when}}" → relative time formatter yields "2m ago"
expect(screen.getByText(/Last used/)).toBeInTheDocument();
expect(screen.queryByText(enSettings.composio.last_used_never)).not.toBeInTheDocument();
});
it("renders the expired branch with a Reconnect button", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "expired",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: null,
},
],
isError: false,
};
renderTab();
expect(screen.getByText(enSettings.composio.expired)).toBeInTheDocument();
expect(
screen.getByRole("button", { name: new RegExp(enSettings.composio.reconnect) }),
).toBeInTheDocument();
// Not treated as connected, so no Connected badge.
expect(screen.queryByText(enSettings.composio.connected)).not.toBeInTheDocument();
});
it("toasts success and clears the ?connected param on a successful callback", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&connected=notion");
renderTab();
await waitFor(() => {
expect(mockToastSuccess).toHaveBeenCalledWith(enSettings.composio.toast_connected);
});
expect(mockInvalidate).toHaveBeenCalledWith({ queryKey: ["composio", "connections"] });
// The one-shot param is stripped while ?tab is preserved.
expect(mockReplace).toHaveBeenCalledWith("/acme/settings?tab=integrations");
});
it("toasts error on a failed callback", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&error=composio_connect_failed");
renderTab();
await waitFor(() => {
expect(mockToastError).toHaveBeenCalledWith(enSettings.composio.toast_connect_failed);
});
expect(mockReplace).toHaveBeenCalledWith("/acme/settings?tab=integrations");
});
it("fires the success callback exactly once under StrictMode double-invoke", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&connected=notion");
renderTabStrict();
await waitFor(() => {
expect(mockToastSuccess).toHaveBeenCalled();
});
// The consumed-key ref must suppress the second (cleanup → re-mount) run.
expect(mockToastSuccess).toHaveBeenCalledTimes(1);
expect(mockInvalidate).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,378 @@
"use client";
import { useEffect, useMemo, useRef, useState } from "react";
import { useQuery, useQueryClient } from "@tanstack/react-query";
import { toast } from "sonner";
import { AlertTriangle, Check, Loader2, Plug, RefreshCw, Trash2 } from "lucide-react";
import { Button } from "@multica/ui/components/ui/button";
import { Card, CardContent } from "@multica/ui/components/ui/card";
import { Input } from "@multica/ui/components/ui/input";
import {
AlertDialog,
AlertDialogAction,
AlertDialogCancel,
AlertDialogContent,
AlertDialogDescription,
AlertDialogFooter,
AlertDialogHeader,
AlertDialogTitle,
} from "@multica/ui/components/ui/alert-dialog";
import { api } from "@multica/core/api";
import {
composioConnectionsOptions,
composioKeys,
composioToolkitsOptions,
} from "@multica/core/composio";
import type { ComposioToolkit } from "@multica/core/types";
import { useT, useTimeAgo } from "../../i18n";
import { useNavigation } from "../../navigation";
// ComposioTab renders the full Composio toolkit catalog and lets the user
// connect / disconnect the apps their agents can act on.
//
// Key UX rule (MUL-3720): listing ≠ connectable. Only toolkits with an enabled
// auth config in the Composio project carry `connectable: true`; the rest get a
// muted "not configured" hint instead of a dead Connect button that would 400.
export function ComposioTab() {
const { t } = useT("settings");
const qc = useQueryClient();
const navigation = useNavigation();
const toolkitsQuery = useQuery(composioToolkitsOptions());
const connectionsQuery = useQuery(composioConnectionsOptions());
const [query, setQuery] = useState("");
const [connectingSlug, setConnectingSlug] = useState<string | null>(null);
const [disconnectTarget, setDisconnectTarget] = useState<{
connectionId: string;
name: string;
} | null>(null);
const [disconnecting, setDisconnecting] = useState(false);
// The hosted Composio consent flow is a full-page redirect that lands back
// on the settings page carrying either `?connected=<slug>` (success) or
// `?error=composio_connect_failed` (any backend-side failure — see
// Service.CallbackRedirect, MUL-3720). Consume it exactly once: fire a toast,
// refresh the connections list so the freshly-linked card flips to Connected
// without a manual reload, then strip the one-shot params via `replace` so a
// browser refresh doesn't re-toast.
const connectedParam = navigation.searchParams.get("connected");
const errorParam = navigation.searchParams.get("error");
// React Strict Mode (dev / Next) double-invokes mount effects as
// mount → cleanup → mount. On the second invoke the `replace` from the first
// hasn't committed yet, so the closure still sees the same params and would
// toast + invalidate twice. Guard with a ref keyed on the callback we already
// consumed; a genuinely new callback (different slug, or the redirect being a
// full page load that resets this ref) still fires.
const consumedCallbackKey = useRef<string | null>(null);
useEffect(() => {
const callbackKey = connectedParam
? `connected:${connectedParam}`
: errorParam === "composio_connect_failed"
? "error:composio_connect_failed"
: null;
if (!callbackKey) return;
if (consumedCallbackKey.current === callbackKey) return;
consumedCallbackKey.current = callbackKey;
if (connectedParam) {
toast.success(t(($) => $.composio.toast_connected));
void qc.invalidateQueries({ queryKey: composioKeys.connections() });
} else {
toast.error(t(($) => $.composio.toast_connect_failed));
}
// Drop only the Composio one-shot params; keep everything else (notably
// ?tab=integrations) so the user stays on this tab.
const params = new URLSearchParams(navigation.searchParams);
params.delete("connected");
params.delete("error");
const qs = params.toString();
navigation.replace(qs ? `${navigation.pathname}?${qs}` : navigation.pathname);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [connectedParam, errorParam]);
// Map active connections by toolkit slug so each card knows whether it is
// already connected (and which connection id to disconnect).
const connectionBySlug = useMemo(() => {
const m = new Map<string, string>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "active") m.set(c.toolkit_slug, c.id);
}
return m;
}, [connectionsQuery.data]);
// Toolkits whose latest connection is expired render a Reconnect affordance
// instead of Connected/Connect. Backend only emits `expired` once Stage 4
// (MUL-3719) lands, but the branch is wired up now so it lights up for free.
const expiredBySlug = useMemo(() => {
const m = new Set<string>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "expired") m.add(c.toolkit_slug);
}
return m;
}, [connectionsQuery.data]);
// Last-used timestamp per active connection, for the "Last used …" line on a
// connected card. Backend leaves this null until tool-call dispatch starts
// stamping it (Stage 3, MUL-3721); the card shows a "never used" placeholder
// until then.
const lastUsedBySlug = useMemo(() => {
const m = new Map<string, string | null>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "active") m.set(c.toolkit_slug, c.last_used_at ?? null);
}
return m;
}, [connectionsQuery.data]);
const toolkits = useMemo(() => toolkitsQuery.data ?? [], [toolkitsQuery.data]);
const filtered = useMemo(() => {
const q = query.trim().toLowerCase();
if (!q) return toolkits;
return toolkits.filter(
(tk) =>
tk.name.toLowerCase().includes(q) ||
tk.slug.toLowerCase().includes(q) ||
(tk.category ?? "").toLowerCase().includes(q),
);
}, [toolkits, query]);
// 503 handling lives in the parent IntegrationsTab, which hides the whole
// Composio section when COMPOSIO_API_KEY is unset — this component only
// mounts when the integration is configured, so it deals with the loaded /
// error / empty / list states below.
async function handleConnect(tk: ComposioToolkit) {
if (connectingSlug) return;
setConnectingSlug(tk.slug);
try {
const { redirect_url } = await api.beginComposioConnect(tk.slug);
// Hand the browser to Composio's hosted consent flow; it redirects back
// to /api/integrations/composio/callback when done.
window.location.href = redirect_url;
} catch (e) {
toast.error(e instanceof Error ? e.message : t(($) => $.composio.connect_failed));
setConnectingSlug(null);
}
}
async function handleDisconnect() {
if (!disconnectTarget || disconnecting) return;
setDisconnecting(true);
try {
await api.deleteComposioConnection(disconnectTarget.connectionId);
await qc.invalidateQueries({ queryKey: composioKeys.connections() });
toast.success(t(($) => $.composio.toast_disconnected));
setDisconnectTarget(null);
} catch (e) {
toast.error(e instanceof Error ? e.message : t(($) => $.composio.disconnect_failed));
} finally {
setDisconnecting(false);
}
}
return (
<div className="space-y-6">
<section className="space-y-1">
<p className="text-sm text-muted-foreground">{t(($) => $.composio.page_description)}</p>
</section>
{toolkitsQuery.isLoading ? (
<Card>
<CardContent>
<p className="text-sm text-muted-foreground">{t(($) => $.composio.loading)}</p>
</CardContent>
</Card>
) : toolkitsQuery.isError ? (
<Card>
<CardContent>
<p className="text-sm text-destructive">{t(($) => $.composio.load_failed)}</p>
</CardContent>
</Card>
) : toolkits.length === 0 ? (
<Card>
<CardContent className="space-y-2">
<p className="text-sm font-medium">{t(($) => $.composio.empty_title)}</p>
<p className="text-xs text-muted-foreground">{t(($) => $.composio.empty_description)}</p>
</CardContent>
</Card>
) : (
<section className="space-y-3">
<Input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder={t(($) => $.composio.search_placeholder)}
className="max-w-xs"
/>
{connectionsQuery.isError && (
// Don't silently treat a failed connections fetch as "nothing
// connected" — that would hide real connections and offer Connect
// on something already linked. Surface it so the user knows the
// connected state may be incomplete; the catalog still renders.
<p className="text-xs text-destructive">
{t(($) => $.composio.connections_load_failed)}
</p>
)}
<div className="grid grid-cols-1 gap-2 sm:grid-cols-2 lg:grid-cols-3">
{filtered.map((tk) => (
<ToolkitCard
key={tk.slug}
toolkit={tk}
connectionId={connectionBySlug.get(tk.slug)}
expired={expiredBySlug.has(tk.slug)}
lastUsedAt={lastUsedBySlug.get(tk.slug) ?? null}
connecting={connectingSlug === tk.slug}
anyConnecting={connectingSlug !== null}
onConnect={() => handleConnect(tk)}
onDisconnect={(connectionId, name) =>
setDisconnectTarget({ connectionId, name })
}
/>
))}
</div>
</section>
)}
<AlertDialog
open={!!disconnectTarget}
onOpenChange={(v) => {
if (!v && !disconnecting) setDisconnectTarget(null);
}}
>
<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>{t(($) => $.composio.disconnect_confirm_title)}</AlertDialogTitle>
<AlertDialogDescription>
{t(($) => $.composio.disconnect_confirm_description)}
</AlertDialogDescription>
</AlertDialogHeader>
<AlertDialogFooter>
<AlertDialogCancel disabled={disconnecting}>
{t(($) => $.composio.disconnect_confirm_cancel)}
</AlertDialogCancel>
<AlertDialogAction onClick={handleDisconnect} disabled={disconnecting}>
{disconnecting
? t(($) => $.composio.disconnecting)
: t(($) => $.composio.disconnect)}
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>
</AlertDialog>
</div>
);
}
function ToolkitCard({
toolkit,
connectionId,
expired,
lastUsedAt,
connecting,
anyConnecting,
onConnect,
onDisconnect,
}: {
toolkit: ComposioToolkit;
connectionId?: string;
expired: boolean;
lastUsedAt: string | null;
connecting: boolean;
anyConnecting: boolean;
onConnect: () => void;
onDisconnect: (connectionId: string, name: string) => void;
}) {
const { t } = useT("settings");
const timeAgo = useTimeAgo();
const isConnected = !!connectionId;
return (
<Card>
<CardContent className="flex items-center gap-3 p-3">
<ToolkitLogo toolkit={toolkit} />
<div className="min-w-0 flex-1">
<p className="truncate text-sm font-medium">{toolkit.name || toolkit.slug}</p>
{isConnected ? (
// Last-used line. Backend leaves last_used_at null until Stage 3
// dispatch stamps it, so show a localized "never used" placeholder
// rather than hiding the line entirely.
<p className="truncate text-[10px] text-muted-foreground">
{lastUsedAt
? t(($) => $.composio.last_used, { when: timeAgo(lastUsedAt) })
: t(($) => $.composio.last_used_never)}
</p>
) : toolkit.category ? (
<p className="truncate text-[10px] uppercase tracking-wide text-muted-foreground">
{toolkit.category}
</p>
) : null}
</div>
{isConnected ? (
<div className="flex items-center gap-2">
<span className="inline-flex items-center gap-1 text-xs text-emerald-600">
<Check className="h-3 w-3" />
{t(($) => $.composio.connected)}
</span>
<Button
variant="outline"
size="sm"
onClick={() => onDisconnect(connectionId!, toolkit.name || toolkit.slug)}
aria-label={t(($) => $.composio.disconnect)}
>
<Trash2 className="h-3 w-3" />
</Button>
</div>
) : expired ? (
// Token-expired connection: surface the failure and let the user
// re-run the same connect flow in one click (no disconnect step).
<div className="flex items-center gap-2">
<span className="inline-flex items-center gap-1 text-xs text-amber-600">
<AlertTriangle className="h-3 w-3" />
{t(($) => $.composio.expired)}
</span>
<Button size="sm" variant="outline" onClick={onConnect} disabled={anyConnecting}>
{connecting ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<RefreshCw className="h-3 w-3" />
)}
{connecting ? t(($) => $.composio.connecting) : t(($) => $.composio.reconnect)}
</Button>
</div>
) : toolkit.connectable ? (
<Button size="sm" onClick={onConnect} disabled={anyConnecting}>
{connecting ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<Plug className="h-3 w-3" />
)}
{connecting ? t(($) => $.composio.connecting) : t(($) => $.composio.connect)}
</Button>
) : (
<span
className="shrink-0 rounded bg-muted px-1.5 py-0.5 text-[10px] text-muted-foreground"
title={t(($) => $.composio.not_connectable_hint)}
>
{t(($) => $.composio.not_connectable)}
</span>
)}
</CardContent>
</Card>
);
}
function ToolkitLogo({ toolkit }: { toolkit: ComposioToolkit }) {
const initial = (toolkit.name || toolkit.slug).charAt(0).toUpperCase();
if (toolkit.logo) {
return (
<img
src={toolkit.logo}
alt=""
className="h-8 w-8 shrink-0 rounded bg-muted object-contain"
/>
);
}
return (
<div className="flex h-8 w-8 shrink-0 items-center justify-center rounded bg-muted text-xs font-semibold text-muted-foreground">
{initial}
</div>
);
}

View File

@@ -1,22 +1,42 @@
"use client";
import { useQuery } from "@tanstack/react-query";
import { LarkTab } from "./lark-tab";
import { ComposioTab } from "./composio-tab";
import { ApiError } from "@multica/core/api";
import { composioToolkitsOptions } from "@multica/core/composio";
import { useT } from "../../i18n";
// Integrations is the umbrella tab for third-party platform connections.
// GitHub has its own top-level tab (see github-tab.tsx); everything else
// — currently just Lark, with Slack/Linear etc. to follow — lives in
// here under its own section heading so additional integrations slot in
// without changing the IA. IntegrationsTab is just the host; each
// integration owns its own description and install flow.
// — Lark, Composio, with Slack/Linear etc. to follow — lives in here under
// its own section heading so additional integrations slot in without changing
// the IA. IntegrationsTab is just the host; each integration owns its own
// description and install flow.
export function IntegrationsTab() {
const { t } = useT("settings");
// Composio is hidden entirely until a key is configured server-side. A 503
// from the toolkits endpoint means COMPOSIO_API_KEY is unset; rather than
// render a card that leaks an internal env-var name to every end user, the
// whole section (heading + body) is withheld. Admin-only "set this up"
// guidance is a later, role-gated affordance (MUL-3720 discussion).
const composioToolkits = useQuery(composioToolkitsOptions());
const composioUnconfigured =
composioToolkits.error instanceof ApiError && composioToolkits.error.status === 503;
return (
<div className="space-y-10">
<section className="space-y-4">
<h2 className="text-sm font-semibold">{t(($) => $.lark.section_title)}</h2>
<LarkTab />
</section>
{!composioUnconfigured && (
<section className="space-y-4">
<h2 className="text-sm font-semibold">{t(($) => $.composio.section_title)}</h2>
<ComposioTab />
</section>
)}
</div>
);
}

View File

@@ -0,0 +1,68 @@
package main
import (
"io"
"net/http"
"testing"
)
// TestComposioCallbackIsPublic_NoCookieNot401 locks in the MUL-3843 fix: the
// Composio OAuth callback must live OUTSIDE the Auth middleware group, because
// Composio 302-redirects the user's browser to it and the cookie session is
// frequently absent (expired session, SameSite=Strict / Safari ITP, private
// window, self-hosted callback subdomain). Before the fix the route sat under
// Auth, so a cookie-less browser got a hard 401 and a JSON blob instead of the
// settings redirect — the exact symptom Yushen hit.
//
// With no COMPOSIO_API_KEY configured in the test env, h.Composio == nil, so a
// cookie-less hit on the callback now reaches the handler and returns 503
// ("not configured") rather than being short-circuited to 401 by the Auth
// middleware. The precise non-401 code is incidental; what this test pins is
// that the request is NOT rejected by auth.
func TestComposioCallbackIsPublic_NoCookieNot401(t *testing.T) {
// Deliberately send NO Authorization header / cookie — simulate the
// cookie-stripped browser redirect coming back from Composio.
resp, err := http.Get(testServer.URL + "/api/integrations/composio/callback?state=bogus&status=success&connected_account_id=ca_x")
if err != nil {
t.Fatalf("callback request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode == http.StatusUnauthorized {
t.Fatalf("callback returned 401 without a session — it is still behind the Auth group (regression of MUL-3843). body=%s", body)
}
}
// TestComposioNonCallbackEndpointsStayGated is the other half of the invariant:
// moving the callback out of the Auth group must NOT loosen the four
// session-scoped endpoints. A cookie-less request to them must still 401.
func TestComposioNonCallbackEndpointsStayGated(t *testing.T) {
gated := []struct {
method string
path string
}{
{http.MethodPost, "/api/integrations/composio/connect/init"},
{http.MethodGet, "/api/integrations/composio/toolkits"},
{http.MethodGet, "/api/integrations/composio/connections"},
{http.MethodDelete, "/api/integrations/composio/connections/11111111-1111-1111-1111-111111111111"},
}
for _, tc := range gated {
t.Run(tc.method+" "+tc.path, func(t *testing.T) {
req, err := http.NewRequest(tc.method, testServer.URL+tc.path, nil)
if err != nil {
t.Fatalf("build request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("expected 401 without a session, got %d — endpoint is no longer auth-gated", resp.StatusCode)
}
})
}
}

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"crypto/sha256"
"log/slog"
"net/http"
"net/netip"
@@ -25,6 +26,7 @@ import (
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/integrations/channel"
"github.com/multica-ai/multica/server/internal/integrations/channel/engine"
composiointeg "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/integrations/lark"
"github.com/multica-ai/multica/server/internal/integrations/slack"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
@@ -34,6 +36,7 @@ import (
"github.com/multica-ai/multica/server/internal/storage"
"github.com/multica-ai/multica/server/internal/util"
"github.com/multica-ai/multica/server/internal/util/secretbox"
composiosdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/featureflag"
)
@@ -435,6 +438,47 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
slog.Info("slack integration disabled (MULTICA_SLACK_SECRET_KEY not set)")
}
// Composio integration (MUL-3720). Gated by COMPOSIO_API_KEY — the
// project-scoped key the standalone SDK authenticates Composio with (sent
// as x-api-key; the project is resolved from the key, so NO project id is
// configured). When unset the whole block is skipped and the composio HTTP
// handlers return 503; existing deployments are unaffected. An operator opts
// in by setting COMPOSIO_API_KEY plus a callback base
// (COMPOSIO_CALLBACK_BASE_URL, falling back to MULTICA_PUBLIC_URL). The
// toolkit→auth-config mapping is NOT configured here — it is resolved
// dynamically from the project's /auth_configs at request time, so enabling
// a toolkit is a dashboard action, not a redeploy. State signing uses
// COMPOSIO_STATE_SECRET, or a key derived from JWT_SECRET when that is unset.
if composioAPIKey := strings.TrimSpace(os.Getenv("COMPOSIO_API_KEY")); composioAPIKey != "" {
sdkClient, err := composiosdk.NewClient(composiosdk.Options{APIKey: composioAPIKey})
if err != nil {
slog.Error("composio: SDK client init failed; composio integration disabled", "error", err)
} else {
stateSecret := composioStateSecret()
callbackBase := composioCallbackBaseURL(signupConfig.PublicURL)
switch {
case len(stateSecret) == 0:
slog.Error("composio: no state secret (set COMPOSIO_STATE_SECRET or JWT_SECRET); composio integration disabled")
case callbackBase == "":
slog.Error("composio: no callback base url (set COMPOSIO_CALLBACK_BASE_URL or MULTICA_PUBLIC_URL); composio integration disabled")
default:
svc, serr := composiointeg.NewService(sdkClient, queries, composiointeg.Config{
StateSecret: stateSecret,
CallbackBaseURL: callbackBase,
FrontendBaseURL: appURLFromEnv(),
})
if serr != nil {
slog.Error("composio: service init failed; composio integration disabled", "error", serr)
} else {
h.Composio = svc
slog.Info("composio integration enabled")
}
}
}
} else {
slog.Info("composio integration disabled (COMPOSIO_API_KEY not set)")
}
if opts.HeartbeatScheduler != nil {
h.HeartbeatScheduler = opts.HeartbeatScheduler
}
@@ -568,6 +612,18 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// HandleCloudBillingStripeWebhook for the rationale).
r.Post("/api/webhooks/stripe", h.HandleCloudBillingStripeWebhook)
// Composio OAuth callback (MUL-3843). NOT under the Auth group on purpose:
// Composio 302-redirects the user's browser here at the end of the OAuth
// flow, and the cookie session is frequently absent (expired session,
// SameSite=Strict / Safari ITP stripping cross-site cookies, private
// windows, self-hosted callbacks on a different subdomain). Identity is NOT
// taken from the session — it comes from the HMAC-signed `state` query
// param, which CompleteCallback verifies (signature, expiry, replay) before
// doing anything. h.Composio == nil still returns 503. Keeping it inside the
// Auth group made a missing cookie a hard 401, breaking the flow for exactly
// the browsers above; the other four composio endpoints stay session-gated.
r.Get("/api/integrations/composio/callback", h.ComposioCallback)
// Daemon API routes (require daemon token or valid user token)
r.Route("/api/daemon", func(r chi.Router) {
r.Use(middleware.DaemonAuth(queries, patCache, daemonTokenCache, cloudPATVerifier))
@@ -725,6 +781,18 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// is combined with the logged-in user to create the mapping.
r.Post("/api/lark/binding/redeem", h.RedeemLarkBindingToken)
// Composio integration (MUL-3720). User-scoped (no workspace context):
// a connection belongs to a user. These four require a logged-in
// session; the OAuth callback is the outlier and lives outside the Auth
// group (registered above with the other public OAuth/webhook routes —
// see MUL-3843). All return 503 when COMPOSIO_API_KEY is unset.
r.Route("/api/integrations/composio", func(r chi.Router) {
r.Post("/connect/init", h.ComposioConnectInit)
r.Get("/toolkits", h.ListComposioToolkits)
r.Get("/connections", h.ListComposioConnections)
r.Delete("/connections/{id}", h.DeleteComposioConnection)
})
// User-scoped invitation routes (no workspace context required)
r.Get("/api/invitations", h.ListMyInvitations)
r.Get("/api/invitations/{id}", h.GetMyInvitation)
@@ -1247,3 +1315,31 @@ func cloudRuntimeFleetURLFromEnv() string {
}
return strings.TrimSpace(os.Getenv("MULTICA_FLEET_URL"))
}
// composioStateSecret resolves the HMAC key for the connect-state. Prefers an
// explicit COMPOSIO_STATE_SECRET; otherwise derives a composio-specific key
// from JWT_SECRET via SHA-256 so the two signing domains never share an
// identical key. Returns nil when neither is set (composio stays disabled).
func composioStateSecret() []byte {
if v := strings.TrimSpace(os.Getenv("COMPOSIO_STATE_SECRET")); v != "" {
return []byte(v)
}
if v := strings.TrimSpace(os.Getenv("JWT_SECRET")); v != "" {
sum := sha256.Sum256([]byte("composio-state:" + v))
return sum[:]
}
return nil
}
// composioCallbackBaseURL resolves the public API base used to build the
// Composio callback URL. Prefers COMPOSIO_CALLBACK_BASE_URL, then the
// already-resolved MULTICA_PUBLIC_URL, then the app URL.
func composioCallbackBaseURL(publicURL string) string {
if v := strings.TrimRight(strings.TrimSpace(os.Getenv("COMPOSIO_CALLBACK_BASE_URL")), "/"); v != "" {
return v
}
if publicURL != "" {
return publicURL
}
return appURLFromEnv()
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/featureflagdispatch"
"github.com/multica-ai/multica/server/internal/integrations/channel/engine"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/integrations/lark"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/middleware"
@@ -157,6 +158,10 @@ type Handler struct {
// UI consults IsConfigured() to decide whether to surface install
// entry points.
LarkAPIClient lark.APIClient
// Composio integration (MUL-3720). Nil when COMPOSIO_API_KEY is unset;
// the composio HTTP handlers return 503 in that case. Wired in
// cmd/server/router.go after handler.New.
Composio *composio.Service
// ChannelSupervisor owns the per-installation supervisor goroutines
// that hold the §4.4 WS lease and drive each channel.Channel
// (MUL-3620 generalized the Feishu-only Hub into this channel-agnostic

View File

@@ -0,0 +1,219 @@
package handler
import (
"encoding/json"
"errors"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
)
// Composio integration handlers (MUL-3720, Stage 2 MVP). A Composio connection
// belongs to a user, not a workspace, so these handlers live outside the
// workspace-membership group. The four management endpoints (connect/init,
// toolkits, connections, delete) are user-scoped (requireUserID) and sit under
// the Auth middleware. ComposioCallback is the exception: it is a public route
// (outside the Auth group, see router.go / MUL-3843) because the browser often
// arrives without a session cookie — its identity comes from the signed state,
// not requireUserID. The whole block returns 503 when h.Composio is nil
// (COMPOSIO_API_KEY unset), matching the Lark/GitHub "integration not
// configured" convention.
// ComposioConnectInitRequest is the POST /connect/init body.
type ComposioConnectInitRequest struct {
ToolkitSlug string `json:"toolkit_slug"`
}
// ComposioConnectInitResponse carries the hosted Composio Connect Link the
// frontend redirects the user to.
type ComposioConnectInitResponse struct {
RedirectURL string `json:"redirect_url"`
}
// ComposioConnectionResponse is the wire shape for one connection row.
type ComposioConnectionResponse struct {
ID string `json:"id"`
ToolkitSlug string `json:"toolkit_slug"`
Status string `json:"status"`
ConnectedAt string `json:"connected_at"`
LastUsedAt *string `json:"last_used_at"`
}
// ComposioToolkitResponse is the wire shape for one toolkit in the catalog.
// connectable is the key UX signal: false means the project has no enabled
// auth config for the toolkit, so the UI must not offer a working Connect
// button (BeginConnect would 400).
type ComposioToolkitResponse struct {
Slug string `json:"slug"`
Name string `json:"name"`
Logo string `json:"logo,omitempty"`
Category string `json:"category,omitempty"`
Connectable bool `json:"connectable"`
}
// ComposioConnectInit (POST /api/integrations/composio/connect/init) starts a
// hosted Composio auth flow for the requested toolkit and returns the redirect
// URL. An unsupported toolkit slug is a 400 (the MVP only wires Notion).
func (h *Handler) ComposioConnectInit(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
var req ComposioConnectInitRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if strings.TrimSpace(req.ToolkitSlug) == "" {
writeError(w, http.StatusBadRequest, "toolkit_slug is required")
return
}
redirectURL, err := h.Composio.BeginConnect(r.Context(), userUUID, req.ToolkitSlug)
if err != nil {
if errors.Is(err, composio.ErrToolkitNotSupported) {
writeError(w, http.StatusBadRequest, "toolkit not supported")
return
}
writeError(w, http.StatusBadGateway, "failed to start composio connect")
return
}
writeJSON(w, http.StatusOK, ComposioConnectInitResponse{RedirectURL: redirectURL})
}
// ComposioCallback (GET /api/integrations/composio/callback) is the browser
// redirect target Composio sends the user back to after the hosted flow. It is
// registered as a PUBLIC route (outside the Auth middleware group — see
// router.go / MUL-3843), because the browser frequently lands here without a
// session cookie (expired session, SameSite/ITP stripping, private window,
// self-hosted callback subdomain). Identity therefore comes solely from the
// HMAC-signed `state` query param, which CompleteCallback verifies before
// doing anything. On success the row is upserted and the browser is redirected
// to the settings page; any failure redirects to the same page with a stable
// error code so the user is never left on a blank API response.
func (h *Handler) ComposioCallback(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
q := r.URL.Query()
state := q.Get("state")
status := q.Get("status")
connectedAccountID := q.Get("connected_account_id")
slug, err := h.Composio.CompleteCallback(r.Context(), state, status, connectedAccountID)
if err != nil {
// Every failure (tampered/expired state, non-success status, write
// error) collapses to the generic failure redirect — we never tell the
// browser which check failed.
http.Redirect(w, r, h.Composio.CallbackRedirect(slug, false), http.StatusFound)
return
}
http.Redirect(w, r, h.Composio.CallbackRedirect(slug, true), http.StatusFound)
}
// ListComposioConnections (GET /api/integrations/composio/connections) returns
// the caller's active connections.
func (h *Handler) ListComposioConnections(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
conns, err := h.Composio.ListConnections(r.Context(), userUUID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list composio connections")
return
}
out := make([]ComposioConnectionResponse, 0, len(conns))
for _, c := range conns {
out = append(out, ComposioConnectionResponse{
ID: c.ID,
ToolkitSlug: c.ToolkitSlug,
Status: c.Status,
ConnectedAt: c.ConnectedAt,
LastUsedAt: c.LastUsedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// ListComposioToolkits (GET /api/integrations/composio/toolkits) returns the
// full Composio toolkit catalog for the Settings UI to render. Each entry
// carries a `connectable` flag: only toolkits with an enabled auth config in
// the project can actually be connected, so the UI gates its Connect button on
// it. The catalog itself is project-global (not per-user), but the route is
// user-scoped (requireUser) like the rest of the block.
func (h *Handler) ListComposioToolkits(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
if _, ok := requireUserID(w, r); !ok {
return
}
toolkits, err := h.Composio.ListToolkits(r.Context())
if err != nil {
writeError(w, http.StatusBadGateway, "failed to list composio toolkits")
return
}
out := make([]ComposioToolkitResponse, 0, len(toolkits))
for _, tk := range toolkits {
out = append(out, ComposioToolkitResponse{
Slug: tk.Slug,
Name: tk.Name,
Logo: tk.LogoURL,
Category: tk.Category,
Connectable: tk.Connectable,
})
}
writeJSON(w, http.StatusOK, out)
}
// DeleteComposioConnection (DELETE /api/integrations/composio/connections/{id})
// disconnects a connection the caller owns. Idempotent at the service layer;
// a connection that does not belong to the caller is a 404.
func (h *Handler) DeleteComposioConnection(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
connUUID, ok := parseUUIDOrBadRequest(w, chi.URLParam(r, "id"), "connection id")
if !ok {
return
}
if err := h.Composio.Disconnect(r.Context(), userUUID, connUUID); err != nil {
if errors.Is(err, composio.ErrConnectionNotFound) {
writeError(w, http.StatusNotFound, "composio connection not found")
return
}
writeError(w, http.StatusBadGateway, "failed to disconnect composio connection")
return
}
w.WriteHeader(http.StatusNoContent)
}

View File

@@ -0,0 +1,325 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/util"
sdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
const composioTestUserID = "22222222-2222-2222-2222-222222222222"
// --- local fakes (handler package can only see the exported interfaces) ---
type composioFakeSDK struct {
createLinkResp *sdk.CreateLinkResponse
revokeErr error
}
func (f *composioFakeSDK) CreateLink(_ context.Context, _ sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
if f.createLinkResp != nil {
return f.createLinkResp, nil
}
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect"}, nil
}
// ListConnectedAccounts echoes the requested id as an account owned by the
// handler-test user under the notion auth config, so callback ownership
// verification passes on the happy path.
func (f *composioFakeSDK) ListConnectedAccounts(_ context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error) {
id := ""
if len(req.ConnectedAccountIDs) > 0 {
id = req.ConnectedAccountIDs[0]
}
return &sdk.ListConnectedAccountsResponse{Items: []sdk.ConnectedAccount{{
ID: id,
UserID: composioTestUserID,
AuthConfigID: "ac_notion",
}}}, nil
}
func (f *composioFakeSDK) RevokeConnection(_ context.Context, _ string) error { return f.revokeErr }
func (f *composioFakeSDK) DeleteConnectedAccount(_ context.Context, _ string) error { return nil }
// ListAuthConfigs reports a single enabled notion auth config so BeginConnect
// resolves notion → ac_notion and the callback's auth-config check matches.
func (f *composioFakeSDK) ListAuthConfigs(_ context.Context, _ sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error) {
return &sdk.ListAuthConfigsResponse{Items: []sdk.AuthConfig{{
ID: "ac_notion",
Toolkit: sdk.Toolkit{Slug: "notion"},
Status: "ENABLED",
}}}, nil
}
func (f *composioFakeSDK) ListToolkits(_ context.Context, _ sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error) {
return &sdk.ListToolkitsResponse{Items: []sdk.Toolkit{
{Slug: "notion", Name: "Notion"},
{Slug: "github", Name: "GitHub"},
}}, nil
}
func (f *composioFakeSDK) CreateSession(_ context.Context, _ sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error) {
return &sdk.CreateSessionResponse{}, nil
}
func (f *composioFakeSDK) MCPAuthHeaders() map[string]string {
return map[string]string{"x-api-key": "k"}
}
type composioFakeStore struct {
rows []db.UserComposioConnection
nextID byte
}
func (s *composioFakeStore) UpsertUserComposioConnection(_ context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error) {
for i := range s.rows {
if s.rows[i].UserID.Bytes == arg.UserID.Bytes && s.rows[i].ConnectedAccountID == arg.ConnectedAccountID {
s.rows[i].Status = "active"
return s.rows[i], nil
}
}
s.nextID++
var b [16]byte
b[15] = s.nextID
row := db.UserComposioConnection{
ID: pgtype.UUID{Bytes: b, Valid: true},
UserID: arg.UserID,
ToolkitSlug: arg.ToolkitSlug,
AuthConfigID: arg.AuthConfigID,
ConnectedAccountID: arg.ConnectedAccountID,
ComposioUserID: arg.ComposioUserID,
Status: "active",
ConnectedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
}
s.rows = append(s.rows, row)
return row, nil
}
func (s *composioFakeStore) ListActiveUserComposioConnections(_ context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error) {
out := []db.UserComposioConnection{}
for _, r := range s.rows {
if r.UserID.Bytes == userID.Bytes && r.Status == "active" {
out = append(out, r)
}
}
return out, nil
}
func (s *composioFakeStore) GetUserComposioConnection(_ context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error) {
for _, r := range s.rows {
if r.ID.Bytes == arg.ID.Bytes && r.UserID.Bytes == arg.UserID.Bytes {
return r, nil
}
}
return db.UserComposioConnection{}, pgx.ErrNoRows
}
func (s *composioFakeStore) MarkUserComposioConnectionRevoked(_ context.Context, arg db.MarkUserComposioConnectionRevokedParams) error {
for i := range s.rows {
if s.rows[i].ID.Bytes == arg.ID.Bytes && s.rows[i].UserID.Bytes == arg.UserID.Bytes {
s.rows[i].Status = "revoked"
}
}
return nil
}
func newComposioTestHandler(t *testing.T, sdkFake composio.SDK, store composio.Store) *Handler {
t.Helper()
svc, err := composio.NewService(sdkFake, store, composio.Config{
StateSecret: []byte("handler-test-secret"),
CallbackBaseURL: "https://app.multica.ai",
FrontendBaseURL: "https://app.multica.ai",
})
if err != nil {
t.Fatalf("NewService: %v", err)
}
return &Handler{Composio: svc}
}
func composioReq(method, target, body string) *http.Request {
var r *http.Request
if body != "" {
r = httptest.NewRequest(method, target, strings.NewReader(body))
} else {
r = httptest.NewRequest(method, target, nil)
}
r.Header.Set("X-User-ID", composioTestUserID)
return r
}
// --- tests ---
func TestComposio_ServiceUnavailableWhenNil(t *testing.T) {
h := &Handler{}
for _, hf := range []http.HandlerFunc{
h.ComposioConnectInit, h.ComposioCallback, h.ListComposioConnections, h.DeleteComposioConnection,
} {
w := httptest.NewRecorder()
hf(w, composioReq(http.MethodGet, "/", ""))
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503 when Composio nil, got %d", w.Code)
}
}
}
func TestComposio_ConnectInit(t *testing.T) {
h := newComposioTestHandler(t, &composioFakeSDK{}, &composioFakeStore{})
// success
w := httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{"toolkit_slug":"notion"}`))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (%s)", w.Code, w.Body.String())
}
var resp ComposioConnectInitResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.RedirectURL == "" {
t.Error("expected redirect_url")
}
// unsupported toolkit → 400
w = httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{"toolkit_slug":"github"}`))
if w.Code != http.StatusBadRequest {
t.Errorf("unsupported toolkit: expected 400, got %d", w.Code)
}
// missing slug → 400
w = httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{}`))
if w.Code != http.StatusBadRequest {
t.Errorf("missing slug: expected 400, got %d", w.Code)
}
}
func TestComposio_ListToolkits(t *testing.T) {
h := newComposioTestHandler(t, &composioFakeSDK{}, &composioFakeStore{})
w := httptest.NewRecorder()
h.ListComposioToolkits(w, composioReq(http.MethodGet, "/toolkits", ""))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (%s)", w.Code, w.Body.String())
}
var toolkits []ComposioToolkitResponse
if err := json.Unmarshal(w.Body.Bytes(), &toolkits); err != nil {
t.Fatalf("decode: %v", err)
}
if len(toolkits) != 2 {
t.Fatalf("expected 2 toolkits, got %d", len(toolkits))
}
// notion has an enabled auth config in the fake → connectable + sorted first.
if toolkits[0].Slug != "notion" || !toolkits[0].Connectable {
t.Errorf("first toolkit = %+v, want connectable notion", toolkits[0])
}
for _, tk := range toolkits {
if tk.Slug == "github" && tk.Connectable {
t.Error("github has no auth config and must not be connectable")
}
}
}
func TestComposio_CallbackRedirects(t *testing.T) {
store := &composioFakeStore{}
// Mint a valid signed state by driving BeginConnect through a capturing
// SDK, then replay it through the real callback handler.
capturing := &composioCapturingSDK{}
h2 := newComposioTestHandler(t, capturing, store)
bw := httptest.NewRecorder()
h2.ComposioConnectInit(bw, composioReq(http.MethodPost, "/", `{"toolkit_slug":"notion"}`))
state := capturing.stateFromCallback()
if state == "" {
t.Fatal("could not capture signed state")
}
w := httptest.NewRecorder()
h2.ComposioCallback(w, composioReq(http.MethodGet, "/callback?state="+state+"&status=success&connected_account_id=ca_1", ""))
if w.Code != http.StatusFound {
t.Fatalf("expected 302, got %d", w.Code)
}
if loc := w.Header().Get("Location"); !strings.Contains(loc, "connected=notion") {
t.Errorf("success location = %q", loc)
}
// failure path: bad state → error redirect
w = httptest.NewRecorder()
h2.ComposioCallback(w, composioReq(http.MethodGet, "/callback?state=bad&status=success&connected_account_id=ca_1", ""))
if w.Code != http.StatusFound {
t.Fatalf("expected 302 on bad state, got %d", w.Code)
}
if loc := w.Header().Get("Location"); !strings.Contains(loc, "error=composio_connect_failed") {
t.Errorf("failure location = %q", loc)
}
}
func TestComposio_ListAndDelete(t *testing.T) {
store := &composioFakeStore{}
userUUID, _ := util.ParseUUID(composioTestUserID)
row, _ := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
UserID: userUUID,
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
ConnectedAccountID: "ca_list",
ComposioUserID: composioTestUserID,
})
h := newComposioTestHandler(t, &composioFakeSDK{}, store)
// list
w := httptest.NewRecorder()
h.ListComposioConnections(w, composioReq(http.MethodGet, "/connections", ""))
if w.Code != http.StatusOK {
t.Fatalf("list: expected 200, got %d", w.Code)
}
var conns []ComposioConnectionResponse
if err := json.Unmarshal(w.Body.Bytes(), &conns); err != nil {
t.Fatalf("decode: %v", err)
}
if len(conns) != 1 || conns[0].ToolkitSlug != "notion" {
t.Fatalf("conns = %+v", conns)
}
// delete (owner) → 204, routed through chi so {id} resolves
r := chi.NewRouter()
r.Delete("/api/integrations/composio/connections/{id}", h.DeleteComposioConnection)
delReq := composioReq(http.MethodDelete, "/api/integrations/composio/connections/"+util.UUIDToString(row.ID), "")
w = httptest.NewRecorder()
r.ServeHTTP(w, delReq)
if w.Code != http.StatusNoContent {
t.Fatalf("delete: expected 204, got %d (%s)", w.Code, w.Body.String())
}
// delete unknown id → 404
missing := "33333333-3333-3333-3333-333333333333"
w = httptest.NewRecorder()
r.ServeHTTP(w, composioReq(http.MethodDelete, "/api/integrations/composio/connections/"+missing, ""))
if w.Code != http.StatusNotFound {
t.Fatalf("delete missing: expected 404, got %d", w.Code)
}
}
// composioCapturingSDK records the callback URL so a test can replay the signed
// state through the real callback handler.
type composioCapturingSDK struct {
composioFakeSDK
lastCallbackURL string
}
func (f *composioCapturingSDK) CreateLink(_ context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
f.lastCallbackURL = req.CallbackURL
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect"}, nil
}
func (f *composioCapturingSDK) stateFromCallback() string {
idx := strings.Index(f.lastCallbackURL, "state=")
if idx < 0 {
return ""
}
return f.lastCallbackURL[idx+len("state="):]
}

View File

@@ -0,0 +1,685 @@
// Package composio is the Stage 2 business-integration glue between Multica and
// the standalone Composio SDK (server/pkg/composio). It owns Multica semantics:
// the signed-state connect handshake, the local user_composio_connection
// mirror, idempotent disconnect, and the per-user MCP session helper.
//
// It deliberately does NOT wrap the SDK in another HTTP client — it composes
// *sdk.Client directly through the SDK interface so tests can drop in a fake.
//
// MVP scope (MUL-3720): toolkits are discovered dynamically. The
// toolkit→auth-config mapping is resolved at request time from Composio's
// /auth_configs endpoint (cached briefly), so a toolkit becomes connectable the
// moment an auth config is enabled for it in the Composio dashboard — no env
// var and no redeploy. A toolkit with no enabled auth config is rejected.
package composio
import (
"context"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"sync"
"time"
sdk "github.com/multica-ai/multica/server/pkg/composio"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// Service-level errors surfaced to the handler layer.
var (
// ErrToolkitNotSupported is returned by BeginConnect when the requested
// toolkit has no enabled auth config in the Composio project, so there is
// no auth_config_id to start a connect link with.
ErrToolkitNotSupported = errors.New("composio: toolkit not supported")
// ErrConnectNotSuccessful is returned by CompleteCallback when Composio
// reported a non-success status — no active row is written.
ErrConnectNotSuccessful = errors.New("composio: connection was not successful")
// ErrConnectionNotFound is returned by Disconnect when the connection id
// does not belong to the user (or does not exist).
ErrConnectionNotFound = errors.New("composio: connection not found")
// ErrAccountVerification is returned by CompleteCallback when the
// connected_account_id carried on the callback cannot be confirmed (with
// Composio) to belong to the user/auth-config named in the signed state —
// i.e. a tampered or unknown account id. No local row is written.
ErrAccountVerification = errors.New("composio: connected account verification failed")
)
// defaultStateTTL bounds how long a connect handshake may sit between
// BeginConnect and the Composio callback. Five minutes is generous for a hosted
// OAuth flow while keeping the replay window small.
const defaultStateTTL = 5 * time.Minute
// defaultAuthCacheTTL bounds how long the resolved toolkit→auth-config map is
// cached before a re-fetch from Composio. Short enough that enabling an auth
// config in the dashboard reflects within minutes; long enough that a burst of
// connect/list requests does not hammer /auth_configs.
const defaultAuthCacheTTL = 5 * time.Minute
// maxAuthConfigPages / maxToolkitPages cap the paginated fetch-all loops so a
// pathological or buggy upstream cursor cannot spin forever. At limit=1000 per
// page these cover far more than any real project / catalog.
const (
maxAuthConfigPages = 20
maxToolkitPages = 20
listPageLimit = 1000
)
// SDK is the subset of *sdk.Client the service depends on. Declared as an
// interface so handler/service tests can inject a fake without hitting Composio.
// *sdk.Client satisfies it.
type SDK interface {
CreateLink(ctx context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error)
ListConnectedAccounts(ctx context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error)
ListAuthConfigs(ctx context.Context, req sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error)
ListToolkits(ctx context.Context, req sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error)
RevokeConnection(ctx context.Context, connectedAccountID string) error
DeleteConnectedAccount(ctx context.Context, connectedAccountID string) error
CreateSession(ctx context.Context, req sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error)
MCPAuthHeaders() map[string]string
}
// Store is the persistence seam for the local connection mirror. *db.Queries
// satisfies it; tests use an in-memory fake.
type Store interface {
UpsertUserComposioConnection(ctx context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error)
ListActiveUserComposioConnections(ctx context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error)
GetUserComposioConnection(ctx context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error)
MarkUserComposioConnectionRevoked(ctx context.Context, arg db.MarkUserComposioConnectionRevokedParams) error
}
// Config configures a Service.
type Config struct {
// StateSecret signs the connect-state HMAC. Required (non-empty).
StateSecret []byte
// CallbackBaseURL is the absolute, public base URL of THIS API, with no
// trailing slash (e.g. "https://app.multica.ai"). The Composio callback
// URL is built as CallbackBaseURL + CallbackPath. Required.
CallbackBaseURL string
// FrontendBaseURL is the web app base used to build the post-callback
// browser redirect (e.g. "https://app.multica.ai"). May be empty, in which
// case CallbackRedirect returns a site-relative path.
FrontendBaseURL string
// StateTTL overrides the default connect-state lifetime. Zero uses
// defaultStateTTL.
StateTTL time.Duration
// AuthConfigTTL overrides how long the resolved toolkit→auth-config map is
// cached. Zero uses defaultAuthCacheTTL.
AuthConfigTTL time.Duration
// Now is overridable for deterministic tests. Nil uses time.Now.
Now func() time.Time
}
// callbackPath is the API path Composio redirects the browser back to. It is a
// constant (not configurable) so the SDK callback URL and the router route
// cannot drift apart.
const callbackPath = "/api/integrations/composio/callback"
// Service is the Composio business-integration service.
type Service struct {
sdk SDK
store Store
secret []byte
callbackURL string
frontendURL string
stateTTL time.Duration
now func() time.Time
// authCache holds the resolved toolkit_slug → auth_config_id map for the
// project. It is rebuilt from Composio's /auth_configs endpoint on first
// use and whenever authCacheExp has passed; authCacheMu guards both fields.
authCacheMu sync.Mutex
authCache map[string]string
authCacheExp time.Time
authCacheTTL time.Duration
}
// NewService validates its inputs and returns a ready Service. It errors when a
// required dependency is missing so a misconfigured boot fails loudly instead
// of returning 500s at request time.
func NewService(client SDK, store Store, cfg Config) (*Service, error) {
if client == nil {
return nil, errors.New("composio: SDK client is required")
}
if store == nil {
return nil, errors.New("composio: store is required")
}
if len(cfg.StateSecret) == 0 {
return nil, errors.New("composio: StateSecret is required")
}
base := strings.TrimRight(strings.TrimSpace(cfg.CallbackBaseURL), "/")
if base == "" {
return nil, errors.New("composio: CallbackBaseURL is required")
}
ttl := cfg.StateTTL
if ttl <= 0 {
ttl = defaultStateTTL
}
authTTL := cfg.AuthConfigTTL
if authTTL <= 0 {
authTTL = defaultAuthCacheTTL
}
now := cfg.Now
if now == nil {
now = time.Now
}
return &Service{
sdk: client,
store: store,
secret: cfg.StateSecret,
callbackURL: base + callbackPath,
frontendURL: strings.TrimRight(strings.TrimSpace(cfg.FrontendBaseURL), "/"),
stateTTL: ttl,
now: now,
authCacheTTL: authTTL,
}, nil
}
// Connection is the API-facing view of a local connection row. The Composio
// connected_account_id and auth_config_id are intentionally omitted — they are
// server-internal handles, not API surface.
type Connection struct {
ID string `json:"id"`
ToolkitSlug string `json:"toolkit_slug"`
Status string `json:"status"`
ConnectedAt string `json:"connected_at"`
LastUsedAt *string `json:"last_used_at"`
}
// MCPSession is the result of CreateMCPSession: the streamable MCP URL plus the
// headers an MCP client must attach. Headers carry the Composio x-api-key, so
// callers must route them through the redact pipeline before logging.
type MCPSession struct {
URL string
Headers map[string]string
}
// ToolkitView is the API-facing descriptor for one Composio toolkit, carrying
// exactly the fields the Settings UI renders plus a Connectable flag.
//
// Connectable means the project has an enabled auth config for the toolkit, so
// BeginConnect would succeed. When false the UI must not offer a working
// "Connect" affordance — clicking it would 400 with ErrToolkitNotSupported.
type ToolkitView struct {
Slug string `json:"slug"`
Name string `json:"name"`
LogoURL string `json:"logo,omitempty"`
Category string `json:"category,omitempty"`
Connectable bool `json:"connectable"`
}
// BeginConnect validates the toolkit, mints a signed state, and asks Composio
// for a hosted Connect Link. The returned redirect URL is where the caller
// sends the user's browser.
//
// The auth_config_id is resolved dynamically from the project's enabled auth
// configs (cached), so a toolkit is connectable iff the dashboard has an auth
// config for it — no static env map. A toolkit with none yields
// ErrToolkitNotSupported.
//
// The composio_user_id sent to Composio is the Multica user id verbatim — the
// invariant the rest of the integration relies on.
func (s *Service) BeginConnect(ctx context.Context, userID pgtype.UUID, toolkitSlug string) (string, error) {
slug := strings.ToLower(strings.TrimSpace(toolkitSlug))
authConfigID, err := s.authConfigForToolkit(ctx, slug)
if err != nil {
return "", err
}
if authConfigID == "" {
return "", ErrToolkitNotSupported
}
if !userID.Valid {
return "", errors.New("composio: invalid user id")
}
composioUserID := util.UUIDToString(userID)
state, err := signState(s.secret, stateClaims{
UserID: composioUserID,
ToolkitSlug: slug,
AuthConfigID: authConfigID,
Exp: s.now().Add(s.stateTTL).Unix(),
})
if err != nil {
return "", fmt.Errorf("composio: sign state: %w", err)
}
// Composio appends its own status / connected_account_id query params to
// the callback URL and preserves ours, so the signed state rides back to us
// on the redirect.
callbackURL := s.callbackURL + "?state=" + url.QueryEscape(state)
resp, err := s.sdk.CreateLink(ctx, sdk.CreateLinkRequest{
AuthConfigID: authConfigID,
UserID: composioUserID,
CallbackURL: callbackURL,
})
if err != nil {
return "", fmt.Errorf("composio: create link: %w", err)
}
return resp.RedirectURL, nil
}
// CompleteCallback verifies the signed state and, on a successful Composio
// status, upserts the local connection row. It returns the toolkit slug from
// the state so the handler can build the right redirect even on the
// not-successful path.
//
// Idempotency: the upsert is keyed on (user_id, connected_account_id), so a
// duplicate callback re-activates the same row instead of creating a second.
func (s *Service) CompleteCallback(ctx context.Context, state, status, connectedAccountID string) (string, error) {
claims, err := verifyState(s.secret, state, s.now())
if err != nil {
return "", err
}
if !strings.EqualFold(strings.TrimSpace(status), "success") {
// Honor the state for the redirect slug, but do not write an active row.
return claims.ToolkitSlug, ErrConnectNotSuccessful
}
if strings.TrimSpace(connectedAccountID) == "" {
return claims.ToolkitSlug, errors.New("composio: callback missing connected_account_id")
}
userID, err := util.ParseUUID(claims.UserID)
if err != nil {
return claims.ToolkitSlug, fmt.Errorf("composio: state has invalid user id: %w", err)
}
// The auth_config_id was resolved at BeginConnect and signed into the state,
// so we compare against THAT exact value rather than re-resolving here (a
// re-resolve that failed or drifted would otherwise fail-open: a missing
// expected auth config used to skip the check, letting another toolkit's
// account id be bound under this toolkit's slug). An empty value fails
// closed in verifyAccountOwnership.
authConfigID := claims.AuthConfigID
// Defense-in-depth (PR 4608 review): the signed state proves *who* started
// the handshake and *which* toolkit, but connected_account_id rides back as
// a plain query param Composio appends to our callback URL. A crafted
// redirect could pair a valid, un-expired state with someone else's account
// id and we would mirror it verbatim. Before writing, confirm with Composio
// that this account actually belongs to the state's user (the
// composio_user_id == multica user id invariant) and was created under the
// toolkit's auth config. Any mismatch fails closed with ErrAccountVerification.
if err := s.verifyAccountOwnership(ctx, connectedAccountID, claims.UserID, authConfigID); err != nil {
return claims.ToolkitSlug, err
}
if _, err := s.store.UpsertUserComposioConnection(ctx, db.UpsertUserComposioConnectionParams{
UserID: userID,
ToolkitSlug: claims.ToolkitSlug,
AuthConfigID: authConfigID,
ConnectedAccountID: connectedAccountID,
// Invariant: composio_user_id == Multica user id.
ComposioUserID: claims.UserID,
}); err != nil {
return claims.ToolkitSlug, fmt.Errorf("composio: upsert connection: %w", err)
}
return claims.ToolkitSlug, nil
}
// ListConnections returns the user's active connections.
func (s *Service) ListConnections(ctx context.Context, userID pgtype.UUID) ([]Connection, error) {
rows, err := s.store.ListActiveUserComposioConnections(ctx, userID)
if err != nil {
return nil, err
}
out := make([]Connection, 0, len(rows))
for _, row := range rows {
out = append(out, rowToConnection(row))
}
return out, nil
}
// Disconnect revokes and deletes the connection at Composio, then marks the
// local row revoked. It is idempotent: a Composio 404 (already gone) is treated
// as success, and re-revoking an already-revoked local row is a no-op.
//
// A connection id that does not belong to the user (or does not exist at all)
// returns ErrConnectionNotFound so the handler can 404 without leaking
// existence across users.
func (s *Service) Disconnect(ctx context.Context, userID, connectionID pgtype.UUID) error {
row, err := s.store.GetUserComposioConnection(ctx, db.GetUserComposioConnectionParams{
ID: connectionID,
UserID: userID,
})
if err != nil {
// pgx.ErrNoRows or fake "not found" — treat as not found for the owner.
return ErrConnectionNotFound
}
// Already disconnected locally: a repeat DELETE is a pure no-op. Short-
// circuiting here keeps disconnect idempotent even when the upstream now
// answers revoke/delete with a NON-404 error (PR 4608 review): the account
// is already gone, so re-hitting Composio could turn a second DELETE into a
// 502 and break the 204-idempotent contract. The first disconnect already
// revoked upstream and marked the row.
if !strings.EqualFold(row.Status, "active") {
return nil
}
// Revoke the upstream grant first, then delete the Composio record. Both are
// made idempotent against a 404 so a repeated disconnect (or a connection
// already removed at Composio) still resolves the local row.
if err := s.sdk.RevokeConnection(ctx, row.ConnectedAccountID); err != nil && !isNotFound(err) {
return fmt.Errorf("composio: revoke connection: %w", err)
}
// DeleteConnectedAccount already swallows 404 in the SDK, but guard anyway.
if err := s.sdk.DeleteConnectedAccount(ctx, row.ConnectedAccountID); err != nil && !isNotFound(err) {
return fmt.Errorf("composio: delete connected account: %w", err)
}
if err := s.store.MarkUserComposioConnectionRevoked(ctx, db.MarkUserComposioConnectionRevokedParams{
ID: connectionID,
UserID: userID,
}); err != nil {
return fmt.Errorf("composio: mark revoked: %w", err)
}
return nil
}
// CreateMCPSession opens a Composio tool-router (MCP) session scoped to the
// user's active connections. It returns (nil, nil) when the user has no active
// connections — callers treat that as "no MCP overlay for this user".
//
// connected_accounts is pinned per toolkit to the user's own connected account
// id so the session cannot surface accounts the user did not connect. This
// helper is NOT yet wired into task dispatch (Stage 3); it exists so that wiring
// is a pure consumer of an already-tested seam.
//
// Single-account constraint (v1, PR 4608 review follow-up): the MVP connect
// flow assumes AT MOST ONE active connection per (user, toolkit) — there is no
// UI or API to hold several, and connected_accounts is keyed by toolkit slug so
// it physically cannot carry two accounts for the same toolkit. Should
// duplicates ever exist, we must choose deterministically: rows arrive
// newest-first (ListActive orders by connected_at DESC), so we keep the FIRST
// occurrence per toolkit (the most recently connected account) instead of
// letting a later map write silently select an older one.
//
// Stage 3 owns the real decision before this is wired into dispatch: either
// enforce the single-active constraint at connect time (revoke the previous
// active row for the same toolkit on a new connect) or extend CreateSession to
// a multi-account request shape. Until then, newest-wins keeps behavior
// deterministic rather than order-dependent.
func (s *Service) CreateMCPSession(ctx context.Context, userID pgtype.UUID) (*MCPSession, error) {
rows, err := s.store.ListActiveUserComposioConnections(ctx, userID)
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
connectedAccounts := make(map[string]any, len(rows))
for _, row := range rows {
// Keep the first (newest) account per toolkit; ignore older duplicates.
if _, exists := connectedAccounts[row.ToolkitSlug]; exists {
continue
}
connectedAccounts[row.ToolkitSlug] = row.ConnectedAccountID
}
resp, err := s.sdk.CreateSession(ctx, sdk.CreateSessionRequest{
UserID: util.UUIDToString(userID),
ConnectedAccounts: connectedAccounts,
})
if err != nil {
return nil, fmt.Errorf("composio: create session: %w", err)
}
return &MCPSession{
URL: resp.MCP.URL,
Headers: s.sdk.MCPAuthHeaders(),
}, nil
}
// CallbackRedirect builds the browser redirect target for the callback handler.
// On success it points at the settings page (Integrations tab) with the
// connected toolkit slug; on failure it carries a stable error code. The path
// is the slug-less `/settings?tab=integrations&...` form on purpose: the web
// proxy's legacy-route redirect prepends the user's last workspace slug, so it
// resolves to the real `/{slug}/settings?tab=integrations` route that mounts
// the Composio tab. The older `/settings/integrations` path was NOT a real
// route and 404'd after the legacy rewrite. When FrontendBaseURL is unset it
// returns a site-relative path.
func (s *Service) CallbackRedirect(slug string, success bool) string {
var path string
if success {
path = "/settings?tab=integrations&connected=" + url.QueryEscape(slug)
} else {
path = "/settings?tab=integrations&error=composio_connect_failed"
}
return s.frontendURL + path
}
// rowToConnection maps a DB row to the API-facing Connection view.
func rowToConnection(row db.UserComposioConnection) Connection {
c := Connection{
ID: util.UUIDToString(row.ID),
ToolkitSlug: row.ToolkitSlug,
Status: row.Status,
}
if row.ConnectedAt.Valid {
c.ConnectedAt = row.ConnectedAt.Time.UTC().Format(time.RFC3339)
}
c.LastUsedAt = util.TimestampToPtr(row.LastUsedAt)
return c
}
// ListToolkits returns the full Composio toolkit catalog annotated with a
// Connectable flag (whether the project has an enabled auth config for each).
// It fetches all pages (capped by maxToolkitPages) so the UI gets the complete
// list in one call; the catalog is a few hundred entries, well within a single
// JSON response. Connectable toolkits are surfaced first so the UI can lead
// with what actually works.
func (s *Service) ListToolkits(ctx context.Context) ([]ToolkitView, error) {
// connectable is the project's enabled toolkit_slug → auth_config_id map.
// On a transient resolver error we still render the catalog, just with
// everything marked not-connectable, rather than failing the whole list.
connectable, err := s.authConfigMap(ctx)
if err != nil {
connectable = map[string]string{}
}
out := []ToolkitView{}
seen := make(map[string]struct{})
cursor := ""
for page := 0; page < maxToolkitPages; page++ {
resp, err := s.sdk.ListToolkits(ctx, sdk.ListToolkitsRequest{
Limit: listPageLimit,
Cursor: cursor,
SortBy: "usage",
})
if err != nil {
return nil, fmt.Errorf("composio: list toolkits: %w", err)
}
for _, tk := range resp.Items {
slug := strings.ToLower(strings.TrimSpace(tk.Slug))
if slug == "" {
continue
}
if _, dup := seen[slug]; dup {
continue
}
seen[slug] = struct{}{}
category := ""
if len(tk.Categories) > 0 {
category = tk.Categories[0]
}
_, canConnect := connectable[slug]
out = append(out, ToolkitView{
Slug: tk.Slug,
Name: tk.Name,
LogoURL: tk.LogoURL,
Category: category,
Connectable: canConnect,
})
}
if resp.NextCursor == "" {
break
}
cursor = resp.NextCursor
}
// Stable sort: connectable toolkits first, preserving Composio's usage
// order within each group.
sort.SliceStable(out, func(i, j int) bool {
if out[i].Connectable != out[j].Connectable {
return out[i].Connectable
}
return false
})
return out, nil
}
// authConfigForToolkit returns the chosen auth_config_id for a toolkit slug, or
// "" when the project has no enabled auth config for it. It reads the cached
// project-wide map (refreshed on TTL).
func (s *Service) authConfigForToolkit(ctx context.Context, slug string) (string, error) {
slug = strings.ToLower(strings.TrimSpace(slug))
if slug == "" {
return "", nil
}
m, err := s.authConfigMap(ctx)
if err != nil {
return "", err
}
return m[slug], nil
}
// authConfigMap returns the toolkit_slug → auth_config_id map for the project,
// rebuilding it from Composio when the cache is empty or expired. Concurrent
// callers serialize on authCacheMu; the fetch runs under the lock, which is
// acceptable for a short-TTL map that is cheap to build and read by a
// low-traffic settings surface. A new map is assigned on refresh (never mutated
// in place), so a reference handed to a caller stays safe to read.
func (s *Service) authConfigMap(ctx context.Context) (map[string]string, error) {
s.authCacheMu.Lock()
defer s.authCacheMu.Unlock()
if s.authCache != nil && s.now().Before(s.authCacheExp) {
return s.authCache, nil
}
m, err := s.fetchAuthConfigMap(ctx)
if err != nil {
// Serve a stale snapshot if we have one — a transient /auth_configs
// blip should not make every toolkit suddenly un-connectable.
if s.authCache != nil {
return s.authCache, nil
}
return nil, err
}
s.authCache = m
s.authCacheExp = s.now().Add(s.authCacheTTL)
return m, nil
}
// authCandidate is one project auth config in contention to represent a toolkit
// during resolution.
type authCandidate struct {
id string
managed bool
updated string
}
// fetchAuthConfigMap pages through the project's ENABLED auth configs and
// reduces them to one chosen auth_config_id per toolkit slug. When a toolkit
// has several (e.g. a Composio-managed one plus a custom white-label one),
// betterAuthConfig picks the winner.
func (s *Service) fetchAuthConfigMap(ctx context.Context) (map[string]string, error) {
best := make(map[string]authCandidate)
cursor := ""
for page := 0; page < maxAuthConfigPages; page++ {
resp, err := s.sdk.ListAuthConfigs(ctx, sdk.ListAuthConfigsRequest{
ShowDisabled: false,
Limit: listPageLimit,
Cursor: cursor,
})
if err != nil {
return nil, fmt.Errorf("composio: list auth configs: %w", err)
}
for _, ac := range resp.Items {
if ac.ID == "" || strings.EqualFold(ac.Status, "DISABLED") {
continue
}
slug := strings.ToLower(strings.TrimSpace(ac.Toolkit.Slug))
if slug == "" {
continue
}
cand := authCandidate{id: ac.ID, managed: ac.IsComposioManaged, updated: ac.LastUpdatedAt}
if cur, ok := best[slug]; !ok || betterAuthConfig(cand, cur) {
best[slug] = cand
}
}
if resp.NextCursor == "" {
break
}
cursor = resp.NextCursor
}
out := make(map[string]string, len(best))
for slug, c := range best {
out[slug] = c.id
}
return out, nil
}
// betterAuthConfig reports whether candidate a should win over the currently
// selected b for the same toolkit. A custom (bring-your-own OAuth) config beats
// a Composio-managed one — it is the white-label path the product wants — and
// among configs of the same kind the most recently updated wins.
func betterAuthConfig(a, b authCandidate) bool {
if a.managed != b.managed {
return !a.managed
}
return a.updated > b.updated
}
// verifyAccountOwnership confirms with Composio that connectedAccountID really
// belongs to expectedUserID and was created under expectedAuthConfigID, so a
// tampered or cross-toolkit connected_account_id on the callback cannot smuggle
// another account into the local mirror. It fails closed: an upstream error, an
// unknown account, an owner mismatch, an EMPTY expected auth config, or an
// auth-config mismatch all return ErrAccountVerification. Requiring a non-empty,
// exactly-matching auth config is what closes the cross-toolkit binding gap —
// the expected value is the auth_config_id signed into the state at
// BeginConnect, which is toolkit-specific.
func (s *Service) verifyAccountOwnership(ctx context.Context, connectedAccountID, expectedUserID, expectedAuthConfigID string) error {
resp, err := s.sdk.ListConnectedAccounts(ctx, sdk.ListConnectedAccountsRequest{
ConnectedAccountIDs: []string{connectedAccountID},
})
if err != nil {
return fmt.Errorf("composio: verify connected account: %w", err)
}
var acct *sdk.ConnectedAccount
for i := range resp.Items {
if resp.Items[i].ID == connectedAccountID {
acct = &resp.Items[i]
break
}
}
if acct == nil {
return ErrAccountVerification
}
if acct.UserID != expectedUserID {
return ErrAccountVerification
}
// Fail closed: the account MUST have been created under the exact auth
// config the connect link used. An empty expected value (state missing it,
// or a resolver gap) is rejected rather than skipped — skipping is the
// fail-open hole that let a cross-toolkit account id be bound here.
if expectedAuthConfigID == "" || acct.AuthConfigID != expectedAuthConfigID {
return ErrAccountVerification
}
return nil
}
// isNotFound reports whether err is a Composio 404 APIError, used to make
// revoke/delete idempotent.
func isNotFound(err error) bool {
var apiErr *sdk.APIError
return errors.As(err, &apiErr) && apiErr.IsNotFound()
}

View File

@@ -0,0 +1,694 @@
package composio
import (
"context"
"errors"
"net/http"
"net/url"
"strings"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/util"
sdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ---- fakes ---------------------------------------------------------------
type fakeSDK struct {
createLinkResp *sdk.CreateLinkResponse
createLinkErr error
lastCreateLink sdk.CreateLinkRequest
revoked []string
revokeErr error
deleted []string
deleteErr error
createSessResp *sdk.CreateSessionResponse
createSessErr error
lastSessReq sdk.CreateSessionRequest
createSessCalls int
// account-ownership verification (CompleteCallback). By default
// ListConnectedAccounts echoes the requested id with acctUserID /
// acctAuthConfigID so success-path tests can opt in to a matching account;
// acctMissing returns no items, listAccountsErr forces a transport error.
acctUserID string
acctAuthConfigID string
acctMissing bool
listAccountsErr error
lastListAccounts sdk.ListConnectedAccountsRequest
// auth-config resolution (BeginConnect / ListToolkits connectable flag).
// authConfigs nil => a default single notion→ac_notion ENABLED config so
// existing connect tests keep resolving; set explicitly to override.
authConfigs []sdk.AuthConfig
authConfigsSet bool
listAuthErr error
// toolkit catalog (ListToolkits).
toolkits []sdk.Toolkit
listToolkitsErr error
}
func (f *fakeSDK) CreateLink(_ context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
f.lastCreateLink = req
if f.createLinkErr != nil {
return nil, f.createLinkErr
}
if f.createLinkResp != nil {
return f.createLinkResp, nil
}
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect", ConnectedAccountID: "ca_pending"}, nil
}
func (f *fakeSDK) ListConnectedAccounts(_ context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error) {
f.lastListAccounts = req
if f.listAccountsErr != nil {
return nil, f.listAccountsErr
}
if f.acctMissing {
return &sdk.ListConnectedAccountsResponse{}, nil
}
id := ""
if len(req.ConnectedAccountIDs) > 0 {
id = req.ConnectedAccountIDs[0]
}
return &sdk.ListConnectedAccountsResponse{Items: []sdk.ConnectedAccount{{
ID: id,
UserID: f.acctUserID,
AuthConfigID: f.acctAuthConfigID,
}}}, nil
}
func (f *fakeSDK) ListAuthConfigs(_ context.Context, _ sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error) {
if f.listAuthErr != nil {
return nil, f.listAuthErr
}
items := f.authConfigs
if !f.authConfigsSet && items == nil {
items = []sdk.AuthConfig{{
ID: "ac_notion",
Toolkit: sdk.Toolkit{Slug: "notion"},
Status: "ENABLED",
IsComposioManaged: true,
}}
}
return &sdk.ListAuthConfigsResponse{Items: items}, nil
}
func (f *fakeSDK) ListToolkits(_ context.Context, _ sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error) {
if f.listToolkitsErr != nil {
return nil, f.listToolkitsErr
}
return &sdk.ListToolkitsResponse{Items: f.toolkits}, nil
}
func (f *fakeSDK) RevokeConnection(_ context.Context, id string) error {
f.revoked = append(f.revoked, id)
return f.revokeErr
}
func (f *fakeSDK) DeleteConnectedAccount(_ context.Context, id string) error {
f.deleted = append(f.deleted, id)
return f.deleteErr
}
func (f *fakeSDK) CreateSession(_ context.Context, req sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error) {
f.createSessCalls++
f.lastSessReq = req
if f.createSessErr != nil {
return nil, f.createSessErr
}
if f.createSessResp != nil {
return f.createSessResp, nil
}
return &sdk.CreateSessionResponse{MCP: sdk.MCPDescriptor{URL: "https://mcp.example/session"}}, nil
}
func (f *fakeSDK) MCPAuthHeaders() map[string]string {
return map[string]string{"x-api-key": "secret"}
}
// fakeStore is an in-memory implementation of Store with the same
// (user_id, connected_account_id) uniqueness as the real table.
type fakeStore struct {
rows []db.UserComposioConnection
nextID byte
}
func newFakeStore() *fakeStore { return &fakeStore{nextID: 1} }
func (s *fakeStore) UpsertUserComposioConnection(_ context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error) {
for i := range s.rows {
if uuidEqual(s.rows[i].UserID, arg.UserID) && s.rows[i].ConnectedAccountID == arg.ConnectedAccountID {
s.rows[i].ToolkitSlug = arg.ToolkitSlug
s.rows[i].AuthConfigID = arg.AuthConfigID
s.rows[i].ComposioUserID = arg.ComposioUserID
s.rows[i].Status = "active"
s.rows[i].UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true}
return s.rows[i], nil
}
}
row := db.UserComposioConnection{
ID: mintUUID(s.nextID),
UserID: arg.UserID,
ToolkitSlug: arg.ToolkitSlug,
AuthConfigID: arg.AuthConfigID,
ConnectedAccountID: arg.ConnectedAccountID,
ComposioUserID: arg.ComposioUserID,
Status: "active",
ConnectedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
}
s.nextID++
s.rows = append(s.rows, row)
return row, nil
}
func (s *fakeStore) ListActiveUserComposioConnections(_ context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error) {
out := []db.UserComposioConnection{}
for _, r := range s.rows {
if uuidEqual(r.UserID, userID) && r.Status == "active" {
out = append(out, r)
}
}
return out, nil
}
func (s *fakeStore) GetUserComposioConnection(_ context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error) {
for _, r := range s.rows {
if uuidEqual(r.ID, arg.ID) && uuidEqual(r.UserID, arg.UserID) {
return r, nil
}
}
return db.UserComposioConnection{}, pgx.ErrNoRows
}
func (s *fakeStore) MarkUserComposioConnectionRevoked(_ context.Context, arg db.MarkUserComposioConnectionRevokedParams) error {
for i := range s.rows {
if uuidEqual(s.rows[i].ID, arg.ID) && uuidEqual(s.rows[i].UserID, arg.UserID) {
s.rows[i].Status = "revoked"
}
}
return nil
}
func uuidEqual(a, b pgtype.UUID) bool { return a.Valid && b.Valid && a.Bytes == b.Bytes }
func mintUUID(n byte) pgtype.UUID {
var b [16]byte
b[15] = n
return pgtype.UUID{Bytes: b, Valid: true}
}
func newTestService(t *testing.T, client SDK, store Store) *Service {
t.Helper()
svc, err := NewService(client, store, Config{
StateSecret: testSecret,
CallbackBaseURL: "https://app.multica.ai",
FrontendBaseURL: "https://app.multica.ai",
Now: func() time.Time { return time.Unix(1_700_000_000, 0) },
})
if err != nil {
t.Fatalf("NewService: %v", err)
}
return svc
}
// ---- tests ---------------------------------------------------------------
func TestNewService_Validation(t *testing.T) {
t.Parallel()
if _, err := NewService(nil, newFakeStore(), Config{StateSecret: testSecret, CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for nil client")
}
if _, err := NewService(&fakeSDK{}, nil, Config{StateSecret: testSecret, CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for nil store")
}
if _, err := NewService(&fakeSDK{}, newFakeStore(), Config{CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for empty secret")
}
if _, err := NewService(&fakeSDK{}, newFakeStore(), Config{StateSecret: testSecret}); err == nil {
t.Error("expected error for empty callback base")
}
}
func TestBeginConnect_MappingAndState(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, newFakeStore())
userID := mintUUID(7)
redirect, err := svc.BeginConnect(context.Background(), userID, "Notion")
if err != nil {
t.Fatalf("BeginConnect: %v", err)
}
if redirect != "https://composio.example/redirect" {
t.Errorf("redirect = %q", redirect)
}
// toolkit → auth_config mapping
if sdkFake.lastCreateLink.AuthConfigID != "ac_notion" {
t.Errorf("auth config = %q", sdkFake.lastCreateLink.AuthConfigID)
}
// composio_user_id == multica user id
if sdkFake.lastCreateLink.UserID != util.UUIDToString(userID) {
t.Errorf("composio user id = %q, want %q", sdkFake.lastCreateLink.UserID, util.UUIDToString(userID))
}
// callback URL carries the signed state and points at our callback path
cb := sdkFake.lastCreateLink.CallbackURL
if !strings.HasPrefix(cb, "https://app.multica.ai"+callbackPath+"?state=") {
t.Fatalf("callback url = %q", cb)
}
u, _ := url.Parse(cb)
state := u.Query().Get("state")
claims, err := verifyState(testSecret, state, time.Unix(1_700_000_000, 0))
if err != nil {
t.Fatalf("state did not verify: %v", err)
}
if claims.ToolkitSlug != "notion" || claims.UserID != util.UUIDToString(userID) {
t.Errorf("claims = %+v", claims)
}
// The resolved auth_config_id is signed into the state so the callback can
// verify the returned account against it exactly (no fail-open re-resolve).
if claims.AuthConfigID != "ac_notion" {
t.Errorf("state auth config = %q, want ac_notion", claims.AuthConfigID)
}
}
func TestBeginConnect_UnsupportedToolkit(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "github"); !errors.Is(err, ErrToolkitNotSupported) {
t.Fatalf("expected ErrToolkitNotSupported, got %v", err)
}
}
// TestBeginConnect_UnsupportedWhenNoAuthConfig: with the project reporting no
// enabled auth configs at all, even notion is not connectable.
func TestBeginConnect_UnsupportedWhenNoAuthConfig(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{authConfigsSet: true, authConfigs: []sdk.AuthConfig{}}
svc := newTestService(t, sdkFake, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "notion"); !errors.Is(err, ErrToolkitNotSupported) {
t.Fatalf("expected ErrToolkitNotSupported with no auth configs, got %v", err)
}
}
// TestBeginConnect_PrefersCustomAuthConfig: when a toolkit has both a
// Composio-managed and a custom (white-label) auth config, the custom one wins.
func TestBeginConnect_PrefersCustomAuthConfig(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{authConfigsSet: true, authConfigs: []sdk.AuthConfig{
{ID: "ac_managed", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED", IsComposioManaged: true},
{ID: "ac_custom", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED", IsComposioManaged: false},
}}
svc := newTestService(t, sdkFake, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "notion"); err != nil {
t.Fatalf("BeginConnect: %v", err)
}
if sdkFake.lastCreateLink.AuthConfigID != "ac_custom" {
t.Errorf("auth config = %q, want ac_custom (custom preferred over managed)", sdkFake.lastCreateLink.AuthConfigID)
}
}
// TestListToolkits_ConnectableFlagAndOrder: every toolkit is listed, but only
// those with an enabled auth config are connectable, and connectable ones sort
// first.
func TestListToolkits_ConnectableFlagAndOrder(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
authConfigsSet: true,
authConfigs: []sdk.AuthConfig{
{ID: "ac_notion", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED"},
},
toolkits: []sdk.Toolkit{
{Slug: "github", Name: "GitHub", LogoURL: "https://logo/gh", Categories: []string{"dev"}},
{Slug: "notion", Name: "Notion", LogoURL: "https://logo/notion", Categories: []string{"productivity"}},
{Slug: "slack", Name: "Slack"},
},
}
svc := newTestService(t, sdkFake, newFakeStore())
tks, err := svc.ListToolkits(context.Background())
if err != nil {
t.Fatalf("ListToolkits: %v", err)
}
if len(tks) != 3 {
t.Fatalf("expected 3 toolkits, got %d", len(tks))
}
// Connectable (notion) sorts first.
if tks[0].Slug != "notion" || !tks[0].Connectable {
t.Errorf("first toolkit = %+v, want connectable notion", tks[0])
}
if tks[0].Name != "Notion" || tks[0].LogoURL != "https://logo/notion" || tks[0].Category != "productivity" {
t.Errorf("notion fields not mapped: %+v", tks[0])
}
for _, tk := range tks[1:] {
if tk.Connectable {
t.Errorf("toolkit %q should not be connectable", tk.Slug)
}
}
}
// TestListToolkits_PaginatesAndResolverErrorIsSoft: a paginated catalog is
// fully drained, and an /auth_configs failure degrades to "nothing
// connectable" instead of failing the whole list.
func TestListToolkits_ResolverErrorMarksNoneConnectable(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
listAuthErr: errors.New("upstream blip"),
toolkits: []sdk.Toolkit{{Slug: "notion", Name: "Notion"}},
}
svc := newTestService(t, sdkFake, newFakeStore())
tks, err := svc.ListToolkits(context.Background())
if err != nil {
t.Fatalf("ListToolkits should not fail on auth-config error, got %v", err)
}
if len(tks) != 1 || tks[0].Connectable {
t.Fatalf("expected 1 non-connectable toolkit, got %+v", tks)
}
}
func TestCompleteCallback_SuccessAndIdempotent(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(3)
// The account Composio reports for ca_123 belongs to this user under the
// notion auth config, so ownership verification passes.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
slug, err := svc.CompleteCallback(context.Background(), state, "success", "ca_123")
if err != nil {
t.Fatalf("CompleteCallback: %v", err)
}
if slug != "notion" {
t.Errorf("slug = %q", slug)
}
// Duplicate callback (same connected account) must not create a 2nd row.
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_123"); err != nil {
t.Fatalf("second CompleteCallback: %v", err)
}
if len(store.rows) != 1 {
t.Fatalf("expected 1 row after duplicate callback, got %d", len(store.rows))
}
row := store.rows[0]
if row.ComposioUserID != util.UUIDToString(userID) {
t.Errorf("composio_user_id invariant broken: %q", row.ComposioUserID)
}
if row.AuthConfigID != "ac_notion" || row.ToolkitSlug != "notion" || row.Status != "active" {
t.Errorf("row = %+v", row)
}
}
func TestCompleteCallback_NonSuccessNoRow(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(mintUUID(4)),
ToolkitSlug: "notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
slug, err := svc.CompleteCallback(context.Background(), state, "failed", "ca_x")
if !errors.Is(err, ErrConnectNotSuccessful) {
t.Fatalf("expected ErrConnectNotSuccessful, got %v", err)
}
if slug != "notion" {
t.Errorf("slug = %q (should still be returned for redirect)", slug)
}
if len(store.rows) != 0 {
t.Fatalf("expected no row written on non-success, got %d", len(store.rows))
}
}
func TestCompleteCallback_BadState(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if _, err := svc.CompleteCallback(context.Background(), "garbage", "success", "ca_1"); err == nil {
t.Fatal("expected error for malformed state")
}
}
// TestCompleteCallback_TamperedAccountRejected covers the PR 4608 blocker:
// a valid, un-expired state paired with a connected_account_id that Composio
// reports as belonging to a DIFFERENT user must be rejected, and no row written.
func TestCompleteCallback_TamperedAccountRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(20)
// Composio says ca_evil belongs to someone else, not our state's user.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(mintUUID(99)), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_evil"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for foreign account, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written when ownership fails, got %d", len(store.rows))
}
}
// TestCompleteCallback_WrongAuthConfigRejected is the cross-toolkit proof: the
// account belongs to the right user but was created under a DIFFERENT toolkit's
// auth config (e.g. the user pasting their slack account id into a notion
// callback). The state-signed auth_config_id must not match, so it is rejected.
func TestCompleteCallback_WrongAuthConfigRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(21)
// Account is owned by the user but lives under ac_other (another toolkit).
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_other"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_x"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for wrong auth config, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
// TestCompleteCallback_MissingAuthConfigFailsClosed is the regression for the
// re-review blocker: a state with no signed auth_config_id (the old fail-open
// path) plus an account owned by the user must STILL be rejected — the empty
// expected auth config now fails closed instead of skipping the check.
func TestCompleteCallback_MissingAuthConfigFailsClosed(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(25)
// Account genuinely belongs to the user — only the missing auth-config
// binding should trip the rejection.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
// AuthConfigID deliberately omitted (empty) — must fail closed.
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_owned"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification when state carries no auth config, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
// TestCompleteCallback_UnknownAccountRejected ensures an account id Composio
// does not know about fails closed rather than being mirrored verbatim.
func TestCompleteCallback_UnknownAccountRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(22)
sdkFake := &fakeSDK{acctMissing: true}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_ghost"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for unknown account, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
func TestListConnections(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
userID := mintUUID(5)
seedActive(store, userID, "notion", "ca_a")
conns, err := svc.ListConnections(context.Background(), userID)
if err != nil {
t.Fatalf("ListConnections: %v", err)
}
if len(conns) != 1 || conns[0].ToolkitSlug != "notion" || conns[0].Status != "active" {
t.Fatalf("conns = %+v", conns)
}
}
func TestDisconnect_OwnerRevokeIdempotentAndFilter(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(6)
row := seedActive(store, userID, "notion", "ca_z")
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("Disconnect: %v", err)
}
if len(sdkFake.revoked) != 1 || sdkFake.revoked[0] != "ca_z" {
t.Errorf("revoked = %v", sdkFake.revoked)
}
// Local row should now be filtered out of the active list.
conns, _ := svc.ListConnections(context.Background(), userID)
if len(conns) != 0 {
t.Errorf("expected 0 active after disconnect, got %d", len(conns))
}
// Second disconnect is idempotent (row still owned, marks revoked again).
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("idempotent Disconnect: %v", err)
}
}
// TestDisconnect_RevokedRowNoOp covers the PR 4608 blocker: once a row is
// locally revoked, a second DELETE must be a pure no-op and must NOT call
// upstream again — otherwise a non-404 upstream error on the repeat would be
// surfaced as a 502 and break idempotency.
func TestDisconnect_RevokedRowNoOp(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(30)
row := seedActive(store, userID, "notion", "ca_noop")
// First disconnect revokes upstream and marks the row revoked.
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("first Disconnect: %v", err)
}
if len(sdkFake.revoked) != 1 {
t.Fatalf("expected 1 upstream revoke, got %d", len(sdkFake.revoked))
}
// Now make the upstream fail with a NON-404 error. A correct no-op must not
// touch upstream, so this error must never surface.
sdkFake.revokeErr = &sdk.APIError{HTTPStatus: http.StatusInternalServerError}
sdkFake.deleteErr = &sdk.APIError{HTTPStatus: http.StatusInternalServerError}
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("second Disconnect on already-revoked row should be a no-op, got %v", err)
}
if len(sdkFake.revoked) != 1 {
t.Errorf("second disconnect must not call upstream revoke again, revoked=%v", sdkFake.revoked)
}
}
func TestDisconnect_UpstreamNotFoundIsIdempotent(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{revokeErr: &sdk.APIError{HTTPStatus: http.StatusNotFound}}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(8)
row := seedActive(store, userID, "notion", "ca_404")
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("Disconnect should treat upstream 404 as success, got %v", err)
}
}
func TestDisconnect_NotOwner(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
owner := mintUUID(9)
row := seedActive(store, owner, "notion", "ca_o")
attacker := mintUUID(10)
if err := svc.Disconnect(context.Background(), attacker, row.ID); !errors.Is(err, ErrConnectionNotFound) {
t.Fatalf("expected ErrConnectionNotFound for non-owner, got %v", err)
}
}
func TestCreateMCPSession_NoOpWhenEmpty(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, newFakeStore())
sess, err := svc.CreateMCPSession(context.Background(), mintUUID(11))
if err != nil {
t.Fatalf("CreateMCPSession: %v", err)
}
if sess != nil {
t.Fatalf("expected nil session when no connections, got %+v", sess)
}
if sdkFake.createSessCalls != 0 {
t.Errorf("CreateSession should not be called when there are no connections")
}
}
func TestCreateMCPSession_PinsConnectedAccounts(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(12)
seedActive(store, userID, "notion", "ca_pin")
sess, err := svc.CreateMCPSession(context.Background(), userID)
if err != nil {
t.Fatalf("CreateMCPSession: %v", err)
}
if sess == nil || sess.URL != "https://mcp.example/session" {
t.Fatalf("session = %+v", sess)
}
if sess.Headers["x-api-key"] != "secret" {
t.Errorf("headers = %+v", sess.Headers)
}
if sdkFake.lastSessReq.UserID != util.UUIDToString(userID) {
t.Errorf("session user id = %q", sdkFake.lastSessReq.UserID)
}
if got := sdkFake.lastSessReq.ConnectedAccounts["notion"]; got != "ca_pin" {
t.Errorf("connected_accounts pin = %v, want ca_pin", got)
}
}
func TestCallbackRedirect(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if got := svc.CallbackRedirect("notion", true); got != "https://app.multica.ai/settings?tab=integrations&connected=notion" {
t.Errorf("success redirect = %q", got)
}
if got := svc.CallbackRedirect("notion", false); got != "https://app.multica.ai/settings?tab=integrations&error=composio_connect_failed" {
t.Errorf("failure redirect = %q", got)
}
}
// seedActive inserts an active connection through the store and returns the row.
func seedActive(store *fakeStore, userID pgtype.UUID, slug, caID string) db.UserComposioConnection {
row, _ := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
UserID: userID,
ToolkitSlug: slug,
AuthConfigID: "ac_notion",
ConnectedAccountID: caID,
ComposioUserID: util.UUIDToString(userID),
})
return row
}

View File

@@ -0,0 +1,92 @@
package composio
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"strings"
"time"
)
// Signed-state errors. The handler maps all of them to a generic
// "connect failed" redirect so a tampered/expired state never leaks which
// check failed.
var (
// ErrStateMalformed is returned when the state token is not the expected
// "<payload>.<sig>" base64url shape.
ErrStateMalformed = errors.New("composio: state malformed")
// ErrStateSignature is returned when the HMAC signature does not match —
// the state was tampered with or signed by a different secret.
ErrStateSignature = errors.New("composio: state signature mismatch")
// ErrStateExpired is returned when the state's exp claim is in the past.
ErrStateExpired = errors.New("composio: state expired")
)
// stateClaims is the payload embedded in the signed connect-state. It carries
// exactly what CompleteCallback needs to attribute the callback to a user and
// toolkit without a server-side session table — the signature is what makes it
// trustworthy, the short exp is what bounds replay.
//
// Field names are single letters to keep the encoded token compact; they are
// an internal wire format, never exposed to clients.
type stateClaims struct {
UserID string `json:"u"`
ToolkitSlug string `json:"t"`
// AuthConfigID is the exact Composio auth_config_id resolved at BeginConnect
// and used to create the connect link. Signing it into the state lets
// CompleteCallback verify the returned account was created under THIS
// toolkit's auth config without re-resolving (which could fail-open). It is
// an opaque config handle (ac_…), not a credential.
AuthConfigID string `json:"a"`
Exp int64 `json:"e"`
}
// signState produces a URL-safe "<payload>.<sig>" token. payload is the
// base64url-encoded JSON claims; sig is the base64url-encoded HMAC-SHA256 of
// the payload under the service secret. We sign the encoded payload (not the
// raw struct) so verification re-derives the exact bytes that were signed.
func signState(secret []byte, claims stateClaims) (string, error) {
raw, err := json.Marshal(claims)
if err != nil {
return "", err
}
payload := base64.RawURLEncoding.EncodeToString(raw)
sig := signPayload(secret, payload)
return payload + "." + sig, nil
}
// verifyState validates the signature and expiry of a token produced by
// signState and returns the embedded claims. Signature is checked with a
// constant-time compare before the payload is trusted; expiry is checked
// against now.
func verifyState(secret []byte, token string, now time.Time) (stateClaims, error) {
payload, sig, found := strings.Cut(token, ".")
if !found || payload == "" || sig == "" {
return stateClaims{}, ErrStateMalformed
}
expected := signPayload(secret, payload)
if !hmac.Equal([]byte(sig), []byte(expected)) {
return stateClaims{}, ErrStateSignature
}
raw, err := base64.RawURLEncoding.DecodeString(payload)
if err != nil {
return stateClaims{}, ErrStateMalformed
}
var claims stateClaims
if err := json.Unmarshal(raw, &claims); err != nil {
return stateClaims{}, ErrStateMalformed
}
if now.Unix() > claims.Exp {
return stateClaims{}, ErrStateExpired
}
return claims, nil
}
// signPayload returns the base64url HMAC-SHA256 of payload under secret.
func signPayload(secret []byte, payload string) string {
mac := hmac.New(sha256.New, secret)
mac.Write([]byte(payload))
return base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
}

View File

@@ -0,0 +1,97 @@
package composio
import (
"errors"
"strings"
"testing"
"time"
)
var testSecret = []byte("test-state-secret-0123456789")
func TestSignVerifyState_RoundTrip(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{
UserID: "11111111-1111-1111-1111-111111111111",
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: now.Add(5 * time.Minute).Unix(),
})
if err != nil {
t.Fatalf("signState: %v", err)
}
got, err := verifyState(testSecret, tok, now)
if err != nil {
t.Fatalf("verifyState: %v", err)
}
if got.UserID != "11111111-1111-1111-1111-111111111111" {
t.Errorf("user id = %q", got.UserID)
}
if got.ToolkitSlug != "notion" {
t.Errorf("toolkit slug = %q", got.ToolkitSlug)
}
if got.AuthConfigID != "ac_notion" {
t.Errorf("auth config id = %q", got.AuthConfigID)
}
}
func TestVerifyState_Expired(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{
UserID: "u",
ToolkitSlug: "notion",
Exp: now.Add(-time.Second).Unix(),
})
if err != nil {
t.Fatalf("signState: %v", err)
}
if _, err := verifyState(testSecret, tok, now); !errors.Is(err, ErrStateExpired) {
t.Fatalf("expected ErrStateExpired, got %v", err)
}
}
func TestVerifyState_Tampered(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{UserID: "u", ToolkitSlug: "notion", Exp: now.Add(time.Minute).Unix()})
if err != nil {
t.Fatalf("signState: %v", err)
}
// Flip a byte in the payload segment.
payload, sig, _ := strings.Cut(tok, ".")
tampered := payload[:len(payload)-1] + flipLastChar(payload) + "." + sig
if _, err := verifyState(testSecret, tampered, now); !errors.Is(err, ErrStateSignature) && !errors.Is(err, ErrStateMalformed) {
t.Fatalf("expected signature/malformed error, got %v", err)
}
}
func TestVerifyState_WrongSecret(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, _ := signState(testSecret, stateClaims{UserID: "u", ToolkitSlug: "notion", Exp: now.Add(time.Minute).Unix()})
if _, err := verifyState([]byte("a-different-secret"), tok, now); !errors.Is(err, ErrStateSignature) {
t.Fatalf("expected ErrStateSignature, got %v", err)
}
}
func TestVerifyState_Malformed(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
for _, tok := range []string{"", "nodot", ".", "a.", ".b"} {
if _, err := verifyState(testSecret, tok, now); !errors.Is(err, ErrStateMalformed) {
t.Errorf("token %q: expected ErrStateMalformed, got %v", tok, err)
}
}
}
// flipLastChar returns a single replacement char different from the payload's
// last character so the tampered payload is guaranteed to differ.
func flipLastChar(payload string) string {
last := payload[len(payload)-1]
if last == 'A' {
return "B"
}
return "A"
}

View File

@@ -0,0 +1 @@
DROP TABLE IF EXISTS user_composio_connection;

View File

@@ -0,0 +1,38 @@
-- Composio integration (Stage 2 MVP): one row per user-connected Composio
-- account. The row is the local mirror of a Composio "connected account" so the
-- product can list / disconnect connections and build per-user MCP sessions
-- without round-tripping Composio on every read.
--
-- No foreign keys / cascades by design: Multica enforces cross-table
-- relationships at the application layer (see migration 118 dropping the
-- agent_task_queue.initiator_user_id FK). user_id is a "user".id but is left
-- unconstrained here so a user delete does not require a migration-ordered
-- cascade across integration tables.
--
-- composio_user_id always equals the Multica user_id.String() — the
-- application keeps that mapping as an invariant so a Composio session can be
-- created from the Multica user id alone. It is stored explicitly so a future
-- change to the mapping does not silently break already-connected accounts.
CREATE TABLE user_composio_connection (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
toolkit_slug TEXT NOT NULL,
auth_config_id TEXT NOT NULL,
connected_account_id TEXT NOT NULL,
composio_user_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
connected_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (user_id, connected_account_id)
);
-- The hot read path is "active connections for this user" (list endpoint and
-- MCP session builder both filter on user_id + status).
CREATE INDEX user_composio_connection_user_status_idx
ON user_composio_connection(user_id, status);
-- Webhook / callback paths look a row up by its Composio connected_account_id.
CREATE INDEX user_composio_connection_account_idx
ON user_composio_connection(connected_account_id);

View File

@@ -0,0 +1,92 @@
package composio
import (
"context"
"net/http"
"net/url"
"strconv"
"strings"
)
// AuthConfig mirrors a subset of a Composio auth config — the project-level
// record that defines HOW users authenticate with a toolkit (the OAuth client,
// API-key scheme, etc.). The connect-link flow needs its opaque `id` (ac_…);
// the other fields drive selection when a toolkit has more than one.
//
// Spec: https://docs.composio.dev/reference/v3/api-reference/auth-configs/getAuthConfigs
type AuthConfig struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
// Toolkit carries at least the slug (and a logo) the config belongs to.
Toolkit Toolkit `json:"toolkit"`
AuthScheme string `json:"auth_scheme,omitempty"`
// IsComposioManaged is true for Composio's managed OAuth app and false for a
// custom (bring-your-own client_id/secret) config — the white-label case.
IsComposioManaged bool `json:"is_composio_managed"`
// Status is "ENABLED" or "DISABLED". The list endpoint hides disabled
// configs by default (show_disabled=false).
Status string `json:"status,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
LastUpdatedAt string `json:"last_updated_at,omitempty"`
}
// ListAuthConfigsRequest collects the optional filters of GET /auth_configs.
// Zero values are omitted from the query string.
type ListAuthConfigsRequest struct {
// ToolkitSlugs filters to specific toolkits; sent as a single
// comma-separated `toolkit_slug` query param per the v3 spec.
ToolkitSlugs []string
// IsComposioManaged, when non-nil, filters by managed vs custom configs.
IsComposioManaged *bool
// ShowDisabled includes disabled configs (default false = enabled only).
ShowDisabled bool
// Search matches auth configs by name or id.
Search string
// Limit is the page size (max 1000 upstream). 0 = upstream default.
Limit int
// Cursor pages through results.
Cursor string
}
// ListAuthConfigsResponse is the typed paginated response.
type ListAuthConfigsResponse struct {
Items []AuthConfig `json:"items"`
NextCursor string `json:"next_cursor,omitempty"`
TotalItems int `json:"total_items,omitempty"`
}
// ListAuthConfigs returns the auth configs registered in the project, with
// optional filters. The project is resolved from the x-api-key (a project API
// key authenticates to exactly one project), so no project id is passed.
func (c *Client) ListAuthConfigs(ctx context.Context, req ListAuthConfigsRequest) (*ListAuthConfigsResponse, error) {
q := url.Values{}
if len(req.ToolkitSlugs) > 0 {
q.Set("toolkit_slug", strings.Join(req.ToolkitSlugs, ","))
}
if req.IsComposioManaged != nil {
q.Set("is_composio_managed", strconv.FormatBool(*req.IsComposioManaged))
}
if req.ShowDisabled {
q.Set("show_disabled", "true")
}
if req.Search != "" {
q.Set("search", req.Search)
}
if req.Limit > 0 {
q.Set("limit", strconv.Itoa(req.Limit))
}
if req.Cursor != "" {
q.Set("cursor", req.Cursor)
}
path := "/auth_configs"
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
var out ListAuthConfigsResponse
if err := c.do(c.newRequest(ctx), http.MethodGet, path, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -0,0 +1,155 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: composio.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const getUserComposioConnection = `-- name: GetUserComposioConnection :one
SELECT id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at FROM user_composio_connection
WHERE id = $1 AND user_id = $2
`
type GetUserComposioConnectionParams struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
}
// Owner-scoped lookup: a connection can only be read by the user who owns it,
// so one user cannot disconnect another's account by guessing the UUID.
func (q *Queries) GetUserComposioConnection(ctx context.Context, arg GetUserComposioConnectionParams) (UserComposioConnection, error) {
row := q.db.QueryRow(ctx, getUserComposioConnection, arg.ID, arg.UserID)
var i UserComposioConnection
err := row.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const listActiveUserComposioConnections = `-- name: ListActiveUserComposioConnections :many
SELECT id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at FROM user_composio_connection
WHERE user_id = $1 AND status = 'active'
ORDER BY connected_at DESC
`
func (q *Queries) ListActiveUserComposioConnections(ctx context.Context, userID pgtype.UUID) ([]UserComposioConnection, error) {
rows, err := q.db.Query(ctx, listActiveUserComposioConnections, userID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []UserComposioConnection{}
for rows.Next() {
var i UserComposioConnection
if err := rows.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markUserComposioConnectionRevoked = `-- name: MarkUserComposioConnectionRevoked :exec
UPDATE user_composio_connection
SET status = 'revoked', updated_at = now()
WHERE id = $1 AND user_id = $2
`
type MarkUserComposioConnectionRevokedParams struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
}
// Idempotent: re-running on an already-revoked row is a no-op write. Scoped to
// the owner for defense-in-depth.
func (q *Queries) MarkUserComposioConnectionRevoked(ctx context.Context, arg MarkUserComposioConnectionRevokedParams) error {
_, err := q.db.Exec(ctx, markUserComposioConnectionRevoked, arg.ID, arg.UserID)
return err
}
const upsertUserComposioConnection = `-- name: UpsertUserComposioConnection :one
INSERT INTO user_composio_connection (
user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status
) VALUES (
$1, $2, $3, $4, $5, 'active'
)
ON CONFLICT (user_id, connected_account_id) DO UPDATE SET
toolkit_slug = EXCLUDED.toolkit_slug,
auth_config_id = EXCLUDED.auth_config_id,
composio_user_id = EXCLUDED.composio_user_id,
status = 'active',
updated_at = now()
RETURNING id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at
`
type UpsertUserComposioConnectionParams struct {
UserID pgtype.UUID `json:"user_id"`
ToolkitSlug string `json:"toolkit_slug"`
AuthConfigID string `json:"auth_config_id"`
ConnectedAccountID string `json:"connected_account_id"`
ComposioUserID string `json:"composio_user_id"`
}
// =====================
// User Composio Connection
// =====================
// Idempotent on (user_id, connected_account_id): a duplicate callback for the
// same connected account re-activates the row instead of inserting a second
// one. connected_at is preserved on conflict (first-connect time); updated_at
// moves so the reactivation is observable.
func (q *Queries) UpsertUserComposioConnection(ctx context.Context, arg UpsertUserComposioConnectionParams) (UserComposioConnection, error) {
row := q.db.QueryRow(ctx, upsertUserComposioConnection,
arg.UserID,
arg.ToolkitSlug,
arg.AuthConfigID,
arg.ConnectedAccountID,
arg.ComposioUserID,
)
var i UserComposioConnection
err := row.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}

View File

@@ -848,6 +848,20 @@ type User struct {
Timezone pgtype.Text `json:"timezone"`
}
type UserComposioConnection struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
ToolkitSlug string `json:"toolkit_slug"`
AuthConfigID string `json:"auth_config_id"`
ConnectedAccountID string `json:"connected_account_id"`
ComposioUserID string `json:"composio_user_id"`
Status string `json:"status"`
ConnectedAt pgtype.Timestamptz `json:"connected_at"`
LastUsedAt pgtype.Timestamptz `json:"last_used_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type VerificationCode struct {
ID pgtype.UUID `json:"id"`
Email string `json:"email"`

View File

@@ -0,0 +1,39 @@
-- =====================
-- User Composio Connection
-- =====================
-- name: UpsertUserComposioConnection :one
-- Idempotent on (user_id, connected_account_id): a duplicate callback for the
-- same connected account re-activates the row instead of inserting a second
-- one. connected_at is preserved on conflict (first-connect time); updated_at
-- moves so the reactivation is observable.
INSERT INTO user_composio_connection (
user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status
) VALUES (
$1, $2, $3, $4, $5, 'active'
)
ON CONFLICT (user_id, connected_account_id) DO UPDATE SET
toolkit_slug = EXCLUDED.toolkit_slug,
auth_config_id = EXCLUDED.auth_config_id,
composio_user_id = EXCLUDED.composio_user_id,
status = 'active',
updated_at = now()
RETURNING *;
-- name: ListActiveUserComposioConnections :many
SELECT * FROM user_composio_connection
WHERE user_id = $1 AND status = 'active'
ORDER BY connected_at DESC;
-- name: GetUserComposioConnection :one
-- Owner-scoped lookup: a connection can only be read by the user who owns it,
-- so one user cannot disconnect another's account by guessing the UUID.
SELECT * FROM user_composio_connection
WHERE id = $1 AND user_id = $2;
-- name: MarkUserComposioConnectionRevoked :exec
-- Idempotent: re-running on an already-revoked row is a no-op write. Scoped to
-- the owner for defense-in-depth.
UPDATE user_composio_connection
SET status = 'revoked', updated_at = now()
WHERE id = $1 AND user_id = $2;

View File

@@ -74,6 +74,22 @@ func TestRedactBearerToken(t *testing.T) {
}
}
// TestRedactBearerMCPToken is a regression guard for the Composio MCP session
// headers (MUL-3720): the SDK attaches the project key as `Bearer mcp_...` on
// some MCP transports, so the generic Bearer pattern must mask it before it can
// reach a log line or WS broadcast.
func TestRedactBearerMCPToken(t *testing.T) {
t.Parallel()
input := "connecting with Authorization: Bearer mcp_AbCdEf0123456789-_token"
got := Text(input)
if strings.Contains(got, "mcp_AbCdEf0123456789") {
t.Fatalf("Bearer mcp_ token not redacted: %s", got)
}
if !strings.Contains(got, "Bearer [REDACTED]") {
t.Fatalf("expected Bearer [REDACTED] placeholder, got: %s", got)
}
}
func TestRedactGenericCredentials(t *testing.T) {
t.Parallel()
cases := []struct {