Compare commits

..

1 Commits

Author SHA1 Message Date
Jiayuan
dd4897f954 fix(dashboard): reconcile deleted-agent spend in usage leaderboard (MUL-3776)
PR #4637 (MUL-3771) dropped hard-deleted agents from the per-agent
leaderboard so they'd stop rendering as a bare UUID, but the top-line
Cost/Tokens KPIs still count their spend (those totals aggregate
task_usage_hourly without joining `agent`). The breakdown therefore no
longer reconciled with the totals (#4640).

Instead of dropping unknown-agent rows, fold them into a single
aggregated "Deleted agents" row: sum(visible rows) == KPI total again,
with no UUID exposed. Archived agents still appear as themselves (the
agent list is fetched with include_archived). The bucket carries
tokens + cost only; Time/Tasks render as "—" since the run-time rollups
inner-join `agent` and never attribute time to deleted agents.

- bucketUnknownAgentRows replaces filterKnownAgentRows in dashboard/utils
- Leaderboard renders the sentinel bucket row with a neutral placeholder
  and a "{{count}} agents · {{deleted}} deleted" caption
- i18n: deleted_agents + caption_with_deleted (en/zh-Hans/ja/ko)
- tests cover bucket reconciliation, archived-stays, null-loading passthrough

Co-authored-by: multica-agent <github@multica.ai>
2026-06-28 12:47:54 +08:00
50 changed files with 218 additions and 3927 deletions

View File

@@ -159,14 +159,14 @@ Agentic coding CLI using the ACP protocol over stdio (shares the transport with
### Antigravity (Google)
Google's Antigravity CLI (`agy`). Pairs with Google's Antigravity service and runs Gemini-backed models. Multica launches it with `agy -p`, the daemon-compatible non-interactive mode; current Antigravity CLI releases can execute tools from that mode, while `agy -i` requires an attached TTY. Session resumption works through `--conversation <id>`, captured by the daemon from the CLI log file. Model selection is managed inside the Antigravity CLI itself — Multica disables the per-agent model picker for this provider. Skills are written to `.agents/skills/` (the CLI inherits Gemini CLI's workspace skill layout — see [Antigravity docs](https://antigravity.google/docs/gcli-migration)).
Google's Antigravity CLI (`agy`). Pairs with Google's Antigravity service and runs Gemini-backed models. Session resumption works through `--conversation <id>`, captured by the daemon from the CLI log file. Model selection is managed inside the Antigravity CLI itself — Multica disables the per-agent model picker for this provider. Skills are written to `.agents/skills/` (the CLI inherits Gemini CLI's workspace skill layout — see [Antigravity docs](https://antigravity.google/docs/gcli-migration)).
| | |
|---|---|
| Daemon looks for | `agy` |
| Install | Follow the official guide at [antigravity.google/docs/cli-overview](https://antigravity.google/docs/cli-overview). The CLI ships pre-built — run `agy install` once to wire up PATH and shell aliases. |
| Authentication | Run `agy` once interactively and complete the Google account login, or sign in via the Antigravity desktop app — the CLI reuses the keyring entry the GUI writes. |
| Notes | The CLI emits plain assistant text on stdout, not a structured event stream; intermediate "I will run X" lines and the final reply are both relayed to Multica as text, and per-tool telemetry is not available today. |
| Notes | The CLI emits plain assistant text on stdout, not a structured event stream; intermediate "I will run X" lines and the final reply are both relayed to Multica as text. |
## After installing

View File

@@ -159,14 +159,14 @@ ACP 协议 agent和 Kimi 共享传输层。会话续接可用MCP 配置
### AntigravityGoogle
Google 的 Antigravity CLI`agy`)。搭配 Google Antigravity 服务,默认走 Gemini 系列模型。Multica 使用 `agy -p` 启动它,这是适合 daemon 后台任务的一次性非交互模式;当前 Antigravity CLI 在这个模式下仍可执行工具,而 `agy -i` 需要连接 TTY不适合 daemon 驱动。会话续接通过 `--conversation <id>` 工作——守护进程从 CLI 的日志文件里抓取 conversation UUID。模型选择保存在 Antigravity CLI 自己的设置里——Multica 里这款工具的「模型」选择项被禁用。Skill 文件写入 `.agents/skills/`CLI 沿用 Gemini CLI 的 workspace 布局——见 [Antigravity 文档](https://antigravity.google/docs/gcli-migration))。
Google 的 Antigravity CLI`agy`)。搭配 Google Antigravity 服务,默认走 Gemini 系列模型。会话续接通过 `--conversation <id>` 工作——守护进程从 CLI 的日志文件里抓取 conversation UUID。模型选择保存在 Antigravity CLI 自己的设置里——Multica 里这款工具的「模型」选择项被禁用。Skill 文件写入 `.agents/skills/`CLI 沿用 Gemini CLI 的 workspace 布局——见 [Antigravity 文档](https://antigravity.google/docs/gcli-migration))。
| | |
|---|---|
| 守护进程扫描 | `agy` |
| 安装 | 看官方指引 [antigravity.google/docs/cli-overview](https://antigravity.google/docs/cli-overview)。CLI 是预编译的,跑一次 `agy install` 配好 PATH 和 shell 别名即可。 |
| 认证 | 交互式跑一次 `agy` 走 Google 账号登录流程;或者通过 Antigravity 桌面端登录——CLI 会复用 GUI 写入 keyring 的凭据。 |
| 备注 | CLI 的 stdout 是纯文本,不是结构化事件流;中间的 "I will run X" 过程和最终回复都会作为 text 消息送回 Multica,目前无法展示 Antigravity 的逐工具 telemetry。 |
| 备注 | CLI 的 stdout 是纯文本,不是结构化事件流;中间的 "I will run X" 思考过程和最终回复都会作为 text 消息送回 Multica。 |
## 装完之后

View File

@@ -31,7 +31,7 @@ For guidance on picking a tool when creating an agent, see [Creating and configu
### Antigravity
From Google. CLI binary name is `agy`. Pairs with Google's Antigravity service and ships with a Gemini-backed default model. Multica launches Antigravity with `agy -p` because that is the daemon-compatible non-interactive mode; `agy -i` needs an attached TTY and is not suitable for background task execution. Current Antigravity CLI releases can still execute tools from this mode, but stdout is plain assistant text rather than a structured event stream, so Multica relays the transcript as text and cannot show per-tool telemetry for Antigravity today. **Session resumption works** via `--conversation <id>`; the daemon captures the conversation UUID from the CLI's log file. **Model selection works** via the `--model` flag (added in agy 1.0.6): the daemon enumerates the catalog with `agy models` and ships the chosen value verbatim. Note these are human display strings such as `Claude Opus 4.6 (Thinking)`, not `provider/model` slugs — and agy silently no-ops on a value it doesn't recognise, so prefer picking from the discovered list over typing a custom one. Skills land in `.agents/skills/` (the CLI inherits Gemini CLI's workspace skill layout — see [Antigravity migration docs](https://antigravity.google/docs/gcli-migration)).
From Google. CLI binary name is `agy`. Pairs with Google's Antigravity service and ships with a Gemini-backed default model. **Session resumption works** via `--conversation <id>`; the daemon captures the conversation UUID from the CLI's log file because stdout is plain text rather than a structured event stream. **Model selection works** via the `--model` flag (added in agy 1.0.6): the daemon enumerates the catalog with `agy models` and ships the chosen value verbatim. Note these are human display strings such as `Claude Opus 4.6 (Thinking)`, not `provider/model` slugs — and agy silently no-ops on a value it doesn't recognise, so prefer picking from the discovered list over typing a custom one. Skills land in `.agents/skills/` (the CLI inherits Gemini CLI's workspace skill layout — see [Antigravity migration docs](https://antigravity.google/docs/gcli-migration)).
### Claude Code

View File

@@ -31,7 +31,7 @@ Multica 内置支持 **13 款 AI 编程工具**。它们都实现了同一套接
### Antigravity
Google 出品。CLI 二进制名为 `agy`,搭配 Google Antigravity 服务,默认走 Gemini 系列模型。Multica 使用 `agy -p` 启动 Antigravity因为这是适合 daemon 后台任务的一次性非交互模式;`agy -i` 需要连接 TTY不适合后台执行。当前 Antigravity CLI 在 `agy -p` 下仍可执行工具,但 stdout 是纯文本而非结构化事件流,所以 Multica 会把 transcript 作为 text 转发,暂时无法展示逐工具 telemetry。**会话恢复真用**——通过 `--conversation <id>`,守护进程从 CLI 的日志文件里抓取 conversation UUID。**模型选择真用**——通过 `--model` flagagy 1.0.6 新增):守护进程用 `agy models` 枚举可选项,并把选中的值原样传入。注意这些是 `Claude Opus 4.6 (Thinking)` 这样的人类可读显示名,而非 `provider/model` slug而且 agy 遇到无法识别的值会静默空跑所以优先从发现列表里挑选不要手填。Skill 文件写入 `.agents/skills/`CLI 沿用 Gemini CLI 的 workspace 布局——见 [Antigravity 迁移文档](https://antigravity.google/docs/gcli-migration))。
Google 出品。CLI 二进制名为 `agy`,搭配 Google Antigravity 服务,默认走 Gemini 系列模型。**会话恢复真用**——通过 `--conversation <id>`;因为 stdout 是纯文本而非结构化事件流,守护进程从 CLI 的日志文件里抓取 conversation UUID。**模型选择真用**——通过 `--model` flagagy 1.0.6 新增):守护进程用 `agy models` 枚举可选项,并把选中的值原样传入。注意这些是 `Claude Opus 4.6 (Thinking)` 这样的人类可读显示名,而非 `provider/model` slug而且 agy 遇到无法识别的值会静默空跑所以优先从发现列表里挑选不要手填。Skill 文件写入 `.agents/skills/`CLI 沿用 Gemini CLI 的 workspace 布局——见 [Antigravity 迁移文档](https://antigravity.google/docs/gcli-migration))。
### Claude Code

View File

@@ -37,83 +37,4 @@ test.describe("Settings", () => {
await expect(page.getByText("Workspace settings saved").first()).toBeVisible({ timeout: 5000 });
await expect(page.getByRole("button", { name: new RegExp(originalName) }).first()).toBeVisible();
});
// Composio connect flow, fully mocked at the network boundary so it runs
// without a configured COMPOSIO_API_KEY or a live Composio project. The
// backend redirect is simulated by pointing the init endpoint's redirect_url
// straight back at the settings page with ?connected=<slug> — exercising the
// frontend's callback toast + connections refresh (MUL-3718) end to end.
test("connecting a Composio toolkit shows a toast and refreshes the list", async ({
page,
}) => {
const workspaceSlug = await loginAsDefault(page);
const settingsUrl = `/${workspaceSlug}/settings?tab=integrations`;
// Stateful: connections is empty until the (mocked) connect flow lands.
let connected = false;
await page.route("**/api/integrations/composio/toolkits", (route) =>
route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify([
{ slug: "notion", name: "Notion", connectable: true },
]),
}),
);
await page.route("**/api/integrations/composio/connections", (route) => {
if (route.request().method() !== "GET") return route.fallback();
return route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify(
connected
? [
{
id: "conn-notion-1",
toolkit_slug: "notion",
status: "active",
connected_at: new Date().toISOString(),
last_used_at: null,
},
]
: [],
),
});
});
await page.route("**/api/integrations/composio/connect/init", (route) => {
// Composio would 302 through its hosted consent and back to our callback,
// which emits CallbackRedirect's slug-less shape:
// `/settings?tab=integrations&connected=<slug>`. The web proxy's
// legacy-route redirect then prepends the last workspace slug, landing on
// the real settings route. Mock that exact backend shape (NOT the final
// slugged URL) so the test exercises the same redirect path real users hit.
connected = true;
return route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify({
redirect_url: `/settings?tab=integrations&connected=notion`,
}),
});
});
await page.goto(settingsUrl, { waitUntil: "domcontentloaded" });
await waitForPageText(page, "Composio");
// Notion starts disconnected → click Connect.
await page.getByRole("button", { name: /^Connect$/ }).first().click();
// Success toast from the simulated callback redirect.
await expect(page.getByText("Connected").first()).toBeVisible({ timeout: 10000 });
// List refreshed without a manual reload: the Notion card now offers
// Disconnect, and the one-shot ?connected param has been stripped.
await expect(
page.getByRole("button", { name: /Disconnect/ }).first(),
).toBeVisible({ timeout: 10000 });
await expect(page).not.toHaveURL(/connected=notion/);
});
});

View File

@@ -112,9 +112,6 @@ import type {
BeginLarkInstallResponse,
LarkInstallStatusResponse,
RedeemLarkBindingTokenResponse,
ComposioToolkit,
ComposioConnection,
ComposioConnectInitResponse,
Squad,
SquadMember,
SquadMemberStatusListResponse,
@@ -2273,34 +2270,4 @@ export class ApiClient {
body: JSON.stringify({ token }),
});
}
// Composio integration (MUL-3720). All routes are user-scoped (a connection
// belongs to a user, not a workspace), so none take a workspaceId.
/** Full Composio toolkit catalog, each annotated with `connectable`
* (whether the project has an enabled auth config for it). */
async listComposioToolkits(): Promise<ComposioToolkit[]> {
return this.fetch(`/api/integrations/composio/toolkits`);
}
/** The caller's active Composio connections. */
async listComposioConnections(): Promise<ComposioConnection[]> {
return this.fetch(`/api/integrations/composio/connections`);
}
/** Starts a hosted Composio connect flow for a toolkit and returns the
* redirect URL the browser should be sent to. */
async beginComposioConnect(toolkitSlug: string): Promise<ComposioConnectInitResponse> {
return this.fetch(`/api/integrations/composio/connect/init`, {
method: "POST",
body: JSON.stringify({ toolkit_slug: toolkitSlug }),
});
}
/** Disconnects a Composio connection the caller owns. */
async deleteComposioConnection(connectionId: string): Promise<void> {
await this.fetch(`/api/integrations/composio/connections/${connectionId}`, {
method: "DELETE",
});
}
}

View File

@@ -1 +0,0 @@
export { composioKeys, composioToolkitsOptions, composioConnectionsOptions } from "./queries";

View File

@@ -1,26 +0,0 @@
import { queryOptions } from "@tanstack/react-query";
import { api } from "../api";
/** Query-key namespace for Composio integration data. */
export const composioKeys = {
all: ["composio"] as const,
toolkits: () => [...composioKeys.all, "toolkits"] as const,
connections: () => [...composioKeys.all, "connections"] as const,
};
/** The full Composio toolkit catalog (with per-toolkit `connectable`). The
* catalog changes rarely, so a long staleTime avoids refetching it every time
* the Settings tab mounts. */
export const composioToolkitsOptions = () =>
queryOptions({
queryKey: composioKeys.toolkits(),
queryFn: () => api.listComposioToolkits(),
staleTime: 5 * 60 * 1000,
});
/** The current user's active Composio connections. */
export const composioConnectionsOptions = () =>
queryOptions({
queryKey: composioKeys.connections(),
queryFn: () => api.listComposioConnections(),
});

View File

@@ -85,8 +85,6 @@
"./github/queries": "./github/queries.ts",
"./lark": "./lark/index.ts",
"./lark/queries": "./lark/queries.ts",
"./composio": "./composio/index.ts",
"./composio/queries": "./composio/queries.ts",
"./feedback": "./feedback/index.ts",
"./feedback/mutations": "./feedback/mutations.ts",
"./realtime": "./realtime/index.ts",

View File

@@ -1,36 +0,0 @@
/** A Composio toolkit as surfaced by GET /api/integrations/composio/toolkits.
*
* Wire shape mirrors `ComposioToolkitResponse` in
* `server/internal/handler/integrations_composio.go`. New fields the backend
* adds later MUST stay optional so older desktop builds keep parsing — see
* CLAUDE.md → API Response Compatibility. */
export interface ComposioToolkit {
slug: string;
name: string;
logo?: string;
category?: string;
/** Whether the project has an enabled auth config for this toolkit. When
* false the UI must not offer a working Connect button — BeginConnect would
* 400 with "toolkit not supported". */
connectable: boolean;
}
/** A user's Composio connected account, as returned by
* GET /api/integrations/composio/connections. Mirrors
* `ComposioConnectionResponse` server-side. */
export interface ComposioConnection {
id: string;
toolkit_slug: string;
/** Connection lifecycle state. `expired` surfaces a Reconnect affordance in
* the UI; the backend only starts emitting it once Stage 4 webhook handling
* lands (MUL-3719), but the client renders the branch ahead of that. */
status: "active" | "expired" | "revoked" | string;
connected_at: string;
last_used_at?: string | null;
}
/** Response of POST /api/integrations/composio/connect/init — the hosted
* Composio Connect Link the browser is redirected to. */
export interface ComposioConnectInitResponse {
redirect_url: string;
}

View File

