mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-18 12:18:45 +02:00
Compare commits
3 Commits
agent/lamb
...
refactor/u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f20d05bb7 | ||
|
|
ff27142b69 | ||
|
|
96695a79c5 |
@@ -14,6 +14,7 @@ import { AgentDetailPage } from "./pages/agent-detail-page";
|
||||
import { RuntimeDetailPage } from "./pages/runtime-detail-page";
|
||||
import { IssuesPage } from "@multica/views/issues/components";
|
||||
import { ProjectsPage } from "@multica/views/projects/components";
|
||||
import { DashboardPage } from "@multica/views/dashboard";
|
||||
import { AutopilotsPage } from "@multica/views/autopilots/components";
|
||||
import { MyIssuesPage } from "@multica/views/my-issues";
|
||||
import { SkillsPage } from "@multica/views/skills";
|
||||
@@ -146,6 +147,11 @@ export const appRoutes: RouteObject[] = [
|
||||
handle: { title: "Agent" },
|
||||
},
|
||||
{ path: "inbox", element: <InboxPage />, handle: { title: "Inbox" } },
|
||||
{
|
||||
path: "usage",
|
||||
element: <DashboardPage />,
|
||||
handle: { title: "Usage" },
|
||||
},
|
||||
{
|
||||
path: "settings",
|
||||
element: (
|
||||
|
||||
1
apps/web/app/[workspaceSlug]/(dashboard)/usage/page.tsx
Normal file
1
apps/web/app/[workspaceSlug]/(dashboard)/usage/page.tsx
Normal file
@@ -0,0 +1 @@
|
||||
export { DashboardPage as default } from "@multica/views/dashboard";
|
||||
@@ -38,6 +38,9 @@ import type {
|
||||
RuntimeHourlyActivity,
|
||||
RuntimeUsageByAgent,
|
||||
RuntimeUsageByHour,
|
||||
DashboardUsageDaily,
|
||||
DashboardUsageByAgent,
|
||||
DashboardAgentRunTime,
|
||||
RuntimeUpdate,
|
||||
RuntimeModelListRequest,
|
||||
RuntimeLocalSkillListRequest,
|
||||
@@ -94,6 +97,9 @@ import {
|
||||
AttachmentResponseSchema,
|
||||
ChildIssuesResponseSchema,
|
||||
CommentsListSchema,
|
||||
DashboardAgentRunTimeListSchema,
|
||||
DashboardUsageByAgentListSchema,
|
||||
DashboardUsageDailyListSchema,
|
||||
EMPTY_ATTACHMENT,
|
||||
EMPTY_LIST_ISSUES_RESPONSE,
|
||||
EMPTY_TIMELINE_ENTRIES,
|
||||
@@ -700,6 +706,58 @@ export class ApiClient {
|
||||
return this.fetch(`/api/runtimes/${runtimeId}/usage/by-hour?${search}`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workspace dashboard — three independent rollups for `/{slug}/dashboard`.
|
||||
// Each accepts an optional `project_id` to narrow the scope to one project.
|
||||
// Cost is computed client-side from the model pricing table (same contract
|
||||
// as the per-runtime endpoints above).
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async getDashboardUsageDaily(
|
||||
params: { days?: number; project_id?: string | null },
|
||||
): Promise<DashboardUsageDaily[]> {
|
||||
const search = new URLSearchParams();
|
||||
if (params.days) search.set("days", String(params.days));
|
||||
if (params.project_id) search.set("project_id", params.project_id);
|
||||
const raw = await this.fetch<unknown>(`/api/dashboard/usage/daily?${search}`);
|
||||
return parseWithFallback<DashboardUsageDaily[]>(
|
||||
raw,
|
||||
DashboardUsageDailyListSchema,
|
||||
[],
|
||||
{ endpoint: "GET /api/dashboard/usage/daily" },
|
||||
);
|
||||
}
|
||||
|
||||
async getDashboardUsageByAgent(
|
||||
params: { days?: number; project_id?: string | null },
|
||||
): Promise<DashboardUsageByAgent[]> {
|
||||
const search = new URLSearchParams();
|
||||
if (params.days) search.set("days", String(params.days));
|
||||
if (params.project_id) search.set("project_id", params.project_id);
|
||||
const raw = await this.fetch<unknown>(`/api/dashboard/usage/by-agent?${search}`);
|
||||
return parseWithFallback<DashboardUsageByAgent[]>(
|
||||
raw,
|
||||
DashboardUsageByAgentListSchema,
|
||||
[],
|
||||
{ endpoint: "GET /api/dashboard/usage/by-agent" },
|
||||
);
|
||||
}
|
||||
|
||||
async getDashboardAgentRunTime(
|
||||
params: { days?: number; project_id?: string | null },
|
||||
): Promise<DashboardAgentRunTime[]> {
|
||||
const search = new URLSearchParams();
|
||||
if (params.days) search.set("days", String(params.days));
|
||||
if (params.project_id) search.set("project_id", params.project_id);
|
||||
const raw = await this.fetch<unknown>(`/api/dashboard/agent-runtime?${search}`);
|
||||
return parseWithFallback<DashboardAgentRunTime[]>(
|
||||
raw,
|
||||
DashboardAgentRunTimeListSchema,
|
||||
[],
|
||||
{ endpoint: "GET /api/dashboard/agent-runtime" },
|
||||
);
|
||||
}
|
||||
|
||||
async initiateUpdate(
|
||||
runtimeId: string,
|
||||
targetVersion: string,
|
||||
|
||||
@@ -169,3 +169,46 @@ export const SubscribersListSchema = z.array(SubscriberSchema);
|
||||
export const ChildIssuesResponseSchema = z.object({
|
||||
issues: z.array(IssueSchema).default([]),
|
||||
}).loose();
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workspace dashboard schemas
|
||||
//
|
||||
// The dashboard hits three independent rollup endpoints. Each returns a flat
|
||||
// array, and every field is consumed by chart / KPI math — a missing number
|
||||
// silently degrades to NaN downstream, so we coerce missing numbers to 0.
|
||||
// String fields stay lenient (no enum narrowing) to survive future model /
|
||||
// agent ID drift.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const DashboardUsageDailySchema = z.object({
|
||||
date: z.string(),
|
||||
model: z.string(),
|
||||
input_tokens: z.number().default(0),
|
||||
output_tokens: z.number().default(0),
|
||||
cache_read_tokens: z.number().default(0),
|
||||
cache_write_tokens: z.number().default(0),
|
||||
task_count: z.number().default(0),
|
||||
}).loose();
|
||||
|
||||
export const DashboardUsageDailyListSchema = z.array(DashboardUsageDailySchema);
|
||||
|
||||
const DashboardUsageByAgentSchema = z.object({
|
||||
agent_id: z.string(),
|
||||
model: z.string(),
|
||||
input_tokens: z.number().default(0),
|
||||
output_tokens: z.number().default(0),
|
||||
cache_read_tokens: z.number().default(0),
|
||||
cache_write_tokens: z.number().default(0),
|
||||
task_count: z.number().default(0),
|
||||
}).loose();
|
||||
|
||||
export const DashboardUsageByAgentListSchema = z.array(DashboardUsageByAgentSchema);
|
||||
|
||||
const DashboardAgentRunTimeSchema = z.object({
|
||||
agent_id: z.string(),
|
||||
total_seconds: z.number().default(0),
|
||||
task_count: z.number().default(0),
|
||||
failed_count: z.number().default(0),
|
||||
}).loose();
|
||||
|
||||
export const DashboardAgentRunTimeListSchema = z.array(DashboardAgentRunTimeSchema);
|
||||
|
||||
1
packages/core/dashboard/index.ts
Normal file
1
packages/core/dashboard/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from "./queries";
|
||||
72
packages/core/dashboard/queries.ts
Normal file
72
packages/core/dashboard/queries.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { queryOptions } from "@tanstack/react-query";
|
||||
import { api } from "../api";
|
||||
|
||||
// Workspace dashboard query options. All three endpoints share the same
|
||||
// (wsId, days, projectId) key shape so workspace switching, time-range
|
||||
// changes, and the project filter each invalidate the cache cleanly.
|
||||
//
|
||||
// The cache key includes `wsId` explicitly: TanStack Query already isolates
|
||||
// per workspace via the key, but threading wsId into the queryFn lets
|
||||
// callers fail fast (return [] on empty wsId) instead of issuing a request
|
||||
// the server would reject.
|
||||
//
|
||||
// `projectId` is normalised to `null` (not undefined / "all") so the
|
||||
// queryKey shape is stable across renders even when the dropdown sits on
|
||||
// "all projects".
|
||||
|
||||
export const dashboardKeys = {
|
||||
all: (wsId: string) => ["dashboard", wsId] as const,
|
||||
daily: (wsId: string, days: number, projectId: string | null) =>
|
||||
[...dashboardKeys.all(wsId), "daily", days, projectId] as const,
|
||||
byAgent: (wsId: string, days: number, projectId: string | null) =>
|
||||
[...dashboardKeys.all(wsId), "by-agent", days, projectId] as const,
|
||||
agentRuntime: (wsId: string, days: number, projectId: string | null) =>
|
||||
[...dashboardKeys.all(wsId), "agent-runtime", days, projectId] as const,
|
||||
};
|
||||
|
||||
// 60s staleTime matches the per-runtime usage queries — the data is rollup-
|
||||
// driven on the server (5-min rollup cadence) and the dashboard isn't a
|
||||
// real-time view, so background refetches every minute are plenty.
|
||||
const STALE_TIME = 60 * 1000;
|
||||
|
||||
export function dashboardUsageDailyOptions(
|
||||
wsId: string,
|
||||
days: number,
|
||||
projectId: string | null,
|
||||
) {
|
||||
return queryOptions({
|
||||
queryKey: dashboardKeys.daily(wsId, days, projectId),
|
||||
queryFn: () =>
|
||||
api.getDashboardUsageDaily({ days, project_id: projectId ?? undefined }),
|
||||
enabled: !!wsId,
|
||||
staleTime: STALE_TIME,
|
||||
});
|
||||
}
|
||||
|
||||
export function dashboardUsageByAgentOptions(
|
||||
wsId: string,
|
||||
days: number,
|
||||
projectId: string | null,
|
||||
) {
|
||||
return queryOptions({
|
||||
queryKey: dashboardKeys.byAgent(wsId, days, projectId),
|
||||
queryFn: () =>
|
||||
api.getDashboardUsageByAgent({ days, project_id: projectId ?? undefined }),
|
||||
enabled: !!wsId,
|
||||
staleTime: STALE_TIME,
|
||||
});
|
||||
}
|
||||
|
||||
export function dashboardAgentRunTimeOptions(
|
||||
wsId: string,
|
||||
days: number,
|
||||
projectId: string | null,
|
||||
) {
|
||||
return queryOptions({
|
||||
queryKey: dashboardKeys.agentRuntime(wsId, days, projectId),
|
||||
queryFn: () =>
|
||||
api.getDashboardAgentRunTime({ days, project_id: projectId ?? undefined }),
|
||||
enabled: !!wsId,
|
||||
staleTime: STALE_TIME,
|
||||
});
|
||||
}
|
||||
@@ -47,6 +47,8 @@
|
||||
"./runtimes/mutations": "./runtimes/mutations.ts",
|
||||
"./runtimes/hooks": "./runtimes/hooks.ts",
|
||||
"./runtimes/custom-pricing-store": "./runtimes/custom-pricing-store.ts",
|
||||
"./dashboard": "./dashboard/index.ts",
|
||||
"./dashboard/queries": "./dashboard/queries.ts",
|
||||
"./agents": "./agents/index.ts",
|
||||
"./agents/queries": "./agents/queries.ts",
|
||||
"./agents/derive-presence": "./agents/derive-presence.ts",
|
||||
|
||||
@@ -17,6 +17,7 @@ describe("paths.workspace() shape", () => {
|
||||
expect(new Set(parameterlessRoutes)).toEqual(
|
||||
new Set([
|
||||
"root",
|
||||
"usage",
|
||||
"issues",
|
||||
"projects",
|
||||
"autopilots",
|
||||
@@ -35,6 +36,7 @@ describe("paths.workspace() shape", () => {
|
||||
// Check that none of the parameterless paths embed a leaked literal
|
||||
// and that their second URL segment matches the method name's kebab-case.
|
||||
const expectedSegments: Array<[string, string]> = [
|
||||
["usage", "usage"],
|
||||
["issues", "issues"],
|
||||
["projects", "projects"],
|
||||
["autopilots", "autopilots"],
|
||||
|
||||
@@ -4,7 +4,8 @@ import { paths, isGlobalPath } from "./paths";
|
||||
describe("paths.workspace(slug)", () => {
|
||||
const ws = paths.workspace("acme");
|
||||
|
||||
it("builds dashboard paths with slug prefix", () => {
|
||||
it("builds workspace paths with slug prefix", () => {
|
||||
expect(ws.usage()).toBe("/acme/usage");
|
||||
expect(ws.issues()).toBe("/acme/issues");
|
||||
expect(ws.issueDetail("abc-123")).toBe("/acme/issues/abc-123");
|
||||
expect(ws.projects()).toBe("/acme/projects");
|
||||
|
||||
@@ -18,6 +18,7 @@ function workspaceScoped(slug: string) {
|
||||
const ws = `/${encode(slug)}`;
|
||||
return {
|
||||
root: () => `${ws}/issues`,
|
||||
usage: () => `${ws}/usage`,
|
||||
issues: () => `${ws}/issues`,
|
||||
issueDetail: (id: string) => `${ws}/issues/${encode(id)}`,
|
||||
projects: () => `${ws}/projects`,
|
||||
|
||||
@@ -70,7 +70,7 @@ export const RESERVED_SLUGS: ReadonlySet<string> = new Set([
|
||||
"search",
|
||||
"members",
|
||||
|
||||
// Dashboard / workspace route segments
|
||||
// Workspace route segments
|
||||
// Reserving each segment name prevents `/{slug}/{view}` from being visually
|
||||
// ambiguous (e.g. a workspace named `issues` would make `/issues/abc` mean two
|
||||
// things). `workspaces` covers the global `/workspaces/new` workspace-creation
|
||||
@@ -81,6 +81,7 @@ export const RESERVED_SLUGS: ReadonlySet<string> = new Set([
|
||||
"agents",
|
||||
"inbox",
|
||||
"my-issues",
|
||||
"usage",
|
||||
"runtimes",
|
||||
"skills",
|
||||
"settings",
|
||||
|
||||
@@ -289,6 +289,44 @@ export interface RuntimeUsageByHour {
|
||||
task_count: number;
|
||||
}
|
||||
|
||||
// One (date, model) bucket of token usage for the workspace dashboard.
|
||||
// Same shape as RuntimeUsage but workspace-scoped (no runtime_id, no
|
||||
// provider field on the wire) and optionally narrowed to a single project
|
||||
// on the server side. Cost stays client-side via the model pricing table.
|
||||
export interface DashboardUsageDaily {
|
||||
date: string;
|
||||
model: string;
|
||||
input_tokens: number;
|
||||
output_tokens: number;
|
||||
cache_read_tokens: number;
|
||||
cache_write_tokens: number;
|
||||
task_count: number;
|
||||
}
|
||||
|
||||
// Per-(agent, model) token totals for the workspace dashboard. Identical
|
||||
// wire shape to RuntimeUsageByAgent — the client folds by agent_id and
|
||||
// sums cost.
|
||||
export interface DashboardUsageByAgent {
|
||||
agent_id: string;
|
||||
model: string;
|
||||
input_tokens: number;
|
||||
output_tokens: number;
|
||||
cache_read_tokens: number;
|
||||
cache_write_tokens: number;
|
||||
task_count: number;
|
||||
}
|
||||
|
||||
// Per-agent total terminal-task run-time + counts. Powers the workspace
|
||||
// dashboard's "time by agent" list. failed_count is a subset of
|
||||
// task_count (failed tasks still contribute to total_seconds because
|
||||
// they consumed runtime to fail).
|
||||
export interface DashboardAgentRunTime {
|
||||
agent_id: string;
|
||||
total_seconds: number;
|
||||
task_count: number;
|
||||
failed_count: number;
|
||||
}
|
||||
|
||||
export type RuntimeUpdateStatus =
|
||||
| "pending"
|
||||
| "running"
|
||||
|
||||
@@ -23,6 +23,9 @@ export type {
|
||||
RuntimeHourlyActivity,
|
||||
RuntimeUsageByAgent,
|
||||
RuntimeUsageByHour,
|
||||
DashboardUsageDaily,
|
||||
DashboardUsageByAgent,
|
||||
DashboardAgentRunTime,
|
||||
RuntimeUpdate,
|
||||
RuntimeUpdateStatus,
|
||||
RuntimeModel,
|
||||
|
||||
503
packages/views/dashboard/components/dashboard-page.tsx
Normal file
503
packages/views/dashboard/components/dashboard-page.tsx
Normal file
@@ -0,0 +1,503 @@
|
||||
"use client";
|
||||
|
||||
import { useMemo, useState } from "react";
|
||||
import { BarChart3, FolderKanban } from "lucide-react";
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { Skeleton } from "@multica/ui/components/ui/skeleton";
|
||||
import {
|
||||
Select,
|
||||
SelectContent,
|
||||
SelectItem,
|
||||
SelectTrigger,
|
||||
SelectValue,
|
||||
} from "@multica/ui/components/ui/select";
|
||||
import { useWorkspaceId } from "@multica/core/hooks";
|
||||
import { agentListOptions } from "@multica/core/workspace/queries";
|
||||
import { projectListOptions } from "@multica/core/projects/queries";
|
||||
import {
|
||||
dashboardUsageDailyOptions,
|
||||
dashboardUsageByAgentOptions,
|
||||
dashboardAgentRunTimeOptions,
|
||||
} from "@multica/core/dashboard";
|
||||
import { useCustomPricingStore } from "@multica/core/runtimes/custom-pricing-store";
|
||||
import { PageHeader } from "../../layout/page-header";
|
||||
import { KpiCard } from "../../runtimes/components/shared";
|
||||
import { DailyCostChart } from "../../runtimes/components/charts";
|
||||
import { ProjectIcon } from "../../projects/components/project-icon";
|
||||
import { ActorAvatar } from "../../common/actor-avatar";
|
||||
import { formatTokens } from "../../runtimes/utils";
|
||||
import { useT } from "../../i18n";
|
||||
import {
|
||||
aggregateAgentTokens,
|
||||
aggregateDailyCost,
|
||||
computeDailyTotals,
|
||||
formatDuration,
|
||||
mergeAgentDashboardRows,
|
||||
type AgentDashboardRow,
|
||||
} from "../utils";
|
||||
|
||||
// One-place source of truth for the period selector. Matches the runtime
|
||||
// detail page so users see the same three options across the dashboards.
|
||||
const TIME_RANGES = [
|
||||
{ label: "7d", days: 7 },
|
||||
{ label: "30d", days: 30 },
|
||||
{ label: "90d", days: 90 },
|
||||
] as const;
|
||||
type TimeRange = (typeof TIME_RANGES)[number]["days"];
|
||||
|
||||
// Sentinel for "no project filter" — kept distinct from the empty string
|
||||
// so it survives a refactor that ever lets a project be slug-keyed.
|
||||
const ALL_PROJECTS = "__all__";
|
||||
|
||||
// Stable references — `data ?? []` would create a new empty array on
|
||||
// every render while the query is loading, which breaks useMemo's
|
||||
// reference-equality dep check and trips the exhaustive-deps lint rule.
|
||||
const EMPTY_DAILY: import("@multica/core/types").DashboardUsageDaily[] = [];
|
||||
const EMPTY_BY_AGENT: import("@multica/core/types").DashboardUsageByAgent[] = [];
|
||||
const EMPTY_RUNTIME: import("@multica/core/types").DashboardAgentRunTime[] = [];
|
||||
|
||||
function fmtMoney(n: number): string {
|
||||
if (n >= 100) return `$${n.toFixed(0)}`;
|
||||
return `$${n.toFixed(2)}`;
|
||||
}
|
||||
|
||||
// Local segmented control — same visual language the runtime usage section
|
||||
// uses for its period / tab toggles. shadcn's Tabs is wired for full tab
|
||||
// pages with ARIA semantics the compact toolbar pill doesn't need.
|
||||
function Segmented<T extends string | number>({
|
||||
value,
|
||||
onChange,
|
||||
options,
|
||||
}: {
|
||||
value: T;
|
||||
onChange: (v: T) => void;
|
||||
options: readonly { label: string; value: T }[];
|
||||
}) {
|
||||
return (
|
||||
<div className="inline-flex items-center gap-0.5 rounded-md bg-muted p-0.5">
|
||||
{options.map((o) => (
|
||||
<button
|
||||
key={String(o.value)}
|
||||
type="button"
|
||||
onClick={() => onChange(o.value)}
|
||||
className={`rounded-sm px-2.5 py-1 text-xs font-medium transition-colors ${
|
||||
o.value === value
|
||||
? "bg-background text-foreground shadow-sm"
|
||||
: "text-muted-foreground hover:text-foreground"
|
||||
}`}
|
||||
>
|
||||
{o.label}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Workspace + project token / run-time dashboard.
|
||||
*
|
||||
* Lives at `/{slug}/dashboard`. Three independent rollups (daily cost,
|
||||
* per-agent tokens, per-agent run-time) feed four KPI tiles, a daily cost
|
||||
* chart, and a combined "by agent" list. A project dropdown narrows every
|
||||
* query to one project; the period selector applies to all three.
|
||||
*
|
||||
* Cost math runs client-side via the runtimes utils — keeps the dashboard
|
||||
* and the runtime page using one pricing table.
|
||||
*/
|
||||
export function DashboardPage() {
|
||||
const { t } = useT("usage");
|
||||
const wsId = useWorkspaceId();
|
||||
const [days, setDays] = useState<TimeRange>(30);
|
||||
const [projectValue, setProjectValue] = useState<string>(ALL_PROJECTS);
|
||||
|
||||
// The user can save model prices from the runtimes page; re-render when
|
||||
// they do so the dashboard reflects the new rates.
|
||||
useCustomPricingStore((s) => s.pricings);
|
||||
|
||||
const { data: projects = [] } = useQuery(projectListOptions(wsId));
|
||||
const { data: agents = [] } = useQuery(agentListOptions(wsId));
|
||||
|
||||
// Validate the picked project against the current workspace's list. A
|
||||
// stale UUID — left over from a project that's been deleted, or from the
|
||||
// previous workspace after a switch — would silently filter all three
|
||||
// queries to empty rows while the dropdown still reads "All projects".
|
||||
// Derive the effective filter so the API call matches the user-visible
|
||||
// selection.
|
||||
const projectId = useMemo(() => {
|
||||
if (projectValue === ALL_PROJECTS) return null;
|
||||
return projects.some((p) => p.id === projectValue) ? projectValue : null;
|
||||
}, [projectValue, projects]);
|
||||
|
||||
const dailyQuery = useQuery(dashboardUsageDailyOptions(wsId, days, projectId));
|
||||
const byAgentQuery = useQuery(dashboardUsageByAgentOptions(wsId, days, projectId));
|
||||
const runTimeQuery = useQuery(dashboardAgentRunTimeOptions(wsId, days, projectId));
|
||||
|
||||
const dailyUsage = dailyQuery.data ?? EMPTY_DAILY;
|
||||
const byAgentUsage = byAgentQuery.data ?? EMPTY_BY_AGENT;
|
||||
const runTimeRows = runTimeQuery.data ?? EMPTY_RUNTIME;
|
||||
|
||||
const isLoading =
|
||||
dailyQuery.isLoading || byAgentQuery.isLoading || runTimeQuery.isLoading;
|
||||
|
||||
// Three independent rollups, but the empty-state is one decision — only
|
||||
// show "no data yet" when ALL three came back empty so a project with
|
||||
// tokens but no runs doesn't look broken.
|
||||
const hasNoData =
|
||||
!isLoading &&
|
||||
dailyUsage.length === 0 &&
|
||||
byAgentUsage.length === 0 &&
|
||||
runTimeRows.length === 0;
|
||||
|
||||
// Cost / token math — re-derived when usage, days, or pricings change.
|
||||
const totals = useMemo(() => computeDailyTotals(dailyUsage), [dailyUsage]);
|
||||
const dailyCost = useMemo(() => aggregateDailyCost(dailyUsage), [dailyUsage]);
|
||||
const agentTokenRows = useMemo(
|
||||
() => aggregateAgentTokens(byAgentUsage),
|
||||
[byAgentUsage],
|
||||
);
|
||||
|
||||
// Run-time totals — taskCount + failedCount summed for the KPI row.
|
||||
const runTimeTotals = useMemo(() => {
|
||||
let totalSeconds = 0;
|
||||
let taskCount = 0;
|
||||
let failedCount = 0;
|
||||
for (const r of runTimeRows) {
|
||||
totalSeconds += r.total_seconds;
|
||||
taskCount += r.task_count;
|
||||
failedCount += r.failed_count;
|
||||
}
|
||||
return { totalSeconds, taskCount, failedCount };
|
||||
}, [runTimeRows]);
|
||||
|
||||
const agentRows = useMemo(
|
||||
() => mergeAgentDashboardRows(agentTokenRows, runTimeRows),
|
||||
[agentTokenRows, runTimeRows],
|
||||
);
|
||||
|
||||
return (
|
||||
<div className="flex h-full flex-col">
|
||||
<PageHeader className="justify-between px-5">
|
||||
<div className="flex items-center gap-2">
|
||||
<BarChart3 className="h-4 w-4 text-muted-foreground" />
|
||||
<h1 className="text-sm font-medium">{t(($) => $.title)}</h1>
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<ProjectFilter
|
||||
projects={projects}
|
||||
value={projectValue}
|
||||
onChange={setProjectValue}
|
||||
/>
|
||||
<Segmented
|
||||
value={days}
|
||||
onChange={setDays}
|
||||
options={TIME_RANGES.map((r) => ({ label: r.label, value: r.days }))}
|
||||
/>
|
||||
</div>
|
||||
</PageHeader>
|
||||
|
||||
<div className="flex-1 overflow-y-auto">
|
||||
<div className="mx-auto max-w-6xl space-y-5 p-6">
|
||||
<p className="text-xs text-muted-foreground">{t(($) => $.subtitle)}</p>
|
||||
|
||||
{isLoading ? (
|
||||
<DashboardSkeleton />
|
||||
) : hasNoData ? (
|
||||
<DashboardEmpty />
|
||||
) : (
|
||||
<>
|
||||
{/* KPI row — same 3-divide-x card grid the runtime usage
|
||||
section uses, expanded to four tiles. */}
|
||||
<div className="grid grid-cols-1 divide-y rounded-lg border bg-card sm:grid-cols-2 sm:divide-x sm:divide-y-0 lg:grid-cols-4">
|
||||
<KpiCard
|
||||
label={t(($) => $.kpi.cost_label, { days })}
|
||||
value={fmtMoney(totals.cost)}
|
||||
/>
|
||||
<KpiCard
|
||||
label={t(($) => $.kpi.tokens_label, { days })}
|
||||
value={formatTokens(
|
||||
totals.input + totals.output + totals.cacheRead + totals.cacheWrite,
|
||||
)}
|
||||
hint={t(($) => $.kpi.tokens_hint, {
|
||||
input: formatTokens(totals.input),
|
||||
output: formatTokens(totals.output),
|
||||
})}
|
||||
/>
|
||||
<KpiCard
|
||||
label={t(($) => $.kpi.run_time_label, { days })}
|
||||
value={formatDuration(
|
||||
runTimeTotals.totalSeconds,
|
||||
t(($) => $.duration.less_than_minute),
|
||||
)}
|
||||
hint={t(($) => $.kpi.run_time_hint, {
|
||||
tasks: runTimeTotals.taskCount,
|
||||
})}
|
||||
/>
|
||||
<KpiCard
|
||||
label={t(($) => $.kpi.tasks_label, { days })}
|
||||
value={String(runTimeTotals.taskCount)}
|
||||
hint={t(($) => $.kpi.tasks_hint, {
|
||||
failed: runTimeTotals.failedCount,
|
||||
})}
|
||||
accent={runTimeTotals.failedCount > 0 ? "default" : "default"}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* Daily cost chart — reuses the runtime DailyCostChart. */}
|
||||
<DailyCostBlock dailyCost={dailyCost} />
|
||||
|
||||
{/* Per-agent leaderboard — user picks the ranking metric;
|
||||
the progress bar and column emphasis follow the metric. */}
|
||||
<Leaderboard
|
||||
rows={agentRows}
|
||||
agents={agents}
|
||||
lessThanMinuteLabel={t(($) => $.duration.less_than_minute)}
|
||||
/>
|
||||
</>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function ProjectFilter({
|
||||
projects,
|
||||
value,
|
||||
onChange,
|
||||
}: {
|
||||
projects: { id: string; title: string; icon: string | null }[];
|
||||
value: string;
|
||||
onChange: (v: string) => void;
|
||||
}) {
|
||||
const { t } = useT("usage");
|
||||
const allLabel = t(($) => $.filter.all_projects);
|
||||
const selected = projects.find((p) => p.id === value);
|
||||
const selectedTitle =
|
||||
value === ALL_PROJECTS ? allLabel : selected?.title ?? allLabel;
|
||||
|
||||
return (
|
||||
<Select
|
||||
value={value}
|
||||
onValueChange={(v) => onChange(v ?? ALL_PROJECTS)}
|
||||
>
|
||||
<SelectTrigger size="sm" className="min-w-[180px]">
|
||||
<SelectValue>
|
||||
{() => (
|
||||
<>
|
||||
{selected ? (
|
||||
<ProjectIcon project={selected} size="sm" />
|
||||
) : (
|
||||
<FolderKanban className="h-3.5 w-3.5 shrink-0 text-muted-foreground" />
|
||||
)}
|
||||
<span className="truncate">{selectedTitle}</span>
|
||||
</>
|
||||
)}
|
||||
</SelectValue>
|
||||
</SelectTrigger>
|
||||
{/* alignItemWithTrigger=false: the default aligns the *selected* item
|
||||
to the trigger, which pushes "All projects" above the trigger and
|
||||
clips it off-screen when the usage header sits at the top of the
|
||||
viewport. Anchor the dropdown to the bottom of the trigger so
|
||||
every entry stays reachable.
|
||||
max-h-72: cap the dropdown so a long project list scrolls instead
|
||||
of stretching to the bottom of the window. */}
|
||||
<SelectContent align="start" alignItemWithTrigger={false} className="max-h-72">
|
||||
<SelectItem value={ALL_PROJECTS}>
|
||||
<FolderKanban className="h-3.5 w-3.5 shrink-0 text-muted-foreground" />
|
||||
<span className="truncate">{allLabel}</span>
|
||||
</SelectItem>
|
||||
{projects.map((p) => (
|
||||
<SelectItem key={p.id} value={p.id}>
|
||||
<ProjectIcon project={p} size="sm" />
|
||||
<span className="truncate">{p.title}</span>
|
||||
</SelectItem>
|
||||
))}
|
||||
</SelectContent>
|
||||
</Select>
|
||||
);
|
||||
}
|
||||
|
||||
function DailyCostBlock({
|
||||
dailyCost,
|
||||
}: {
|
||||
dailyCost: ReturnType<typeof aggregateDailyCost>;
|
||||
}) {
|
||||
const { t } = useT("usage");
|
||||
const total = dailyCost.reduce((sum, d) => sum + d.total, 0);
|
||||
return (
|
||||
<div className="rounded-lg border bg-card p-4">
|
||||
<div className="mb-3 flex items-center justify-between">
|
||||
<h4 className="text-sm font-semibold">{t(($) => $.daily.title)}</h4>
|
||||
</div>
|
||||
<div className="min-h-[240px]">
|
||||
{total === 0 ? (
|
||||
<div className="flex aspect-[3/1] flex-col items-center justify-center gap-2 rounded-md border border-dashed bg-muted/20 p-6 text-center">
|
||||
<BarChart3 className="h-5 w-5 text-muted-foreground/50" />
|
||||
<p className="text-xs text-muted-foreground">
|
||||
{t(($) => $.daily.no_data)}
|
||||
</p>
|
||||
</div>
|
||||
) : (
|
||||
<DailyCostChart data={dailyCost} />
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// Which metric ranks the leaderboard. Drives row order, progress bar
|
||||
// width, and which column header is emphasised — keeping the three in
|
||||
// lockstep so the user always sees what the ranking actually measures.
|
||||
type LeaderboardSort = "tokens" | "cost" | "time" | "tasks";
|
||||
|
||||
const SORT_METRIC: Record<LeaderboardSort, (r: AgentDashboardRow) => number> = {
|
||||
tokens: (r) => r.tokens,
|
||||
cost: (r) => r.cost,
|
||||
time: (r) => r.seconds,
|
||||
tasks: (r) => r.taskCount,
|
||||
};
|
||||
|
||||
function Leaderboard({
|
||||
rows,
|
||||
agents,
|
||||
lessThanMinuteLabel,
|
||||
}: {
|
||||
rows: AgentDashboardRow[];
|
||||
agents: { id: string; name: string }[];
|
||||
lessThanMinuteLabel: string;
|
||||
}) {
|
||||
const { t } = useT("usage");
|
||||
const [sortBy, setSortBy] = useState<LeaderboardSort>("tokens");
|
||||
|
||||
const sortOptions = useMemo(
|
||||
() => [
|
||||
{ value: "tokens" as const, label: t(($) => $.leaderboard.header_tokens) },
|
||||
{ value: "cost" as const, label: t(($) => $.leaderboard.header_cost) },
|
||||
{ value: "time" as const, label: t(($) => $.leaderboard.header_time) },
|
||||
{ value: "tasks" as const, label: t(($) => $.leaderboard.header_tasks) },
|
||||
],
|
||||
[t],
|
||||
);
|
||||
|
||||
// Re-rank when the metric changes; keep the merged input untouched so
|
||||
// upstream `mergeAgentDashboardRows`'s tiebreaker (run time desc) still
|
||||
// applies inside an equal-bucket.
|
||||
const sortedRows = useMemo(() => {
|
||||
const metric = SORT_METRIC[sortBy];
|
||||
return [...rows].sort((a, b) => metric(b) - metric(a));
|
||||
}, [rows, sortBy]);
|
||||
|
||||
const maxValue = useMemo(() => {
|
||||
const metric = SORT_METRIC[sortBy];
|
||||
return sortedRows.reduce((m, r) => Math.max(m, metric(r)), 0);
|
||||
}, [sortedRows, sortBy]);
|
||||
|
||||
// Active column gets foreground text; others stay muted. Helps the user
|
||||
// see "this is what the bar is measuring" at a glance.
|
||||
const colClass = (key: LeaderboardSort) =>
|
||||
`text-right ${sortBy === key ? "text-foreground" : "text-muted-foreground"}`;
|
||||
|
||||
return (
|
||||
<div className="rounded-lg border bg-card">
|
||||
<div className="flex flex-wrap items-center justify-between gap-3 border-b px-4 pt-4 pb-3">
|
||||
<h4 className="text-sm font-semibold">{t(($) => $.leaderboard.title)}</h4>
|
||||
<div className="flex items-center gap-3">
|
||||
<Segmented value={sortBy} onChange={setSortBy} options={sortOptions} />
|
||||
<span className="text-xs text-muted-foreground">
|
||||
{t(($) => $.leaderboard.caption, { count: rows.length })}
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
{sortedRows.length === 0 ? (
|
||||
<p className="px-4 py-8 text-center text-xs text-muted-foreground">
|
||||
{t(($) => $.leaderboard.no_data)}
|
||||
</p>
|
||||
) : (
|
||||
<>
|
||||
<div className="grid grid-cols-[minmax(0,1.6fr)_minmax(0,1fr)_5rem_5rem_5rem_4rem] items-center gap-3 border-b px-4 py-2 text-xs font-medium text-muted-foreground">
|
||||
<span>{t(($) => $.leaderboard.header_agent)}</span>
|
||||
<span />
|
||||
<span className={colClass("tokens")}>{t(($) => $.leaderboard.header_tokens)}</span>
|
||||
<span className={colClass("cost")}>{t(($) => $.leaderboard.header_cost)}</span>
|
||||
<span className={colClass("time")}>{t(($) => $.leaderboard.header_time)}</span>
|
||||
<span className={colClass("tasks")}>{t(($) => $.leaderboard.header_tasks)}</span>
|
||||
</div>
|
||||
<div className="divide-y">
|
||||
{sortedRows.map((row) => {
|
||||
const agent = agents.find((a) => a.id === row.agentId);
|
||||
const value = SORT_METRIC[sortBy](row);
|
||||
const pct = maxValue > 0 ? (value / maxValue) * 100 : 0;
|
||||
return (
|
||||
<div
|
||||
key={row.agentId}
|
||||
className="grid grid-cols-[minmax(0,1.6fr)_minmax(0,1fr)_5rem_5rem_5rem_4rem] items-center gap-3 px-4 py-2"
|
||||
>
|
||||
<div className="flex min-w-0 items-center gap-2">
|
||||
<ActorAvatar
|
||||
actorType="agent"
|
||||
actorId={row.agentId}
|
||||
size={22}
|
||||
enableHoverCard
|
||||
/>
|
||||
<span className="cursor-pointer truncate text-sm font-medium">
|
||||
{agent?.name ?? row.agentId}
|
||||
</span>
|
||||
</div>
|
||||
<div className="relative h-2 overflow-hidden rounded-full bg-muted">
|
||||
<div
|
||||
className="h-full rounded-full bg-chart-1 transition-[width] duration-300 ease-out"
|
||||
style={{ width: `${pct}%` }}
|
||||
/>
|
||||
</div>
|
||||
<div
|
||||
className={`text-right text-xs tabular-nums ${sortBy === "tokens" ? "font-medium text-foreground" : "text-muted-foreground"}`}
|
||||
>
|
||||
{formatTokens(row.tokens)}
|
||||
</div>
|
||||
<div
|
||||
className={`text-right tabular-nums ${sortBy === "cost" ? "text-sm font-medium" : "text-xs text-muted-foreground"}`}
|
||||
>
|
||||
${row.cost.toFixed(2)}
|
||||
</div>
|
||||
<div
|
||||
className={`text-right text-xs tabular-nums ${sortBy === "time" ? "font-medium text-foreground" : "text-muted-foreground"}`}
|
||||
>
|
||||
{formatDuration(row.seconds, lessThanMinuteLabel)}
|
||||
</div>
|
||||
<div
|
||||
className={`text-right text-xs tabular-nums ${sortBy === "tasks" ? "font-medium text-foreground" : "text-muted-foreground"}`}
|
||||
>
|
||||
{row.taskCount}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function DashboardSkeleton() {
|
||||
return (
|
||||
<div className="space-y-5">
|
||||
<Skeleton className="h-28 rounded-lg" />
|
||||
<Skeleton className="h-56 rounded-lg" />
|
||||
<Skeleton className="h-48 rounded-lg" />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function DashboardEmpty() {
|
||||
const { t } = useT("usage");
|
||||
return (
|
||||
<div className="flex flex-col items-center rounded-lg border border-dashed py-12 text-center">
|
||||
<BarChart3 className="h-6 w-6 text-muted-foreground/40" />
|
||||
<p className="mt-3 text-sm font-medium">{t(($) => $.empty.title)}</p>
|
||||
<p className="mt-1 max-w-md text-xs text-muted-foreground">
|
||||
{t(($) => $.empty.body)}
|
||||
</p>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
1
packages/views/dashboard/index.ts
Normal file
1
packages/views/dashboard/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { DashboardPage } from "./components/dashboard-page";
|
||||
213
packages/views/dashboard/utils.test.ts
Normal file
213
packages/views/dashboard/utils.test.ts
Normal file
@@ -0,0 +1,213 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
aggregateAgentTokens,
|
||||
aggregateDailyCost,
|
||||
computeDailyTotals,
|
||||
formatDuration,
|
||||
mergeAgentDashboardRows,
|
||||
} from "./utils";
|
||||
|
||||
describe("aggregateDailyCost", () => {
|
||||
it("collapses multiple rows per day into one stack and sorts by date asc", () => {
|
||||
const result = aggregateDailyCost([
|
||||
{
|
||||
date: "2026-05-10",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 1_000_000,
|
||||
output_tokens: 500_000,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 3,
|
||||
},
|
||||
{
|
||||
date: "2026-05-09",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 1_000_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
// Sort: oldest day first.
|
||||
expect(result.map((r) => r.date)).toEqual(["2026-05-09", "2026-05-10"]);
|
||||
// claude-sonnet-4-6: input $3/M, output $15/M.
|
||||
// 2026-05-09 → 1M input × $3 = $3 input, $0 output, $0 cache.
|
||||
expect(result[0]).toMatchObject({ input: 3, output: 0, cacheWrite: 0, total: 3 });
|
||||
// 2026-05-10 → $3 input + (0.5M × $15) = $7.5 output. Total $10.5.
|
||||
expect(result[1]).toMatchObject({ input: 3, output: 7.5, cacheWrite: 0, total: 10.5 });
|
||||
});
|
||||
|
||||
it("treats unmapped models as zero-cost", () => {
|
||||
const result = aggregateDailyCost([
|
||||
{
|
||||
date: "2026-05-10",
|
||||
model: "made-up-model",
|
||||
input_tokens: 999_999_999,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 0,
|
||||
},
|
||||
]);
|
||||
expect(result[0]?.total).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("aggregateAgentTokens", () => {
|
||||
it("folds per-(agent, model) rows into per-agent totals and sorts by cost desc", () => {
|
||||
const rows = aggregateAgentTokens([
|
||||
{
|
||||
agent_id: "small-spender",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 100_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 1,
|
||||
},
|
||||
{
|
||||
agent_id: "big-spender",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 5_000_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 3,
|
||||
},
|
||||
{
|
||||
agent_id: "big-spender",
|
||||
model: "claude-haiku-4-5",
|
||||
input_tokens: 1_000_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 2,
|
||||
},
|
||||
]);
|
||||
|
||||
expect(rows.map((r) => r.agentId)).toEqual(["big-spender", "small-spender"]);
|
||||
expect(rows[0]?.taskCount).toBe(5);
|
||||
// big-spender across two models — verify cost > small-spender's.
|
||||
expect(rows[0]!.cost).toBeGreaterThan(rows[1]!.cost);
|
||||
});
|
||||
});
|
||||
|
||||
describe("computeDailyTotals", () => {
|
||||
it("sums tokens across rows and adds estimated cost", () => {
|
||||
const totals = computeDailyTotals([
|
||||
{
|
||||
date: "2026-05-10",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 1_000_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 2,
|
||||
},
|
||||
{
|
||||
date: "2026-05-09",
|
||||
model: "claude-sonnet-4-6",
|
||||
input_tokens: 2_000_000,
|
||||
output_tokens: 0,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
task_count: 3,
|
||||
},
|
||||
]);
|
||||
expect(totals.input).toBe(3_000_000);
|
||||
expect(totals.cost).toBe(9); // 3M × $3/M
|
||||
expect(totals.taskCount).toBe(5);
|
||||
});
|
||||
});
|
||||
|
||||
describe("mergeAgentDashboardRows", () => {
|
||||
it("uses run-time rollup's per-agent task count, not the token sum", () => {
|
||||
// Token rollup returns two (agent, model) rows for the same task
|
||||
// (the agent ran one task that touched two models). The token-side
|
||||
// aggregator sums per-row task_count and lands at 2; the run-time
|
||||
// rollup correctly reports the underlying distinct count of 1.
|
||||
const tokenRows = [
|
||||
{
|
||||
agentId: "agent-a",
|
||||
tokens: 3_000_000,
|
||||
cost: 12,
|
||||
taskCount: 2, // overcounted because (model-1: 1) + (model-2: 1)
|
||||
},
|
||||
];
|
||||
const runTimeRows = [
|
||||
{
|
||||
agent_id: "agent-a",
|
||||
total_seconds: 600,
|
||||
task_count: 1, // truth: one task touched both models
|
||||
failed_count: 0,
|
||||
},
|
||||
];
|
||||
const merged = mergeAgentDashboardRows(tokenRows, runTimeRows);
|
||||
expect(merged).toHaveLength(1);
|
||||
expect(merged[0]!.taskCount).toBe(1);
|
||||
expect(merged[0]!.seconds).toBe(600);
|
||||
});
|
||||
|
||||
it("falls back to token count when no run-time row exists (in-flight task)", () => {
|
||||
// Tokens reported mid-run; task hasn't terminated yet so the run-time
|
||||
// rollup is silent on this agent. Keep the token-side estimate
|
||||
// instead of dropping the agent from the table entirely.
|
||||
const merged = mergeAgentDashboardRows(
|
||||
[{ agentId: "agent-b", tokens: 100, cost: 0.5, taskCount: 1 }],
|
||||
[],
|
||||
);
|
||||
expect(merged[0]!.taskCount).toBe(1);
|
||||
expect(merged[0]!.seconds).toBe(0);
|
||||
});
|
||||
|
||||
it("includes agents that have run-time but no tokens", () => {
|
||||
// Task errored before reporting any usage — run-time row exists but
|
||||
// there's no corresponding token row. Agent must still appear on the
|
||||
// list with zeroed-out token columns.
|
||||
const merged = mergeAgentDashboardRows(
|
||||
[],
|
||||
[{ agent_id: "agent-c", total_seconds: 30, task_count: 1, failed_count: 1 }],
|
||||
);
|
||||
expect(merged).toHaveLength(1);
|
||||
expect(merged[0]!.tokens).toBe(0);
|
||||
expect(merged[0]!.cost).toBe(0);
|
||||
expect(merged[0]!.taskCount).toBe(1);
|
||||
});
|
||||
|
||||
it("sorts by cost desc with run-time as a tiebreaker", () => {
|
||||
const merged = mergeAgentDashboardRows(
|
||||
[
|
||||
{ agentId: "low", tokens: 100, cost: 1, taskCount: 1 },
|
||||
{ agentId: "high", tokens: 100, cost: 9, taskCount: 1 },
|
||||
{ agentId: "zero-cost-long", tokens: 0, cost: 0, taskCount: 0 },
|
||||
],
|
||||
[
|
||||
{ agent_id: "zero-cost-long", total_seconds: 1000, task_count: 5, failed_count: 0 },
|
||||
],
|
||||
);
|
||||
expect(merged.map((r) => r.agentId)).toEqual(["high", "low", "zero-cost-long"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatDuration", () => {
|
||||
it("formats seconds-only durations", () => {
|
||||
expect(formatDuration(45, "<1m")).toBe("45s");
|
||||
});
|
||||
it("formats minutes and seconds when under one hour", () => {
|
||||
expect(formatDuration(150, "<1m")).toBe("2m 30s");
|
||||
expect(formatDuration(60, "<1m")).toBe("1m");
|
||||
});
|
||||
it("formats hours and minutes when under one day", () => {
|
||||
expect(formatDuration(3 * 3600 + 17 * 60, "<1m")).toBe("3h 17m");
|
||||
expect(formatDuration(3600, "<1m")).toBe("1h");
|
||||
});
|
||||
it("formats days and hours when more than 24 hours", () => {
|
||||
expect(formatDuration(2 * 86400 + 5 * 3600, "<1m")).toBe("2d 5h");
|
||||
});
|
||||
it("falls back to the supplied label for sub-second durations", () => {
|
||||
expect(formatDuration(0, "<1m")).toBe("<1m");
|
||||
expect(formatDuration(0.4, "<1m")).toBe("<1m");
|
||||
});
|
||||
});
|
||||
202
packages/views/dashboard/utils.ts
Normal file
202
packages/views/dashboard/utils.ts
Normal file
@@ -0,0 +1,202 @@
|
||||
import type {
|
||||
DashboardUsageDaily,
|
||||
DashboardUsageByAgent,
|
||||
DashboardAgentRunTime,
|
||||
} from "@multica/core/types";
|
||||
import { estimateCost, estimateCostBreakdown } from "../runtimes/utils";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dashboard data aggregations
|
||||
//
|
||||
// The workspace dashboard returns the same per-(date, model) and
|
||||
// per-(agent, model) shapes the runtime page does, so cost math reuses
|
||||
// `estimateCost` / `estimateCostBreakdown` from the runtimes utils. What
|
||||
// the runtimes view does with `aggregateByDate` (works on RuntimeUsage,
|
||||
// which carries a `provider` field) we replicate here with a tighter
|
||||
// type — fewer optional fields, less conditional logic on the consumer
|
||||
// side.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface DailyCostStack {
|
||||
date: string;
|
||||
label: string;
|
||||
input: number;
|
||||
output: number;
|
||||
cacheWrite: number;
|
||||
total: number;
|
||||
}
|
||||
|
||||
function formatDateLabel(d: string): string {
|
||||
// Anchor to local midnight so the formatted label matches the bucket the
|
||||
// server picked (which is already in workspace time). Pasting the raw
|
||||
// date as the body of `new Date()` would interpret it as UTC and shift
|
||||
// by the user's offset.
|
||||
const date = new Date(d + "T00:00:00");
|
||||
return `${date.getMonth() + 1}/${date.getDate()}`;
|
||||
}
|
||||
|
||||
// Per-(date, model) rows → 1 row per date with cost broken into the three
|
||||
// segments the stacked bar chart consumes. Stable sort by date asc so the
|
||||
// chart x-axis is left-to-right oldest-to-newest.
|
||||
export function aggregateDailyCost(usage: DashboardUsageDaily[]): DailyCostStack[] {
|
||||
const map = new Map<string, { input: number; output: number; cacheWrite: number }>();
|
||||
for (const u of usage) {
|
||||
const b = estimateCostBreakdown(u);
|
||||
const entry = map.get(u.date) ?? { input: 0, output: 0, cacheWrite: 0 };
|
||||
entry.input += b.input;
|
||||
entry.output += b.output;
|
||||
entry.cacheWrite += b.cacheWrite;
|
||||
map.set(u.date, entry);
|
||||
}
|
||||
const round = (n: number) => Math.round(n * 100) / 100;
|
||||
return [...map.entries()]
|
||||
.sort(([a], [b]) => a.localeCompare(b))
|
||||
.map(([date, s]) => {
|
||||
const input = round(s.input);
|
||||
const output = round(s.output);
|
||||
const cacheWrite = round(s.cacheWrite);
|
||||
return {
|
||||
date,
|
||||
label: formatDateLabel(date),
|
||||
input,
|
||||
output,
|
||||
cacheWrite,
|
||||
total: round(input + output + cacheWrite),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
export interface DashboardTokenTotals {
|
||||
input: number;
|
||||
output: number;
|
||||
cacheRead: number;
|
||||
cacheWrite: number;
|
||||
cost: number;
|
||||
taskCount: number;
|
||||
}
|
||||
|
||||
// Whole-window totals for the KPI tiles. taskCount sums DISTINCT task counts
|
||||
// per row — these are already collapsed server-side per (date, model), so
|
||||
// the value can over-count if the same task has tokens in two days; that's
|
||||
// acceptable for a KPI ("rough volume") and the per-agent run-time card
|
||||
// gives the precise figure.
|
||||
export function computeDailyTotals(usage: DashboardUsageDaily[]): DashboardTokenTotals {
|
||||
return usage.reduce<DashboardTokenTotals>(
|
||||
(acc, u) => ({
|
||||
input: acc.input + u.input_tokens,
|
||||
output: acc.output + u.output_tokens,
|
||||
cacheRead: acc.cacheRead + u.cache_read_tokens,
|
||||
cacheWrite: acc.cacheWrite + u.cache_write_tokens,
|
||||
cost: acc.cost + estimateCost(u),
|
||||
taskCount: acc.taskCount + u.task_count,
|
||||
}),
|
||||
{ input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, taskCount: 0 },
|
||||
);
|
||||
}
|
||||
|
||||
export interface AgentCostRow {
|
||||
agentId: string;
|
||||
tokens: number;
|
||||
cost: number;
|
||||
taskCount: number;
|
||||
}
|
||||
|
||||
// Fold per-(agent, model) rows into one row per agent. Cost is the sum
|
||||
// across this agent's models, which is the figure the user cares about.
|
||||
// Sort by cost desc so the heaviest spender lands first.
|
||||
export function aggregateAgentTokens(rows: DashboardUsageByAgent[]): AgentCostRow[] {
|
||||
const map = new Map<string, AgentCostRow>();
|
||||
for (const r of rows) {
|
||||
const entry = map.get(r.agent_id) ?? {
|
||||
agentId: r.agent_id,
|
||||
tokens: 0,
|
||||
cost: 0,
|
||||
taskCount: 0,
|
||||
};
|
||||
entry.tokens +=
|
||||
r.input_tokens + r.output_tokens + r.cache_read_tokens + r.cache_write_tokens;
|
||||
entry.cost += estimateCost(r);
|
||||
entry.taskCount += r.task_count;
|
||||
map.set(r.agent_id, entry);
|
||||
}
|
||||
return [...map.values()].sort((a, b) => b.cost - a.cost);
|
||||
}
|
||||
|
||||
export interface AgentDashboardRow {
|
||||
agentId: string;
|
||||
tokens: number;
|
||||
cost: number;
|
||||
seconds: number;
|
||||
taskCount: number;
|
||||
}
|
||||
|
||||
// Merge per-agent token totals with per-agent run-time totals into one
|
||||
// row per agent.
|
||||
//
|
||||
// taskCount comes from `runTimeRows` when available — that rollup is a
|
||||
// true per-agent distinct count (`COUNT(*)` on (agent, terminal-task) in
|
||||
// SQL). The token rollup's per-(agent, model) counts double-count a task
|
||||
// when it spans multiple models, so we only fall back to it for agents
|
||||
// with no terminal run yet (in-flight tasks reported tokens but haven't
|
||||
// completed). Sorted by cost desc, then run time desc.
|
||||
export function mergeAgentDashboardRows(
|
||||
tokenRows: AgentCostRow[],
|
||||
runTimeRows: DashboardAgentRunTime[],
|
||||
): AgentDashboardRow[] {
|
||||
const runTimeByAgent = new Map(
|
||||
runTimeRows.map((r) => [r.agent_id, r] as const),
|
||||
);
|
||||
const merged = new Map<string, AgentDashboardRow>();
|
||||
for (const r of tokenRows) {
|
||||
const rt = runTimeByAgent.get(r.agentId);
|
||||
merged.set(r.agentId, {
|
||||
agentId: r.agentId,
|
||||
tokens: r.tokens,
|
||||
cost: r.cost,
|
||||
seconds: rt?.total_seconds ?? 0,
|
||||
taskCount: rt ? rt.task_count : r.taskCount,
|
||||
});
|
||||
}
|
||||
// Agents with run-time rows but zero tokens still belong on the list
|
||||
// (a task that errored before producing usage). Their token columns
|
||||
// stay at 0.
|
||||
for (const r of runTimeRows) {
|
||||
if (merged.has(r.agent_id)) continue;
|
||||
merged.set(r.agent_id, {
|
||||
agentId: r.agent_id,
|
||||
tokens: 0,
|
||||
cost: 0,
|
||||
seconds: r.total_seconds,
|
||||
taskCount: r.task_count,
|
||||
});
|
||||
}
|
||||
return [...merged.values()].sort((a, b) => {
|
||||
if (b.cost !== a.cost) return b.cost - a.cost;
|
||||
return b.seconds - a.seconds;
|
||||
});
|
||||
}
|
||||
|
||||
// Compact human duration: "1h 23m" / "12m 30s" / "45s" / "<1m". Used for
|
||||
// the dashboard run-time KPI and the per-agent run-time column. Keeps two
|
||||
// segments max — three segments adds visual noise without precision the
|
||||
// dashboard actually needs.
|
||||
export function formatDuration(seconds: number, lessThanMinuteLabel: string): string {
|
||||
if (seconds < 0 || !Number.isFinite(seconds)) return lessThanMinuteLabel;
|
||||
if (seconds < 60) {
|
||||
if (seconds < 1) return lessThanMinuteLabel;
|
||||
return `${Math.round(seconds)}s`;
|
||||
}
|
||||
const totalMinutes = Math.floor(seconds / 60);
|
||||
const hours = Math.floor(totalMinutes / 60);
|
||||
const mins = totalMinutes % 60;
|
||||
if (hours === 0) {
|
||||
const secs = Math.floor(seconds) % 60;
|
||||
return secs > 0 ? `${mins}m ${secs}s` : `${mins}m`;
|
||||
}
|
||||
if (hours >= 24) {
|
||||
const days = Math.floor(hours / 24);
|
||||
const h = hours % 24;
|
||||
return h > 0 ? `${days}d ${h}h` : `${days}d`;
|
||||
}
|
||||
return mins > 0 ? `${hours}h ${mins}m` : `${hours}h`;
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import { isGlobalPath } from "@multica/core/paths";
|
||||
* as intentional. Only "/issues/..." style paths get auto-prefixed.
|
||||
*/
|
||||
const WORKSPACE_ROUTE_SEGMENTS = new Set([
|
||||
"usage",
|
||||
"issues",
|
||||
"projects",
|
||||
"autopilots",
|
||||
|
||||
@@ -20,6 +20,7 @@ import type chat from "../locales/en/chat.json";
|
||||
import type modals from "../locales/en/modals.json";
|
||||
import type runtimes from "../locales/en/runtimes.json";
|
||||
import type layout from "../locales/en/layout.json";
|
||||
import type usage from "../locales/en/usage.json";
|
||||
|
||||
// Module augmentation enables i18next v26 selector API across the monorepo:
|
||||
// `t($ => $.signin.title)` resolves to the value in en/auth.json.
|
||||
@@ -54,6 +55,7 @@ declare module "i18next" {
|
||||
modals: typeof modals;
|
||||
runtimes: typeof runtimes;
|
||||
layout: typeof layout;
|
||||
usage: typeof usage;
|
||||
};
|
||||
enableSelector: true;
|
||||
}
|
||||
|
||||
@@ -90,6 +90,7 @@ vi.mock("@multica/core/paths", () => ({
|
||||
projects: () => "/acme/projects",
|
||||
autopilots: () => "/acme/autopilots",
|
||||
agents: () => "/acme/agents",
|
||||
usage: () => "/acme/usage",
|
||||
runtimes: () => "/acme/runtimes",
|
||||
skills: () => "/acme/skills",
|
||||
settings: () => "/acme/settings",
|
||||
|
||||
@@ -29,6 +29,7 @@ import {
|
||||
SquarePen,
|
||||
CircleUser,
|
||||
FolderKanban,
|
||||
BarChart3,
|
||||
X,
|
||||
Zap,
|
||||
} from "lucide-react";
|
||||
@@ -106,6 +107,7 @@ type NavKey =
|
||||
| "projects"
|
||||
| "autopilots"
|
||||
| "agents"
|
||||
| "usage"
|
||||
| "runtimes"
|
||||
| "skills"
|
||||
| "settings";
|
||||
@@ -118,6 +120,7 @@ type NavLabelKey =
|
||||
| "projects"
|
||||
| "autopilots"
|
||||
| "agents"
|
||||
| "usage"
|
||||
| "runtimes"
|
||||
| "skills"
|
||||
| "settings";
|
||||
@@ -132,6 +135,7 @@ const workspaceNav: { key: NavKey; labelKey: NavLabelKey; icon: typeof Inbox }[]
|
||||
{ key: "projects", labelKey: "projects", icon: FolderKanban },
|
||||
{ key: "autopilots", labelKey: "autopilots", icon: Zap },
|
||||
{ key: "agents", labelKey: "agents", icon: Bot },
|
||||
{ key: "usage", labelKey: "usage", icon: BarChart3 },
|
||||
];
|
||||
|
||||
const configureNav: { key: NavKey; labelKey: NavLabelKey; icon: typeof Inbox }[] = [
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"projects": "Projects",
|
||||
"autopilots": "Autopilot",
|
||||
"agents": "Agents",
|
||||
"usage": "Usage",
|
||||
"runtimes": "Runtimes",
|
||||
"skills": "Skills",
|
||||
"settings": "Settings"
|
||||
|
||||
39
packages/views/locales/en/usage.json
Normal file
39
packages/views/locales/en/usage.json
Normal file
@@ -0,0 +1,39 @@
|
||||
{
|
||||
"title": "Usage",
|
||||
"subtitle": "Token spend and agent activity across this workspace.",
|
||||
"filter": {
|
||||
"project_label": "Project",
|
||||
"all_projects": "All projects",
|
||||
"period_label": "Period"
|
||||
},
|
||||
"kpi": {
|
||||
"cost_label": "Cost · {{days}}D",
|
||||
"tokens_label": "Tokens · {{days}}D",
|
||||
"tokens_hint": "Input {{input}} · Output {{output}}",
|
||||
"run_time_label": "Run time · {{days}}D",
|
||||
"run_time_hint": "Across {{tasks}} tasks",
|
||||
"tasks_label": "Tasks · {{days}}D",
|
||||
"tasks_hint": "{{failed}} failed"
|
||||
},
|
||||
"daily": {
|
||||
"title": "Daily cost",
|
||||
"no_data": "No usage in this window."
|
||||
},
|
||||
"leaderboard": {
|
||||
"title": "Leaderboard",
|
||||
"caption": "{{count}} agents",
|
||||
"header_agent": "Agent",
|
||||
"header_tokens": "Tokens",
|
||||
"header_cost": "Cost",
|
||||
"header_time": "Time",
|
||||
"header_tasks": "Tasks",
|
||||
"no_data": "No agent activity in this window."
|
||||
},
|
||||
"empty": {
|
||||
"title": "No usage yet",
|
||||
"body": "Once agents start running tasks here, their token spend and run time will appear in this view."
|
||||
},
|
||||
"duration": {
|
||||
"less_than_minute": "<1m"
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ import enChat from "./en/chat.json";
|
||||
import enModals from "./en/modals.json";
|
||||
import enRuntimes from "./en/runtimes.json";
|
||||
import enLayout from "./en/layout.json";
|
||||
import enUsage from "./en/usage.json";
|
||||
import zhHansCommon from "./zh-Hans/common.json";
|
||||
import zhHansAuth from "./zh-Hans/auth.json";
|
||||
import zhHansSettings from "./zh-Hans/settings.json";
|
||||
@@ -41,6 +42,7 @@ import zhHansChat from "./zh-Hans/chat.json";
|
||||
import zhHansModals from "./zh-Hans/modals.json";
|
||||
import zhHansRuntimes from "./zh-Hans/runtimes.json";
|
||||
import zhHansLayout from "./zh-Hans/layout.json";
|
||||
import zhHansUsage from "./zh-Hans/usage.json";
|
||||
|
||||
// Single source of truth for the resource bundle. Both apps (web layout +
|
||||
// desktop App.tsx) import from here so adding a locale or namespace happens
|
||||
@@ -68,6 +70,7 @@ export const RESOURCES: Record<SupportedLocale, LocaleResources> = {
|
||||
modals: enModals,
|
||||
runtimes: enRuntimes,
|
||||
layout: enLayout,
|
||||
usage: enUsage,
|
||||
},
|
||||
"zh-Hans": {
|
||||
common: zhHansCommon,
|
||||
@@ -91,5 +94,6 @@ export const RESOURCES: Record<SupportedLocale, LocaleResources> = {
|
||||
modals: zhHansModals,
|
||||
runtimes: zhHansRuntimes,
|
||||
layout: zhHansLayout,
|
||||
usage: zhHansUsage,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"projects": "项目",
|
||||
"autopilots": "自动化",
|
||||
"agents": "智能体",
|
||||
"usage": "用量",
|
||||
"runtimes": "运行时",
|
||||
"skills": "Skill",
|
||||
"settings": "设置"
|
||||
|
||||
39
packages/views/locales/zh-Hans/usage.json
Normal file
39
packages/views/locales/zh-Hans/usage.json
Normal file
@@ -0,0 +1,39 @@
|
||||
{
|
||||
"title": "用量",
|
||||
"subtitle": "查看当前 Workspace 的 token 消耗和智能体运行情况。",
|
||||
"filter": {
|
||||
"project_label": "项目",
|
||||
"all_projects": "全部项目",
|
||||
"period_label": "时间范围"
|
||||
},
|
||||
"kpi": {
|
||||
"cost_label": "费用 · {{days}}天",
|
||||
"tokens_label": "Token · {{days}}天",
|
||||
"tokens_hint": "输入 {{input}} · 输出 {{output}}",
|
||||
"run_time_label": "运行时长 · {{days}}天",
|
||||
"run_time_hint": "共 {{tasks}} 个任务",
|
||||
"tasks_label": "任务数 · {{days}}天",
|
||||
"tasks_hint": "失败 {{failed}} 个"
|
||||
},
|
||||
"daily": {
|
||||
"title": "每日费用",
|
||||
"no_data": "所选时间范围内暂无消耗。"
|
||||
},
|
||||
"leaderboard": {
|
||||
"title": "排行榜",
|
||||
"caption": "{{count}} 个智能体",
|
||||
"header_agent": "智能体",
|
||||
"header_tokens": "Token",
|
||||
"header_cost": "费用",
|
||||
"header_time": "运行时长",
|
||||
"header_tasks": "任务数",
|
||||
"no_data": "所选时间范围内暂无智能体活动。"
|
||||
},
|
||||
"empty": {
|
||||
"title": "暂无消耗数据",
|
||||
"body": "当智能体在这里开始执行任务后,它们的 token 消耗和运行时长将出现在此处。"
|
||||
},
|
||||
"duration": {
|
||||
"less_than_minute": "<1分钟"
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@
|
||||
"./agents": "./agents/index.ts",
|
||||
"./inbox": "./inbox/index.ts",
|
||||
"./runtimes": "./runtimes/index.ts",
|
||||
"./dashboard": "./dashboard/index.ts",
|
||||
"./workspace/workspace-avatar": "./workspace/workspace-avatar.tsx",
|
||||
"./workspace/create-workspace-form": "./workspace/create-workspace-form.tsx",
|
||||
"./workspace/no-access-page": "./workspace/no-access-page.tsx",
|
||||
|
||||
131
server/cmd/backfill_task_usage_dashboard_daily/main.go
Normal file
131
server/cmd/backfill_task_usage_dashboard_daily/main.go
Normal file
@@ -0,0 +1,131 @@
|
||||
// Backfill_task_usage_dashboard_daily seeds the dashboard rollup table
|
||||
// (`task_usage_dashboard_daily`, migration 084) from historical
|
||||
// `task_usage` rows. Run once after migration 084 ships, BEFORE enabling
|
||||
// USAGE_DASHBOARD_ROLLUP_ENABLED and scheduling the pg_cron job.
|
||||
//
|
||||
// Mirrors the per-runtime backfill in `cmd/backfill_task_usage_daily`:
|
||||
// walk task_usage's time range in monthly slices, call the same idempotent
|
||||
// window primitive the cron path uses, then stamp the rollup-state
|
||||
// watermark so the cron tick doesn't reprocess history.
|
||||
//
|
||||
// Re-running is safe — the window function recomputes each dirty bucket
|
||||
// from raw and REPLACES the daily row, so a partially completed backfill
|
||||
// can be re-run without TRUNCATEing.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/multica-ai/multica/server/internal/logger"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger.Init()
|
||||
|
||||
var (
|
||||
dryRun = flag.Bool("dry-run", false, "log slices that would be processed without touching task_usage_dashboard_daily")
|
||||
monthsBack = flag.Int("months-back", 0, "limit backfill to the last N months (0 = all available history)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pool, err := pgxpool.New(ctx, dbURL)
|
||||
if err != nil {
|
||||
slog.Error("unable to connect to database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
slog.Error("unable to ping database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var minTS, maxTS pgtype.Timestamptz
|
||||
if err := pool.QueryRow(ctx, `SELECT MIN(created_at), MAX(created_at) FROM task_usage`).Scan(&minTS, &maxTS); err != nil {
|
||||
slog.Error("scan task_usage time range", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if !minTS.Valid {
|
||||
slog.Info("task_usage is empty; nothing to backfill")
|
||||
stampWatermark(ctx, pool)
|
||||
return
|
||||
}
|
||||
|
||||
from := monthFloor(minTS.Time.UTC())
|
||||
end := monthFloor(maxTS.Time.UTC()).AddDate(0, 1, 0)
|
||||
|
||||
if *monthsBack > 0 {
|
||||
cutoff := monthFloor(time.Now().UTC()).AddDate(0, -(*monthsBack), 0)
|
||||
if cutoff.After(from) {
|
||||
from = cutoff
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("backfill range", "from", from.Format(time.RFC3339), "to", end.Format(time.RFC3339), "dry_run", *dryRun)
|
||||
|
||||
cursor := from
|
||||
var totalRows int64
|
||||
for cursor.Before(end) {
|
||||
next := cursor.AddDate(0, 1, 0)
|
||||
if *dryRun {
|
||||
slog.Info("would roll up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339))
|
||||
cursor = next
|
||||
continue
|
||||
}
|
||||
var rows int64
|
||||
err := pool.QueryRow(
|
||||
ctx,
|
||||
`SELECT rollup_task_usage_dashboard_daily_window($1::timestamptz, $2::timestamptz)`,
|
||||
cursor, next,
|
||||
).Scan(&rows)
|
||||
if err != nil {
|
||||
slog.Error("rollup slice failed", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
totalRows += rows
|
||||
slog.Info("rolled up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "rows_touched", rows)
|
||||
cursor = next
|
||||
}
|
||||
|
||||
if !*dryRun {
|
||||
stampWatermark(ctx, pool)
|
||||
}
|
||||
slog.Info("backfill complete", "total_rows_touched", totalRows)
|
||||
}
|
||||
|
||||
// stampWatermark moves the rollup state's watermark to (now() - 5 min) so
|
||||
// the cron tick that follows picks up only events newer than the backfill's
|
||||
// upper bound. Mirrors the per-runtime backfill's stampWatermark.
|
||||
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) {
|
||||
tag, err := pool.Exec(ctx, `
|
||||
UPDATE task_usage_dashboard_rollup_state
|
||||
SET watermark_at = now() - INTERVAL '5 minutes'
|
||||
WHERE id = 1
|
||||
`)
|
||||
if err != nil {
|
||||
slog.Error("stamp watermark failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if tag.RowsAffected() == 0 {
|
||||
slog.Warn("no rollup state row to stamp; was migration 084 applied?")
|
||||
return
|
||||
}
|
||||
fmt.Println("watermark stamped to now() - 5 minutes")
|
||||
}
|
||||
|
||||
func monthFloor(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
@@ -106,6 +106,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
|
||||
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
|
||||
UseDailyRollupForRuntimeUsage: os.Getenv("USAGE_DAILY_ROLLUP_ENABLED") == "true",
|
||||
UseDailyRollupForDashboard: os.Getenv("USAGE_DASHBOARD_ROLLUP_ENABLED") == "true",
|
||||
}
|
||||
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
|
||||
if opts.DaemonWakeup != nil {
|
||||
@@ -468,6 +469,15 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
r.Get("/summary", h.GetWorkspaceUsageSummary)
|
||||
})
|
||||
|
||||
// Dashboard — workspace-wide token + run-time rollups for the
|
||||
// "/{slug}/dashboard" page. Optional ?project_id filter scopes
|
||||
// the rollup to a single project.
|
||||
r.Route("/api/dashboard", func(r chi.Router) {
|
||||
r.Get("/usage/daily", h.GetDashboardUsageDaily)
|
||||
r.Get("/usage/by-agent", h.GetDashboardUsageByAgent)
|
||||
r.Get("/agent-runtime", h.GetDashboardAgentRunTime)
|
||||
})
|
||||
|
||||
// Runtimes
|
||||
r.Route("/api/runtimes", func(r chi.Router) {
|
||||
r.Get("/", h.ListAgentRuntimes)
|
||||
|
||||
@@ -2229,12 +2229,14 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, slot i
|
||||
switch result.Status {
|
||||
case "completed":
|
||||
if result.Output == "" {
|
||||
// Even an empty-output completion may have established a real
|
||||
// session — surface it through the blocked path so the next chat
|
||||
// turn can still resume from where this one left off.
|
||||
// The agent completed successfully but produced no text output.
|
||||
// This is valid — the agent may have done all its work via tool
|
||||
// calls (e.g. posting comments via CLI, pushing code). Treat as
|
||||
// a normal completion so the task is not incorrectly marked as
|
||||
// blocked.
|
||||
return TaskResult{
|
||||
Status: "blocked",
|
||||
Comment: fmt.Sprintf("%s returned empty output", provider),
|
||||
Status: "completed",
|
||||
Comment: "",
|
||||
SessionID: result.SessionID,
|
||||
WorkDir: env.WorkDir,
|
||||
EnvRoot: env.RootDir,
|
||||
|
||||
274
server/internal/handler/dashboard.go
Normal file
274
server/internal/handler/dashboard.go
Normal file
@@ -0,0 +1,274 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workspace / Project dashboard
|
||||
//
|
||||
// Three read endpoints power the workspace dashboard:
|
||||
//
|
||||
// GET /api/dashboard/usage/daily per-(date, model) token rows
|
||||
// GET /api/dashboard/usage/by-agent per-(agent, model) token rows
|
||||
// GET /api/dashboard/agent-runtime per-agent run-time + task counts
|
||||
//
|
||||
// All three accept ?days=N (defaults to 30, capped at 365) and an optional
|
||||
// ?project_id=<uuid> to scope the rollup to a single project. With no
|
||||
// project_id the data spans the whole workspace.
|
||||
//
|
||||
// Cost is computed client-side from a per-model pricing table — the model
|
||||
// dimension is intentionally preserved on the wire (same convention as the
|
||||
// per-runtime usage endpoints).
|
||||
//
|
||||
// Access control: workspace membership only — we don't filter by per-agent
|
||||
// visibility on the dashboard because token spend / run time are workspace-
|
||||
// level operational metrics. Agent-detail pages still gate on per-agent
|
||||
// access (see GetWorkspaceAgentRunCounts).
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// parseProjectIDParam reads ?project_id=<uuid> off the URL. Returns a
|
||||
// pgtype.UUID with Valid=false when the param is absent so sqlc's nullable
|
||||
// argument resolves to SQL NULL and the WHERE clause degrades to "no
|
||||
// project filter". On a malformed UUID it writes a 400 and returns
|
||||
// ok=false; callers must return immediately.
|
||||
func parseProjectIDParam(w http.ResponseWriter, r *http.Request) (pgtype.UUID, bool) {
|
||||
raw := r.URL.Query().Get("project_id")
|
||||
if raw == "" {
|
||||
return pgtype.UUID{}, true
|
||||
}
|
||||
u, err := util.ParseUUID(raw)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid project_id")
|
||||
return pgtype.UUID{}, false
|
||||
}
|
||||
return u, true
|
||||
}
|
||||
|
||||
// DashboardUsageDailyResponse is one (date, model) bucket. Cost-side math
|
||||
// happens on the client from a per-model pricing table; model stays on the
|
||||
// wire for that reason.
|
||||
type DashboardUsageDailyResponse struct {
|
||||
Date string `json:"date"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// GetDashboardUsageDaily returns per-(date, model) token rows for the
|
||||
// workspace, optionally scoped to a project. When the dashboard rollup
|
||||
// is enabled (USAGE_DASHBOARD_ROLLUP_ENABLED=true) reads come from
|
||||
// `task_usage_dashboard_daily` (migration 084); otherwise from the raw
|
||||
// task_usage stream.
|
||||
func (h *Handler) GetDashboardUsageDaily(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := h.resolveWorkspaceID(r)
|
||||
if _, ok := h.workspaceMember(w, r, workspaceID); !ok {
|
||||
return
|
||||
}
|
||||
projectID, ok := parseProjectIDParam(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
since := parseSinceParam(r, 30)
|
||||
|
||||
resp, err := h.listDashboardUsageDaily(r.Context(), parseUUID(workspaceID), since, projectID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list usage")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) listDashboardUsageDaily(
|
||||
ctx context.Context,
|
||||
workspaceID pgtype.UUID,
|
||||
since pgtype.Timestamptz,
|
||||
projectID pgtype.UUID,
|
||||
) ([]DashboardUsageDailyResponse, error) {
|
||||
if h.cfg.UseDailyRollupForDashboard {
|
||||
rows, err := h.Queries.ListDashboardUsageDailyRollup(ctx, db.ListDashboardUsageDailyRollupParams{
|
||||
WorkspaceID: workspaceID,
|
||||
Since: since,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]DashboardUsageDailyResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DashboardUsageDailyResponse{
|
||||
Date: row.Date.Time.Format("2006-01-02"),
|
||||
Model: row.Model,
|
||||
InputTokens: row.InputTokens,
|
||||
OutputTokens: row.OutputTokens,
|
||||
CacheReadTokens: row.CacheReadTokens,
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
rows, err := h.Queries.ListDashboardUsageDaily(ctx, db.ListDashboardUsageDailyParams{
|
||||
WorkspaceID: workspaceID,
|
||||
Since: since,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]DashboardUsageDailyResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DashboardUsageDailyResponse{
|
||||
Date: row.Date.Time.Format("2006-01-02"),
|
||||
Model: row.Model,
|
||||
InputTokens: row.InputTokens,
|
||||
OutputTokens: row.OutputTokens,
|
||||
CacheReadTokens: row.CacheReadTokens,
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// DashboardUsageByAgentResponse is one (agent, model) row.
|
||||
type DashboardUsageByAgentResponse struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// GetDashboardUsageByAgent returns per-(agent, model) token aggregates for
|
||||
// the workspace, optionally scoped to a project. Switches to the rollup
|
||||
// table when UseDailyRollupForDashboard is on (same gating as the daily
|
||||
// endpoint above).
|
||||
func (h *Handler) GetDashboardUsageByAgent(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := h.resolveWorkspaceID(r)
|
||||
if _, ok := h.workspaceMember(w, r, workspaceID); !ok {
|
||||
return
|
||||
}
|
||||
projectID, ok := parseProjectIDParam(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
since := parseSinceParam(r, 30)
|
||||
|
||||
resp, err := h.listDashboardUsageByAgent(r.Context(), parseUUID(workspaceID), since, projectID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list usage by agent")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) listDashboardUsageByAgent(
|
||||
ctx context.Context,
|
||||
workspaceID pgtype.UUID,
|
||||
since pgtype.Timestamptz,
|
||||
projectID pgtype.UUID,
|
||||
) ([]DashboardUsageByAgentResponse, error) {
|
||||
if h.cfg.UseDailyRollupForDashboard {
|
||||
rows, err := h.Queries.ListDashboardUsageByAgentRollup(ctx, db.ListDashboardUsageByAgentRollupParams{
|
||||
WorkspaceID: workspaceID,
|
||||
Since: since,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]DashboardUsageByAgentResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DashboardUsageByAgentResponse{
|
||||
AgentID: uuidToString(row.AgentID),
|
||||
Model: row.Model,
|
||||
InputTokens: row.InputTokens,
|
||||
OutputTokens: row.OutputTokens,
|
||||
CacheReadTokens: row.CacheReadTokens,
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
rows, err := h.Queries.ListDashboardUsageByAgent(ctx, db.ListDashboardUsageByAgentParams{
|
||||
WorkspaceID: workspaceID,
|
||||
Since: since,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]DashboardUsageByAgentResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DashboardUsageByAgentResponse{
|
||||
AgentID: uuidToString(row.AgentID),
|
||||
Model: row.Model,
|
||||
InputTokens: row.InputTokens,
|
||||
OutputTokens: row.OutputTokens,
|
||||
CacheReadTokens: row.CacheReadTokens,
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// DashboardAgentRunTimeResponse is one agent's total terminal-task run time
|
||||
// over the window. Includes failed tasks so the dashboard can surface how
|
||||
// much execution time was spent on runs that didn't succeed.
|
||||
type DashboardAgentRunTimeResponse struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
TotalSeconds int64 `json:"total_seconds"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
FailedCount int32 `json:"failed_count"`
|
||||
}
|
||||
|
||||
// GetDashboardAgentRunTime returns per-agent total task run time (seconds)
|
||||
// and task counts for the workspace, optionally scoped to a project. Only
|
||||
// terminal tasks (completed or failed) with both started_at and
|
||||
// completed_at populated contribute, since queued/running tasks have no
|
||||
// finite duration.
|
||||
func (h *Handler) GetDashboardAgentRunTime(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := h.resolveWorkspaceID(r)
|
||||
if _, ok := h.workspaceMember(w, r, workspaceID); !ok {
|
||||
return
|
||||
}
|
||||
projectID, ok := parseProjectIDParam(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
since := parseSinceParam(r, 30)
|
||||
|
||||
rows, err := h.Queries.ListDashboardAgentRunTime(r.Context(), db.ListDashboardAgentRunTimeParams{
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
Since: since,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list agent runtime")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]DashboardAgentRunTimeResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DashboardAgentRunTimeResponse{
|
||||
AgentID: uuidToString(row.AgentID),
|
||||
TotalSeconds: row.TotalSeconds,
|
||||
TaskCount: row.TaskCount,
|
||||
FailedCount: row.FailedCount,
|
||||
}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
590
server/internal/handler/dashboard_test.go
Normal file
590
server/internal/handler/dashboard_test.go
Normal file
@@ -0,0 +1,590 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestDashboardEndpoints covers the workspace-dashboard rollups:
|
||||
// - daily token usage with and without project filter
|
||||
// - per-agent token usage with and without project filter
|
||||
// - per-agent run time
|
||||
//
|
||||
// Asserts that (1) tasks belonging to a project show up under the workspace
|
||||
// view, (2) the project filter excludes tasks tied to issues without a
|
||||
// matching project_id, and (3) run-time aggregation accumulates the
|
||||
// completed_at − started_at delta correctly.
|
||||
func TestDashboardEndpoints(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
var runtimeID, agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("fetch runtime: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
// Two issues: one bound to a project, one not.
|
||||
var projectID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO project (workspace_id, title)
|
||||
VALUES ($1, 'dashboard test project')
|
||||
RETURNING id
|
||||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||||
t.Fatalf("create project: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||||
|
||||
// issue.number is `UNIQUE (workspace_id, number)` (migration 020) and
|
||||
// defaults to 0. Two inserts into the same workspace would collide on the
|
||||
// default; allocate `MAX(number) + 1` per row to stay sequential and
|
||||
// avoid stepping on rows other tests have left behind in the shared
|
||||
// fixture workspace.
|
||||
mkIssue := func(withProject bool) string {
|
||||
var id string
|
||||
var pid any
|
||||
if withProject {
|
||||
pid = projectID
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||||
VALUES (
|
||||
$1, 'dashboard test', $2, 'member', $3,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1)
|
||||
)
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID, pid).Scan(&id); err != nil {
|
||||
t.Fatalf("insert issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, id) })
|
||||
return id
|
||||
}
|
||||
projectIssueID := mkIssue(true)
|
||||
otherIssueID := mkIssue(false)
|
||||
|
||||
now := time.Now().UTC()
|
||||
started := now.Add(-30 * time.Minute)
|
||||
completed := started.Add(10 * time.Minute) // 600s run
|
||||
|
||||
mkTaskWithUsage := func(issueID string, status string, tokens int64) {
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, started_at, completed_at, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, now())
|
||||
RETURNING id
|
||||
`, agentID, issueID, runtimeID, status, started, completed).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||||
VALUES ($1, 'claude', 'claude-3-5-sonnet', $2, 0, now())
|
||||
`, taskID, tokens); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
}
|
||||
|
||||
mkTaskWithUsage(projectIssueID, "completed", 1000)
|
||||
mkTaskWithUsage(otherIssueID, "completed", 500)
|
||||
|
||||
type dailyRow struct {
|
||||
Date string `json:"date"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
}
|
||||
type byAgentRow struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
}
|
||||
type runtimeRow struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
TotalSeconds int64 `json:"total_seconds"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// daily — workspace-wide
|
||||
{
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("daily ws: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []dailyRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||||
var total int64
|
||||
for _, r := range rows {
|
||||
if r.Model == "claude-3-5-sonnet" {
|
||||
total += r.InputTokens
|
||||
}
|
||||
}
|
||||
if total < 1500 {
|
||||
t.Errorf("daily ws: expected >=1500 tokens (1000+500), got %d", total)
|
||||
}
|
||||
}
|
||||
|
||||
// daily — project-scoped
|
||||
{
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1&project_id="+projectID, nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("daily project: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []dailyRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||||
var total int64
|
||||
for _, r := range rows {
|
||||
if r.Model == "claude-3-5-sonnet" {
|
||||
total += r.InputTokens
|
||||
}
|
||||
}
|
||||
// Project filter must exclude the 500-token "other" issue. Token total
|
||||
// for this project must be >= 1000 (our task) and < 1500 (would only
|
||||
// reach 1500 if filter leaked).
|
||||
if total < 1000 {
|
||||
t.Errorf("daily project: expected >=1000 tokens, got %d", total)
|
||||
}
|
||||
if total >= 1500 {
|
||||
t.Errorf("daily project: filter leaked — expected <1500 tokens, got %d", total)
|
||||
}
|
||||
}
|
||||
|
||||
// by-agent — project-scoped
|
||||
{
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1&project_id="+projectID, nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("by-agent project: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []byAgentRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||||
found := false
|
||||
for _, r := range rows {
|
||||
if r.AgentID == agentID && r.InputTokens >= 1000 {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("by-agent project: expected agent %s with >=1000 tokens; got %v", agentID, rows)
|
||||
}
|
||||
}
|
||||
|
||||
// agent-runtime — project-scoped
|
||||
{
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?days=1&project_id="+projectID, nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("agent-runtime: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []runtimeRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||||
var seconds int64
|
||||
var tasks int32
|
||||
for _, r := range rows {
|
||||
if r.AgentID == agentID {
|
||||
seconds += r.TotalSeconds
|
||||
tasks += r.TaskCount
|
||||
}
|
||||
}
|
||||
if tasks < 1 {
|
||||
t.Errorf("agent-runtime: expected >=1 task for agent, got %d", tasks)
|
||||
}
|
||||
if seconds < 600 {
|
||||
t.Errorf("agent-runtime: expected >=600s (one 10-minute run), got %d", seconds)
|
||||
}
|
||||
}
|
||||
|
||||
// agent-runtime — invalid project_id rejected
|
||||
{
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?project_id=not-a-uuid", nil))
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("agent-runtime: expected 400 for invalid uuid, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// Rollup path — run the dashboard window function, flip the feature
|
||||
// flag, and verify daily + by-agent reads come back with the same
|
||||
// project-filtered totals. The raw path above already passed, so this
|
||||
// validates that the rollup table mirrors the raw aggregation
|
||||
// (modulo project_id snapshot semantics, which match here since
|
||||
// nothing has changed since the rows were created).
|
||||
{
|
||||
// rollup the full window in one shot; same idempotent primitive
|
||||
// the cron path uses.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup window: %v", err)
|
||||
}
|
||||
origRollup := testHandler.cfg.UseDailyRollupForDashboard
|
||||
testHandler.cfg.UseDailyRollupForDashboard = true
|
||||
t.Cleanup(func() { testHandler.cfg.UseDailyRollupForDashboard = origRollup })
|
||||
|
||||
// daily — project-scoped through rollup
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1&project_id="+projectID, nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("rollup daily: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var dRows []dailyRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&dRows)
|
||||
var dTotal int64
|
||||
for _, r := range dRows {
|
||||
if r.Model == "claude-3-5-sonnet" {
|
||||
dTotal += r.InputTokens
|
||||
}
|
||||
}
|
||||
if dTotal < 1000 {
|
||||
t.Errorf("rollup daily project: expected >=1000 tokens, got %d", dTotal)
|
||||
}
|
||||
if dTotal >= 1500 {
|
||||
t.Errorf("rollup daily project: filter leaked — expected <1500, got %d", dTotal)
|
||||
}
|
||||
|
||||
// by-agent — workspace-wide through rollup
|
||||
w = httptest.NewRecorder()
|
||||
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("rollup by-agent: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var aRows []byAgentRow
|
||||
_ = json.NewDecoder(w.Body).Decode(&aRows)
|
||||
var aTotal int64
|
||||
for _, r := range aRows {
|
||||
if r.AgentID == agentID && r.Model == "claude-3-5-sonnet" {
|
||||
aTotal += r.InputTokens
|
||||
}
|
||||
}
|
||||
if aTotal < 1500 {
|
||||
t.Errorf("rollup by-agent: expected >=1500 tokens across workspace, got %d", aTotal)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestDashboardRollupReattributesOnProjectChange verifies the trigger that
|
||||
// fires on `UPDATE issue SET project_id` enqueues both old + new project
|
||||
// buckets so the next rollup tick re-attributes the affected tokens.
|
||||
// Uses the rollup window function directly to drain the dirty queue,
|
||||
// then asserts the rollup table reflects the new project_id.
|
||||
func TestDashboardRollupReattributesOnProjectChange(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
var runtimeID, agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("fetch runtime: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
mkProject := func(name string) string {
|
||||
var id string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO project (workspace_id, title) VALUES ($1, $2) RETURNING id
|
||||
`, testWorkspaceID, name).Scan(&id); err != nil {
|
||||
t.Fatalf("create project: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, id) })
|
||||
return id
|
||||
}
|
||||
projectA := mkProject("dashboard reattr A")
|
||||
projectB := mkProject("dashboard reattr B")
|
||||
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||||
VALUES ($1, 'reattr issue', $2, 'member', $3,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID, projectA).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
|
||||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 7777, 0, now())
|
||||
`, taskID); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
|
||||
// First rollup pass: tokens attributed to project A.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup A: %v", err)
|
||||
}
|
||||
var aTokens int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||||
`, testWorkspaceID, projectA, agentID).Scan(&aTokens); err != nil {
|
||||
t.Fatalf("read A rollup: %v", err)
|
||||
}
|
||||
if aTokens < 7777 {
|
||||
t.Fatalf("project A: expected >=7777 tokens after first rollup, got %d", aTokens)
|
||||
}
|
||||
|
||||
// Move the issue to project B. Trigger enqueues both A and B buckets.
|
||||
if _, err := testPool.Exec(ctx, `UPDATE issue SET project_id = $1 WHERE id = $2`, projectB, issueID); err != nil {
|
||||
t.Fatalf("reassign project: %v", err)
|
||||
}
|
||||
// Second rollup pass: A bucket drops to zero (deleted_empty), B
|
||||
// bucket gets the tokens.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup B: %v", err)
|
||||
}
|
||||
|
||||
var bTokens, aTokensAfter int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||||
`, testWorkspaceID, projectB, agentID).Scan(&bTokens); err != nil {
|
||||
t.Fatalf("read B rollup: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||||
`, testWorkspaceID, projectA, agentID).Scan(&aTokensAfter); err != nil {
|
||||
t.Fatalf("read A rollup after move: %v", err)
|
||||
}
|
||||
if bTokens < 7777 {
|
||||
t.Errorf("project B: expected >=7777 tokens after reassign + rollup, got %d", bTokens)
|
||||
}
|
||||
if aTokensAfter != 0 {
|
||||
t.Errorf("project A: expected 0 tokens after reassign + rollup, got %d", aTokensAfter)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDashboardRollupClearsOnIssueDelete verifies that deleting an issue
|
||||
// (which cascades to its tasks and task_usage rows) also clears the
|
||||
// dashboard rollup row attributed to that issue's project. The
|
||||
// `issue BEFORE DELETE` trigger has to fire ahead of the cascade so the
|
||||
// dirty queue captures the original project_id while the issue row is
|
||||
// still readable.
|
||||
func TestDashboardRollupClearsOnIssueDelete(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
var runtimeID, agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("fetch runtime: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
var projectID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard cascade test') RETURNING id
|
||||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||||
t.Fatalf("create project: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||||
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||||
VALUES ($1, 'cascade issue', $2, 'member', $3,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
// No t.Cleanup deleting the issue — that's what the test exercises.
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
|
||||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
// Don't bother cleaning up taskID either; cascade will take it.
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 4242, 0, now())
|
||||
`, taskID); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
|
||||
// First rollup: project bucket exists with 4242 tokens.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup before delete: %v", err)
|
||||
}
|
||||
var before int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2
|
||||
`, testWorkspaceID, projectID).Scan(&before); err != nil {
|
||||
t.Fatalf("read before: %v", err)
|
||||
}
|
||||
if before < 4242 {
|
||||
t.Fatalf("project bucket: expected >=4242 tokens before delete, got %d", before)
|
||||
}
|
||||
|
||||
// Delete the issue. Cascade removes atq + task_usage. The issue
|
||||
// BEFORE DELETE trigger should have enqueued the project bucket
|
||||
// before the cascade started.
|
||||
if _, err := testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID); err != nil {
|
||||
t.Fatalf("delete issue: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup after delete: %v", err)
|
||||
}
|
||||
var after int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2
|
||||
`, testWorkspaceID, projectID).Scan(&after); err != nil {
|
||||
t.Fatalf("read after: %v", err)
|
||||
}
|
||||
if after != 0 {
|
||||
t.Errorf("project bucket: expected 0 tokens after issue delete, got %d", after)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDashboardRollupReattributesOnLinkTaskToIssue verifies that
|
||||
// `LinkTaskToIssue` (which UPDATEs `agent_task_queue.issue_id` from NULL
|
||||
// to a real issue id) re-attributes existing rollup rows from the
|
||||
// no-project bucket to the linked issue's project bucket. Mirrors the
|
||||
// quick-create flow in `service.task.LinkTaskToIssue`.
|
||||
func TestDashboardRollupReattributesOnLinkTaskToIssue(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
var runtimeID, agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("fetch runtime: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
// Quick-create task: issue_id is NULL at creation time.
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, context, created_at)
|
||||
VALUES ($1, NULL, $2, 'completed', '{}'::jsonb, now()) RETURNING id
|
||||
`, agentID, runtimeID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert quick-create task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 1234, 0, now())
|
||||
`, taskID); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
|
||||
// First rollup: tokens attributed to the no-project bucket (NULL).
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup pre-link: %v", err)
|
||||
}
|
||||
var nullBefore int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
|
||||
`, testWorkspaceID, agentID).Scan(&nullBefore); err != nil {
|
||||
t.Fatalf("read NULL bucket pre-link: %v", err)
|
||||
}
|
||||
if nullBefore < 1234 {
|
||||
t.Fatalf("NULL bucket: expected >=1234 tokens pre-link, got %d", nullBefore)
|
||||
}
|
||||
|
||||
// Create a project + issue, then run the same UPDATE LinkTaskToIssue
|
||||
// uses. The atq trigger should enqueue OLD (NULL project) AND NEW
|
||||
// (the project's id) so the next rollup tick zeroes the NULL bucket
|
||||
// and populates the project bucket.
|
||||
var projectID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard link test') RETURNING id
|
||||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||||
t.Fatalf("create project: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||||
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||||
VALUES ($1, 'link test issue', $2, 'member', $3,
|
||||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||||
|
||||
// Mirror LinkTaskToIssue's UPDATE shape.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
UPDATE agent_task_queue SET issue_id = $1 WHERE id = $2 AND issue_id IS NULL
|
||||
`, issueID, taskID); err != nil {
|
||||
t.Fatalf("link task to issue: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_dashboard_daily_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup post-link: %v", err)
|
||||
}
|
||||
|
||||
var projectAfter, nullAfter int64
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||||
`, testWorkspaceID, projectID, agentID).Scan(&projectAfter); err != nil {
|
||||
t.Fatalf("read project bucket post-link: %v", err)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
|
||||
`, testWorkspaceID, agentID).Scan(&nullAfter); err != nil {
|
||||
t.Fatalf("read NULL bucket post-link: %v", err)
|
||||
}
|
||||
if projectAfter < 1234 {
|
||||
t.Errorf("project bucket: expected >=1234 tokens after link, got %d", projectAfter)
|
||||
}
|
||||
if nullAfter != 0 {
|
||||
t.Errorf("NULL bucket: expected 0 tokens after link, got %d", nullAfter)
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,17 @@ type Config struct {
|
||||
// 2) backfill_task_usage_daily ran successfully,
|
||||
// 3) cron job scheduled and task_usage_rollup_lag_seconds() < 900.
|
||||
UseDailyRollupForRuntimeUsage bool
|
||||
// UseDailyRollupForDashboard routes the workspace `/dashboard` page's
|
||||
// token-aggregation reads to `task_usage_dashboard_daily` (migration
|
||||
// 084). Mirrors UseDailyRollupForRuntimeUsage above with the same
|
||||
// fail-safe default (false → raw scan). Operators flip per
|
||||
// environment AFTER:
|
||||
// 1) migration 084 applied,
|
||||
// 2) `backfill_task_usage_dashboard_daily` succeeded and stamped
|
||||
// the dashboard rollup watermark,
|
||||
// 3) cron job scheduled (`rollup_task_usage_dashboard_daily`) and
|
||||
// `task_usage_dashboard_rollup_lag_seconds()` < 900.
|
||||
UseDailyRollupForDashboard bool
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
|
||||
@@ -63,7 +63,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"label": "Dashboard / workspace route segments",
|
||||
"label": "Workspace route segments",
|
||||
"description": "Reserving each segment name prevents `/{slug}/{view}` from being visually ambiguous (e.g. a workspace named `issues` would make `/issues/abc` mean two things). `workspaces` covers the global `/workspaces/new` workspace-creation page; `teams` is reserved for future team management.",
|
||||
"slugs": [
|
||||
"issues",
|
||||
@@ -72,6 +72,7 @@
|
||||
"agents",
|
||||
"inbox",
|
||||
"my-issues",
|
||||
"usage",
|
||||
"runtimes",
|
||||
"skills",
|
||||
"settings",
|
||||
|
||||
25
server/migrations/084_task_usage_dashboard_rollup.down.sql
Normal file
25
server/migrations/084_task_usage_dashboard_rollup.down.sql
Normal file
@@ -0,0 +1,25 @@
|
||||
DROP FUNCTION IF EXISTS task_usage_dashboard_rollup_lag_seconds();
|
||||
DROP FUNCTION IF EXISTS rollup_task_usage_dashboard_daily();
|
||||
DROP FUNCTION IF EXISTS rollup_task_usage_dashboard_daily_window(TIMESTAMPTZ, TIMESTAMPTZ);
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_issue_project_dirty_dashboard ON issue;
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_dashboard_dirty_for_issue_project();
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_tu_dirty_dashboard ON task_usage;
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_dashboard_dirty_for_tu();
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_issue_delete_dirty_dashboard ON issue;
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_dashboard_dirty_for_issue_delete();
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_atq_dirty_dashboard ON agent_task_queue;
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_dashboard_dirty_for_atq();
|
||||
|
||||
DROP INDEX IF EXISTS idx_task_usage_dashboard_dirty_enqueued_at;
|
||||
DROP TABLE IF EXISTS task_usage_dashboard_dirty;
|
||||
|
||||
DROP TABLE IF EXISTS task_usage_dashboard_rollup_state;
|
||||
|
||||
DROP INDEX IF EXISTS idx_task_usage_dashboard_daily_agent_date;
|
||||
DROP INDEX IF EXISTS idx_task_usage_dashboard_daily_project_date;
|
||||
DROP INDEX IF EXISTS idx_task_usage_dashboard_daily_workspace_date;
|
||||
DROP TABLE IF EXISTS task_usage_dashboard_daily;
|
||||
519
server/migrations/084_task_usage_dashboard_rollup.up.sql
Normal file
519
server/migrations/084_task_usage_dashboard_rollup.up.sql
Normal file
@@ -0,0 +1,519 @@
|
||||
-- Daily rollup table for the workspace `/{slug}/dashboard` page. Mirrors
|
||||
-- the per-runtime rollup in migration 073 (`task_usage_daily`) but indexes
|
||||
-- on a different set of dimensions:
|
||||
--
|
||||
-- (bucket_date, workspace_id, agent_id, project_id, model)
|
||||
--
|
||||
-- vs. the existing 073:
|
||||
--
|
||||
-- (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
--
|
||||
-- The dashboard queries don't filter by `runtime_id` and the per-runtime
|
||||
-- table doesn't carry `agent_id` / `project_id`, so we materialise a
|
||||
-- dedicated rollup instead of extending the existing one and balloonng
|
||||
-- its cardinality (`runtime × agent × project × model` per day vs.
|
||||
-- `runtime × model`). The cron entry point + dirty-queue invalidation
|
||||
-- pattern is otherwise identical.
|
||||
--
|
||||
-- `project_id` is the project at rollup time, snapshotted via
|
||||
-- `agent_task_queue.issue_id → issue.project_id`. Re-attribution after a
|
||||
-- user reassigns an issue's project lands via the `issue` trigger below;
|
||||
-- historical buckets that aren't touched again stay attributed where
|
||||
-- they were when the rollup ran. (Operator follow-up: re-run the backfill
|
||||
-- command on the affected window if a bulk project move needs to
|
||||
-- propagate to old data.)
|
||||
--
|
||||
-- `project_id` is nullable — issues without a project still produce
|
||||
-- usage rows. We use `UNIQUE NULLS NOT DISTINCT` (PG 15+) so NULL is
|
||||
-- treated as a single distinct value in the unique key, which lets
|
||||
-- `INSERT ... ON CONFLICT` upsert "no-project" buckets the same way it
|
||||
-- handles a specific project.
|
||||
--
|
||||
-- Bucket date is computed in **UTC**. The per-runtime rollup uses the
|
||||
-- runtime's tz (migration 082) because each row has a single runtime;
|
||||
-- this rollup aggregates *across* runtimes that may sit in different
|
||||
-- tzs, and there is no single correct local boundary. The dashboard
|
||||
-- frontend just renders the raw ISO date.
|
||||
CREATE TABLE task_usage_dashboard_daily (
|
||||
bucket_date DATE NOT NULL,
|
||||
workspace_id UUID NOT NULL,
|
||||
agent_id UUID NOT NULL,
|
||||
project_id UUID,
|
||||
model TEXT NOT NULL,
|
||||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
task_count BIGINT NOT NULL DEFAULT 0,
|
||||
event_count BIGINT NOT NULL DEFAULT 0,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
CONSTRAINT uq_task_usage_dashboard_daily_key
|
||||
UNIQUE NULLS NOT DISTINCT
|
||||
(bucket_date, workspace_id, agent_id, project_id, model)
|
||||
);
|
||||
|
||||
-- Workspace-wide reads (no project filter) hit this index.
|
||||
CREATE INDEX idx_task_usage_dashboard_daily_workspace_date
|
||||
ON task_usage_dashboard_daily (workspace_id, bucket_date DESC);
|
||||
|
||||
-- Per-project reads. Partial index because most queries either filter to
|
||||
-- one project or "all" — both extremes benefit from this layout.
|
||||
CREATE INDEX idx_task_usage_dashboard_daily_project_date
|
||||
ON task_usage_dashboard_daily (workspace_id, project_id, bucket_date DESC);
|
||||
|
||||
-- Per-agent reads (the "by agent" panel).
|
||||
CREATE INDEX idx_task_usage_dashboard_daily_agent_date
|
||||
ON task_usage_dashboard_daily (workspace_id, agent_id, bucket_date DESC);
|
||||
|
||||
CREATE TABLE task_usage_dashboard_rollup_state (
|
||||
id SMALLINT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
|
||||
watermark_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
|
||||
last_run_started_at TIMESTAMPTZ,
|
||||
last_run_finished_at TIMESTAMPTZ,
|
||||
last_run_rows BIGINT NOT NULL DEFAULT 0,
|
||||
last_error TEXT
|
||||
);
|
||||
INSERT INTO task_usage_dashboard_rollup_state (id) VALUES (1) ON CONFLICT DO NOTHING;
|
||||
|
||||
-- Dirty queue for invalidations the `updated_at` watermark can't see:
|
||||
-- * DELETE on `task_usage` (no row left for the watermark).
|
||||
-- * DELETE cascade through `agent_task_queue` (task_usage rows gone).
|
||||
-- * UPDATE of `issue.project_id` — moves the bucket to a new key.
|
||||
--
|
||||
-- bucket key matches the rollup table so the queue can be UNIONed into
|
||||
-- `dirty_keys` in the window function with no translation. project_id is
|
||||
-- nullable here for the same reason as in the rollup table.
|
||||
CREATE TABLE task_usage_dashboard_dirty (
|
||||
bucket_date DATE NOT NULL,
|
||||
workspace_id UUID NOT NULL,
|
||||
agent_id UUID NOT NULL,
|
||||
project_id UUID,
|
||||
model TEXT NOT NULL,
|
||||
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
CONSTRAINT uq_task_usage_dashboard_dirty_key
|
||||
UNIQUE NULLS NOT DISTINCT
|
||||
(bucket_date, workspace_id, agent_id, project_id, model)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_task_usage_dashboard_dirty_enqueued_at
|
||||
ON task_usage_dashboard_dirty (enqueued_at);
|
||||
|
||||
-- Trigger 1: agent_task_queue BEFORE UPDATE OF issue_id OR DELETE.
|
||||
--
|
||||
-- Two cases:
|
||||
--
|
||||
-- * UPDATE OF issue_id — currently only `LinkTaskToIssue` (quick-create
|
||||
-- tasks attaching to the issue the agent just produced) writes here,
|
||||
-- moving the task from `issue_id IS NULL` to a real issue. If usage
|
||||
-- already rolled up under the no-project bucket, we have to enqueue
|
||||
-- both OLD (NULL project) AND NEW (the new issue's project) so the
|
||||
-- next tick re-attributes the tokens.
|
||||
--
|
||||
-- * DELETE — direct atq deletions land here with the issue row still
|
||||
-- alive, so `LEFT JOIN issue` resolves the project correctly.
|
||||
-- Cascade DELETE driven from `DELETE FROM issue` is handled by
|
||||
-- `enqueue_task_usage_dashboard_dirty_for_issue_delete` below (which
|
||||
-- fires *before* the cascade, while `issue.project_id` is still
|
||||
-- readable); this trigger may also fire during that cascade, but the
|
||||
-- join returns no row → workspace_id is missing, so the JOIN on
|
||||
-- `agent` keeps the enqueue safe and the resulting NULL-project key
|
||||
-- no-ops on recompute (deleted_empty path).
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
IF TG_OP = 'UPDATE' THEN
|
||||
IF OLD.issue_id IS DISTINCT FROM NEW.issue_id THEN
|
||||
-- OLD side: project_id of OLD.issue_id (NULL when OLD.issue_id IS NULL,
|
||||
-- e.g. the quick-create starting state).
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
a.workspace_id,
|
||||
OLD.agent_id,
|
||||
i_old.project_id,
|
||||
tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = OLD.agent_id
|
||||
LEFT JOIN issue i_old ON i_old.id = OLD.issue_id
|
||||
WHERE tu.task_id = OLD.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
|
||||
-- NEW side: project_id of NEW.issue_id.
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
a.workspace_id,
|
||||
NEW.agent_id,
|
||||
i_new.project_id,
|
||||
tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = NEW.agent_id
|
||||
LEFT JOIN issue i_new ON i_new.id = NEW.issue_id
|
||||
WHERE tu.task_id = NEW.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
a.workspace_id,
|
||||
OLD.agent_id,
|
||||
i.project_id,
|
||||
tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = OLD.agent_id
|
||||
LEFT JOIN issue i ON i.id = OLD.issue_id
|
||||
WHERE tu.task_id = OLD.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_atq_dirty_dashboard
|
||||
BEFORE UPDATE OF issue_id OR DELETE ON agent_task_queue
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq();
|
||||
|
||||
-- Trigger 1b: issue BEFORE DELETE.
|
||||
--
|
||||
-- `DELETE FROM issue` cascades to `agent_task_queue` and onward to
|
||||
-- `task_usage`. By the time the atq BEFORE DELETE trigger runs, the
|
||||
-- issue row is gone and `LEFT JOIN issue` returns NULL for project_id,
|
||||
-- so the atq trigger would enqueue a NULL-project key — the rollup row
|
||||
-- under the original project would never get cleared and would keep
|
||||
-- billing the workspace for tokens that no longer have a source.
|
||||
--
|
||||
-- This trigger fires BEFORE the cascade, while `OLD.project_id` is still
|
||||
-- readable, and enqueues one dirty row per (date, agent, model) the
|
||||
-- issue's tasks contributed to. The next rollup tick recomputes the
|
||||
-- bucket, finds no source rows under the original project, and drops it
|
||||
-- (deleted_empty path).
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
OLD.workspace_id,
|
||||
atq.agent_id,
|
||||
OLD.project_id,
|
||||
tu.model
|
||||
FROM agent_task_queue atq
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
WHERE atq.issue_id = OLD.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_issue_delete_dirty_dashboard
|
||||
BEFORE DELETE ON issue
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete();
|
||||
|
||||
-- Trigger 2: task_usage BEFORE DELETE.
|
||||
-- Rare in practice (no direct DELETE call sites today) but ensures the
|
||||
-- rollup converges if one is added. UPDATE / INSERT are caught by the
|
||||
-- updated_at watermark in the window function.
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT
|
||||
DATE(OLD.created_at),
|
||||
a.workspace_id,
|
||||
atq.agent_id,
|
||||
i.project_id,
|
||||
OLD.model
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE atq.id = OLD.task_id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_tu_dirty_dashboard
|
||||
BEFORE DELETE ON task_usage
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu();
|
||||
|
||||
-- Trigger 3: issue BEFORE UPDATE OF project_id.
|
||||
-- Re-attribute every (date, agent, model) bucket touched by tasks under
|
||||
-- this issue: enqueue OLD project_id (so it stops claiming the tokens)
|
||||
-- AND NEW project_id (so it picks them up). Both go through the same
|
||||
-- dirty queue; the window function recomputes from the live join in
|
||||
-- recomputed CTE, which now sees NEW.project_id, so the OLD bucket
|
||||
-- either drops to 0 (deleted_empty path) or shrinks.
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
IF OLD.project_id IS DISTINCT FROM NEW.project_id THEN
|
||||
-- OLD project buckets.
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
NEW.workspace_id,
|
||||
atq.agent_id,
|
||||
OLD.project_id,
|
||||
tu.model
|
||||
FROM agent_task_queue atq
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
WHERE atq.issue_id = NEW.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
|
||||
-- NEW project buckets.
|
||||
INSERT INTO task_usage_dashboard_dirty (
|
||||
bucket_date, workspace_id, agent_id, project_id, model
|
||||
)
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at),
|
||||
NEW.workspace_id,
|
||||
atq.agent_id,
|
||||
NEW.project_id,
|
||||
tu.model
|
||||
FROM agent_task_queue atq
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
WHERE atq.issue_id = NEW.id
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_issue_project_dirty_dashboard
|
||||
BEFORE UPDATE OF project_id ON issue
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project();
|
||||
|
||||
-- Window function. Same shape as 077's rollup_task_usage_daily_window:
|
||||
-- 1. Discover dirty keys from updated_at watermark + the explicit queue.
|
||||
-- 2. Recompute each from raw via the agent_task_queue + issue join.
|
||||
-- 3. Upsert present buckets, delete buckets that recomputed to nothing.
|
||||
-- 4. Drain the queue rows whose enqueued_at < p_to.
|
||||
--
|
||||
-- IDEMPOTENCY: re-running the same window yields the same final state,
|
||||
-- because each touched key is rebuilt from raw, not deltaed.
|
||||
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily_window(
|
||||
p_from TIMESTAMPTZ,
|
||||
p_to TIMESTAMPTZ
|
||||
)
|
||||
RETURNS BIGINT
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_rows BIGINT;
|
||||
BEGIN
|
||||
IF p_from >= p_to THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
WITH
|
||||
dirty_from_updates AS (
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at) AS bucket_date,
|
||||
a.workspace_id AS workspace_id,
|
||||
atq.agent_id AS agent_id,
|
||||
i.project_id AS project_id,
|
||||
tu.model AS model
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE (
|
||||
(tu.updated_at >= p_from AND tu.updated_at < p_to)
|
||||
-- Legacy `updated_at IS NULL` rows: 077 handles this via a
|
||||
-- partial index. We rely on the same `idx_task_usage_created_at_legacy`
|
||||
-- (migration 078) for this branch.
|
||||
OR (tu.updated_at IS NULL
|
||||
AND tu.created_at >= p_from
|
||||
AND tu.created_at < p_to)
|
||||
)
|
||||
),
|
||||
dirty_from_queue AS (
|
||||
SELECT bucket_date, workspace_id, agent_id, project_id, model
|
||||
FROM task_usage_dashboard_dirty
|
||||
WHERE enqueued_at < p_to
|
||||
),
|
||||
dirty_keys AS (
|
||||
SELECT * FROM dirty_from_updates
|
||||
UNION
|
||||
SELECT * FROM dirty_from_queue
|
||||
),
|
||||
recomputed AS (
|
||||
SELECT
|
||||
dk.bucket_date,
|
||||
dk.workspace_id,
|
||||
dk.agent_id,
|
||||
dk.project_id,
|
||||
dk.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::bigint AS task_count,
|
||||
COUNT(*)::bigint AS event_count
|
||||
FROM dirty_keys dk
|
||||
JOIN agent_task_queue atq ON atq.agent_id = dk.agent_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
AND a.workspace_id = dk.workspace_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
AND tu.model = dk.model
|
||||
AND DATE(tu.created_at) = dk.bucket_date
|
||||
WHERE (i.project_id IS NOT DISTINCT FROM dk.project_id)
|
||||
GROUP BY 1, 2, 3, 4, 5
|
||||
),
|
||||
upserted AS (
|
||||
INSERT INTO task_usage_dashboard_daily AS d (
|
||||
bucket_date, workspace_id, agent_id, project_id, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
task_count, event_count
|
||||
)
|
||||
SELECT
|
||||
bucket_date, workspace_id, agent_id, project_id, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
task_count, event_count
|
||||
FROM recomputed
|
||||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_daily_key DO UPDATE
|
||||
SET input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
task_count = EXCLUDED.task_count,
|
||||
event_count = EXCLUDED.event_count,
|
||||
updated_at = now()
|
||||
RETURNING 1
|
||||
),
|
||||
deleted_empty AS (
|
||||
DELETE FROM task_usage_dashboard_daily d
|
||||
USING dirty_keys dk
|
||||
WHERE d.bucket_date = dk.bucket_date
|
||||
AND d.workspace_id = dk.workspace_id
|
||||
AND d.agent_id = dk.agent_id
|
||||
AND d.project_id IS NOT DISTINCT FROM dk.project_id
|
||||
AND d.model = dk.model
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM recomputed r
|
||||
WHERE r.bucket_date = dk.bucket_date
|
||||
AND r.workspace_id = dk.workspace_id
|
||||
AND r.agent_id = dk.agent_id
|
||||
AND r.project_id IS NOT DISTINCT FROM dk.project_id
|
||||
AND r.model = dk.model
|
||||
)
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
|
||||
INTO v_rows;
|
||||
|
||||
DELETE FROM task_usage_dashboard_dirty WHERE enqueued_at < p_to;
|
||||
|
||||
RETURN v_rows;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Cron entry. Mirrors `rollup_task_usage_daily` (migration 073) — same
|
||||
-- advisory-lock + watermark + error-recovery shape. Uses lock id 4244
|
||||
-- so it serialises independently of the per-runtime rollup (4242).
|
||||
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily()
|
||||
RETURNS BIGINT
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_lock_ok BOOLEAN;
|
||||
v_from TIMESTAMPTZ;
|
||||
v_to TIMESTAMPTZ;
|
||||
v_rows BIGINT := 0;
|
||||
BEGIN
|
||||
SELECT pg_try_advisory_lock(4244) INTO v_lock_ok;
|
||||
IF NOT v_lock_ok THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
BEGIN
|
||||
UPDATE task_usage_dashboard_rollup_state
|
||||
SET last_run_started_at = now(),
|
||||
last_error = NULL
|
||||
WHERE id = 1
|
||||
RETURNING watermark_at INTO v_from;
|
||||
|
||||
v_to := now() - INTERVAL '5 minutes';
|
||||
|
||||
IF v_from < v_to THEN
|
||||
v_rows := rollup_task_usage_dashboard_daily_window(v_from, v_to);
|
||||
|
||||
UPDATE task_usage_dashboard_rollup_state
|
||||
SET watermark_at = v_to,
|
||||
last_run_finished_at = now(),
|
||||
last_run_rows = v_rows
|
||||
WHERE id = 1;
|
||||
ELSE
|
||||
UPDATE task_usage_dashboard_rollup_state
|
||||
SET last_run_finished_at = now(),
|
||||
last_run_rows = 0
|
||||
WHERE id = 1;
|
||||
END IF;
|
||||
|
||||
PERFORM pg_advisory_unlock(4244);
|
||||
RETURN v_rows;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
UPDATE task_usage_dashboard_rollup_state
|
||||
SET last_error = SQLERRM,
|
||||
last_run_finished_at = now()
|
||||
WHERE id = 1;
|
||||
PERFORM pg_advisory_unlock(4244);
|
||||
RAISE;
|
||||
END;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Health-check helper mirroring task_usage_rollup_lag_seconds() (076).
|
||||
-- Alert via monitoring on NULL > 15 min after deploy, or value > 900s.
|
||||
CREATE OR REPLACE FUNCTION task_usage_dashboard_rollup_lag_seconds()
|
||||
RETURNS DOUBLE PRECISION
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $$
|
||||
SELECT EXTRACT(EPOCH FROM (now() - last_run_finished_at))
|
||||
FROM task_usage_dashboard_rollup_state
|
||||
WHERE id = 1;
|
||||
$$;
|
||||
|
||||
-- NOTE: cron job is NOT scheduled by this migration — same convention as
|
||||
-- 076 for the per-runtime rollup. Operator playbook:
|
||||
-- 1) Apply migrations through 084.
|
||||
-- 2) Run `go run ./cmd/backfill_task_usage_dashboard_daily`.
|
||||
-- 3) Set USAGE_DASHBOARD_ROLLUP_ENABLED=true on the API and roll out.
|
||||
-- 4) As superuser:
|
||||
-- SELECT cron.schedule(
|
||||
-- 'rollup_task_usage_dashboard_daily',
|
||||
-- '*/5 * * * *',
|
||||
-- $$SELECT rollup_task_usage_dashboard_daily()$$
|
||||
-- );
|
||||
@@ -496,6 +496,39 @@ type TaskUsageDailyDirty struct {
|
||||
EnqueuedAt pgtype.Timestamptz `json:"enqueued_at"`
|
||||
}
|
||||
|
||||
type TaskUsageDashboardDaily struct {
|
||||
BucketDate pgtype.Date `json:"bucket_date"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int64 `json:"task_count"`
|
||||
EventCount int64 `json:"event_count"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type TaskUsageDashboardDirty struct {
|
||||
BucketDate pgtype.Date `json:"bucket_date"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
Model string `json:"model"`
|
||||
EnqueuedAt pgtype.Timestamptz `json:"enqueued_at"`
|
||||
}
|
||||
|
||||
type TaskUsageDashboardRollupState struct {
|
||||
ID int16 `json:"id"`
|
||||
WatermarkAt pgtype.Timestamptz `json:"watermark_at"`
|
||||
LastRunStartedAt pgtype.Timestamptz `json:"last_run_started_at"`
|
||||
LastRunFinishedAt pgtype.Timestamptz `json:"last_run_finished_at"`
|
||||
LastRunRows int64 `json:"last_run_rows"`
|
||||
LastError pgtype.Text `json:"last_error"`
|
||||
}
|
||||
|
||||
type TaskUsageRollupState struct {
|
||||
ID int16 `json:"id"`
|
||||
WatermarkAt pgtype.Timestamptz `json:"watermark_at"`
|
||||
|
||||
@@ -206,6 +206,342 @@ func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspace
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listDashboardAgentRunTime = `-- name: ListDashboardAgentRunTime :many
|
||||
SELECT
|
||||
atq.agent_id,
|
||||
COALESCE(
|
||||
SUM(EXTRACT(EPOCH FROM (atq.completed_at - atq.started_at)))::bigint,
|
||||
0
|
||||
)::bigint AS total_seconds,
|
||||
COUNT(*)::int AS task_count,
|
||||
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.status IN ('completed', 'failed')
|
||||
AND atq.started_at IS NOT NULL
|
||||
AND atq.completed_at IS NOT NULL
|
||||
AND atq.completed_at >= DATE_TRUNC('day', $2::timestamptz)
|
||||
AND ($3::uuid IS NULL OR i.project_id = $3)
|
||||
GROUP BY atq.agent_id
|
||||
ORDER BY total_seconds DESC
|
||||
`
|
||||
|
||||
type ListDashboardAgentRunTimeParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
}
|
||||
|
||||
type ListDashboardAgentRunTimeRow struct {
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
TotalSeconds int64 `json:"total_seconds"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
FailedCount int32 `json:"failed_count"`
|
||||
}
|
||||
|
||||
// Per-agent total task run time and task count for the workspace, optionally
|
||||
// scoped to a single project. Counts only terminal runs (completed or failed)
|
||||
// with both started_at and completed_at populated — queued/running tasks have
|
||||
// no finite duration. Anchored on completed_at so the window matches the
|
||||
// token cost window (which is anchored on tu.created_at, ~= completion time).
|
||||
func (q *Queries) ListDashboardAgentRunTime(ctx context.Context, arg ListDashboardAgentRunTimeParams) ([]ListDashboardAgentRunTimeRow, error) {
|
||||
rows, err := q.db.Query(ctx, listDashboardAgentRunTime, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListDashboardAgentRunTimeRow{}
|
||||
for rows.Next() {
|
||||
var i ListDashboardAgentRunTimeRow
|
||||
if err := rows.Scan(
|
||||
&i.AgentID,
|
||||
&i.TotalSeconds,
|
||||
&i.TaskCount,
|
||||
&i.FailedCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listDashboardUsageByAgent = `-- name: ListDashboardUsageByAgent :many
|
||||
SELECT
|
||||
atq.agent_id,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
||||
AND ($3::uuid IS NULL OR i.project_id = $3)
|
||||
GROUP BY atq.agent_id, tu.model
|
||||
ORDER BY atq.agent_id, tu.model
|
||||
`
|
||||
|
||||
type ListDashboardUsageByAgentParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
}
|
||||
|
||||
type ListDashboardUsageByAgentRow struct {
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// Per-(agent, model) token aggregates for the workspace, optionally scoped
|
||||
// to a single project. Model dimension is preserved so the client can
|
||||
// compute cost from its per-model pricing table; the client folds rows by
|
||||
// agent for the "by agent" list on the dashboard.
|
||||
func (q *Queries) ListDashboardUsageByAgent(ctx context.Context, arg ListDashboardUsageByAgentParams) ([]ListDashboardUsageByAgentRow, error) {
|
||||
rows, err := q.db.Query(ctx, listDashboardUsageByAgent, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListDashboardUsageByAgentRow{}
|
||||
for rows.Next() {
|
||||
var i ListDashboardUsageByAgentRow
|
||||
if err := rows.Scan(
|
||||
&i.AgentID,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listDashboardUsageByAgentRollup = `-- name: ListDashboardUsageByAgentRollup :many
|
||||
SELECT
|
||||
agent_id,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
SUM(task_count)::int AS task_count
|
||||
FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1
|
||||
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
|
||||
AND ($3::uuid IS NULL OR project_id = $3)
|
||||
GROUP BY agent_id, model
|
||||
ORDER BY agent_id, model
|
||||
`
|
||||
|
||||
type ListDashboardUsageByAgentRollupParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
}
|
||||
|
||||
type ListDashboardUsageByAgentRollupRow struct {
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// Per-(agent, model) token rollup from `task_usage_dashboard_daily`.
|
||||
// task_count here is the SUM of per-bucket distinct counts; one task that
|
||||
// spans multiple days lands in multiple buckets, so this can over-count
|
||||
// by date. The frontend prefers `ListDashboardAgentRunTime`'s per-agent
|
||||
// distinct figure for the user-facing "tasks" column, so this value is
|
||||
// informational only.
|
||||
func (q *Queries) ListDashboardUsageByAgentRollup(ctx context.Context, arg ListDashboardUsageByAgentRollupParams) ([]ListDashboardUsageByAgentRollupRow, error) {
|
||||
rows, err := q.db.Query(ctx, listDashboardUsageByAgentRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListDashboardUsageByAgentRollupRow{}
|
||||
for rows.Next() {
|
||||
var i ListDashboardUsageByAgentRollupRow
|
||||
if err := rows.Scan(
|
||||
&i.AgentID,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listDashboardUsageDaily = `-- name: ListDashboardUsageDaily :many
|
||||
SELECT
|
||||
DATE(tu.created_at) AS date,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
||||
AND ($3::uuid IS NULL OR i.project_id = $3)
|
||||
GROUP BY DATE(tu.created_at), tu.model
|
||||
ORDER BY DATE(tu.created_at) DESC, tu.model
|
||||
`
|
||||
|
||||
type ListDashboardUsageDailyParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
}
|
||||
|
||||
type ListDashboardUsageDailyRow struct {
|
||||
Date pgtype.Date `json:"date"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// Daily per-(date, model) token aggregates for the workspace, optionally
|
||||
// scoped to a single project via sqlc.narg('project_id'). Bucketed by
|
||||
// tu.created_at (token-production time) to match GetWorkspaceUsageByDay,
|
||||
// so a task that queues one day and finishes the next is attributed to
|
||||
// the day the tokens actually landed. Powers the workspace dashboard's
|
||||
// daily cost chart.
|
||||
func (q *Queries) ListDashboardUsageDaily(ctx context.Context, arg ListDashboardUsageDailyParams) ([]ListDashboardUsageDailyRow, error) {
|
||||
rows, err := q.db.Query(ctx, listDashboardUsageDaily, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListDashboardUsageDailyRow{}
|
||||
for rows.Next() {
|
||||
var i ListDashboardUsageDailyRow
|
||||
if err := rows.Scan(
|
||||
&i.Date,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listDashboardUsageDailyRollup = `-- name: ListDashboardUsageDailyRollup :many
|
||||
SELECT
|
||||
bucket_date AS date,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
SUM(task_count)::int AS task_count
|
||||
FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1
|
||||
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
|
||||
AND ($3::uuid IS NULL OR project_id = $3)
|
||||
GROUP BY bucket_date, model
|
||||
ORDER BY bucket_date DESC, model
|
||||
`
|
||||
|
||||
type ListDashboardUsageDailyRollupParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
ProjectID pgtype.UUID `json:"project_id"`
|
||||
}
|
||||
|
||||
type ListDashboardUsageDailyRollupRow struct {
|
||||
Date pgtype.Date `json:"date"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
// Daily token rollup, served from `task_usage_dashboard_daily` (migration
|
||||
// 084). Same wire shape as ListDashboardUsageDaily so the handler can
|
||||
// swap them on the `UseDailyRollupForDashboard` flag with no other
|
||||
// changes. The rollup is up to ~10 min stale (5 min cron + 5 min lag),
|
||||
// which is fine for a dashboard read path.
|
||||
func (q *Queries) ListDashboardUsageDailyRollup(ctx context.Context, arg ListDashboardUsageDailyRollupParams) ([]ListDashboardUsageDailyRollupRow, error) {
|
||||
rows, err := q.db.Query(ctx, listDashboardUsageDailyRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListDashboardUsageDailyRollupRow{}
|
||||
for rows.Next() {
|
||||
var i ListDashboardUsageDailyRollupRow
|
||||
if err := rows.Scan(
|
||||
&i.Date,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
|
||||
|
||||
@@ -67,3 +67,120 @@ SELECT
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
WHERE atq.issue_id = $1;
|
||||
|
||||
-- name: ListDashboardUsageDaily :many
|
||||
-- Daily per-(date, model) token aggregates for the workspace, optionally
|
||||
-- scoped to a single project via sqlc.narg('project_id'). Bucketed by
|
||||
-- tu.created_at (token-production time) to match GetWorkspaceUsageByDay,
|
||||
-- so a task that queues one day and finishes the next is attributed to
|
||||
-- the day the tokens actually landed. Powers the workspace dashboard's
|
||||
-- daily cost chart.
|
||||
SELECT
|
||||
DATE(tu.created_at) AS date,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', @since::timestamptz)
|
||||
AND (sqlc.narg('project_id')::uuid IS NULL OR i.project_id = sqlc.narg('project_id'))
|
||||
GROUP BY DATE(tu.created_at), tu.model
|
||||
ORDER BY DATE(tu.created_at) DESC, tu.model;
|
||||
|
||||
-- name: ListDashboardUsageByAgent :many
|
||||
-- Per-(agent, model) token aggregates for the workspace, optionally scoped
|
||||
-- to a single project. Model dimension is preserved so the client can
|
||||
-- compute cost from its per-model pricing table; the client folds rows by
|
||||
-- agent for the "by agent" list on the dashboard.
|
||||
SELECT
|
||||
atq.agent_id,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', @since::timestamptz)
|
||||
AND (sqlc.narg('project_id')::uuid IS NULL OR i.project_id = sqlc.narg('project_id'))
|
||||
GROUP BY atq.agent_id, tu.model
|
||||
ORDER BY atq.agent_id, tu.model;
|
||||
|
||||
-- name: ListDashboardUsageDailyRollup :many
|
||||
-- Daily token rollup, served from `task_usage_dashboard_daily` (migration
|
||||
-- 084). Same wire shape as ListDashboardUsageDaily so the handler can
|
||||
-- swap them on the `UseDailyRollupForDashboard` flag with no other
|
||||
-- changes. The rollup is up to ~10 min stale (5 min cron + 5 min lag),
|
||||
-- which is fine for a dashboard read path.
|
||||
SELECT
|
||||
bucket_date AS date,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
SUM(task_count)::int AS task_count
|
||||
FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1
|
||||
AND bucket_date >= DATE_TRUNC('day', @since::timestamptz)::date
|
||||
AND (sqlc.narg('project_id')::uuid IS NULL OR project_id = sqlc.narg('project_id'))
|
||||
GROUP BY bucket_date, model
|
||||
ORDER BY bucket_date DESC, model;
|
||||
|
||||
-- name: ListDashboardUsageByAgentRollup :many
|
||||
-- Per-(agent, model) token rollup from `task_usage_dashboard_daily`.
|
||||
-- task_count here is the SUM of per-bucket distinct counts; one task that
|
||||
-- spans multiple days lands in multiple buckets, so this can over-count
|
||||
-- by date. The frontend prefers `ListDashboardAgentRunTime`'s per-agent
|
||||
-- distinct figure for the user-facing "tasks" column, so this value is
|
||||
-- informational only.
|
||||
SELECT
|
||||
agent_id,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
SUM(task_count)::int AS task_count
|
||||
FROM task_usage_dashboard_daily
|
||||
WHERE workspace_id = $1
|
||||
AND bucket_date >= DATE_TRUNC('day', @since::timestamptz)::date
|
||||
AND (sqlc.narg('project_id')::uuid IS NULL OR project_id = sqlc.narg('project_id'))
|
||||
GROUP BY agent_id, model
|
||||
ORDER BY agent_id, model;
|
||||
|
||||
-- name: ListDashboardAgentRunTime :many
|
||||
-- Per-agent total task run time and task count for the workspace, optionally
|
||||
-- scoped to a single project. Counts only terminal runs (completed or failed)
|
||||
-- with both started_at and completed_at populated — queued/running tasks have
|
||||
-- no finite duration. Anchored on completed_at so the window matches the
|
||||
-- token cost window (which is anchored on tu.created_at, ~= completion time).
|
||||
SELECT
|
||||
atq.agent_id,
|
||||
COALESCE(
|
||||
SUM(EXTRACT(EPOCH FROM (atq.completed_at - atq.started_at)))::bigint,
|
||||
0
|
||||
)::bigint AS total_seconds,
|
||||
COUNT(*)::int AS task_count,
|
||||
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.status IN ('completed', 'failed')
|
||||
AND atq.started_at IS NOT NULL
|
||||
AND atq.completed_at IS NOT NULL
|
||||
AND atq.completed_at >= DATE_TRUNC('day', @since::timestamptz)
|
||||
AND (sqlc.narg('project_id')::uuid IS NULL OR i.project_id = sqlc.narg('project_id'))
|
||||
GROUP BY atq.agent_id
|
||||
ORDER BY total_seconds DESC;
|
||||
|
||||
Reference in New Issue
Block a user