@@ -119,11 +119,6 @@ export type {
LarkInstallStatusResponse,
RedeemLarkBindingTokenResponse,
} from "./lark";
export type {
ComposioToolkit,
ComposioConnection,
ComposioConnectInitResponse,
} from "./composio";
export type {
Autopilot,
AutopilotStatus,

View File

@@ -1,7 +1,7 @@
"use client";
import { useMemo, useState } from "react";
import { BarChart3, FolderKanban } from "lucide-react";
import { BarChart3, FolderKanban, Trash2 } from "lucide-react";
import { useQuery } from "@tanstack/react-query";
import { Skeleton } from "@multica/ui/components/ui/skeleton";
import {
@@ -52,8 +52,9 @@ import {
aggregateDailyTokens,
aggregateWeeklyTasks,
aggregateWeeklyTime,
bucketUnknownAgentRows,
computeDailyTotals,
filterKnownAgentRows,
DELETED_AGENTS_ROW_ID,
formatDuration,
mergeAgentDashboardRows,
type AgentDashboardRow,
@@ -314,17 +315,29 @@ export function DashboardPage() {
[agentTokenRows, runTimeRows],
);
// Hide rollup rows for agents that were hard-deleted from the workspace —
// they'd otherwise show up as a bare UUID on the leaderboard (MUL-3771).
// Archived agents stay (the agent list is fetched with archived included);
// only truly-removed agents drop out. Skip filtering until the agent list
// has loaded so a slow agents fetch doesn't transiently blank the list.
// Fold rollup rows for hard-deleted agents into one aggregated "Deleted
// agents" row instead of showing them as a bare UUID (MUL-3771) or dropping
// them outright — dropping made the per-agent breakdown stop reconciling
// with the top-line Cost/Tokens KPIs, which still count that spend (MUL-3776,
// #4640). Archived agents stay as themselves (the agent list is fetched with
// archived included); only truly-removed agents collapse into the bucket.
// Skip bucketing until the agent list has loaded so a slow agents fetch
// doesn't transiently merge every row.
const knownAgentIds = useMemo(
() => (agentsQuery.isSuccess ? new Set(agents.map((a) => a.id)) : null),
[agentsQuery.isSuccess, agents],
);
const visibleAgentRows = useMemo(
() => filterKnownAgentRows(agentRows, knownAgentIds),
() => bucketUnknownAgentRows(agentRows, knownAgentIds),
[agentRows, knownAgentIds],
);
// Distinct hard-deleted agents folded into the bucket — drives the caption's
// "· N deleted" suffix (the bucket itself is a single row).
const deletedAgentCount = useMemo(
() =>
knownAgentIds
? agentRows.filter((r) => !knownAgentIds.has(r.agentId)).length
: 0,
[agentRows, knownAgentIds],
);
@@ -431,6 +444,7 @@ export function DashboardPage() {
<Leaderboard
rows={visibleAgentRows}
agents={agents}
deletedAgentCount={deletedAgentCount}
lessThanMinuteLabel={t(($) => $.duration.less_than_minute)}
/>
</>
@@ -640,10 +654,12 @@ const SORT_METRIC: Record<LeaderboardSort, (r: AgentDashboardRow) => number> = {
function Leaderboard({
rows,
agents,
deletedAgentCount,
lessThanMinuteLabel,
}: {
rows: AgentDashboardRow[];
agents: { id: string; name: string }[];
deletedAgentCount: number;
lessThanMinuteLabel: string;
}) {
const { t } = useT("usage");
@@ -684,7 +700,12 @@ function Leaderboard({
<div className="flex items-center gap-3">
<Segmented value={sortBy} onChange={setSortBy} options={sortOptions} />
<span className="text-xs text-muted-foreground">
{t(($) => $.leaderboard.caption, { count: rows.length })}
{deletedAgentCount > 0
? t(($) => $.leaderboard.caption_with_deleted, {
count: rows.length - 1,
deleted: deletedAgentCount,
})
: t(($) => $.leaderboard.caption, { count: rows.length })}
</span>
</div>
</div>
@@ -704,6 +725,11 @@ function Leaderboard({
</div>
<div className="divide-y">
{sortedRows.map((row) => {
// The deleted-agents bucket is a synthetic row, not a real agent:
// render a neutral placeholder (no avatar fetch / hover card / UUID)
// and dash out Time/Tasks, which it never carries (see
// bucketUnknownAgentRows).
const isDeletedBucket = row.agentId === DELETED_AGENTS_ROW_ID;
const agent = agents.find((a) => a.id === row.agentId);
const value = SORT_METRIC[sortBy](row);
const pct = maxValue > 0 ? (value / maxValue) * 100 : 0;
@@ -713,15 +739,28 @@ function Leaderboard({
className="grid grid-cols-[minmax(0,1.6fr)_minmax(0,1fr)_5rem_5rem_5rem_4rem] items-center gap-3 px-4 py-2"
>
<div className="flex min-w-0 items-center gap-2">
<ActorAvatar
actorType="agent"
actorId={row.agentId}
size={22}
enableHoverCard
/>
<span className="cursor-pointer truncate text-sm font-medium">
{agent?.name ?? row.agentId}
</span>
{isDeletedBucket ? (
<>
<span className="flex h-[22px] w-[22px] shrink-0 items-center justify-center rounded-full bg-muted text-muted-foreground">
<Trash2 className="h-3 w-3" />
</span>
<span className="truncate text-sm font-medium italic text-muted-foreground">
{t(($) => $.leaderboard.deleted_agents)}
</span>
</>
) : (
<>
<ActorAvatar
actorType="agent"
actorId={row.agentId}
size={22}
enableHoverCard
/>
<span className="cursor-pointer truncate text-sm font-medium">
{agent?.name ?? row.agentId}
</span>
</>
)}
</div>
<div className="relative h-2 overflow-hidden rounded-full bg-muted">
<div
@@ -742,12 +781,14 @@ function Leaderboard({
<div
className={`text-right text-xs tabular-nums ${sortBy === "time" ? "font-medium text-foreground" : "text-muted-foreground"}`}
>
{formatDuration(row.seconds, lessThanMinuteLabel)}
{isDeletedBucket
? "—"
: formatDuration(row.seconds, lessThanMinuteLabel)}
</div>
<div
className={`text-right text-xs tabular-nums ${sortBy === "tasks" ? "font-medium text-foreground" : "text-muted-foreground"}`}
>
{row.taskCount}
{isDeletedBucket ? "—" : row.taskCount}
</div>
</div>
);

View File

@@ -4,8 +4,9 @@ import {
aggregateDailyCost,
aggregateWeeklyTasks,
aggregateWeeklyTime,
bucketUnknownAgentRows,
computeDailyTotals,
filterKnownAgentRows,
DELETED_AGENTS_ROW_ID,
formatDuration,
mergeAgentDashboardRows,
} from "./utils";
@@ -202,26 +203,81 @@ describe("mergeAgentDashboardRows", () => {
});
});
describe("filterKnownAgentRows", () => {
const rows = [
{ agentId: "live", tokens: 100, cost: 1, seconds: 10, taskCount: 1 },
{ agentId: "deleted", tokens: 50, cost: 0.5, seconds: 5, taskCount: 1 },
];
describe("bucketUnknownAgentRows", () => {
const live = { agentId: "live", tokens: 100, cost: 1, seconds: 10, taskCount: 1 };
const archived = {
agentId: "archived",
tokens: 80,
cost: 0.8,
seconds: 8,
taskCount: 2,
};
const deletedA = {
agentId: "deleted-a",
tokens: 50,
cost: 0.5,
seconds: 5,
taskCount: 1,
};
const deletedB = {
agentId: "deleted-b",
tokens: 30,
cost: 0.25,
seconds: 3,
taskCount: 4,
};
it("drops rows whose agent is no longer in the workspace", () => {
// "deleted" is absent from the known set — it's a hard-deleted agent whose
// legacy rollup row would otherwise render as a bare UUID.
const out = filterKnownAgentRows(rows, new Set(["live"]));
expect(out.map((r) => r.agentId)).toEqual(["live"]);
it("folds every hard-deleted agent into one aggregated bucket row", () => {
// "deleted-a" / "deleted-b" are absent from the known set — they'd otherwise
// render as bare UUIDs. They collapse into a single sentinel row.
const out = bucketUnknownAgentRows(
[live, deletedA, deletedB],
new Set(["live"]),
);
expect(out.map((r) => r.agentId)).toEqual(["live", DELETED_AGENTS_ROW_ID]);
const bucket = out.find((r) => r.agentId === DELETED_AGENTS_ROW_ID)!;
expect(bucket.tokens).toBe(80);
expect(bucket.cost).toBeCloseTo(0.75);
// Time/Tasks never attach to the bucket — the run-time rollup inner-joins
// `agent`, so deleted agents contribute nothing to those columns.
expect(bucket.seconds).toBe(0);
expect(bucket.taskCount).toBe(0);
});
it("keeps every row while the agent list is still loading (null set)", () => {
const out = filterKnownAgentRows(rows, null);
expect(out.map((r) => r.agentId)).toEqual(["live", "deleted"]);
it("keeps the bucket total reconciled with the top-line spend", () => {
// The KPI total counts deleted-agent spend; sum(visible rows) must match it
// so the breakdown reconciles (MUL-3776).
const out = bucketUnknownAgentRows(
[live, deletedA, deletedB],
new Set(["live"]),
);
const visibleCost = out.reduce((s, r) => s + r.cost, 0);
const kpiCost = [live, deletedA, deletedB].reduce((s, r) => s + r.cost, 0);
expect(visibleCost).toBeCloseTo(kpiCost);
});
it("drops every row when the known set is empty", () => {
expect(filterKnownAgentRows(rows, new Set())).toEqual([]);
it("keeps archived agents as themselves, never in the bucket", () => {
// The agent list is fetched with archived included, so archived agents are
// in the known set and stay on the board under their own id.
const out = bucketUnknownAgentRows(
[live, archived, deletedA],
new Set(["live", "archived"]),
);
expect(out.map((r) => r.agentId)).toEqual([
"live",
"archived",
DELETED_AGENTS_ROW_ID,
]);
});
it("adds no bucket row when every agent is known", () => {
const out = bucketUnknownAgentRows([live, archived], new Set(["live", "archived"]));
expect(out.map((r) => r.agentId)).toEqual(["live", "archived"]);
});
it("keeps every row untouched while the agent list is still loading (null set)", () => {
const out = bucketUnknownAgentRows([live, deletedA], null);
expect(out.map((r) => r.agentId)).toEqual(["live", "deleted-a"]);
});
});

View File

@@ -227,21 +227,54 @@ export function mergeAgentDashboardRows(
});
}
// Drop usage rows whose agent no longer exists in the workspace. The agent
// list is fetched with `include_archived: true`, so archived agents keep
// their names and stay on the leaderboard; only hard-deleted agents fall out
// of `knownAgentIds`. Those are legacy rollup rows that would otherwise
// render as a bare UUID (MUL-3771).
// Synthetic agentId for the row that aggregates all hard-deleted agents.
// Sentinel (not a real UUID) so the component can detect it and render a
// placeholder instead of looking the id up in the agent list.
export const DELETED_AGENTS_ROW_ID = "__deleted_agents__";
// Fold usage rows whose agent no longer exists in the workspace into a single
// aggregated "Deleted agents" row instead of dropping them. The agent list is
// fetched with `include_archived: true`, so archived agents keep their names
// and stay on the leaderboard as themselves; only hard-deleted agents fall out
// of `knownAgentIds` and collapse into the bucket.
//
// `knownAgentIds` is empty while the agent list is still loading; callers
// MUL-3771 (PR #4637) originally *dropped* these rows so they'd stop rendering
// as a bare UUID — but the top-line Cost/Tokens KPIs still count their spend
// (those totals aggregate `task_usage_hourly` without joining `agent`), so the
// per-agent breakdown no longer reconciled with the totals (MUL-3776, #4640).
// Aggregating instead of dropping keeps `sum(visible rows) == KPI total` while
// still never exposing a UUID. The bucket carries tokens + cost only; seconds
// and taskCount stay 0 because the run-time rollups inner-join `agent`, so
// deleted agents already contribute nothing to the Time/Tasks KPIs — the
// component renders those two columns as "—" for this row.
//
// `knownAgentIds` is `null` while the agent list is still loading; callers
// pass `null` in that case so the rows pass through untouched instead of the
// whole leaderboard blanking on a slow fetch.
export function filterKnownAgentRows(
// whole leaderboard collapsing into one bucket on a slow fetch.
export function bucketUnknownAgentRows(
rows: AgentDashboardRow[],
knownAgentIds: ReadonlySet<string> | null,
): AgentDashboardRow[] {
if (!knownAgentIds) return rows;
return rows.filter((r) => knownAgentIds.has(r.agentId));
const known: AgentDashboardRow[] = [];
const bucket: AgentDashboardRow = {
agentId: DELETED_AGENTS_ROW_ID,
tokens: 0,
cost: 0,
seconds: 0,
taskCount: 0,
};
let hasDeleted = false;
for (const r of rows) {
if (knownAgentIds.has(r.agentId)) {
known.push(r);
continue;
}
hasDeleted = true;
bucket.tokens += r.tokens;
bucket.cost += r.cost;
}
return hasDeleted ? [...known, bucket] : known;
}
// ---------------------------------------------------------------------------

View File

@@ -300,38 +300,6 @@
"install_error_forbidden": "You no longer have permission to install Lark Bots in this workspace. Ask a workspace admin to continue.",
"install_error_generic": "Install failed. Try again."
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "Connected",
"toast_connect_failed": "Couldn't complete the connection. Please try again.",
"last_used": "Last used {{when}}",
"last_used_never": "Never used",
"expired": "Token expired",
"reconnect": "Reconnect"
},
"repositories": {
"section_title": "Repositories",
"description": "Git repositories associated with this workspace. Agents use these to clone and work on code.",

View File

@@ -41,6 +41,8 @@
"leaderboard": {
"title": "Leaderboard",
"caption": "{{count}} agents",
"caption_with_deleted": "{{count}} agents · {{deleted}} deleted",
"deleted_agents": "Deleted agents",
"header_agent": "Agent",
"header_tokens": "Tokens",
"header_cost": "Cost",

View File

@@ -300,38 +300,6 @@
"install_error_forbidden": "このワークスペースに Lark ボットを設置する権限がなくなりました。ワークスペース管理者にお問い合わせください。",
"install_error_generic": "設置に失敗しました。もう一度お試しください。"
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "接続しました",
"toast_connect_failed": "接続を完了できませんでした。もう一度お試しください。",
"last_used": "最終使用 {{when}}",
"last_used_never": "未使用",
"expired": "トークンの有効期限切れ",
"reconnect": "再接続"
},
"repositories": {
"section_title": "リポジトリ",
"description": "このワークスペースに関連付けられた Git リポジトリです。エージェントはこれらをクローンしてコードを作業します。",

View File

@@ -41,6 +41,8 @@
"leaderboard": {
"title": "リーダーボード",
"caption": "{{count}} 件のエージェント",
"caption_with_deleted": "{{count}} 件のエージェント · 削除済み {{deleted}} 件",
"deleted_agents": "削除済みエージェント",
"header_agent": "エージェント",
"header_tokens": "トークン",
"header_cost": "コスト",

View File

@@ -376,37 +376,5 @@
"install_error_session_lost": "설치 세션이 만료되었거나 유실되었어요. 다시 스캔해 처음부터 진행하세요.",
"install_error_forbidden": "이 워크스페이스에 Lark 봇을 설치할 권한이 더 이상 없어요. 워크스페이스 관리자에게 문의하세요.",
"install_error_generic": "설치에 실패했어요. 다시 시도하세요."
},
"composio": {
"section_title": "Composio",
"page_description": "Browse the full Composio toolkit catalog and connect the apps your agents can act on. Only toolkits with a configured auth config can be connected right now.",
"not_enabled_title": "Composio integration not enabled",
"not_enabled_description_prefix": "Set",
"not_enabled_description_suffix": "on the server to enable Composio toolkit connections.",
"loading": "Loading toolkits…",
"load_failed": "Failed to load Composio toolkits.",
"empty_title": "No toolkits available",
"empty_description": "Composio returned no toolkits. Check the API key and project configuration.",
"search_placeholder": "Search toolkits…",
"connect": "Connect",
"connecting": "Connecting…",
"connected": "Connected",
"disconnect": "Disconnect",
"disconnecting": "Disconnecting…",
"not_connectable": "Not configured",
"not_connectable_hint": "This toolkit has no auth config in your Composio project yet, so it can't be connected. Add an auth config for it in the Composio dashboard to enable connecting.",
"connect_failed": "Couldn't start the connection. Please try again.",
"disconnect_failed": "Couldn't disconnect. Please try again.",
"toast_disconnected": "Disconnected",
"disconnect_confirm_title": "Disconnect this app?",
"disconnect_confirm_description": "Your connected account will be revoked at Composio and your agents will lose access to this toolkit. You can reconnect later.",
"disconnect_confirm_cancel": "Cancel",
"connections_load_failed": "Couldn't load your existing connections, so connected status may be incomplete.",
"toast_connected": "연결되었습니다",
"toast_connect_failed": "연결을 완료하지 못했습니다. 다시 시도해 주세요.",
"last_used": "마지막 사용 {{when}}",
"last_used_never": "사용한 적 없음",
"expired": "토큰 만료됨",
"reconnect": "다시 연결"
}
}

View File

@@ -41,6 +41,8 @@
"leaderboard": {
"title": "리더보드",
"caption": "에이전트 {{count}}개",
"caption_with_deleted": "에이전트 {{count}}개 · 삭제됨 {{deleted}}개",
"deleted_agents": "삭제된 에이전트",
"header_agent": "에이전트",
"header_tokens": "토큰",
"header_cost": "비용",

View File

@@ -300,38 +300,6 @@
"install_error_forbidden": "你已没有在此工作区安装飞书 Bot 的权限,请联系工作区管理员。",
"install_error_generic": "安装失败,请重试。"
},
"composio": {
"section_title": "Composio",
"page_description": "浏览 Composio 的全部 toolkit连接你的 agent 可以操作的应用。目前只有已配置 auth config 的 toolkit 可以连接。",
"not_enabled_title": "Composio 集成未启用",
"not_enabled_description_prefix": "在服务器上设置",
"not_enabled_description_suffix": "以启用 Composio toolkit 连接。",
"loading": "正在加载 toolkit…",
"load_failed": "加载 Composio toolkit 失败。",
"empty_title": "没有可用的 toolkit",
"empty_description": "Composio 未返回任何 toolkit。请检查 API key 和项目配置。",
"search_placeholder": "搜索 toolkit…",
"connect": "连接",
"connecting": "连接中…",
"connected": "已连接",
"disconnect": "断开",
"disconnecting": "断开中…",
"not_connectable": "未配置",
"not_connectable_hint": "该 toolkit 在你的 Composio 项目中还没有 auth config因此无法连接。请在 Composio 后台为它添加一个 auth config 后再连接。",
"connect_failed": "无法发起连接,请重试。",
"disconnect_failed": "无法断开连接,请重试。",
"toast_disconnected": "已断开",
"disconnect_confirm_title": "断开这个应用?",
"disconnect_confirm_description": "你的已连接账号将在 Composio 侧被撤销,你的 agent 将失去对该 toolkit 的访问权限。你可以稍后重新连接。",
"disconnect_confirm_cancel": "取消",
"connections_load_failed": "无法加载你已有的连接,连接状态可能不完整。",
"toast_connected": "已连接",
"toast_connect_failed": "连接未能完成,请重试。",
"last_used": "最近使用 {{when}}",
"last_used_never": "从未使用",
"expired": "令牌已过期",
"reconnect": "重新连接"
},
"repositories": {
"section_title": "代码仓库",
"description": "与该工作区关联的 Git 仓库。智能体会从这里 clone 代码并完成工作。",

View File

@@ -41,6 +41,8 @@
"leaderboard": {
"title": "排行榜",
"caption": "{{count}} 个智能体",
"caption_with_deleted": "{{count}} 个智能体 · {{deleted}} 个已删除",
"deleted_agents": "已删除的智能体",
"header_agent": "智能体",
"header_tokens": "Token",
"header_cost": "费用",

View File

@@ -1,193 +0,0 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { StrictMode } from "react";
import { render, screen, waitFor } from "@testing-library/react";
import { I18nProvider } from "@multica/core/i18n/react";
import enCommon from "../../locales/en/common.json";
import enSettings from "../../locales/en/settings.json";
import type { ComposioConnection, ComposioToolkit } from "@multica/core/types";
// --- Mutable refs the mocked hooks read from, so each test can shape the data
// without re-mocking the modules. ---
const toolkitsRef = vi.hoisted(() => ({
current: { data: [] as ComposioToolkit[], isLoading: false, isError: false },
}));
const connectionsRef = vi.hoisted(() => ({
current: { data: [] as ComposioConnection[], isError: false },
}));
const searchParamsRef = vi.hoisted(() => ({ current: new URLSearchParams("tab=integrations") }));
const mockInvalidate = vi.hoisted(() => vi.fn());
const mockReplace = vi.hoisted(() => vi.fn());
const mockToastSuccess = vi.hoisted(() => vi.fn());
const mockToastError = vi.hoisted(() => vi.fn());
vi.mock("@tanstack/react-query", () => ({
useQuery: (opts: { queryKey: unknown[] }) => {
const key = JSON.stringify(opts.queryKey);
if (key.includes("toolkits")) return toolkitsRef.current;
if (key.includes("connections")) return connectionsRef.current;
return { data: undefined };
},
useQueryClient: () => ({ invalidateQueries: mockInvalidate }),
queryOptions: <T,>(opts: T) => opts,
}));
vi.mock("@multica/core/composio", () => ({
composioKeys: {
all: ["composio"],
toolkits: () => ["composio", "toolkits"],
connections: () => ["composio", "connections"],
},
composioToolkitsOptions: () => ({ queryKey: ["composio", "toolkits"], queryFn: vi.fn() }),
composioConnectionsOptions: () => ({ queryKey: ["composio", "connections"], queryFn: vi.fn() }),
}));
vi.mock("@multica/core/api", () => ({
api: {
beginComposioConnect: vi.fn(),
deleteComposioConnection: vi.fn(),
},
}));
vi.mock("../../navigation", () => ({
useNavigation: () => ({
push: vi.fn(),
replace: mockReplace,
back: vi.fn(),
pathname: "/acme/settings",
searchParams: searchParamsRef.current,
getShareableUrl: (p: string) => `https://app.example${p}`,
}),
}));
vi.mock("sonner", () => ({
toast: { success: mockToastSuccess, error: mockToastError },
}));
import { ComposioTab } from "./composio-tab";
function renderTab() {
return render(
<I18nProvider locale="en" resources={{ en: { common: enCommon, settings: enSettings } }}>
<ComposioTab />
</I18nProvider>,
);
}
// StrictMode reproduces React's dev-mode mount → cleanup → mount double-invoke,
// which is exactly what would double-fire the callback toast without the
// consumed-key ref guard.
function renderTabStrict() {
return render(
<StrictMode>
<I18nProvider locale="en" resources={{ en: { common: enCommon, settings: enSettings } }}>
<ComposioTab />
</I18nProvider>
</StrictMode>,
);
}
const NOTION: ComposioToolkit = {
slug: "notion",
name: "Notion",
connectable: true,
};
beforeEach(() => {
vi.clearAllMocks();
toolkitsRef.current = { data: [NOTION], isLoading: false, isError: false };
connectionsRef.current = { data: [], isError: false };
searchParamsRef.current = new URLSearchParams("tab=integrations");
});
describe("ComposioTab", () => {
it("renders a connected card with a 'never used' placeholder when last_used_at is null", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "active",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: null,
},
],
isError: false,
};
renderTab();
expect(screen.getByText(enSettings.composio.connected)).toBeInTheDocument();
expect(screen.getByText(enSettings.composio.last_used_never)).toBeInTheDocument();
});
it("renders a 'Last used' line when last_used_at is present", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "active",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: new Date(Date.now() - 2 * 60 * 1000).toISOString(),
},
],
isError: false,
};
renderTab();
// "Last used {{when}}" → relative time formatter yields "2m ago"
expect(screen.getByText(/Last used/)).toBeInTheDocument();
expect(screen.queryByText(enSettings.composio.last_used_never)).not.toBeInTheDocument();
});
it("renders the expired branch with a Reconnect button", () => {
connectionsRef.current = {
data: [
{
id: "conn-1",
toolkit_slug: "notion",
status: "expired",
connected_at: "2026-06-01T00:00:00Z",
last_used_at: null,
},
],
isError: false,
};
renderTab();
expect(screen.getByText(enSettings.composio.expired)).toBeInTheDocument();
expect(
screen.getByRole("button", { name: new RegExp(enSettings.composio.reconnect) }),
).toBeInTheDocument();
// Not treated as connected, so no Connected badge.
expect(screen.queryByText(enSettings.composio.connected)).not.toBeInTheDocument();
});
it("toasts success and clears the ?connected param on a successful callback", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&connected=notion");
renderTab();
await waitFor(() => {
expect(mockToastSuccess).toHaveBeenCalledWith(enSettings.composio.toast_connected);
});
expect(mockInvalidate).toHaveBeenCalledWith({ queryKey: ["composio", "connections"] });
// The one-shot param is stripped while ?tab is preserved.
expect(mockReplace).toHaveBeenCalledWith("/acme/settings?tab=integrations");
});
it("toasts error on a failed callback", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&error=composio_connect_failed");
renderTab();
await waitFor(() => {
expect(mockToastError).toHaveBeenCalledWith(enSettings.composio.toast_connect_failed);
});
expect(mockReplace).toHaveBeenCalledWith("/acme/settings?tab=integrations");
});
it("fires the success callback exactly once under StrictMode double-invoke", async () => {
searchParamsRef.current = new URLSearchParams("tab=integrations&connected=notion");
renderTabStrict();
await waitFor(() => {
expect(mockToastSuccess).toHaveBeenCalled();
});
// The consumed-key ref must suppress the second (cleanup → re-mount) run.
expect(mockToastSuccess).toHaveBeenCalledTimes(1);
expect(mockInvalidate).toHaveBeenCalledTimes(1);
});
});

View File

@@ -1,378 +0,0 @@
"use client";
import { useEffect, useMemo, useRef, useState } from "react";
import { useQuery, useQueryClient } from "@tanstack/react-query";
import { toast } from "sonner";
import { AlertTriangle, Check, Loader2, Plug, RefreshCw, Trash2 } from "lucide-react";
import { Button } from "@multica/ui/components/ui/button";
import { Card, CardContent } from "@multica/ui/components/ui/card";
import { Input } from "@multica/ui/components/ui/input";
import {
AlertDialog,
AlertDialogAction,
AlertDialogCancel,
AlertDialogContent,
AlertDialogDescription,
AlertDialogFooter,
AlertDialogHeader,
AlertDialogTitle,
} from "@multica/ui/components/ui/alert-dialog";
import { api } from "@multica/core/api";
import {
composioConnectionsOptions,
composioKeys,
composioToolkitsOptions,
} from "@multica/core/composio";
import type { ComposioToolkit } from "@multica/core/types";
import { useT, useTimeAgo } from "../../i18n";
import { useNavigation } from "../../navigation";
// ComposioTab renders the full Composio toolkit catalog and lets the user
// connect / disconnect the apps their agents can act on.
//
// Key UX rule (MUL-3720): listing ≠ connectable. Only toolkits with an enabled
// auth config in the Composio project carry `connectable: true`; the rest get a
// muted "not configured" hint instead of a dead Connect button that would 400.
export function ComposioTab() {
const { t } = useT("settings");
const qc = useQueryClient();
const navigation = useNavigation();
const toolkitsQuery = useQuery(composioToolkitsOptions());
const connectionsQuery = useQuery(composioConnectionsOptions());
const [query, setQuery] = useState("");
const [connectingSlug, setConnectingSlug] = useState<string | null>(null);
const [disconnectTarget, setDisconnectTarget] = useState<{
connectionId: string;
name: string;
} | null>(null);
const [disconnecting, setDisconnecting] = useState(false);
// The hosted Composio consent flow is a full-page redirect that lands back
// on the settings page carrying either `?connected=<slug>` (success) or
// `?error=composio_connect_failed` (any backend-side failure — see
// Service.CallbackRedirect, MUL-3720). Consume it exactly once: fire a toast,
// refresh the connections list so the freshly-linked card flips to Connected
// without a manual reload, then strip the one-shot params via `replace` so a
// browser refresh doesn't re-toast.
const connectedParam = navigation.searchParams.get("connected");
const errorParam = navigation.searchParams.get("error");
// React Strict Mode (dev / Next) double-invokes mount effects as
// mount → cleanup → mount. On the second invoke the `replace` from the first
// hasn't committed yet, so the closure still sees the same params and would
// toast + invalidate twice. Guard with a ref keyed on the callback we already
// consumed; a genuinely new callback (different slug, or the redirect being a
// full page load that resets this ref) still fires.
const consumedCallbackKey = useRef<string | null>(null);
useEffect(() => {
const callbackKey = connectedParam
? `connected:${connectedParam}`
: errorParam === "composio_connect_failed"
? "error:composio_connect_failed"
: null;
if (!callbackKey) return;
if (consumedCallbackKey.current === callbackKey) return;
consumedCallbackKey.current = callbackKey;
if (connectedParam) {
toast.success(t(($) => $.composio.toast_connected));
void qc.invalidateQueries({ queryKey: composioKeys.connections() });
} else {
toast.error(t(($) => $.composio.toast_connect_failed));
}
// Drop only the Composio one-shot params; keep everything else (notably
// ?tab=integrations) so the user stays on this tab.
const params = new URLSearchParams(navigation.searchParams);
params.delete("connected");
params.delete("error");
const qs = params.toString();
navigation.replace(qs ? `${navigation.pathname}?${qs}` : navigation.pathname);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [connectedParam, errorParam]);
// Map active connections by toolkit slug so each card knows whether it is
// already connected (and which connection id to disconnect).
const connectionBySlug = useMemo(() => {
const m = new Map<string, string>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "active") m.set(c.toolkit_slug, c.id);
}
return m;
}, [connectionsQuery.data]);
// Toolkits whose latest connection is expired render a Reconnect affordance
// instead of Connected/Connect. Backend only emits `expired` once Stage 4
// (MUL-3719) lands, but the branch is wired up now so it lights up for free.
const expiredBySlug = useMemo(() => {
const m = new Set<string>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "expired") m.add(c.toolkit_slug);
}
return m;
}, [connectionsQuery.data]);
// Last-used timestamp per active connection, for the "Last used …" line on a
// connected card. Backend leaves this null until tool-call dispatch starts
// stamping it (Stage 3, MUL-3721); the card shows a "never used" placeholder
// until then.
const lastUsedBySlug = useMemo(() => {
const m = new Map<string, string | null>();
for (const c of connectionsQuery.data ?? []) {
if (c.status === "active") m.set(c.toolkit_slug, c.last_used_at ?? null);
}
return m;
}, [connectionsQuery.data]);
const toolkits = useMemo(() => toolkitsQuery.data ?? [], [toolkitsQuery.data]);
const filtered = useMemo(() => {
const q = query.trim().toLowerCase();
if (!q) return toolkits;
return toolkits.filter(
(tk) =>
tk.name.toLowerCase().includes(q) ||
tk.slug.toLowerCase().includes(q) ||
(tk.category ?? "").toLowerCase().includes(q),
);
}, [toolkits, query]);
// 503 handling lives in the parent IntegrationsTab, which hides the whole
// Composio section when COMPOSIO_API_KEY is unset — this component only
// mounts when the integration is configured, so it deals with the loaded /
// error / empty / list states below.
async function handleConnect(tk: ComposioToolkit) {
if (connectingSlug) return;
setConnectingSlug(tk.slug);
try {
const { redirect_url } = await api.beginComposioConnect(tk.slug);
// Hand the browser to Composio's hosted consent flow; it redirects back
// to /api/integrations/composio/callback when done.
window.location.href = redirect_url;
} catch (e) {
toast.error(e instanceof Error ? e.message : t(($) => $.composio.connect_failed));
setConnectingSlug(null);
}
}
async function handleDisconnect() {
if (!disconnectTarget || disconnecting) return;
setDisconnecting(true);
try {
await api.deleteComposioConnection(disconnectTarget.connectionId);
await qc.invalidateQueries({ queryKey: composioKeys.connections() });
toast.success(t(($) => $.composio.toast_disconnected));
setDisconnectTarget(null);
} catch (e) {
toast.error(e instanceof Error ? e.message : t(($) => $.composio.disconnect_failed));
} finally {
setDisconnecting(false);
}
}
return (
<div className="space-y-6">
<section className="space-y-1">
<p className="text-sm text-muted-foreground">{t(($) => $.composio.page_description)}</p>
</section>
{toolkitsQuery.isLoading ? (
<Card>
<CardContent>
<p className="text-sm text-muted-foreground">{t(($) => $.composio.loading)}</p>
</CardContent>
</Card>
) : toolkitsQuery.isError ? (
<Card>
<CardContent>
<p className="text-sm text-destructive">{t(($) => $.composio.load_failed)}</p>
</CardContent>
</Card>
) : toolkits.length === 0 ? (
<Card>
<CardContent className="space-y-2">
<p className="text-sm font-medium">{t(($) => $.composio.empty_title)}</p>
<p className="text-xs text-muted-foreground">{t(($) => $.composio.empty_description)}</p>
</CardContent>
</Card>
) : (
<section className="space-y-3">
<Input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder={t(($) => $.composio.search_placeholder)}
className="max-w-xs"
/>
{connectionsQuery.isError && (
// Don't silently treat a failed connections fetch as "nothing
// connected" — that would hide real connections and offer Connect
// on something already linked. Surface it so the user knows the
// connected state may be incomplete; the catalog still renders.
<p className="text-xs text-destructive">
{t(($) => $.composio.connections_load_failed)}
</p>
)}
<div className="grid grid-cols-1 gap-2 sm:grid-cols-2 lg:grid-cols-3">
{filtered.map((tk) => (
<ToolkitCard
key={tk.slug}
toolkit={tk}
connectionId={connectionBySlug.get(tk.slug)}
expired={expiredBySlug.has(tk.slug)}
lastUsedAt={lastUsedBySlug.get(tk.slug) ?? null}
connecting={connectingSlug === tk.slug}
anyConnecting={connectingSlug !== null}
onConnect={() => handleConnect(tk)}
onDisconnect={(connectionId, name) =>
setDisconnectTarget({ connectionId, name })
}
/>
))}
</div>
</section>
)}
<AlertDialog
open={!!disconnectTarget}
onOpenChange={(v) => {
if (!v && !disconnecting) setDisconnectTarget(null);
}}
>
<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>{t(($) => $.composio.disconnect_confirm_title)}</AlertDialogTitle>
<AlertDialogDescription>
{t(($) => $.composio.disconnect_confirm_description)}
</AlertDialogDescription>
</AlertDialogHeader>
<AlertDialogFooter>
<AlertDialogCancel disabled={disconnecting}>
{t(($) => $.composio.disconnect_confirm_cancel)}
</AlertDialogCancel>
<AlertDialogAction onClick={handleDisconnect} disabled={disconnecting}>
{disconnecting
? t(($) => $.composio.disconnecting)
: t(($) => $.composio.disconnect)}
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>
</AlertDialog>
</div>
);
}
function ToolkitCard({
toolkit,
connectionId,
expired,
lastUsedAt,
connecting,
anyConnecting,
onConnect,
onDisconnect,
}: {
toolkit: ComposioToolkit;
connectionId?: string;
expired: boolean;
lastUsedAt: string | null;
connecting: boolean;
anyConnecting: boolean;
onConnect: () => void;
onDisconnect: (connectionId: string, name: string) => void;
}) {
const { t } = useT("settings");
const timeAgo = useTimeAgo();
const isConnected = !!connectionId;
return (
<Card>
<CardContent className="flex items-center gap-3 p-3">
<ToolkitLogo toolkit={toolkit} />
<div className="min-w-0 flex-1">
<p className="truncate text-sm font-medium">{toolkit.name || toolkit.slug}</p>
{isConnected ? (
// Last-used line. Backend leaves last_used_at null until Stage 3
// dispatch stamps it, so show a localized "never used" placeholder
// rather than hiding the line entirely.
<p className="truncate text-[10px] text-muted-foreground">
{lastUsedAt
? t(($) => $.composio.last_used, { when: timeAgo(lastUsedAt) })
: t(($) => $.composio.last_used_never)}
</p>
) : toolkit.category ? (
<p className="truncate text-[10px] uppercase tracking-wide text-muted-foreground">
{toolkit.category}
</p>
) : null}
</div>
{isConnected ? (
<div className="flex items-center gap-2">
<span className="inline-flex items-center gap-1 text-xs text-emerald-600">
<Check className="h-3 w-3" />
{t(($) => $.composio.connected)}
</span>
<Button
variant="outline"
size="sm"
onClick={() => onDisconnect(connectionId!, toolkit.name || toolkit.slug)}
aria-label={t(($) => $.composio.disconnect)}
>
<Trash2 className="h-3 w-3" />
</Button>
</div>
) : expired ? (
// Token-expired connection: surface the failure and let the user
// re-run the same connect flow in one click (no disconnect step).
<div className="flex items-center gap-2">
<span className="inline-flex items-center gap-1 text-xs text-amber-600">
<AlertTriangle className="h-3 w-3" />
{t(($) => $.composio.expired)}
</span>
<Button size="sm" variant="outline" onClick={onConnect} disabled={anyConnecting}>
{connecting ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<RefreshCw className="h-3 w-3" />
)}
{connecting ? t(($) => $.composio.connecting) : t(($) => $.composio.reconnect)}
</Button>
</div>
) : toolkit.connectable ? (
<Button size="sm" onClick={onConnect} disabled={anyConnecting}>
{connecting ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<Plug className="h-3 w-3" />
)}
{connecting ? t(($) => $.composio.connecting) : t(($) => $.composio.connect)}
</Button>
) : (
<span
className="shrink-0 rounded bg-muted px-1.5 py-0.5 text-[10px] text-muted-foreground"
title={t(($) => $.composio.not_connectable_hint)}
>
{t(($) => $.composio.not_connectable)}
</span>
)}
</CardContent>
</Card>
);
}
function ToolkitLogo({ toolkit }: { toolkit: ComposioToolkit }) {
const initial = (toolkit.name || toolkit.slug).charAt(0).toUpperCase();
if (toolkit.logo) {
return (
<img
src={toolkit.logo}
alt=""
className="h-8 w-8 shrink-0 rounded bg-muted object-contain"
/>
);
}
return (
<div className="flex h-8 w-8 shrink-0 items-center justify-center rounded bg-muted text-xs font-semibold text-muted-foreground">
{initial}
</div>
);
}

View File

@@ -1,42 +1,22 @@
"use client";
import { useQuery } from "@tanstack/react-query";
import { LarkTab } from "./lark-tab";
import { ComposioTab } from "./composio-tab";
import { ApiError } from "@multica/core/api";
import { composioToolkitsOptions } from "@multica/core/composio";
import { useT } from "../../i18n";
// Integrations is the umbrella tab for third-party platform connections.
// GitHub has its own top-level tab (see github-tab.tsx); everything else
// — Lark, Composio, with Slack/Linear etc. to follow — lives in here under
// its own section heading so additional integrations slot in without changing
// the IA. IntegrationsTab is just the host; each integration owns its own
// description and install flow.
// — currently just Lark, with Slack/Linear etc. to follow — lives in
// here under its own section heading so additional integrations slot in
// without changing the IA. IntegrationsTab is just the host; each
// integration owns its own description and install flow.
export function IntegrationsTab() {
const { t } = useT("settings");
// Composio is hidden entirely until a key is configured server-side. A 503
// from the toolkits endpoint means COMPOSIO_API_KEY is unset; rather than
// render a card that leaks an internal env-var name to every end user, the
// whole section (heading + body) is withheld. Admin-only "set this up"
// guidance is a later, role-gated affordance (MUL-3720 discussion).
const composioToolkits = useQuery(composioToolkitsOptions());
const composioUnconfigured =
composioToolkits.error instanceof ApiError && composioToolkits.error.status === 503;
return (
<div className="space-y-10">
<section className="space-y-4">
<h2 className="text-sm font-semibold">{t(($) => $.lark.section_title)}</h2>
<LarkTab />
</section>
{!composioUnconfigured && (
<section className="space-y-4">
<h2 className="text-sm font-semibold">{t(($) => $.composio.section_title)}</h2>
<ComposioTab />
</section>
)}
</div>
);
}

View File

@@ -1,68 +0,0 @@
package main
import (
"io"
"net/http"
"testing"
)
// TestComposioCallbackIsPublic_NoCookieNot401 locks in the MUL-3843 fix: the
// Composio OAuth callback must live OUTSIDE the Auth middleware group, because
// Composio 302-redirects the user's browser to it and the cookie session is
// frequently absent (expired session, SameSite=Strict / Safari ITP, private
// window, self-hosted callback subdomain). Before the fix the route sat under
// Auth, so a cookie-less browser got a hard 401 and a JSON blob instead of the
// settings redirect — the exact symptom Yushen hit.
//
// With no COMPOSIO_API_KEY configured in the test env, h.Composio == nil, so a
// cookie-less hit on the callback now reaches the handler and returns 503
// ("not configured") rather than being short-circuited to 401 by the Auth
// middleware. The precise non-401 code is incidental; what this test pins is
// that the request is NOT rejected by auth.
func TestComposioCallbackIsPublic_NoCookieNot401(t *testing.T) {
// Deliberately send NO Authorization header / cookie — simulate the
// cookie-stripped browser redirect coming back from Composio.
resp, err := http.Get(testServer.URL + "/api/integrations/composio/callback?state=bogus&status=success&connected_account_id=ca_x")
if err != nil {
t.Fatalf("callback request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode == http.StatusUnauthorized {
t.Fatalf("callback returned 401 without a session — it is still behind the Auth group (regression of MUL-3843). body=%s", body)
}
}
// TestComposioNonCallbackEndpointsStayGated is the other half of the invariant:
// moving the callback out of the Auth group must NOT loosen the four
// session-scoped endpoints. A cookie-less request to them must still 401.
func TestComposioNonCallbackEndpointsStayGated(t *testing.T) {
gated := []struct {
method string
path string
}{
{http.MethodPost, "/api/integrations/composio/connect/init"},
{http.MethodGet, "/api/integrations/composio/toolkits"},
{http.MethodGet, "/api/integrations/composio/connections"},
{http.MethodDelete, "/api/integrations/composio/connections/11111111-1111-1111-1111-111111111111"},
}
for _, tc := range gated {
t.Run(tc.method+" "+tc.path, func(t *testing.T) {
req, err := http.NewRequest(tc.method, testServer.URL+tc.path, nil)
if err != nil {
t.Fatalf("build request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("expected 401 without a session, got %d — endpoint is no longer auth-gated", resp.StatusCode)
}
})
}
}

View File

@@ -2,7 +2,6 @@ package main
import (
"context"
"crypto/sha256"
"log/slog"
"net/http"
"net/netip"
@@ -26,7 +25,6 @@ import (
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/integrations/channel"
"github.com/multica-ai/multica/server/internal/integrations/channel/engine"
composiointeg "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/integrations/lark"
"github.com/multica-ai/multica/server/internal/integrations/slack"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
@@ -36,7 +34,6 @@ import (
"github.com/multica-ai/multica/server/internal/storage"
"github.com/multica-ai/multica/server/internal/util"
"github.com/multica-ai/multica/server/internal/util/secretbox"
composiosdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/featureflag"
)
@@ -438,47 +435,6 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
slog.Info("slack integration disabled (MULTICA_SLACK_SECRET_KEY not set)")
}
// Composio integration (MUL-3720). Gated by COMPOSIO_API_KEY — the
// project-scoped key the standalone SDK authenticates Composio with (sent
// as x-api-key; the project is resolved from the key, so NO project id is
// configured). When unset the whole block is skipped and the composio HTTP
// handlers return 503; existing deployments are unaffected. An operator opts
// in by setting COMPOSIO_API_KEY plus a callback base
// (COMPOSIO_CALLBACK_BASE_URL, falling back to MULTICA_PUBLIC_URL). The
// toolkit→auth-config mapping is NOT configured here — it is resolved
// dynamically from the project's /auth_configs at request time, so enabling
// a toolkit is a dashboard action, not a redeploy. State signing uses
// COMPOSIO_STATE_SECRET, or a key derived from JWT_SECRET when that is unset.
if composioAPIKey := strings.TrimSpace(os.Getenv("COMPOSIO_API_KEY")); composioAPIKey != "" {
sdkClient, err := composiosdk.NewClient(composiosdk.Options{APIKey: composioAPIKey})
if err != nil {
slog.Error("composio: SDK client init failed; composio integration disabled", "error", err)
} else {
stateSecret := composioStateSecret()
callbackBase := composioCallbackBaseURL(signupConfig.PublicURL)
switch {
case len(stateSecret) == 0:
slog.Error("composio: no state secret (set COMPOSIO_STATE_SECRET or JWT_SECRET); composio integration disabled")
case callbackBase == "":
slog.Error("composio: no callback base url (set COMPOSIO_CALLBACK_BASE_URL or MULTICA_PUBLIC_URL); composio integration disabled")
default:
svc, serr := composiointeg.NewService(sdkClient, queries, composiointeg.Config{
StateSecret: stateSecret,
CallbackBaseURL: callbackBase,
FrontendBaseURL: appURLFromEnv(),
})
if serr != nil {
slog.Error("composio: service init failed; composio integration disabled", "error", serr)
} else {
h.Composio = svc
slog.Info("composio integration enabled")
}
}
}
} else {
slog.Info("composio integration disabled (COMPOSIO_API_KEY not set)")
}
if opts.HeartbeatScheduler != nil {
h.HeartbeatScheduler = opts.HeartbeatScheduler
}
@@ -612,18 +568,6 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// HandleCloudBillingStripeWebhook for the rationale).
r.Post("/api/webhooks/stripe", h.HandleCloudBillingStripeWebhook)
// Composio OAuth callback (MUL-3843). NOT under the Auth group on purpose:
// Composio 302-redirects the user's browser here at the end of the OAuth
// flow, and the cookie session is frequently absent (expired session,
// SameSite=Strict / Safari ITP stripping cross-site cookies, private
// windows, self-hosted callbacks on a different subdomain). Identity is NOT
// taken from the session — it comes from the HMAC-signed `state` query
// param, which CompleteCallback verifies (signature, expiry, replay) before
// doing anything. h.Composio == nil still returns 503. Keeping it inside the
// Auth group made a missing cookie a hard 401, breaking the flow for exactly
// the browsers above; the other four composio endpoints stay session-gated.
r.Get("/api/integrations/composio/callback", h.ComposioCallback)
// Daemon API routes (require daemon token or valid user token)
r.Route("/api/daemon", func(r chi.Router) {
r.Use(middleware.DaemonAuth(queries, patCache, daemonTokenCache, cloudPATVerifier))
@@ -781,18 +725,6 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// is combined with the logged-in user to create the mapping.
r.Post("/api/lark/binding/redeem", h.RedeemLarkBindingToken)
// Composio integration (MUL-3720). User-scoped (no workspace context):
// a connection belongs to a user. These four require a logged-in
// session; the OAuth callback is the outlier and lives outside the Auth
// group (registered above with the other public OAuth/webhook routes —
// see MUL-3843). All return 503 when COMPOSIO_API_KEY is unset.
r.Route("/api/integrations/composio", func(r chi.Router) {
r.Post("/connect/init", h.ComposioConnectInit)
r.Get("/toolkits", h.ListComposioToolkits)
r.Get("/connections", h.ListComposioConnections)
r.Delete("/connections/{id}", h.DeleteComposioConnection)
})
// User-scoped invitation routes (no workspace context required)
r.Get("/api/invitations", h.ListMyInvitations)
r.Get("/api/invitations/{id}", h.GetMyInvitation)
@@ -1315,31 +1247,3 @@ func cloudRuntimeFleetURLFromEnv() string {
}
return strings.TrimSpace(os.Getenv("MULTICA_FLEET_URL"))
}
// composioStateSecret resolves the HMAC key for the connect-state. Prefers an
// explicit COMPOSIO_STATE_SECRET; otherwise derives a composio-specific key
// from JWT_SECRET via SHA-256 so the two signing domains never share an
// identical key. Returns nil when neither is set (composio stays disabled).
func composioStateSecret() []byte {
if v := strings.TrimSpace(os.Getenv("COMPOSIO_STATE_SECRET")); v != "" {
return []byte(v)
}
if v := strings.TrimSpace(os.Getenv("JWT_SECRET")); v != "" {
sum := sha256.Sum256([]byte("composio-state:" + v))
return sum[:]
}
return nil
}
// composioCallbackBaseURL resolves the public API base used to build the
// Composio callback URL. Prefers COMPOSIO_CALLBACK_BASE_URL, then the
// already-resolved MULTICA_PUBLIC_URL, then the app URL.
func composioCallbackBaseURL(publicURL string) string {
if v := strings.TrimRight(strings.TrimSpace(os.Getenv("COMPOSIO_CALLBACK_BASE_URL")), "/"); v != "" {
return v
}
if publicURL != "" {
return publicURL
}
return appURLFromEnv()
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/featureflagdispatch"
"github.com/multica-ai/multica/server/internal/integrations/channel/engine"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/integrations/lark"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/middleware"
@@ -158,10 +157,6 @@ type Handler struct {
// UI consults IsConfigured() to decide whether to surface install
// entry points.
LarkAPIClient lark.APIClient
// Composio integration (MUL-3720). Nil when COMPOSIO_API_KEY is unset;
// the composio HTTP handlers return 503 in that case. Wired in
// cmd/server/router.go after handler.New.
Composio *composio.Service
// ChannelSupervisor owns the per-installation supervisor goroutines
// that hold the §4.4 WS lease and drive each channel.Channel
// (MUL-3620 generalized the Feishu-only Hub into this channel-agnostic

View File

@@ -1,219 +0,0 @@
package handler
import (
"encoding/json"
"errors"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
)
// Composio integration handlers (MUL-3720, Stage 2 MVP). A Composio connection
// belongs to a user, not a workspace, so these handlers live outside the
// workspace-membership group. The four management endpoints (connect/init,
// toolkits, connections, delete) are user-scoped (requireUserID) and sit under
// the Auth middleware. ComposioCallback is the exception: it is a public route
// (outside the Auth group, see router.go / MUL-3843) because the browser often
// arrives without a session cookie — its identity comes from the signed state,
// not requireUserID. The whole block returns 503 when h.Composio is nil
// (COMPOSIO_API_KEY unset), matching the Lark/GitHub "integration not
// configured" convention.
// ComposioConnectInitRequest is the POST /connect/init body.
type ComposioConnectInitRequest struct {
ToolkitSlug string `json:"toolkit_slug"`
}
// ComposioConnectInitResponse carries the hosted Composio Connect Link the
// frontend redirects the user to.
type ComposioConnectInitResponse struct {
RedirectURL string `json:"redirect_url"`
}
// ComposioConnectionResponse is the wire shape for one connection row.
type ComposioConnectionResponse struct {
ID string `json:"id"`
ToolkitSlug string `json:"toolkit_slug"`
Status string `json:"status"`
ConnectedAt string `json:"connected_at"`
LastUsedAt *string `json:"last_used_at"`
}
// ComposioToolkitResponse is the wire shape for one toolkit in the catalog.
// connectable is the key UX signal: false means the project has no enabled
// auth config for the toolkit, so the UI must not offer a working Connect
// button (BeginConnect would 400).
type ComposioToolkitResponse struct {
Slug string `json:"slug"`
Name string `json:"name"`
Logo string `json:"logo,omitempty"`
Category string `json:"category,omitempty"`
Connectable bool `json:"connectable"`
}
// ComposioConnectInit (POST /api/integrations/composio/connect/init) starts a
// hosted Composio auth flow for the requested toolkit and returns the redirect
// URL. An unsupported toolkit slug is a 400 (the MVP only wires Notion).
func (h *Handler) ComposioConnectInit(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
var req ComposioConnectInitRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if strings.TrimSpace(req.ToolkitSlug) == "" {
writeError(w, http.StatusBadRequest, "toolkit_slug is required")
return
}
redirectURL, err := h.Composio.BeginConnect(r.Context(), userUUID, req.ToolkitSlug)
if err != nil {
if errors.Is(err, composio.ErrToolkitNotSupported) {
writeError(w, http.StatusBadRequest, "toolkit not supported")
return
}
writeError(w, http.StatusBadGateway, "failed to start composio connect")
return
}
writeJSON(w, http.StatusOK, ComposioConnectInitResponse{RedirectURL: redirectURL})
}
// ComposioCallback (GET /api/integrations/composio/callback) is the browser
// redirect target Composio sends the user back to after the hosted flow. It is
// registered as a PUBLIC route (outside the Auth middleware group — see
// router.go / MUL-3843), because the browser frequently lands here without a
// session cookie (expired session, SameSite/ITP stripping, private window,
// self-hosted callback subdomain). Identity therefore comes solely from the
// HMAC-signed `state` query param, which CompleteCallback verifies before
// doing anything. On success the row is upserted and the browser is redirected
// to the settings page; any failure redirects to the same page with a stable
// error code so the user is never left on a blank API response.
func (h *Handler) ComposioCallback(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
q := r.URL.Query()
state := q.Get("state")
status := q.Get("status")
connectedAccountID := q.Get("connected_account_id")
slug, err := h.Composio.CompleteCallback(r.Context(), state, status, connectedAccountID)
if err != nil {
// Every failure (tampered/expired state, non-success status, write
// error) collapses to the generic failure redirect — we never tell the
// browser which check failed.
http.Redirect(w, r, h.Composio.CallbackRedirect(slug, false), http.StatusFound)
return
}
http.Redirect(w, r, h.Composio.CallbackRedirect(slug, true), http.StatusFound)
}
// ListComposioConnections (GET /api/integrations/composio/connections) returns
// the caller's active connections.
func (h *Handler) ListComposioConnections(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
conns, err := h.Composio.ListConnections(r.Context(), userUUID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list composio connections")
return
}
out := make([]ComposioConnectionResponse, 0, len(conns))
for _, c := range conns {
out = append(out, ComposioConnectionResponse{
ID: c.ID,
ToolkitSlug: c.ToolkitSlug,
Status: c.Status,
ConnectedAt: c.ConnectedAt,
LastUsedAt: c.LastUsedAt,
})
}
writeJSON(w, http.StatusOK, out)
}
// ListComposioToolkits (GET /api/integrations/composio/toolkits) returns the
// full Composio toolkit catalog for the Settings UI to render. Each entry
// carries a `connectable` flag: only toolkits with an enabled auth config in
// the project can actually be connected, so the UI gates its Connect button on
// it. The catalog itself is project-global (not per-user), but the route is
// user-scoped (requireUser) like the rest of the block.
func (h *Handler) ListComposioToolkits(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
if _, ok := requireUserID(w, r); !ok {
return
}
toolkits, err := h.Composio.ListToolkits(r.Context())
if err != nil {
writeError(w, http.StatusBadGateway, "failed to list composio toolkits")
return
}
out := make([]ComposioToolkitResponse, 0, len(toolkits))
for _, tk := range toolkits {
out = append(out, ComposioToolkitResponse{
Slug: tk.Slug,
Name: tk.Name,
Logo: tk.LogoURL,
Category: tk.Category,
Connectable: tk.Connectable,
})
}
writeJSON(w, http.StatusOK, out)
}
// DeleteComposioConnection (DELETE /api/integrations/composio/connections/{id})
// disconnects a connection the caller owns. Idempotent at the service layer;
// a connection that does not belong to the caller is a 404.
func (h *Handler) DeleteComposioConnection(w http.ResponseWriter, r *http.Request) {
if h.Composio == nil {
writeError(w, http.StatusServiceUnavailable, "composio integration not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
userUUID, ok := parseUUIDOrBadRequest(w, userID, "user id")
if !ok {
return
}
connUUID, ok := parseUUIDOrBadRequest(w, chi.URLParam(r, "id"), "connection id")
if !ok {
return
}
if err := h.Composio.Disconnect(r.Context(), userUUID, connUUID); err != nil {
if errors.Is(err, composio.ErrConnectionNotFound) {
writeError(w, http.StatusNotFound, "composio connection not found")
return
}
writeError(w, http.StatusBadGateway, "failed to disconnect composio connection")
return
}
w.WriteHeader(http.StatusNoContent)
}

View File

@@ -1,325 +0,0 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
composio "github.com/multica-ai/multica/server/internal/integrations/composio"
"github.com/multica-ai/multica/server/internal/util"
sdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
const composioTestUserID = "22222222-2222-2222-2222-222222222222"
// --- local fakes (handler package can only see the exported interfaces) ---
type composioFakeSDK struct {
createLinkResp *sdk.CreateLinkResponse
revokeErr error
}
func (f *composioFakeSDK) CreateLink(_ context.Context, _ sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
if f.createLinkResp != nil {
return f.createLinkResp, nil
}
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect"}, nil
}
// ListConnectedAccounts echoes the requested id as an account owned by the
// handler-test user under the notion auth config, so callback ownership
// verification passes on the happy path.
func (f *composioFakeSDK) ListConnectedAccounts(_ context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error) {
id := ""
if len(req.ConnectedAccountIDs) > 0 {
id = req.ConnectedAccountIDs[0]
}
return &sdk.ListConnectedAccountsResponse{Items: []sdk.ConnectedAccount{{
ID: id,
UserID: composioTestUserID,
AuthConfigID: "ac_notion",
}}}, nil
}
func (f *composioFakeSDK) RevokeConnection(_ context.Context, _ string) error { return f.revokeErr }
func (f *composioFakeSDK) DeleteConnectedAccount(_ context.Context, _ string) error { return nil }
// ListAuthConfigs reports a single enabled notion auth config so BeginConnect
// resolves notion → ac_notion and the callback's auth-config check matches.
func (f *composioFakeSDK) ListAuthConfigs(_ context.Context, _ sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error) {
return &sdk.ListAuthConfigsResponse{Items: []sdk.AuthConfig{{
ID: "ac_notion",
Toolkit: sdk.Toolkit{Slug: "notion"},
Status: "ENABLED",
}}}, nil
}
func (f *composioFakeSDK) ListToolkits(_ context.Context, _ sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error) {
return &sdk.ListToolkitsResponse{Items: []sdk.Toolkit{
{Slug: "notion", Name: "Notion"},
{Slug: "github", Name: "GitHub"},
}}, nil
}
func (f *composioFakeSDK) CreateSession(_ context.Context, _ sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error) {
return &sdk.CreateSessionResponse{}, nil
}
func (f *composioFakeSDK) MCPAuthHeaders() map[string]string {
return map[string]string{"x-api-key": "k"}
}
type composioFakeStore struct {
rows []db.UserComposioConnection
nextID byte
}
func (s *composioFakeStore) UpsertUserComposioConnection(_ context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error) {
for i := range s.rows {
if s.rows[i].UserID.Bytes == arg.UserID.Bytes && s.rows[i].ConnectedAccountID == arg.ConnectedAccountID {
s.rows[i].Status = "active"
return s.rows[i], nil
}
}
s.nextID++
var b [16]byte
b[15] = s.nextID
row := db.UserComposioConnection{
ID: pgtype.UUID{Bytes: b, Valid: true},
UserID: arg.UserID,
ToolkitSlug: arg.ToolkitSlug,
AuthConfigID: arg.AuthConfigID,
ConnectedAccountID: arg.ConnectedAccountID,
ComposioUserID: arg.ComposioUserID,
Status: "active",
ConnectedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
}
s.rows = append(s.rows, row)
return row, nil
}
func (s *composioFakeStore) ListActiveUserComposioConnections(_ context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error) {
out := []db.UserComposioConnection{}
for _, r := range s.rows {
if r.UserID.Bytes == userID.Bytes && r.Status == "active" {
out = append(out, r)
}
}
return out, nil
}
func (s *composioFakeStore) GetUserComposioConnection(_ context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error) {
for _, r := range s.rows {
if r.ID.Bytes == arg.ID.Bytes && r.UserID.Bytes == arg.UserID.Bytes {
return r, nil
}
}
return db.UserComposioConnection{}, pgx.ErrNoRows
}
func (s *composioFakeStore) MarkUserComposioConnectionRevoked(_ context.Context, arg db.MarkUserComposioConnectionRevokedParams) error {
for i := range s.rows {
if s.rows[i].ID.Bytes == arg.ID.Bytes && s.rows[i].UserID.Bytes == arg.UserID.Bytes {
s.rows[i].Status = "revoked"
}
}
return nil
}
func newComposioTestHandler(t *testing.T, sdkFake composio.SDK, store composio.Store) *Handler {
t.Helper()
svc, err := composio.NewService(sdkFake, store, composio.Config{
StateSecret: []byte("handler-test-secret"),
CallbackBaseURL: "https://app.multica.ai",
FrontendBaseURL: "https://app.multica.ai",
})
if err != nil {
t.Fatalf("NewService: %v", err)
}
return &Handler{Composio: svc}
}
func composioReq(method, target, body string) *http.Request {
var r *http.Request
if body != "" {
r = httptest.NewRequest(method, target, strings.NewReader(body))
} else {
r = httptest.NewRequest(method, target, nil)
}
r.Header.Set("X-User-ID", composioTestUserID)
return r
}
// --- tests ---
func TestComposio_ServiceUnavailableWhenNil(t *testing.T) {
h := &Handler{}
for _, hf := range []http.HandlerFunc{
h.ComposioConnectInit, h.ComposioCallback, h.ListComposioConnections, h.DeleteComposioConnection,
} {
w := httptest.NewRecorder()
hf(w, composioReq(http.MethodGet, "/", ""))
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503 when Composio nil, got %d", w.Code)
}
}
}
func TestComposio_ConnectInit(t *testing.T) {
h := newComposioTestHandler(t, &composioFakeSDK{}, &composioFakeStore{})
// success
w := httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{"toolkit_slug":"notion"}`))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (%s)", w.Code, w.Body.String())
}
var resp ComposioConnectInitResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.RedirectURL == "" {
t.Error("expected redirect_url")
}
// unsupported toolkit → 400
w = httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{"toolkit_slug":"github"}`))
if w.Code != http.StatusBadRequest {
t.Errorf("unsupported toolkit: expected 400, got %d", w.Code)
}
// missing slug → 400
w = httptest.NewRecorder()
h.ComposioConnectInit(w, composioReq(http.MethodPost, "/", `{}`))
if w.Code != http.StatusBadRequest {
t.Errorf("missing slug: expected 400, got %d", w.Code)
}
}
func TestComposio_ListToolkits(t *testing.T) {
h := newComposioTestHandler(t, &composioFakeSDK{}, &composioFakeStore{})
w := httptest.NewRecorder()
h.ListComposioToolkits(w, composioReq(http.MethodGet, "/toolkits", ""))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (%s)", w.Code, w.Body.String())
}
var toolkits []ComposioToolkitResponse
if err := json.Unmarshal(w.Body.Bytes(), &toolkits); err != nil {
t.Fatalf("decode: %v", err)
}
if len(toolkits) != 2 {
t.Fatalf("expected 2 toolkits, got %d", len(toolkits))
}
// notion has an enabled auth config in the fake → connectable + sorted first.
if toolkits[0].Slug != "notion" || !toolkits[0].Connectable {
t.Errorf("first toolkit = %+v, want connectable notion", toolkits[0])
}
for _, tk := range toolkits {
if tk.Slug == "github" && tk.Connectable {
t.Error("github has no auth config and must not be connectable")
}
}
}
func TestComposio_CallbackRedirects(t *testing.T) {
store := &composioFakeStore{}
// Mint a valid signed state by driving BeginConnect through a capturing
// SDK, then replay it through the real callback handler.
capturing := &composioCapturingSDK{}
h2 := newComposioTestHandler(t, capturing, store)
bw := httptest.NewRecorder()
h2.ComposioConnectInit(bw, composioReq(http.MethodPost, "/", `{"toolkit_slug":"notion"}`))
state := capturing.stateFromCallback()
if state == "" {
t.Fatal("could not capture signed state")
}
w := httptest.NewRecorder()
h2.ComposioCallback(w, composioReq(http.MethodGet, "/callback?state="+state+"&status=success&connected_account_id=ca_1", ""))
if w.Code != http.StatusFound {
t.Fatalf("expected 302, got %d", w.Code)
}
if loc := w.Header().Get("Location"); !strings.Contains(loc, "connected=notion") {
t.Errorf("success location = %q", loc)
}
// failure path: bad state → error redirect
w = httptest.NewRecorder()
h2.ComposioCallback(w, composioReq(http.MethodGet, "/callback?state=bad&status=success&connected_account_id=ca_1", ""))
if w.Code != http.StatusFound {
t.Fatalf("expected 302 on bad state, got %d", w.Code)
}
if loc := w.Header().Get("Location"); !strings.Contains(loc, "error=composio_connect_failed") {
t.Errorf("failure location = %q", loc)
}
}
func TestComposio_ListAndDelete(t *testing.T) {
store := &composioFakeStore{}
userUUID, _ := util.ParseUUID(composioTestUserID)
row, _ := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
UserID: userUUID,
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
ConnectedAccountID: "ca_list",
ComposioUserID: composioTestUserID,
})
h := newComposioTestHandler(t, &composioFakeSDK{}, store)
// list
w := httptest.NewRecorder()
h.ListComposioConnections(w, composioReq(http.MethodGet, "/connections", ""))
if w.Code != http.StatusOK {
t.Fatalf("list: expected 200, got %d", w.Code)
}
var conns []ComposioConnectionResponse
if err := json.Unmarshal(w.Body.Bytes(), &conns); err != nil {
t.Fatalf("decode: %v", err)
}
if len(conns) != 1 || conns[0].ToolkitSlug != "notion" {
t.Fatalf("conns = %+v", conns)
}
// delete (owner) → 204, routed through chi so {id} resolves
r := chi.NewRouter()
r.Delete("/api/integrations/composio/connections/{id}", h.DeleteComposioConnection)
delReq := composioReq(http.MethodDelete, "/api/integrations/composio/connections/"+util.UUIDToString(row.ID), "")
w = httptest.NewRecorder()
r.ServeHTTP(w, delReq)
if w.Code != http.StatusNoContent {
t.Fatalf("delete: expected 204, got %d (%s)", w.Code, w.Body.String())
}
// delete unknown id → 404
missing := "33333333-3333-3333-3333-333333333333"
w = httptest.NewRecorder()
r.ServeHTTP(w, composioReq(http.MethodDelete, "/api/integrations/composio/connections/"+missing, ""))
if w.Code != http.StatusNotFound {
t.Fatalf("delete missing: expected 404, got %d", w.Code)
}
}
// composioCapturingSDK records the callback URL so a test can replay the signed
// state through the real callback handler.
type composioCapturingSDK struct {
composioFakeSDK
lastCallbackURL string
}
func (f *composioCapturingSDK) CreateLink(_ context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
f.lastCallbackURL = req.CallbackURL
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect"}, nil
}
func (f *composioCapturingSDK) stateFromCallback() string {
idx := strings.Index(f.lastCallbackURL, "state=")
if idx < 0 {
return ""
}
return f.lastCallbackURL[idx+len("state="):]
}

View File

@@ -1,685 +0,0 @@
// Package composio is the Stage 2 business-integration glue between Multica and
// the standalone Composio SDK (server/pkg/composio). It owns Multica semantics:
// the signed-state connect handshake, the local user_composio_connection
// mirror, idempotent disconnect, and the per-user MCP session helper.
//
// It deliberately does NOT wrap the SDK in another HTTP client — it composes
// *sdk.Client directly through the SDK interface so tests can drop in a fake.
//
// MVP scope (MUL-3720): toolkits are discovered dynamically. The
// toolkit→auth-config mapping is resolved at request time from Composio's
// /auth_configs endpoint (cached briefly), so a toolkit becomes connectable the
// moment an auth config is enabled for it in the Composio dashboard — no env
// var and no redeploy. A toolkit with no enabled auth config is rejected.
package composio
import (
"context"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"sync"
"time"
sdk "github.com/multica-ai/multica/server/pkg/composio"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// Service-level errors surfaced to the handler layer.
var (
// ErrToolkitNotSupported is returned by BeginConnect when the requested
// toolkit has no enabled auth config in the Composio project, so there is
// no auth_config_id to start a connect link with.
ErrToolkitNotSupported = errors.New("composio: toolkit not supported")
// ErrConnectNotSuccessful is returned by CompleteCallback when Composio
// reported a non-success status — no active row is written.
ErrConnectNotSuccessful = errors.New("composio: connection was not successful")
// ErrConnectionNotFound is returned by Disconnect when the connection id
// does not belong to the user (or does not exist).
ErrConnectionNotFound = errors.New("composio: connection not found")
// ErrAccountVerification is returned by CompleteCallback when the
// connected_account_id carried on the callback cannot be confirmed (with
// Composio) to belong to the user/auth-config named in the signed state —
// i.e. a tampered or unknown account id. No local row is written.
ErrAccountVerification = errors.New("composio: connected account verification failed")
)
// defaultStateTTL bounds how long a connect handshake may sit between
// BeginConnect and the Composio callback. Five minutes is generous for a hosted
// OAuth flow while keeping the replay window small.
const defaultStateTTL = 5 * time.Minute
// defaultAuthCacheTTL bounds how long the resolved toolkit→auth-config map is
// cached before a re-fetch from Composio. Short enough that enabling an auth
// config in the dashboard reflects within minutes; long enough that a burst of
// connect/list requests does not hammer /auth_configs.
const defaultAuthCacheTTL = 5 * time.Minute
// maxAuthConfigPages / maxToolkitPages cap the paginated fetch-all loops so a
// pathological or buggy upstream cursor cannot spin forever. At limit=1000 per
// page these cover far more than any real project / catalog.
const (
maxAuthConfigPages = 20
maxToolkitPages = 20
listPageLimit = 1000
)
// SDK is the subset of *sdk.Client the service depends on. Declared as an
// interface so handler/service tests can inject a fake without hitting Composio.
// *sdk.Client satisfies it.
type SDK interface {
CreateLink(ctx context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error)
ListConnectedAccounts(ctx context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error)
ListAuthConfigs(ctx context.Context, req sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error)
ListToolkits(ctx context.Context, req sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error)
RevokeConnection(ctx context.Context, connectedAccountID string) error
DeleteConnectedAccount(ctx context.Context, connectedAccountID string) error
CreateSession(ctx context.Context, req sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error)
MCPAuthHeaders() map[string]string
}
// Store is the persistence seam for the local connection mirror. *db.Queries
// satisfies it; tests use an in-memory fake.
type Store interface {
UpsertUserComposioConnection(ctx context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error)
ListActiveUserComposioConnections(ctx context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error)
GetUserComposioConnection(ctx context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error)
MarkUserComposioConnectionRevoked(ctx context.Context, arg db.MarkUserComposioConnectionRevokedParams) error
}
// Config configures a Service.
type Config struct {
// StateSecret signs the connect-state HMAC. Required (non-empty).
StateSecret []byte
// CallbackBaseURL is the absolute, public base URL of THIS API, with no
// trailing slash (e.g. "https://app.multica.ai"). The Composio callback
// URL is built as CallbackBaseURL + CallbackPath. Required.
CallbackBaseURL string
// FrontendBaseURL is the web app base used to build the post-callback
// browser redirect (e.g. "https://app.multica.ai"). May be empty, in which
// case CallbackRedirect returns a site-relative path.
FrontendBaseURL string
// StateTTL overrides the default connect-state lifetime. Zero uses
// defaultStateTTL.
StateTTL time.Duration
// AuthConfigTTL overrides how long the resolved toolkit→auth-config map is
// cached. Zero uses defaultAuthCacheTTL.
AuthConfigTTL time.Duration
// Now is overridable for deterministic tests. Nil uses time.Now.
Now func() time.Time
}
// callbackPath is the API path Composio redirects the browser back to. It is a
// constant (not configurable) so the SDK callback URL and the router route
// cannot drift apart.
const callbackPath = "/api/integrations/composio/callback"
// Service is the Composio business-integration service.
type Service struct {
sdk SDK
store Store
secret []byte
callbackURL string
frontendURL string
stateTTL time.Duration
now func() time.Time
// authCache holds the resolved toolkit_slug → auth_config_id map for the
// project. It is rebuilt from Composio's /auth_configs endpoint on first
// use and whenever authCacheExp has passed; authCacheMu guards both fields.
authCacheMu sync.Mutex
authCache map[string]string
authCacheExp time.Time
authCacheTTL time.Duration
}
// NewService validates its inputs and returns a ready Service. It errors when a
// required dependency is missing so a misconfigured boot fails loudly instead
// of returning 500s at request time.
func NewService(client SDK, store Store, cfg Config) (*Service, error) {
if client == nil {
return nil, errors.New("composio: SDK client is required")
}
if store == nil {
return nil, errors.New("composio: store is required")
}
if len(cfg.StateSecret) == 0 {
return nil, errors.New("composio: StateSecret is required")
}
base := strings.TrimRight(strings.TrimSpace(cfg.CallbackBaseURL), "/")
if base == "" {
return nil, errors.New("composio: CallbackBaseURL is required")
}
ttl := cfg.StateTTL
if ttl <= 0 {
ttl = defaultStateTTL
}
authTTL := cfg.AuthConfigTTL
if authTTL <= 0 {
authTTL = defaultAuthCacheTTL
}
now := cfg.Now
if now == nil {
now = time.Now
}
return &Service{
sdk: client,
store: store,
secret: cfg.StateSecret,
callbackURL: base + callbackPath,
frontendURL: strings.TrimRight(strings.TrimSpace(cfg.FrontendBaseURL), "/"),
stateTTL: ttl,
now: now,
authCacheTTL: authTTL,
}, nil
}
// Connection is the API-facing view of a local connection row. The Composio
// connected_account_id and auth_config_id are intentionally omitted — they are
// server-internal handles, not API surface.
type Connection struct {
ID string `json:"id"`
ToolkitSlug string `json:"toolkit_slug"`
Status string `json:"status"`
ConnectedAt string `json:"connected_at"`
LastUsedAt *string `json:"last_used_at"`
}
// MCPSession is the result of CreateMCPSession: the streamable MCP URL plus the
// headers an MCP client must attach. Headers carry the Composio x-api-key, so
// callers must route them through the redact pipeline before logging.
type MCPSession struct {
URL string
Headers map[string]string
}
// ToolkitView is the API-facing descriptor for one Composio toolkit, carrying
// exactly the fields the Settings UI renders plus a Connectable flag.
//
// Connectable means the project has an enabled auth config for the toolkit, so
// BeginConnect would succeed. When false the UI must not offer a working
// "Connect" affordance — clicking it would 400 with ErrToolkitNotSupported.
type ToolkitView struct {
Slug string `json:"slug"`
Name string `json:"name"`
LogoURL string `json:"logo,omitempty"`
Category string `json:"category,omitempty"`
Connectable bool `json:"connectable"`
}
// BeginConnect validates the toolkit, mints a signed state, and asks Composio
// for a hosted Connect Link. The returned redirect URL is where the caller
// sends the user's browser.
//
// The auth_config_id is resolved dynamically from the project's enabled auth
// configs (cached), so a toolkit is connectable iff the dashboard has an auth
// config for it — no static env map. A toolkit with none yields
// ErrToolkitNotSupported.
//
// The composio_user_id sent to Composio is the Multica user id verbatim — the
// invariant the rest of the integration relies on.
func (s *Service) BeginConnect(ctx context.Context, userID pgtype.UUID, toolkitSlug string) (string, error) {
slug := strings.ToLower(strings.TrimSpace(toolkitSlug))
authConfigID, err := s.authConfigForToolkit(ctx, slug)
if err != nil {
return "", err
}
if authConfigID == "" {
return "", ErrToolkitNotSupported
}
if !userID.Valid {
return "", errors.New("composio: invalid user id")
}
composioUserID := util.UUIDToString(userID)
state, err := signState(s.secret, stateClaims{
UserID: composioUserID,
ToolkitSlug: slug,
AuthConfigID: authConfigID,
Exp: s.now().Add(s.stateTTL).Unix(),
})
if err != nil {
return "", fmt.Errorf("composio: sign state: %w", err)
}
// Composio appends its own status / connected_account_id query params to
// the callback URL and preserves ours, so the signed state rides back to us
// on the redirect.
callbackURL := s.callbackURL + "?state=" + url.QueryEscape(state)
resp, err := s.sdk.CreateLink(ctx, sdk.CreateLinkRequest{
AuthConfigID: authConfigID,
UserID: composioUserID,
CallbackURL: callbackURL,
})
if err != nil {
return "", fmt.Errorf("composio: create link: %w", err)
}
return resp.RedirectURL, nil
}
// CompleteCallback verifies the signed state and, on a successful Composio
// status, upserts the local connection row. It returns the toolkit slug from
// the state so the handler can build the right redirect even on the
// not-successful path.
//
// Idempotency: the upsert is keyed on (user_id, connected_account_id), so a
// duplicate callback re-activates the same row instead of creating a second.
func (s *Service) CompleteCallback(ctx context.Context, state, status, connectedAccountID string) (string, error) {
claims, err := verifyState(s.secret, state, s.now())
if err != nil {
return "", err
}
if !strings.EqualFold(strings.TrimSpace(status), "success") {
// Honor the state for the redirect slug, but do not write an active row.
return claims.ToolkitSlug, ErrConnectNotSuccessful
}
if strings.TrimSpace(connectedAccountID) == "" {
return claims.ToolkitSlug, errors.New("composio: callback missing connected_account_id")
}
userID, err := util.ParseUUID(claims.UserID)
if err != nil {
return claims.ToolkitSlug, fmt.Errorf("composio: state has invalid user id: %w", err)
}
// The auth_config_id was resolved at BeginConnect and signed into the state,
// so we compare against THAT exact value rather than re-resolving here (a
// re-resolve that failed or drifted would otherwise fail-open: a missing
// expected auth config used to skip the check, letting another toolkit's
// account id be bound under this toolkit's slug). An empty value fails
// closed in verifyAccountOwnership.
authConfigID := claims.AuthConfigID
// Defense-in-depth (PR 4608 review): the signed state proves *who* started
// the handshake and *which* toolkit, but connected_account_id rides back as
// a plain query param Composio appends to our callback URL. A crafted
// redirect could pair a valid, un-expired state with someone else's account
// id and we would mirror it verbatim. Before writing, confirm with Composio
// that this account actually belongs to the state's user (the
// composio_user_id == multica user id invariant) and was created under the
// toolkit's auth config. Any mismatch fails closed with ErrAccountVerification.
if err := s.verifyAccountOwnership(ctx, connectedAccountID, claims.UserID, authConfigID); err != nil {
return claims.ToolkitSlug, err
}
if _, err := s.store.UpsertUserComposioConnection(ctx, db.UpsertUserComposioConnectionParams{
UserID: userID,
ToolkitSlug: claims.ToolkitSlug,
AuthConfigID: authConfigID,
ConnectedAccountID: connectedAccountID,
// Invariant: composio_user_id == Multica user id.
ComposioUserID: claims.UserID,
}); err != nil {
return claims.ToolkitSlug, fmt.Errorf("composio: upsert connection: %w", err)
}
return claims.ToolkitSlug, nil
}
// ListConnections returns the user's active connections.
func (s *Service) ListConnections(ctx context.Context, userID pgtype.UUID) ([]Connection, error) {
rows, err := s.store.ListActiveUserComposioConnections(ctx, userID)
if err != nil {
return nil, err
}
out := make([]Connection, 0, len(rows))
for _, row := range rows {
out = append(out, rowToConnection(row))
}
return out, nil
}
// Disconnect revokes and deletes the connection at Composio, then marks the
// local row revoked. It is idempotent: a Composio 404 (already gone) is treated
// as success, and re-revoking an already-revoked local row is a no-op.
//
// A connection id that does not belong to the user (or does not exist at all)
// returns ErrConnectionNotFound so the handler can 404 without leaking
// existence across users.
func (s *Service) Disconnect(ctx context.Context, userID, connectionID pgtype.UUID) error {
row, err := s.store.GetUserComposioConnection(ctx, db.GetUserComposioConnectionParams{
ID: connectionID,
UserID: userID,
})
if err != nil {
// pgx.ErrNoRows or fake "not found" — treat as not found for the owner.
return ErrConnectionNotFound
}
// Already disconnected locally: a repeat DELETE is a pure no-op. Short-
// circuiting here keeps disconnect idempotent even when the upstream now
// answers revoke/delete with a NON-404 error (PR 4608 review): the account
// is already gone, so re-hitting Composio could turn a second DELETE into a
// 502 and break the 204-idempotent contract. The first disconnect already
// revoked upstream and marked the row.
if !strings.EqualFold(row.Status, "active") {
return nil
}
// Revoke the upstream grant first, then delete the Composio record. Both are
// made idempotent against a 404 so a repeated disconnect (or a connection
// already removed at Composio) still resolves the local row.
if err := s.sdk.RevokeConnection(ctx, row.ConnectedAccountID); err != nil && !isNotFound(err) {
return fmt.Errorf("composio: revoke connection: %w", err)
}
// DeleteConnectedAccount already swallows 404 in the SDK, but guard anyway.
if err := s.sdk.DeleteConnectedAccount(ctx, row.ConnectedAccountID); err != nil && !isNotFound(err) {
return fmt.Errorf("composio: delete connected account: %w", err)
}
if err := s.store.MarkUserComposioConnectionRevoked(ctx, db.MarkUserComposioConnectionRevokedParams{
ID: connectionID,
UserID: userID,
}); err != nil {
return fmt.Errorf("composio: mark revoked: %w", err)
}
return nil
}
// CreateMCPSession opens a Composio tool-router (MCP) session scoped to the
// user's active connections. It returns (nil, nil) when the user has no active
// connections — callers treat that as "no MCP overlay for this user".
//
// connected_accounts is pinned per toolkit to the user's own connected account
// id so the session cannot surface accounts the user did not connect. This
// helper is NOT yet wired into task dispatch (Stage 3); it exists so that wiring
// is a pure consumer of an already-tested seam.
//
// Single-account constraint (v1, PR 4608 review follow-up): the MVP connect
// flow assumes AT MOST ONE active connection per (user, toolkit) — there is no
// UI or API to hold several, and connected_accounts is keyed by toolkit slug so
// it physically cannot carry two accounts for the same toolkit. Should
// duplicates ever exist, we must choose deterministically: rows arrive
// newest-first (ListActive orders by connected_at DESC), so we keep the FIRST
// occurrence per toolkit (the most recently connected account) instead of
// letting a later map write silently select an older one.
//
// Stage 3 owns the real decision before this is wired into dispatch: either
// enforce the single-active constraint at connect time (revoke the previous
// active row for the same toolkit on a new connect) or extend CreateSession to
// a multi-account request shape. Until then, newest-wins keeps behavior
// deterministic rather than order-dependent.
func (s *Service) CreateMCPSession(ctx context.Context, userID pgtype.UUID) (*MCPSession, error) {
rows, err := s.store.ListActiveUserComposioConnections(ctx, userID)
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
connectedAccounts := make(map[string]any, len(rows))
for _, row := range rows {
// Keep the first (newest) account per toolkit; ignore older duplicates.
if _, exists := connectedAccounts[row.ToolkitSlug]; exists {
continue
}
connectedAccounts[row.ToolkitSlug] = row.ConnectedAccountID
}
resp, err := s.sdk.CreateSession(ctx, sdk.CreateSessionRequest{
UserID: util.UUIDToString(userID),
ConnectedAccounts: connectedAccounts,
})
if err != nil {
return nil, fmt.Errorf("composio: create session: %w", err)
}
return &MCPSession{
URL: resp.MCP.URL,
Headers: s.sdk.MCPAuthHeaders(),
}, nil
}
// CallbackRedirect builds the browser redirect target for the callback handler.
// On success it points at the settings page (Integrations tab) with the
// connected toolkit slug; on failure it carries a stable error code. The path
// is the slug-less `/settings?tab=integrations&...` form on purpose: the web
// proxy's legacy-route redirect prepends the user's last workspace slug, so it
// resolves to the real `/{slug}/settings?tab=integrations` route that mounts
// the Composio tab. The older `/settings/integrations` path was NOT a real
// route and 404'd after the legacy rewrite. When FrontendBaseURL is unset it
// returns a site-relative path.
func (s *Service) CallbackRedirect(slug string, success bool) string {
var path string
if success {
path = "/settings?tab=integrations&connected=" + url.QueryEscape(slug)
} else {
path = "/settings?tab=integrations&error=composio_connect_failed"
}
return s.frontendURL + path
}
// rowToConnection maps a DB row to the API-facing Connection view.
func rowToConnection(row db.UserComposioConnection) Connection {
c := Connection{
ID: util.UUIDToString(row.ID),
ToolkitSlug: row.ToolkitSlug,
Status: row.Status,
}
if row.ConnectedAt.Valid {
c.ConnectedAt = row.ConnectedAt.Time.UTC().Format(time.RFC3339)
}
c.LastUsedAt = util.TimestampToPtr(row.LastUsedAt)
return c
}
// ListToolkits returns the full Composio toolkit catalog annotated with a
// Connectable flag (whether the project has an enabled auth config for each).
// It fetches all pages (capped by maxToolkitPages) so the UI gets the complete
// list in one call; the catalog is a few hundred entries, well within a single
// JSON response. Connectable toolkits are surfaced first so the UI can lead
// with what actually works.
func (s *Service) ListToolkits(ctx context.Context) ([]ToolkitView, error) {
// connectable is the project's enabled toolkit_slug → auth_config_id map.
// On a transient resolver error we still render the catalog, just with
// everything marked not-connectable, rather than failing the whole list.
connectable, err := s.authConfigMap(ctx)
if err != nil {
connectable = map[string]string{}
}
out := []ToolkitView{}
seen := make(map[string]struct{})
cursor := ""
for page := 0; page < maxToolkitPages; page++ {
resp, err := s.sdk.ListToolkits(ctx, sdk.ListToolkitsRequest{
Limit: listPageLimit,
Cursor: cursor,
SortBy: "usage",
})
if err != nil {
return nil, fmt.Errorf("composio: list toolkits: %w", err)
}
for _, tk := range resp.Items {
slug := strings.ToLower(strings.TrimSpace(tk.Slug))
if slug == "" {
continue
}
if _, dup := seen[slug]; dup {
continue
}
seen[slug] = struct{}{}
category := ""
if len(tk.Categories) > 0 {
category = tk.Categories[0]
}
_, canConnect := connectable[slug]
out = append(out, ToolkitView{
Slug: tk.Slug,
Name: tk.Name,
LogoURL: tk.LogoURL,
Category: category,
Connectable: canConnect,
})
}
if resp.NextCursor == "" {
break
}
cursor = resp.NextCursor
}
// Stable sort: connectable toolkits first, preserving Composio's usage
// order within each group.
sort.SliceStable(out, func(i, j int) bool {
if out[i].Connectable != out[j].Connectable {
return out[i].Connectable
}
return false
})
return out, nil
}
// authConfigForToolkit returns the chosen auth_config_id for a toolkit slug, or
// "" when the project has no enabled auth config for it. It reads the cached
// project-wide map (refreshed on TTL).
func (s *Service) authConfigForToolkit(ctx context.Context, slug string) (string, error) {
slug = strings.ToLower(strings.TrimSpace(slug))
if slug == "" {
return "", nil
}
m, err := s.authConfigMap(ctx)
if err != nil {
return "", err
}
return m[slug], nil
}
// authConfigMap returns the toolkit_slug → auth_config_id map for the project,
// rebuilding it from Composio when the cache is empty or expired. Concurrent
// callers serialize on authCacheMu; the fetch runs under the lock, which is
// acceptable for a short-TTL map that is cheap to build and read by a
// low-traffic settings surface. A new map is assigned on refresh (never mutated
// in place), so a reference handed to a caller stays safe to read.
func (s *Service) authConfigMap(ctx context.Context) (map[string]string, error) {
s.authCacheMu.Lock()
defer s.authCacheMu.Unlock()
if s.authCache != nil && s.now().Before(s.authCacheExp) {
return s.authCache, nil
}
m, err := s.fetchAuthConfigMap(ctx)
if err != nil {
// Serve a stale snapshot if we have one — a transient /auth_configs
// blip should not make every toolkit suddenly un-connectable.
if s.authCache != nil {
return s.authCache, nil
}
return nil, err
}
s.authCache = m
s.authCacheExp = s.now().Add(s.authCacheTTL)
return m, nil
}
// authCandidate is one project auth config in contention to represent a toolkit
// during resolution.
type authCandidate struct {
id string
managed bool
updated string
}
// fetchAuthConfigMap pages through the project's ENABLED auth configs and
// reduces them to one chosen auth_config_id per toolkit slug. When a toolkit
// has several (e.g. a Composio-managed one plus a custom white-label one),
// betterAuthConfig picks the winner.
func (s *Service) fetchAuthConfigMap(ctx context.Context) (map[string]string, error) {
best := make(map[string]authCandidate)
cursor := ""
for page := 0; page < maxAuthConfigPages; page++ {
resp, err := s.sdk.ListAuthConfigs(ctx, sdk.ListAuthConfigsRequest{
ShowDisabled: false,
Limit: listPageLimit,
Cursor: cursor,
})
if err != nil {
return nil, fmt.Errorf("composio: list auth configs: %w", err)
}
for _, ac := range resp.Items {
if ac.ID == "" || strings.EqualFold(ac.Status, "DISABLED") {
continue
}
slug := strings.ToLower(strings.TrimSpace(ac.Toolkit.Slug))
if slug == "" {
continue
}
cand := authCandidate{id: ac.ID, managed: ac.IsComposioManaged, updated: ac.LastUpdatedAt}
if cur, ok := best[slug]; !ok || betterAuthConfig(cand, cur) {
best[slug] = cand
}
}
if resp.NextCursor == "" {
break
}
cursor = resp.NextCursor
}
out := make(map[string]string, len(best))
for slug, c := range best {
out[slug] = c.id
}
return out, nil
}
// betterAuthConfig reports whether candidate a should win over the currently
// selected b for the same toolkit. A custom (bring-your-own OAuth) config beats
// a Composio-managed one — it is the white-label path the product wants — and
// among configs of the same kind the most recently updated wins.
func betterAuthConfig(a, b authCandidate) bool {
if a.managed != b.managed {
return !a.managed
}
return a.updated > b.updated
}
// verifyAccountOwnership confirms with Composio that connectedAccountID really
// belongs to expectedUserID and was created under expectedAuthConfigID, so a
// tampered or cross-toolkit connected_account_id on the callback cannot smuggle
// another account into the local mirror. It fails closed: an upstream error, an
// unknown account, an owner mismatch, an EMPTY expected auth config, or an
// auth-config mismatch all return ErrAccountVerification. Requiring a non-empty,
// exactly-matching auth config is what closes the cross-toolkit binding gap —
// the expected value is the auth_config_id signed into the state at
// BeginConnect, which is toolkit-specific.
func (s *Service) verifyAccountOwnership(ctx context.Context, connectedAccountID, expectedUserID, expectedAuthConfigID string) error {
resp, err := s.sdk.ListConnectedAccounts(ctx, sdk.ListConnectedAccountsRequest{
ConnectedAccountIDs: []string{connectedAccountID},
})
if err != nil {
return fmt.Errorf("composio: verify connected account: %w", err)
}
var acct *sdk.ConnectedAccount
for i := range resp.Items {
if resp.Items[i].ID == connectedAccountID {
acct = &resp.Items[i]
break
}
}
if acct == nil {
return ErrAccountVerification
}
if acct.UserID != expectedUserID {
return ErrAccountVerification
}
// Fail closed: the account MUST have been created under the exact auth
// config the connect link used. An empty expected value (state missing it,
// or a resolver gap) is rejected rather than skipped — skipping is the
// fail-open hole that let a cross-toolkit account id be bound here.
if expectedAuthConfigID == "" || acct.AuthConfigID != expectedAuthConfigID {
return ErrAccountVerification
}
return nil
}
// isNotFound reports whether err is a Composio 404 APIError, used to make
// revoke/delete idempotent.
func isNotFound(err error) bool {
var apiErr *sdk.APIError
return errors.As(err, &apiErr) && apiErr.IsNotFound()
}

View File

@@ -1,694 +0,0 @@
package composio
import (
"context"
"errors"
"net/http"
"net/url"
"strings"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/util"
sdk "github.com/multica-ai/multica/server/pkg/composio"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ---- fakes ---------------------------------------------------------------
type fakeSDK struct {
createLinkResp *sdk.CreateLinkResponse
createLinkErr error
lastCreateLink sdk.CreateLinkRequest
revoked []string
revokeErr error
deleted []string
deleteErr error
createSessResp *sdk.CreateSessionResponse
createSessErr error
lastSessReq sdk.CreateSessionRequest
createSessCalls int
// account-ownership verification (CompleteCallback). By default
// ListConnectedAccounts echoes the requested id with acctUserID /
// acctAuthConfigID so success-path tests can opt in to a matching account;
// acctMissing returns no items, listAccountsErr forces a transport error.
acctUserID string
acctAuthConfigID string
acctMissing bool
listAccountsErr error
lastListAccounts sdk.ListConnectedAccountsRequest
// auth-config resolution (BeginConnect / ListToolkits connectable flag).
// authConfigs nil => a default single notion→ac_notion ENABLED config so
// existing connect tests keep resolving; set explicitly to override.
authConfigs []sdk.AuthConfig
authConfigsSet bool
listAuthErr error
// toolkit catalog (ListToolkits).
toolkits []sdk.Toolkit
listToolkitsErr error
}
func (f *fakeSDK) CreateLink(_ context.Context, req sdk.CreateLinkRequest) (*sdk.CreateLinkResponse, error) {
f.lastCreateLink = req
if f.createLinkErr != nil {
return nil, f.createLinkErr
}
if f.createLinkResp != nil {
return f.createLinkResp, nil
}
return &sdk.CreateLinkResponse{RedirectURL: "https://composio.example/redirect", ConnectedAccountID: "ca_pending"}, nil
}
func (f *fakeSDK) ListConnectedAccounts(_ context.Context, req sdk.ListConnectedAccountsRequest) (*sdk.ListConnectedAccountsResponse, error) {
f.lastListAccounts = req
if f.listAccountsErr != nil {
return nil, f.listAccountsErr
}
if f.acctMissing {
return &sdk.ListConnectedAccountsResponse{}, nil
}
id := ""
if len(req.ConnectedAccountIDs) > 0 {
id = req.ConnectedAccountIDs[0]
}
return &sdk.ListConnectedAccountsResponse{Items: []sdk.ConnectedAccount{{
ID: id,
UserID: f.acctUserID,
AuthConfigID: f.acctAuthConfigID,
}}}, nil
}
func (f *fakeSDK) ListAuthConfigs(_ context.Context, _ sdk.ListAuthConfigsRequest) (*sdk.ListAuthConfigsResponse, error) {
if f.listAuthErr != nil {
return nil, f.listAuthErr
}
items := f.authConfigs
if !f.authConfigsSet && items == nil {
items = []sdk.AuthConfig{{
ID: "ac_notion",
Toolkit: sdk.Toolkit{Slug: "notion"},
Status: "ENABLED",
IsComposioManaged: true,
}}
}
return &sdk.ListAuthConfigsResponse{Items: items}, nil
}
func (f *fakeSDK) ListToolkits(_ context.Context, _ sdk.ListToolkitsRequest) (*sdk.ListToolkitsResponse, error) {
if f.listToolkitsErr != nil {
return nil, f.listToolkitsErr
}
return &sdk.ListToolkitsResponse{Items: f.toolkits}, nil
}
func (f *fakeSDK) RevokeConnection(_ context.Context, id string) error {
f.revoked = append(f.revoked, id)
return f.revokeErr
}
func (f *fakeSDK) DeleteConnectedAccount(_ context.Context, id string) error {
f.deleted = append(f.deleted, id)
return f.deleteErr
}
func (f *fakeSDK) CreateSession(_ context.Context, req sdk.CreateSessionRequest) (*sdk.CreateSessionResponse, error) {
f.createSessCalls++
f.lastSessReq = req
if f.createSessErr != nil {
return nil, f.createSessErr
}
if f.createSessResp != nil {
return f.createSessResp, nil
}
return &sdk.CreateSessionResponse{MCP: sdk.MCPDescriptor{URL: "https://mcp.example/session"}}, nil
}
func (f *fakeSDK) MCPAuthHeaders() map[string]string {
return map[string]string{"x-api-key": "secret"}
}
// fakeStore is an in-memory implementation of Store with the same
// (user_id, connected_account_id) uniqueness as the real table.
type fakeStore struct {
rows []db.UserComposioConnection
nextID byte
}
func newFakeStore() *fakeStore { return &fakeStore{nextID: 1} }
func (s *fakeStore) UpsertUserComposioConnection(_ context.Context, arg db.UpsertUserComposioConnectionParams) (db.UserComposioConnection, error) {
for i := range s.rows {
if uuidEqual(s.rows[i].UserID, arg.UserID) && s.rows[i].ConnectedAccountID == arg.ConnectedAccountID {
s.rows[i].ToolkitSlug = arg.ToolkitSlug
s.rows[i].AuthConfigID = arg.AuthConfigID
s.rows[i].ComposioUserID = arg.ComposioUserID
s.rows[i].Status = "active"
s.rows[i].UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true}
return s.rows[i], nil
}
}
row := db.UserComposioConnection{
ID: mintUUID(s.nextID),
UserID: arg.UserID,
ToolkitSlug: arg.ToolkitSlug,
AuthConfigID: arg.AuthConfigID,
ConnectedAccountID: arg.ConnectedAccountID,
ComposioUserID: arg.ComposioUserID,
Status: "active",
ConnectedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
}
s.nextID++
s.rows = append(s.rows, row)
return row, nil
}
func (s *fakeStore) ListActiveUserComposioConnections(_ context.Context, userID pgtype.UUID) ([]db.UserComposioConnection, error) {
out := []db.UserComposioConnection{}
for _, r := range s.rows {
if uuidEqual(r.UserID, userID) && r.Status == "active" {
out = append(out, r)
}
}
return out, nil
}
func (s *fakeStore) GetUserComposioConnection(_ context.Context, arg db.GetUserComposioConnectionParams) (db.UserComposioConnection, error) {
for _, r := range s.rows {
if uuidEqual(r.ID, arg.ID) && uuidEqual(r.UserID, arg.UserID) {
return r, nil
}
}
return db.UserComposioConnection{}, pgx.ErrNoRows
}
func (s *fakeStore) MarkUserComposioConnectionRevoked(_ context.Context, arg db.MarkUserComposioConnectionRevokedParams) error {
for i := range s.rows {
if uuidEqual(s.rows[i].ID, arg.ID) && uuidEqual(s.rows[i].UserID, arg.UserID) {
s.rows[i].Status = "revoked"
}
}
return nil
}
func uuidEqual(a, b pgtype.UUID) bool { return a.Valid && b.Valid && a.Bytes == b.Bytes }
func mintUUID(n byte) pgtype.UUID {
var b [16]byte
b[15] = n
return pgtype.UUID{Bytes: b, Valid: true}
}
func newTestService(t *testing.T, client SDK, store Store) *Service {
t.Helper()
svc, err := NewService(client, store, Config{
StateSecret: testSecret,
CallbackBaseURL: "https://app.multica.ai",
FrontendBaseURL: "https://app.multica.ai",
Now: func() time.Time { return time.Unix(1_700_000_000, 0) },
})
if err != nil {
t.Fatalf("NewService: %v", err)
}
return svc
}
// ---- tests ---------------------------------------------------------------
func TestNewService_Validation(t *testing.T) {
t.Parallel()
if _, err := NewService(nil, newFakeStore(), Config{StateSecret: testSecret, CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for nil client")
}
if _, err := NewService(&fakeSDK{}, nil, Config{StateSecret: testSecret, CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for nil store")
}
if _, err := NewService(&fakeSDK{}, newFakeStore(), Config{CallbackBaseURL: "x"}); err == nil {
t.Error("expected error for empty secret")
}
if _, err := NewService(&fakeSDK{}, newFakeStore(), Config{StateSecret: testSecret}); err == nil {
t.Error("expected error for empty callback base")
}
}
func TestBeginConnect_MappingAndState(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, newFakeStore())
userID := mintUUID(7)
redirect, err := svc.BeginConnect(context.Background(), userID, "Notion")
if err != nil {
t.Fatalf("BeginConnect: %v", err)
}
if redirect != "https://composio.example/redirect" {
t.Errorf("redirect = %q", redirect)
}
// toolkit → auth_config mapping
if sdkFake.lastCreateLink.AuthConfigID != "ac_notion" {
t.Errorf("auth config = %q", sdkFake.lastCreateLink.AuthConfigID)
}
// composio_user_id == multica user id
if sdkFake.lastCreateLink.UserID != util.UUIDToString(userID) {
t.Errorf("composio user id = %q, want %q", sdkFake.lastCreateLink.UserID, util.UUIDToString(userID))
}
// callback URL carries the signed state and points at our callback path
cb := sdkFake.lastCreateLink.CallbackURL
if !strings.HasPrefix(cb, "https://app.multica.ai"+callbackPath+"?state=") {
t.Fatalf("callback url = %q", cb)
}
u, _ := url.Parse(cb)
state := u.Query().Get("state")
claims, err := verifyState(testSecret, state, time.Unix(1_700_000_000, 0))
if err != nil {
t.Fatalf("state did not verify: %v", err)
}
if claims.ToolkitSlug != "notion" || claims.UserID != util.UUIDToString(userID) {
t.Errorf("claims = %+v", claims)
}
// The resolved auth_config_id is signed into the state so the callback can
// verify the returned account against it exactly (no fail-open re-resolve).
if claims.AuthConfigID != "ac_notion" {
t.Errorf("state auth config = %q, want ac_notion", claims.AuthConfigID)
}
}
func TestBeginConnect_UnsupportedToolkit(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "github"); !errors.Is(err, ErrToolkitNotSupported) {
t.Fatalf("expected ErrToolkitNotSupported, got %v", err)
}
}
// TestBeginConnect_UnsupportedWhenNoAuthConfig: with the project reporting no
// enabled auth configs at all, even notion is not connectable.
func TestBeginConnect_UnsupportedWhenNoAuthConfig(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{authConfigsSet: true, authConfigs: []sdk.AuthConfig{}}
svc := newTestService(t, sdkFake, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "notion"); !errors.Is(err, ErrToolkitNotSupported) {
t.Fatalf("expected ErrToolkitNotSupported with no auth configs, got %v", err)
}
}
// TestBeginConnect_PrefersCustomAuthConfig: when a toolkit has both a
// Composio-managed and a custom (white-label) auth config, the custom one wins.
func TestBeginConnect_PrefersCustomAuthConfig(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{authConfigsSet: true, authConfigs: []sdk.AuthConfig{
{ID: "ac_managed", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED", IsComposioManaged: true},
{ID: "ac_custom", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED", IsComposioManaged: false},
}}
svc := newTestService(t, sdkFake, newFakeStore())
if _, err := svc.BeginConnect(context.Background(), mintUUID(1), "notion"); err != nil {
t.Fatalf("BeginConnect: %v", err)
}
if sdkFake.lastCreateLink.AuthConfigID != "ac_custom" {
t.Errorf("auth config = %q, want ac_custom (custom preferred over managed)", sdkFake.lastCreateLink.AuthConfigID)
}
}
// TestListToolkits_ConnectableFlagAndOrder: every toolkit is listed, but only
// those with an enabled auth config are connectable, and connectable ones sort
// first.
func TestListToolkits_ConnectableFlagAndOrder(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
authConfigsSet: true,
authConfigs: []sdk.AuthConfig{
{ID: "ac_notion", Toolkit: sdk.Toolkit{Slug: "notion"}, Status: "ENABLED"},
},
toolkits: []sdk.Toolkit{
{Slug: "github", Name: "GitHub", LogoURL: "https://logo/gh", Categories: []string{"dev"}},
{Slug: "notion", Name: "Notion", LogoURL: "https://logo/notion", Categories: []string{"productivity"}},
{Slug: "slack", Name: "Slack"},
},
}
svc := newTestService(t, sdkFake, newFakeStore())
tks, err := svc.ListToolkits(context.Background())
if err != nil {
t.Fatalf("ListToolkits: %v", err)
}
if len(tks) != 3 {
t.Fatalf("expected 3 toolkits, got %d", len(tks))
}
// Connectable (notion) sorts first.
if tks[0].Slug != "notion" || !tks[0].Connectable {
t.Errorf("first toolkit = %+v, want connectable notion", tks[0])
}
if tks[0].Name != "Notion" || tks[0].LogoURL != "https://logo/notion" || tks[0].Category != "productivity" {
t.Errorf("notion fields not mapped: %+v", tks[0])
}
for _, tk := range tks[1:] {
if tk.Connectable {
t.Errorf("toolkit %q should not be connectable", tk.Slug)
}
}
}
// TestListToolkits_PaginatesAndResolverErrorIsSoft: a paginated catalog is
// fully drained, and an /auth_configs failure degrades to "nothing
// connectable" instead of failing the whole list.
func TestListToolkits_ResolverErrorMarksNoneConnectable(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{
listAuthErr: errors.New("upstream blip"),
toolkits: []sdk.Toolkit{{Slug: "notion", Name: "Notion"}},
}
svc := newTestService(t, sdkFake, newFakeStore())
tks, err := svc.ListToolkits(context.Background())
if err != nil {
t.Fatalf("ListToolkits should not fail on auth-config error, got %v", err)
}
if len(tks) != 1 || tks[0].Connectable {
t.Fatalf("expected 1 non-connectable toolkit, got %+v", tks)
}
}
func TestCompleteCallback_SuccessAndIdempotent(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(3)
// The account Composio reports for ca_123 belongs to this user under the
// notion auth config, so ownership verification passes.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
slug, err := svc.CompleteCallback(context.Background(), state, "success", "ca_123")
if err != nil {
t.Fatalf("CompleteCallback: %v", err)
}
if slug != "notion" {
t.Errorf("slug = %q", slug)
}
// Duplicate callback (same connected account) must not create a 2nd row.
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_123"); err != nil {
t.Fatalf("second CompleteCallback: %v", err)
}
if len(store.rows) != 1 {
t.Fatalf("expected 1 row after duplicate callback, got %d", len(store.rows))
}
row := store.rows[0]
if row.ComposioUserID != util.UUIDToString(userID) {
t.Errorf("composio_user_id invariant broken: %q", row.ComposioUserID)
}
if row.AuthConfigID != "ac_notion" || row.ToolkitSlug != "notion" || row.Status != "active" {
t.Errorf("row = %+v", row)
}
}
func TestCompleteCallback_NonSuccessNoRow(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(mintUUID(4)),
ToolkitSlug: "notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
slug, err := svc.CompleteCallback(context.Background(), state, "failed", "ca_x")
if !errors.Is(err, ErrConnectNotSuccessful) {
t.Fatalf("expected ErrConnectNotSuccessful, got %v", err)
}
if slug != "notion" {
t.Errorf("slug = %q (should still be returned for redirect)", slug)
}
if len(store.rows) != 0 {
t.Fatalf("expected no row written on non-success, got %d", len(store.rows))
}
}
func TestCompleteCallback_BadState(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if _, err := svc.CompleteCallback(context.Background(), "garbage", "success", "ca_1"); err == nil {
t.Fatal("expected error for malformed state")
}
}
// TestCompleteCallback_TamperedAccountRejected covers the PR 4608 blocker:
// a valid, un-expired state paired with a connected_account_id that Composio
// reports as belonging to a DIFFERENT user must be rejected, and no row written.
func TestCompleteCallback_TamperedAccountRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(20)
// Composio says ca_evil belongs to someone else, not our state's user.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(mintUUID(99)), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_evil"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for foreign account, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written when ownership fails, got %d", len(store.rows))
}
}
// TestCompleteCallback_WrongAuthConfigRejected is the cross-toolkit proof: the
// account belongs to the right user but was created under a DIFFERENT toolkit's
// auth config (e.g. the user pasting their slack account id into a notion
// callback). The state-signed auth_config_id must not match, so it is rejected.
func TestCompleteCallback_WrongAuthConfigRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(21)
// Account is owned by the user but lives under ac_other (another toolkit).
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_other"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_x"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for wrong auth config, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
// TestCompleteCallback_MissingAuthConfigFailsClosed is the regression for the
// re-review blocker: a state with no signed auth_config_id (the old fail-open
// path) plus an account owned by the user must STILL be rejected — the empty
// expected auth config now fails closed instead of skipping the check.
func TestCompleteCallback_MissingAuthConfigFailsClosed(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(25)
// Account genuinely belongs to the user — only the missing auth-config
// binding should trip the rejection.
sdkFake := &fakeSDK{acctUserID: util.UUIDToString(userID), acctAuthConfigID: "ac_notion"}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
// AuthConfigID deliberately omitted (empty) — must fail closed.
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_owned"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification when state carries no auth config, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
// TestCompleteCallback_UnknownAccountRejected ensures an account id Composio
// does not know about fails closed rather than being mirrored verbatim.
func TestCompleteCallback_UnknownAccountRejected(t *testing.T) {
t.Parallel()
store := newFakeStore()
userID := mintUUID(22)
sdkFake := &fakeSDK{acctMissing: true}
svc := newTestService(t, sdkFake, store)
state, _ := signState(testSecret, stateClaims{
UserID: util.UUIDToString(userID),
ToolkitSlug: "notion",
Exp: time.Unix(1_700_000_000, 0).Add(time.Minute).Unix(),
})
if _, err := svc.CompleteCallback(context.Background(), state, "success", "ca_ghost"); !errors.Is(err, ErrAccountVerification) {
t.Fatalf("expected ErrAccountVerification for unknown account, got %v", err)
}
if len(store.rows) != 0 {
t.Fatalf("no row should be written, got %d", len(store.rows))
}
}
func TestListConnections(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
userID := mintUUID(5)
seedActive(store, userID, "notion", "ca_a")
conns, err := svc.ListConnections(context.Background(), userID)
if err != nil {
t.Fatalf("ListConnections: %v", err)
}
if len(conns) != 1 || conns[0].ToolkitSlug != "notion" || conns[0].Status != "active" {
t.Fatalf("conns = %+v", conns)
}
}
func TestDisconnect_OwnerRevokeIdempotentAndFilter(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(6)
row := seedActive(store, userID, "notion", "ca_z")
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("Disconnect: %v", err)
}
if len(sdkFake.revoked) != 1 || sdkFake.revoked[0] != "ca_z" {
t.Errorf("revoked = %v", sdkFake.revoked)
}
// Local row should now be filtered out of the active list.
conns, _ := svc.ListConnections(context.Background(), userID)
if len(conns) != 0 {
t.Errorf("expected 0 active after disconnect, got %d", len(conns))
}
// Second disconnect is idempotent (row still owned, marks revoked again).
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("idempotent Disconnect: %v", err)
}
}
// TestDisconnect_RevokedRowNoOp covers the PR 4608 blocker: once a row is
// locally revoked, a second DELETE must be a pure no-op and must NOT call
// upstream again — otherwise a non-404 upstream error on the repeat would be
// surfaced as a 502 and break idempotency.
func TestDisconnect_RevokedRowNoOp(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(30)
row := seedActive(store, userID, "notion", "ca_noop")
// First disconnect revokes upstream and marks the row revoked.
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("first Disconnect: %v", err)
}
if len(sdkFake.revoked) != 1 {
t.Fatalf("expected 1 upstream revoke, got %d", len(sdkFake.revoked))
}
// Now make the upstream fail with a NON-404 error. A correct no-op must not
// touch upstream, so this error must never surface.
sdkFake.revokeErr = &sdk.APIError{HTTPStatus: http.StatusInternalServerError}
sdkFake.deleteErr = &sdk.APIError{HTTPStatus: http.StatusInternalServerError}
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("second Disconnect on already-revoked row should be a no-op, got %v", err)
}
if len(sdkFake.revoked) != 1 {
t.Errorf("second disconnect must not call upstream revoke again, revoked=%v", sdkFake.revoked)
}
}
func TestDisconnect_UpstreamNotFoundIsIdempotent(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{revokeErr: &sdk.APIError{HTTPStatus: http.StatusNotFound}}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(8)
row := seedActive(store, userID, "notion", "ca_404")
if err := svc.Disconnect(context.Background(), userID, row.ID); err != nil {
t.Fatalf("Disconnect should treat upstream 404 as success, got %v", err)
}
}
func TestDisconnect_NotOwner(t *testing.T) {
t.Parallel()
store := newFakeStore()
svc := newTestService(t, &fakeSDK{}, store)
owner := mintUUID(9)
row := seedActive(store, owner, "notion", "ca_o")
attacker := mintUUID(10)
if err := svc.Disconnect(context.Background(), attacker, row.ID); !errors.Is(err, ErrConnectionNotFound) {
t.Fatalf("expected ErrConnectionNotFound for non-owner, got %v", err)
}
}
func TestCreateMCPSession_NoOpWhenEmpty(t *testing.T) {
t.Parallel()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, newFakeStore())
sess, err := svc.CreateMCPSession(context.Background(), mintUUID(11))
if err != nil {
t.Fatalf("CreateMCPSession: %v", err)
}
if sess != nil {
t.Fatalf("expected nil session when no connections, got %+v", sess)
}
if sdkFake.createSessCalls != 0 {
t.Errorf("CreateSession should not be called when there are no connections")
}
}
func TestCreateMCPSession_PinsConnectedAccounts(t *testing.T) {
t.Parallel()
store := newFakeStore()
sdkFake := &fakeSDK{}
svc := newTestService(t, sdkFake, store)
userID := mintUUID(12)
seedActive(store, userID, "notion", "ca_pin")
sess, err := svc.CreateMCPSession(context.Background(), userID)
if err != nil {
t.Fatalf("CreateMCPSession: %v", err)
}
if sess == nil || sess.URL != "https://mcp.example/session" {
t.Fatalf("session = %+v", sess)
}
if sess.Headers["x-api-key"] != "secret" {
t.Errorf("headers = %+v", sess.Headers)
}
if sdkFake.lastSessReq.UserID != util.UUIDToString(userID) {
t.Errorf("session user id = %q", sdkFake.lastSessReq.UserID)
}
if got := sdkFake.lastSessReq.ConnectedAccounts["notion"]; got != "ca_pin" {
t.Errorf("connected_accounts pin = %v, want ca_pin", got)
}
}
func TestCallbackRedirect(t *testing.T) {
t.Parallel()
svc := newTestService(t, &fakeSDK{}, newFakeStore())
if got := svc.CallbackRedirect("notion", true); got != "https://app.multica.ai/settings?tab=integrations&connected=notion" {
t.Errorf("success redirect = %q", got)
}
if got := svc.CallbackRedirect("notion", false); got != "https://app.multica.ai/settings?tab=integrations&error=composio_connect_failed" {
t.Errorf("failure redirect = %q", got)
}
}
// seedActive inserts an active connection through the store and returns the row.
func seedActive(store *fakeStore, userID pgtype.UUID, slug, caID string) db.UserComposioConnection {
row, _ := store.UpsertUserComposioConnection(context.Background(), db.UpsertUserComposioConnectionParams{
UserID: userID,
ToolkitSlug: slug,
AuthConfigID: "ac_notion",
ConnectedAccountID: caID,
ComposioUserID: util.UUIDToString(userID),
})
return row
}

View File

@@ -1,92 +0,0 @@
package composio
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"strings"
"time"
)
// Signed-state errors. The handler maps all of them to a generic
// "connect failed" redirect so a tampered/expired state never leaks which
// check failed.
var (
// ErrStateMalformed is returned when the state token is not the expected
// "<payload>.<sig>" base64url shape.
ErrStateMalformed = errors.New("composio: state malformed")
// ErrStateSignature is returned when the HMAC signature does not match —
// the state was tampered with or signed by a different secret.
ErrStateSignature = errors.New("composio: state signature mismatch")
// ErrStateExpired is returned when the state's exp claim is in the past.
ErrStateExpired = errors.New("composio: state expired")
)
// stateClaims is the payload embedded in the signed connect-state. It carries
// exactly what CompleteCallback needs to attribute the callback to a user and
// toolkit without a server-side session table — the signature is what makes it
// trustworthy, the short exp is what bounds replay.
//
// Field names are single letters to keep the encoded token compact; they are
// an internal wire format, never exposed to clients.
type stateClaims struct {
UserID string `json:"u"`
ToolkitSlug string `json:"t"`
// AuthConfigID is the exact Composio auth_config_id resolved at BeginConnect
// and used to create the connect link. Signing it into the state lets
// CompleteCallback verify the returned account was created under THIS
// toolkit's auth config without re-resolving (which could fail-open). It is
// an opaque config handle (ac_…), not a credential.
AuthConfigID string `json:"a"`
Exp int64 `json:"e"`
}
// signState produces a URL-safe "<payload>.<sig>" token. payload is the
// base64url-encoded JSON claims; sig is the base64url-encoded HMAC-SHA256 of
// the payload under the service secret. We sign the encoded payload (not the
// raw struct) so verification re-derives the exact bytes that were signed.
func signState(secret []byte, claims stateClaims) (string, error) {
raw, err := json.Marshal(claims)
if err != nil {
return "", err
}
payload := base64.RawURLEncoding.EncodeToString(raw)
sig := signPayload(secret, payload)
return payload + "." + sig, nil
}
// verifyState validates the signature and expiry of a token produced by
// signState and returns the embedded claims. Signature is checked with a
// constant-time compare before the payload is trusted; expiry is checked
// against now.
func verifyState(secret []byte, token string, now time.Time) (stateClaims, error) {
payload, sig, found := strings.Cut(token, ".")
if !found || payload == "" || sig == "" {
return stateClaims{}, ErrStateMalformed
}
expected := signPayload(secret, payload)
if !hmac.Equal([]byte(sig), []byte(expected)) {
return stateClaims{}, ErrStateSignature
}
raw, err := base64.RawURLEncoding.DecodeString(payload)
if err != nil {
return stateClaims{}, ErrStateMalformed
}
var claims stateClaims
if err := json.Unmarshal(raw, &claims); err != nil {
return stateClaims{}, ErrStateMalformed
}
if now.Unix() > claims.Exp {
return stateClaims{}, ErrStateExpired
}
return claims, nil
}
// signPayload returns the base64url HMAC-SHA256 of payload under secret.
func signPayload(secret []byte, payload string) string {
mac := hmac.New(sha256.New, secret)
mac.Write([]byte(payload))
return base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
}

View File

@@ -1,97 +0,0 @@
package composio
import (
"errors"
"strings"
"testing"
"time"
)
var testSecret = []byte("test-state-secret-0123456789")
func TestSignVerifyState_RoundTrip(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{
UserID: "11111111-1111-1111-1111-111111111111",
ToolkitSlug: "notion",
AuthConfigID: "ac_notion",
Exp: now.Add(5 * time.Minute).Unix(),
})
if err != nil {
t.Fatalf("signState: %v", err)
}
got, err := verifyState(testSecret, tok, now)
if err != nil {
t.Fatalf("verifyState: %v", err)
}
if got.UserID != "11111111-1111-1111-1111-111111111111" {
t.Errorf("user id = %q", got.UserID)
}
if got.ToolkitSlug != "notion" {
t.Errorf("toolkit slug = %q", got.ToolkitSlug)
}
if got.AuthConfigID != "ac_notion" {
t.Errorf("auth config id = %q", got.AuthConfigID)
}
}
func TestVerifyState_Expired(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{
UserID: "u",
ToolkitSlug: "notion",
Exp: now.Add(-time.Second).Unix(),
})
if err != nil {
t.Fatalf("signState: %v", err)
}
if _, err := verifyState(testSecret, tok, now); !errors.Is(err, ErrStateExpired) {
t.Fatalf("expected ErrStateExpired, got %v", err)
}
}
func TestVerifyState_Tampered(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, err := signState(testSecret, stateClaims{UserID: "u", ToolkitSlug: "notion", Exp: now.Add(time.Minute).Unix()})
if err != nil {
t.Fatalf("signState: %v", err)
}
// Flip a byte in the payload segment.
payload, sig, _ := strings.Cut(tok, ".")
tampered := payload[:len(payload)-1] + flipLastChar(payload) + "." + sig
if _, err := verifyState(testSecret, tampered, now); !errors.Is(err, ErrStateSignature) && !errors.Is(err, ErrStateMalformed) {
t.Fatalf("expected signature/malformed error, got %v", err)
}
}
func TestVerifyState_WrongSecret(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
tok, _ := signState(testSecret, stateClaims{UserID: "u", ToolkitSlug: "notion", Exp: now.Add(time.Minute).Unix()})
if _, err := verifyState([]byte("a-different-secret"), tok, now); !errors.Is(err, ErrStateSignature) {
t.Fatalf("expected ErrStateSignature, got %v", err)
}
}
func TestVerifyState_Malformed(t *testing.T) {
t.Parallel()
now := time.Unix(1_700_000_000, 0)
for _, tok := range []string{"", "nodot", ".", "a.", ".b"} {
if _, err := verifyState(testSecret, tok, now); !errors.Is(err, ErrStateMalformed) {
t.Errorf("token %q: expected ErrStateMalformed, got %v", tok, err)
}
}
}
// flipLastChar returns a single replacement char different from the payload's
// last character so the tampered payload is guaranteed to differ.
func flipLastChar(payload string) string {
last := payload[len(payload)-1]
if last == 'A' {
return "B"
}
return "A"
}

View File

@@ -1,43 +1,20 @@
package middleware
import (
"net/http"
"strings"
)
import "net/http"
const cspBaseHeader = "default-src 'self'; " +
const cspHeader = "default-src 'self'; " +
"script-src 'self'; " +
"style-src 'self' 'unsafe-inline'; " +
"img-src 'self' https: data:; " +
"connect-src 'self' wss:; "
const cspHeader = cspBaseHeader +
"connect-src 'self' wss:; " +
"frame-ancestors 'none'; " +
"object-src 'none'; " +
"base-uri 'self'; " +
"form-action 'self'"
const attachmentPreviewCSPHeader = cspBaseHeader +
"frame-ancestors 'self'; " +
"object-src 'none'; " +
"base-uri 'self'; " +
"form-action 'self'"
func ContentSecurityPolicy(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Security-Policy", contentSecurityPolicyForRequest(r))
w.Header().Set("Content-Security-Policy", cspHeader)
next.ServeHTTP(w, r)
})
}
func contentSecurityPolicyForRequest(r *http.Request) string {
if isAttachmentPreviewDocumentPath(r.URL.Path) {
return attachmentPreviewCSPHeader
}
return cspHeader
}
func isAttachmentPreviewDocumentPath(path string) bool {
return strings.HasPrefix(path, "/api/attachments/") &&
(strings.HasSuffix(path, "/download") || strings.HasSuffix(path, "/content"))
}

View File

@@ -17,48 +17,16 @@ func TestContentSecurityPolicy(t *testing.T) {
handler.ServeHTTP(rec, req)
csp := rec.Header().Get("Content-Security-Policy")
assertCSPDirectives(t, csp, []string{
if csp == "" {
t.Fatal("Content-Security-Policy header is missing")
}
required := []string{
"script-src 'self'",
"object-src 'none'",
"frame-ancestors 'none'",
"base-uri 'self'",
"form-action 'self'",
})
}
func TestContentSecurityPolicyAllowsSameOriginAttachmentPreviews(t *testing.T) {
handler := ContentSecurityPolicy(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
for _, path := range []string{
"/api/attachments/019f0dae-0315-79b7-b653-f55d6af90403/download",
"/api/attachments/019f0dae-0315-79b7-b653-f55d6af90403/content",
} {
t.Run(path, func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, path, nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
csp := rec.Header().Get("Content-Security-Policy")
assertCSPDirectives(t, csp, []string{
"script-src 'self'",
"object-src 'none'",
"frame-ancestors 'self'",
"base-uri 'self'",
"form-action 'self'",
})
if strings.Contains(csp, "frame-ancestors 'none'") {
t.Fatalf("attachment preview CSP must not block same-origin iframe embedding; got: %s", csp)
}
})
}
}
func assertCSPDirectives(t *testing.T, csp string, required []string) {
t.Helper()
if csp == "" {
t.Fatal("Content-Security-Policy header is missing")
}
for _, directive := range required {
if !strings.Contains(csp, directive) {

View File

@@ -1833,23 +1833,15 @@ func (s *TaskService) HandleFailedTasks(ctx context.Context, tasks []db.AgentTas
"error", checkErr,
)
} else if !hasActive {
updatedIssue, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
if _, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: t.IssueID,
Status: "todo",
WorkspaceID: issue.WorkspaceID,
})
if updateErr != nil {
}); updateErr != nil {
slog.Warn("handle failed tasks: reset stuck issue failed",
"issue_id", issueKey,
"error", updateErr,
)
} else {
// This direct reset bypasses the HTTP UpdateIssue
// handler that normally emits issue:updated, so emit
// it here too. Without it the board / status-filter
// caches keep showing the issue as in_progress until
// the next write touches it (#4648 / MUL-3782).
s.broadcastIssueUpdated(updatedIssue, issue.Status)
}
}
}
@@ -2269,32 +2261,14 @@ func (s *TaskService) broadcastChatDone(ctx context.Context, task db.AgentTaskQu
})
}
// broadcastIssueUpdated publishes the issue:updated event the frontend's
// realtime reconcile (onIssueUpdated) relies on to move an issue between status
// columns / status filters and reconcile their bucket counts. prevStatus is the
// issue's status before the write so the client can gate that reconcile on
// status_changed.
//
// The `issue` payload is a map (issueToMap), which the workspace WS fanout
// (listeners.go SubscribeAll) marshals and broadcasts as-is — that is what
// drives the UI reconcile. Note this does NOT cover the full HTTP UpdateIssue
// side effects: the activity-log and inbox listeners type-assert `issue` to a
// handler.IssueResponse and skip a map, so a background status reset does not
// emit status-change activity / notifications. That is intentional for the
// realtime-staleness fix (#4648 / MUL-3782); folding those side effects in
// would mean unifying the payload type and is left as a follow-up.
func (s *TaskService) broadcastIssueUpdated(issue db.Issue, prevStatus string) {
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
prefix := s.getIssuePrefix(issue.WorkspaceID)
s.Bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"issue": issueToMap(issue, prefix),
"status_changed": prevStatus != issue.Status,
"prev_status": prevStatus,
},
Payload: map[string]any{"issue": issueToMap(issue, prefix)},
})
}

View File

@@ -1,119 +0,0 @@
package service
import (
"context"
"testing"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// noRowsDBTX makes every read return pgx.ErrNoRows so getIssuePrefix's
// GetWorkspace lookup falls back to an empty prefix without needing a DB. The
// helper under test still publishes regardless of the prefix result.
type noRowsDBTX struct{}
func (noRowsDBTX) Exec(context.Context, string, ...any) (pgconn.CommandTag, error) {
return pgconn.NewCommandTag(""), nil
}
func (noRowsDBTX) Query(context.Context, string, ...any) (pgx.Rows, error) {
return nil, pgx.ErrNoRows
}
func (noRowsDBTX) QueryRow(context.Context, string, ...any) pgx.Row { return noRow{} }
type noRow struct{}
func (noRow) Scan(...any) error { return pgx.ErrNoRows }
// TestBroadcastIssueUpdated_EmitsStatusChange pins the realtime contract behind
// #4648 / MUL-3782: when a background path resets an issue's status (e.g. the
// failed-task handler flipping a stuck in_progress issue back to todo), it must
// publish issue:updated with status_changed=true and the new status so the
// frontend's onIssueUpdated reconcile moves the card between status columns /
// filters instead of leaving it stale until the next unrelated write.
func TestBroadcastIssueUpdated_EmitsStatusChange(t *testing.T) {
bus := events.New()
var got []events.Event
bus.SubscribeAll(func(e events.Event) { got = append(got, e) })
svc := &TaskService{
Queries: db.New(noRowsDBTX{}),
Bus: bus,
}
issue := db.Issue{
ID: testUUID(1),
WorkspaceID: testUUID(2),
Number: 7,
Status: "todo",
}
svc.broadcastIssueUpdated(issue, "in_progress")
if len(got) != 1 {
t.Fatalf("expected exactly 1 published event, got %d", len(got))
}
e := got[0]
if e.Type != protocol.EventIssueUpdated {
t.Fatalf("expected event type %q, got %q", protocol.EventIssueUpdated, e.Type)
}
if e.WorkspaceID != util.UUIDToString(issue.WorkspaceID) {
t.Fatalf("workspace mismatch: got %q want %q", e.WorkspaceID, util.UUIDToString(issue.WorkspaceID))
}
payload, ok := e.Payload.(map[string]any)
if !ok {
t.Fatalf("payload is not map[string]any: %T", e.Payload)
}
if payload["status_changed"] != true {
t.Errorf("expected status_changed=true, got %v", payload["status_changed"])
}
if payload["prev_status"] != "in_progress" {
t.Errorf("expected prev_status=in_progress, got %v", payload["prev_status"])
}
issueMap, ok := payload["issue"].(map[string]any)
if !ok {
t.Fatalf("issue payload is not map[string]any: %T", payload["issue"])
}
if issueMap["status"] != "todo" {
t.Errorf("expected issue.status=todo, got %v", issueMap["status"])
}
if issueMap["id"] != util.UUIDToString(issue.ID) {
t.Errorf("issue.id mismatch: got %v want %q", issueMap["id"], util.UUIDToString(issue.ID))
}
}
// TestBroadcastIssueUpdated_NoStatusChange guards the gate: a same-status
// broadcast reports status_changed=false so the client skips the status-bucket
// reconcile for non-status field updates.
func TestBroadcastIssueUpdated_NoStatusChange(t *testing.T) {
bus := events.New()
var got []events.Event
bus.SubscribeAll(func(e events.Event) { got = append(got, e) })
svc := &TaskService{
Queries: db.New(noRowsDBTX{}),
Bus: bus,
}
issue := db.Issue{
ID: testUUID(1),
WorkspaceID: testUUID(2),
Status: "todo",
}
svc.broadcastIssueUpdated(issue, "todo")
if len(got) != 1 {
t.Fatalf("expected exactly 1 published event, got %d", len(got))
}
payload, ok := got[0].Payload.(map[string]any)
if !ok {
t.Fatalf("payload is not map[string]any: %T", got[0].Payload)
}
if payload["status_changed"] != false {
t.Errorf("expected status_changed=false, got %v", payload["status_changed"])
}
}

View File

@@ -1 +0,0 @@
DROP TABLE IF EXISTS user_composio_connection;

View File

@@ -1,38 +0,0 @@
-- Composio integration (Stage 2 MVP): one row per user-connected Composio
-- account. The row is the local mirror of a Composio "connected account" so the
-- product can list / disconnect connections and build per-user MCP sessions
-- without round-tripping Composio on every read.
--
-- No foreign keys / cascades by design: Multica enforces cross-table
-- relationships at the application layer (see migration 118 dropping the
-- agent_task_queue.initiator_user_id FK). user_id is a "user".id but is left
-- unconstrained here so a user delete does not require a migration-ordered
-- cascade across integration tables.
--
-- composio_user_id always equals the Multica user_id.String() — the
-- application keeps that mapping as an invariant so a Composio session can be
-- created from the Multica user id alone. It is stored explicitly so a future
-- change to the mapping does not silently break already-connected accounts.
CREATE TABLE user_composio_connection (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
toolkit_slug TEXT NOT NULL,
auth_config_id TEXT NOT NULL,
connected_account_id TEXT NOT NULL,
composio_user_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
connected_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (user_id, connected_account_id)
);
-- The hot read path is "active connections for this user" (list endpoint and
-- MCP session builder both filter on user_id + status).
CREATE INDEX user_composio_connection_user_status_idx
ON user_composio_connection(user_id, status);
-- Webhook / callback paths look a row up by its Composio connected_account_id.
CREATE INDEX user_composio_connection_account_idx
ON user_composio_connection(connected_account_id);

View File

@@ -217,7 +217,7 @@ func DetectVersion(ctx context.Context, executablePath string) (string, error) {
// environment variables are deliberately omitted so the string is a hint
// about *what* users are extending, not a dump of the full command line.
var launchHeaders = map[string]string{
"antigravity": "agy -p (non-interactive)",
"antigravity": "agy -p (print mode)",
"claude": "claude (stream-json)",
"codebuddy": "codebuddy (stream-json)",
"codex": "codex app-server",

View File

@@ -2,12 +2,6 @@ package agent
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
)
@@ -103,69 +97,6 @@ func TestDetectVersionFailsForMissingBinary(t *testing.T) {
}
}
// TestDetectVersionTimesOutOnHang guards MUL-3812: a CLI whose `--version`
// never returns (e.g. a brew-installed claude wedged by a bun regression) must
// not stall version detection forever. The daemon detects every runtime's
// version sequentially inside its blocking preflight, so an unbounded probe
// would leave the daemon stuck "starting" and *every* runtime on the host
// disconnected. detectCLIVersion must bound the probe and return an error so
// the registration loop isolates the broken runtime and the rest still
// register. The script also leaves an orphaned child holding the stdout pipe
// open after the parent is killed, exercising the cmd.WaitDelay path.
func TestDetectVersionTimesOutOnHang(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("relies on a /bin/sh hang script")
}
dir := t.TempDir()
script := filepath.Join(dir, "hang.sh")
pidFile := filepath.Join(dir, "child.pid")
// The CLI hangs forever (`wait`) and backgrounds a child that inherits and
// holds our stdout pipe open even after the parent is killed on timeout —
// the exact case cmd.WaitDelay must cover. The child records its PID so we
// can reap it in Cleanup instead of leaking a 60s `sleep` into CI.
body := fmt.Sprintf("#!/bin/sh\nsleep 60 &\necho $! > %q\nwait\n", pidFile)
if err := os.WriteFile(script, []byte(body), 0o755); err != nil {
t.Fatalf("write hang script: %v", err)
}
t.Cleanup(func() {
data, err := os.ReadFile(pidFile)
if err != nil {
return // child never recorded its PID; nothing to reap
}
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil {
return
}
if proc, err := os.FindProcess(pid); err == nil {
_ = proc.Kill()
}
})
orig := detectVersionTimeout
detectVersionTimeout = 200 * time.Millisecond
t.Cleanup(func() { detectVersionTimeout = orig })
done := make(chan error, 1)
start := time.Now()
go func() {
_, err := DetectVersion(context.Background(), script)
done <- err
}()
select {
case err := <-done:
if err == nil {
t.Fatal("expected an error from a hanging --version probe, got nil")
}
if elapsed := time.Since(start); elapsed > 5*time.Second {
t.Fatalf("detection took %v; expected it to be bounded by the timeout", elapsed)
}
case <-time.After(10 * time.Second):
t.Fatal("DetectVersion did not return: version probe is unbounded (regression of MUL-3812)")
}
}
func TestLaunchHeaderCoversAllSupportedBackends(t *testing.T) {
t.Parallel()
@@ -184,18 +115,6 @@ func TestLaunchHeaderCoversAllSupportedBackends(t *testing.T) {
}
}
func TestLaunchHeaderAntigravityAvoidsTextOnlyPrintModeLabel(t *testing.T) {
t.Parallel()
header := LaunchHeader("antigravity")
if header != "agy -p (non-interactive)" {
t.Fatalf("unexpected Antigravity launch header: %q", header)
}
if strings.Contains(header, "print mode") {
t.Fatalf("Antigravity launch header must not imply a text-only mode: %q", header)
}
}
func TestLaunchHeaderReturnsEmptyForUnknownType(t *testing.T) {
t.Parallel()
if header := LaunchHeader("made-up-agent"); header != "" {

View File

@@ -14,14 +14,12 @@ import (
)
// antigravityBackend implements Backend by spawning Google's Antigravity CLI
// with a one-shot prompt (`agy -p <prompt>`). Despite the upstream flag name,
// current agy print mode is still capable of running Antigravity tools; it is
// the daemon-compatible mode because `agy -i` requires an attached TTY. Unlike
// Claude / Codex / Cursor / Gemini, the Antigravity CLI does not expose a
// structured event stream — stdout is plain assistant text (intermediate "I
// will run X" lines and the final reply, all interleaved). The backend
// therefore streams stdout line-by-line as `MessageText` events and accumulates
// the same text as the final `Result.Output`.
// (`agy -p <prompt>`) in non-interactive print mode. Unlike Claude / Codex /
// Cursor / Gemini, the Antigravity CLI does not expose a structured event
// stream — stdout is plain assistant text (intermediate "I will run X" lines
// and the final reply, all interleaved). The backend therefore streams stdout
// line-by-line as `MessageText` events and accumulates the same text as the
// final `Result.Output`.
//
// Session resumption uses `--conversation <id>`. The conversation id is not
// emitted on stdout; we capture it by routing `--log-file` to a temp file and
@@ -156,7 +154,7 @@ func (b *antigravityBackend) Execute(ctx context.Context, prompt string, opts Ex
// success the user can't distinguish from a finished task (MUL-3570).
finalStatus = "timeout"
finalError = fmt.Sprintf(
"agy --print-timeout elapsed after %s waiting for the agent response; a long-running command likely outlived the print timeout",
"agy print mode timed out after %s waiting for the agent response; a long-running command likely outlived --print-timeout",
antigravityPrintTimeout(timeout),
)
} else if providerErr := antigravityProviderError(logPath); finalStatus == "completed" && providerErr != "" {
@@ -272,7 +270,7 @@ var antigravityBlockedArgs = map[string]blockedArgMode{
"-p": blockedWithValue,
"--print": blockedWithValue,
"--prompt": blockedWithValue,
"-i": blockedStandalone, // interactive mode requires a TTY and cannot run under the daemon
"-i": blockedStandalone, // interactive mode would block the daemon
"--prompt-interactive": blockedStandalone,
"-c": blockedStandalone, // resume via --conversation, not --continue
"--continue": blockedStandalone,
@@ -283,8 +281,7 @@ var antigravityBlockedArgs = map[string]blockedArgMode{
"--log-file": blockedWithValue, // daemon needs it for session capture
}
// buildAntigravityArgs assembles the argv for a daemon-compatible one-shot agy
// invocation.
// buildAntigravityArgs assembles the argv for a one-shot agy invocation.
//
// agy -p <prompt> --dangerously-skip-permissions [--model <display name>]
// --print-timeout <duration> --log-file <tmp>

View File

@@ -219,8 +219,6 @@ func TestBuildAntigravityArgsFiltersBlockedCustomArgs(t *testing.T) {
// resume-aware operation.
CustomArgs: []string{
"-p", "hijacked-prompt",
"-i",
"--prompt-interactive",
"--continue",
"-c",
"--conversation", "bad-id",
@@ -249,9 +247,6 @@ func TestBuildAntigravityArgsFiltersBlockedCustomArgs(t *testing.T) {
if strings.Contains(joined, "hijacked-prompt") {
t.Errorf("custom -p value leaked through filter: %v", args)
}
if strings.Contains(joined, "-i") || strings.Contains(joined, "--prompt-interactive") {
t.Errorf("interactive-mode flags leaked through filter: %v", args)
}
if strings.Contains(joined, "bad-id") {
t.Errorf("custom --conversation value leaked through filter: %v", args)
}
@@ -394,8 +389,8 @@ func TestAntigravityBackendPrintTimeoutSurfacesAsTimeout(t *testing.T) {
if result.Status != "timeout" {
t.Fatalf("expected status=timeout, got %q (error=%q)", result.Status, result.Error)
}
if !strings.Contains(result.Error, "agy --print-timeout elapsed") {
t.Errorf("expected error to explain the agy print timeout, got %q", result.Error)
if !strings.Contains(result.Error, "print mode timed out") {
t.Errorf("expected error to explain the print-mode timeout, got %q", result.Error)
}
// Narration streamed before the cut-off must still reach the result so
// the user sees how far the turn got.

View File

@@ -804,30 +804,9 @@ func writeMcpConfigToTemp(raw json.RawMessage) (string, error) {
return f.Name(), nil
}
// detectVersionTimeout bounds a single `<cli> --version` probe. Version
// detection runs inside the daemon's blocking preflight (registerRuntimesForWorkspace),
// so a CLI that never returns from `--version` — e.g. a brew-installed claude
// wedged by a bun regression (MUL-3812) — would otherwise stall the whole
// registration loop, the daemon would never flip /health from "starting" to
// "running", and *every* runtime on the host would appear disconnected. A real
// `--version` returns well under this bound even on a cold cache or with
// Windows AV scanning; the timeout exists only to fail a wedged probe fast and
// in isolation so the remaining runtimes still register. A var (not const) so
// tests can shrink it without waiting out the real bound.
var detectVersionTimeout = 10 * time.Second
func detectCLIVersion(ctx context.Context, execPath string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, detectVersionTimeout)
defer cancel()
cmd := exec.CommandContext(ctx, execPath, "--version")
hideAgentWindow(cmd)
// exec.CommandContext only kills the direct child on timeout. A broken CLI
// (node/bun shim) can leave grandchildren that inherited and still hold our
// stdout pipe open, and cmd.Output() blocks in Wait() until that pipe
// closes — defeating the timeout above. WaitDelay forces the pipes shut and
// reaps shortly after the context fires so this call always returns.
cmd.WaitDelay = 2 * time.Second
data, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("detect version for %s: %w", execPath, err)

View File

@@ -1,92 +0,0 @@
package composio
import (
"context"
"net/http"
"net/url"
"strconv"
"strings"
)
// AuthConfig mirrors a subset of a Composio auth config — the project-level
// record that defines HOW users authenticate with a toolkit (the OAuth client,
// API-key scheme, etc.). The connect-link flow needs its opaque `id` (ac_…);
// the other fields drive selection when a toolkit has more than one.
//
// Spec: https://docs.composio.dev/reference/v3/api-reference/auth-configs/getAuthConfigs
type AuthConfig struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
// Toolkit carries at least the slug (and a logo) the config belongs to.
Toolkit Toolkit `json:"toolkit"`
AuthScheme string `json:"auth_scheme,omitempty"`
// IsComposioManaged is true for Composio's managed OAuth app and false for a
// custom (bring-your-own client_id/secret) config — the white-label case.
IsComposioManaged bool `json:"is_composio_managed"`
// Status is "ENABLED" or "DISABLED". The list endpoint hides disabled
// configs by default (show_disabled=false).
Status string `json:"status,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
LastUpdatedAt string `json:"last_updated_at,omitempty"`
}
// ListAuthConfigsRequest collects the optional filters of GET /auth_configs.
// Zero values are omitted from the query string.
type ListAuthConfigsRequest struct {
// ToolkitSlugs filters to specific toolkits; sent as a single
// comma-separated `toolkit_slug` query param per the v3 spec.
ToolkitSlugs []string
// IsComposioManaged, when non-nil, filters by managed vs custom configs.
IsComposioManaged *bool
// ShowDisabled includes disabled configs (default false = enabled only).
ShowDisabled bool
// Search matches auth configs by name or id.
Search string
// Limit is the page size (max 1000 upstream). 0 = upstream default.
Limit int
// Cursor pages through results.
Cursor string
}
// ListAuthConfigsResponse is the typed paginated response.
type ListAuthConfigsResponse struct {
Items []AuthConfig `json:"items"`
NextCursor string `json:"next_cursor,omitempty"`
TotalItems int `json:"total_items,omitempty"`
}
// ListAuthConfigs returns the auth configs registered in the project, with
// optional filters. The project is resolved from the x-api-key (a project API
// key authenticates to exactly one project), so no project id is passed.
func (c *Client) ListAuthConfigs(ctx context.Context, req ListAuthConfigsRequest) (*ListAuthConfigsResponse, error) {
q := url.Values{}
if len(req.ToolkitSlugs) > 0 {
q.Set("toolkit_slug", strings.Join(req.ToolkitSlugs, ","))
}
if req.IsComposioManaged != nil {
q.Set("is_composio_managed", strconv.FormatBool(*req.IsComposioManaged))
}
if req.ShowDisabled {
q.Set("show_disabled", "true")
}
if req.Search != "" {
q.Set("search", req.Search)
}
if req.Limit > 0 {
q.Set("limit", strconv.Itoa(req.Limit))
}
if req.Cursor != "" {
q.Set("cursor", req.Cursor)
}
path := "/auth_configs"
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
var out ListAuthConfigsResponse
if err := c.do(c.newRequest(ctx), http.MethodGet, path, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -1,155 +0,0 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: composio.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const getUserComposioConnection = `-- name: GetUserComposioConnection :one
SELECT id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at FROM user_composio_connection
WHERE id = $1 AND user_id = $2
`
type GetUserComposioConnectionParams struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
}
// Owner-scoped lookup: a connection can only be read by the user who owns it,
// so one user cannot disconnect another's account by guessing the UUID.
func (q *Queries) GetUserComposioConnection(ctx context.Context, arg GetUserComposioConnectionParams) (UserComposioConnection, error) {
row := q.db.QueryRow(ctx, getUserComposioConnection, arg.ID, arg.UserID)
var i UserComposioConnection
err := row.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const listActiveUserComposioConnections = `-- name: ListActiveUserComposioConnections :many
SELECT id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at FROM user_composio_connection
WHERE user_id = $1 AND status = 'active'
ORDER BY connected_at DESC
`
func (q *Queries) ListActiveUserComposioConnections(ctx context.Context, userID pgtype.UUID) ([]UserComposioConnection, error) {
rows, err := q.db.Query(ctx, listActiveUserComposioConnections, userID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []UserComposioConnection{}
for rows.Next() {
var i UserComposioConnection
if err := rows.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markUserComposioConnectionRevoked = `-- name: MarkUserComposioConnectionRevoked :exec
UPDATE user_composio_connection
SET status = 'revoked', updated_at = now()
WHERE id = $1 AND user_id = $2
`
type MarkUserComposioConnectionRevokedParams struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
}
// Idempotent: re-running on an already-revoked row is a no-op write. Scoped to
// the owner for defense-in-depth.
func (q *Queries) MarkUserComposioConnectionRevoked(ctx context.Context, arg MarkUserComposioConnectionRevokedParams) error {
_, err := q.db.Exec(ctx, markUserComposioConnectionRevoked, arg.ID, arg.UserID)
return err
}
const upsertUserComposioConnection = `-- name: UpsertUserComposioConnection :one
INSERT INTO user_composio_connection (
user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status
) VALUES (
$1, $2, $3, $4, $5, 'active'
)
ON CONFLICT (user_id, connected_account_id) DO UPDATE SET
toolkit_slug = EXCLUDED.toolkit_slug,
auth_config_id = EXCLUDED.auth_config_id,
composio_user_id = EXCLUDED.composio_user_id,
status = 'active',
updated_at = now()
RETURNING id, user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status, connected_at, last_used_at, created_at, updated_at
`
type UpsertUserComposioConnectionParams struct {
UserID pgtype.UUID `json:"user_id"`
ToolkitSlug string `json:"toolkit_slug"`
AuthConfigID string `json:"auth_config_id"`
ConnectedAccountID string `json:"connected_account_id"`
ComposioUserID string `json:"composio_user_id"`
}
// =====================
// User Composio Connection
// =====================
// Idempotent on (user_id, connected_account_id): a duplicate callback for the
// same connected account re-activates the row instead of inserting a second
// one. connected_at is preserved on conflict (first-connect time); updated_at
// moves so the reactivation is observable.
func (q *Queries) UpsertUserComposioConnection(ctx context.Context, arg UpsertUserComposioConnectionParams) (UserComposioConnection, error) {
row := q.db.QueryRow(ctx, upsertUserComposioConnection,
arg.UserID,
arg.ToolkitSlug,
arg.AuthConfigID,
arg.ConnectedAccountID,
arg.ComposioUserID,
)
var i UserComposioConnection
err := row.Scan(
&i.ID,
&i.UserID,
&i.ToolkitSlug,
&i.AuthConfigID,
&i.ConnectedAccountID,
&i.ComposioUserID,
&i.Status,
&i.ConnectedAt,
&i.LastUsedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}

View File

@@ -848,20 +848,6 @@ type User struct {
Timezone pgtype.Text `json:"timezone"`
}
type UserComposioConnection struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`
ToolkitSlug string `json:"toolkit_slug"`
AuthConfigID string `json:"auth_config_id"`
ConnectedAccountID string `json:"connected_account_id"`
ComposioUserID string `json:"composio_user_id"`
Status string `json:"status"`
ConnectedAt pgtype.Timestamptz `json:"connected_at"`
LastUsedAt pgtype.Timestamptz `json:"last_used_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type VerificationCode struct {
ID pgtype.UUID `json:"id"`
Email string `json:"email"`

View File

@@ -1,39 +0,0 @@
-- =====================
-- User Composio Connection
-- =====================
-- name: UpsertUserComposioConnection :one
-- Idempotent on (user_id, connected_account_id): a duplicate callback for the
-- same connected account re-activates the row instead of inserting a second
-- one. connected_at is preserved on conflict (first-connect time); updated_at
-- moves so the reactivation is observable.
INSERT INTO user_composio_connection (
user_id, toolkit_slug, auth_config_id, connected_account_id, composio_user_id, status
) VALUES (
$1, $2, $3, $4, $5, 'active'
)
ON CONFLICT (user_id, connected_account_id) DO UPDATE SET
toolkit_slug = EXCLUDED.toolkit_slug,
auth_config_id = EXCLUDED.auth_config_id,
composio_user_id = EXCLUDED.composio_user_id,
status = 'active',
updated_at = now()
RETURNING *;
-- name: ListActiveUserComposioConnections :many
SELECT * FROM user_composio_connection
WHERE user_id = $1 AND status = 'active'
ORDER BY connected_at DESC;
-- name: GetUserComposioConnection :one
-- Owner-scoped lookup: a connection can only be read by the user who owns it,
-- so one user cannot disconnect another's account by guessing the UUID.
SELECT * FROM user_composio_connection
WHERE id = $1 AND user_id = $2;
-- name: MarkUserComposioConnectionRevoked :exec
-- Idempotent: re-running on an already-revoked row is a no-op write. Scoped to
-- the owner for defense-in-depth.
UPDATE user_composio_connection
SET status = 'revoked', updated_at = now()
WHERE id = $1 AND user_id = $2;

View File

@@ -74,22 +74,6 @@ func TestRedactBearerToken(t *testing.T) {
}
}
// TestRedactBearerMCPToken is a regression guard for the Composio MCP session
// headers (MUL-3720): the SDK attaches the project key as `Bearer mcp_...` on
// some MCP transports, so the generic Bearer pattern must mask it before it can
// reach a log line or WS broadcast.
func TestRedactBearerMCPToken(t *testing.T) {
t.Parallel()
input := "connecting with Authorization: Bearer mcp_AbCdEf0123456789-_token"
got := Text(input)
if strings.Contains(got, "mcp_AbCdEf0123456789") {
t.Fatalf("Bearer mcp_ token not redacted: %s", got)
}
if !strings.Contains(got, "Bearer [REDACTED]") {
t.Fatalf("expected Bearer [REDACTED] placeholder, got: %s", got)
}
}
func TestRedactGenericCredentials(t *testing.T) {
t.Parallel()
cases := []struct {