mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
* feat(dashboard): workspace/project token + run-time dashboard
Add a `/{slug}/dashboard` page showing per-agent token spend and execution
time across the whole workspace, with an optional project filter.
Backend:
- Three new sqlc queries against task_usage + agent_task_queue: daily
usage, per-agent usage, per-agent total run-time. All optionally
scoped to a project via sqlc.narg('project_id'), reaching project
through the issue join.
- Handlers under /api/dashboard return the same wire shape the runtime
page already consumes (model preserved for client-side cost math).
Frontend: - Shared DashboardPage in packages/views/dashboard reusing KpiCard,
DailyCostChart, ActorAvatar, and estimateCost from the runtime page
so the visual style and pricing math stay in lock-step.
- Period selector (7/30/90d), project dropdown, four KPI tiles
(cost, tokens, run time, tasks), daily cost chart, and a combined
"cost + run time by agent" list.
- Routed in both web (app/[slug]/(dashboard)/dashboard) and desktop
(memory router); sidebar nav entry added under Workspace group.
Co-authored-by: multica-agent <github@multica.ai>
* fix(dashboard): drop stale project filter and stop double-counting tasks
Two issues caught in PR #2462 review:
1. Project filter held the previous selection's UUID across workspace
switches and project deletions: the dropdown gracefully showed
"All projects" (because the title lookup missed) while the three
dashboard queries kept forwarding the dead UUID, leaving the UI
looking like a full-workspace view but populated with empty
project-scoped data. Validate the picked UUID against the current
projects list before passing it to the queries.
2. The "by agent" table read its task count from the token rollup,
which is grouped per (agent, model). A single task that spans two
models lands twice and the agent's row reads e.g. "2 tasks" when
the real count is 1. Prefer `ListDashboardAgentRunTime`'s per-agent
distinct count when available; fall back to the token aggregate
only for agents with no terminal run yet (in-flight tasks).
Extract the merge into `mergeAgentDashboardRows` so the precedence
rules are unit-tested directly.
Co-authored-by: multica-agent <github@multica.ai>
* test(dashboard): allocate per-workspace issue.number explicitly
TestDashboardEndpoints creates two issues in the shared fixture
workspace. issue.number defaults to 0 (migration 020), and the table
carries UNIQUE (workspace_id, number), so the second insert raced the
first on the same default and failed in CI.
Allocate MAX(number) + 1 per insert so each row gets a fresh number
without stepping on rows other tests left behind in the same workspace.
Co-authored-by: multica-agent <github@multica.ai>
* feat(dashboard): rollup table + cron-driven aggregation for dashboard
Mirror the per-runtime rollup in `task_usage_daily` (migrations 073/077/082)
to remove the per-request raw aggregation the dashboard was doing.
Migration 084 adds:
- `task_usage_dashboard_daily` keyed on
(bucket_date, workspace_id, agent_id, project_id, model) — the
dimensions the dashboard actually queries, with project_id nullable
via UNIQUE NULLS NOT DISTINCT (PG15+) so "no-project" buckets
upsert cleanly.
- `task_usage_dashboard_rollup_state` watermark table.
- `task_usage_dashboard_dirty` invalidation queue.
- Triggers on agent_task_queue DELETE, task_usage DELETE, and
issue.project_id UPDATE — the cases the updated_at watermark can't
see. The project_id trigger re-attributes existing rollup rows when
a user moves an issue across projects.
- `rollup_task_usage_dashboard_daily_window(from, to)` —
idempotent recompute primitive (same shape as 077).
- `rollup_task_usage_dashboard_daily()` cron entry — own advisory
lock (4244) so it serialises independently of the runtime rollup.
- `task_usage_dashboard_rollup_lag_seconds()` health helper.
Sqlc queries `ListDashboardUsageDailyRollup` /
`ListDashboardUsageByAgentRollup` read from the new table; the handler
dispatches between rollup and raw on a separate
`UseDailyRollupForDashboard` config flag
(`USAGE_DASHBOARD_ROLLUP_ENABLED` env). Same fail-safe default (false →
raw) so operators can roll out independently of the per-runtime flag.
Bucket date is UTC (the dashboard aggregates across runtimes that may
sit in different tzs; there's no single correct local boundary).
Adds `cmd/backfill_task_usage_dashboard_daily` mirroring the existing
per-runtime backfill — operator runs it once before flipping the flag.
Tests: - TestDashboardEndpoints now also exercises the rollup read path
(raw vs. rollup, same project-scoped totals).
- TestDashboardRollupReattributesOnProjectChange verifies the
issue.project_id trigger enqueues both old + new buckets and the
next rollup tick zeroes the old project + populates the new one.
Co-authored-by: multica-agent <github@multica.ai>
* fix(dashboard-rollup): close two invalidation gaps
Two leak paths missed by migration 084 review:
1. Issue cascade DELETE — the atq BEFORE DELETE trigger runs AFTER the
issue row is gone, so `LEFT JOIN issue` returns NULL project_id and
the original-project bucket never gets cleared (issue 077 calls this
out for the runtime rollup but didn't need to act on it). Adds an
`issue BEFORE DELETE` trigger that enqueues using OLD.project_id
while the issue row is still readable.
2. `LinkTaskToIssue` (quick-create task attaching to a real issue post-
completion) UPDATEs `agent_task_queue.issue_id` from NULL to a real
id. Migration 084 only watched DELETE on atq, so usage already
rolled up under the no-project bucket stayed attributed to NULL
forever. Extends the atq trigger to fire on UPDATE OF issue_id too,
enqueueing both OLD (NULL project) and NEW (linked issue's project).
Tests: - TestDashboardRollupClearsOnIssueDelete asserts rollup row drops to
zero after issue delete + rollup tick.
- TestDashboardRollupReattributesOnLinkTaskToIssue verifies tokens
move from the NULL bucket to the project bucket after the UPDATE.
Co-authored-by: multica-agent <github@multica.ai>
---------
Co-authored-by: multica-agent <github@multica.ai>
520 lines
21 KiB
PL/PgSQL
520 lines
21 KiB
PL/PgSQL
-- Daily rollup table for the workspace `/{slug}/dashboard` page. Mirrors
|
||
-- the per-runtime rollup in migration 073 (`task_usage_daily`) but indexes
|
||
-- on a different set of dimensions:
|
||
--
|
||
-- (bucket_date, workspace_id, agent_id, project_id, model)
|
||
--
|
||
-- vs. the existing 073:
|
||
--
|
||
-- (bucket_date, workspace_id, runtime_id, provider, model)
|
||
--
|
||
-- The dashboard queries don't filter by `runtime_id` and the per-runtime
|
||
-- table doesn't carry `agent_id` / `project_id`, so we materialise a
|
||
-- dedicated rollup instead of extending the existing one and balloonng
|
||
-- its cardinality (`runtime × agent × project × model` per day vs.
|
||
-- `runtime × model`). The cron entry point + dirty-queue invalidation
|
||
-- pattern is otherwise identical.
|
||
--
|
||
-- `project_id` is the project at rollup time, snapshotted via
|
||
-- `agent_task_queue.issue_id → issue.project_id`. Re-attribution after a
|
||
-- user reassigns an issue's project lands via the `issue` trigger below;
|
||
-- historical buckets that aren't touched again stay attributed where
|
||
-- they were when the rollup ran. (Operator follow-up: re-run the backfill
|
||
-- command on the affected window if a bulk project move needs to
|
||
-- propagate to old data.)
|
||
--
|
||
-- `project_id` is nullable — issues without a project still produce
|
||
-- usage rows. We use `UNIQUE NULLS NOT DISTINCT` (PG 15+) so NULL is
|
||
-- treated as a single distinct value in the unique key, which lets
|
||
-- `INSERT ... ON CONFLICT` upsert "no-project" buckets the same way it
|
||
-- handles a specific project.
|
||
--
|
||
-- Bucket date is computed in **UTC**. The per-runtime rollup uses the
|
||
-- runtime's tz (migration 082) because each row has a single runtime;
|
||
-- this rollup aggregates *across* runtimes that may sit in different
|
||
-- tzs, and there is no single correct local boundary. The dashboard
|
||
-- frontend just renders the raw ISO date.
|
||
CREATE TABLE task_usage_dashboard_daily (
|
||
bucket_date DATE NOT NULL,
|
||
workspace_id UUID NOT NULL,
|
||
agent_id UUID NOT NULL,
|
||
project_id UUID,
|
||
model TEXT NOT NULL,
|
||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||
task_count BIGINT NOT NULL DEFAULT 0,
|
||
event_count BIGINT NOT NULL DEFAULT 0,
|
||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
CONSTRAINT uq_task_usage_dashboard_daily_key
|
||
UNIQUE NULLS NOT DISTINCT
|
||
(bucket_date, workspace_id, agent_id, project_id, model)
|
||
);
|
||
|
||
-- Workspace-wide reads (no project filter) hit this index.
|
||
CREATE INDEX idx_task_usage_dashboard_daily_workspace_date
|
||
ON task_usage_dashboard_daily (workspace_id, bucket_date DESC);
|
||
|
||
-- Per-project reads. Partial index because most queries either filter to
|
||
-- one project or "all" — both extremes benefit from this layout.
|
||
CREATE INDEX idx_task_usage_dashboard_daily_project_date
|
||
ON task_usage_dashboard_daily (workspace_id, project_id, bucket_date DESC);
|
||
|
||
-- Per-agent reads (the "by agent" panel).
|
||
CREATE INDEX idx_task_usage_dashboard_daily_agent_date
|
||
ON task_usage_dashboard_daily (workspace_id, agent_id, bucket_date DESC);
|
||
|
||
CREATE TABLE task_usage_dashboard_rollup_state (
|
||
id SMALLINT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
|
||
watermark_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
|
||
last_run_started_at TIMESTAMPTZ,
|
||
last_run_finished_at TIMESTAMPTZ,
|
||
last_run_rows BIGINT NOT NULL DEFAULT 0,
|
||
last_error TEXT
|
||
);
|
||
INSERT INTO task_usage_dashboard_rollup_state (id) VALUES (1) ON CONFLICT DO NOTHING;
|
||
|
||
-- Dirty queue for invalidations the `updated_at` watermark can't see:
|
||
-- * DELETE on `task_usage` (no row left for the watermark).
|
||
-- * DELETE cascade through `agent_task_queue` (task_usage rows gone).
|
||
-- * UPDATE of `issue.project_id` — moves the bucket to a new key.
|
||
--
|
||
-- bucket key matches the rollup table so the queue can be UNIONed into
|
||
-- `dirty_keys` in the window function with no translation. project_id is
|
||
-- nullable here for the same reason as in the rollup table.
|
||
CREATE TABLE task_usage_dashboard_dirty (
|
||
bucket_date DATE NOT NULL,
|
||
workspace_id UUID NOT NULL,
|
||
agent_id UUID NOT NULL,
|
||
project_id UUID,
|
||
model TEXT NOT NULL,
|
||
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
CONSTRAINT uq_task_usage_dashboard_dirty_key
|
||
UNIQUE NULLS NOT DISTINCT
|
||
(bucket_date, workspace_id, agent_id, project_id, model)
|
||
);
|
||
|
||
CREATE INDEX idx_task_usage_dashboard_dirty_enqueued_at
|
||
ON task_usage_dashboard_dirty (enqueued_at);
|
||
|
||
-- Trigger 1: agent_task_queue BEFORE UPDATE OF issue_id OR DELETE.
|
||
--
|
||
-- Two cases:
|
||
--
|
||
-- * UPDATE OF issue_id — currently only `LinkTaskToIssue` (quick-create
|
||
-- tasks attaching to the issue the agent just produced) writes here,
|
||
-- moving the task from `issue_id IS NULL` to a real issue. If usage
|
||
-- already rolled up under the no-project bucket, we have to enqueue
|
||
-- both OLD (NULL project) AND NEW (the new issue's project) so the
|
||
-- next tick re-attributes the tokens.
|
||
--
|
||
-- * DELETE — direct atq deletions land here with the issue row still
|
||
-- alive, so `LEFT JOIN issue` resolves the project correctly.
|
||
-- Cascade DELETE driven from `DELETE FROM issue` is handled by
|
||
-- `enqueue_task_usage_dashboard_dirty_for_issue_delete` below (which
|
||
-- fires *before* the cascade, while `issue.project_id` is still
|
||
-- readable); this trigger may also fire during that cascade, but the
|
||
-- join returns no row → workspace_id is missing, so the JOIN on
|
||
-- `agent` keeps the enqueue safe and the resulting NULL-project key
|
||
-- no-ops on recompute (deleted_empty path).
|
||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq()
|
||
RETURNS TRIGGER
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
BEGIN
|
||
IF TG_OP = 'UPDATE' THEN
|
||
IF OLD.issue_id IS DISTINCT FROM NEW.issue_id THEN
|
||
-- OLD side: project_id of OLD.issue_id (NULL when OLD.issue_id IS NULL,
|
||
-- e.g. the quick-create starting state).
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
a.workspace_id,
|
||
OLD.agent_id,
|
||
i_old.project_id,
|
||
tu.model
|
||
FROM task_usage tu
|
||
JOIN agent a ON a.id = OLD.agent_id
|
||
LEFT JOIN issue i_old ON i_old.id = OLD.issue_id
|
||
WHERE tu.task_id = OLD.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
|
||
-- NEW side: project_id of NEW.issue_id.
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
a.workspace_id,
|
||
NEW.agent_id,
|
||
i_new.project_id,
|
||
tu.model
|
||
FROM task_usage tu
|
||
JOIN agent a ON a.id = NEW.agent_id
|
||
LEFT JOIN issue i_new ON i_new.id = NEW.issue_id
|
||
WHERE tu.task_id = NEW.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
END IF;
|
||
RETURN NEW;
|
||
ELSIF TG_OP = 'DELETE' THEN
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
a.workspace_id,
|
||
OLD.agent_id,
|
||
i.project_id,
|
||
tu.model
|
||
FROM task_usage tu
|
||
JOIN agent a ON a.id = OLD.agent_id
|
||
LEFT JOIN issue i ON i.id = OLD.issue_id
|
||
WHERE tu.task_id = OLD.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
RETURN OLD;
|
||
END IF;
|
||
RETURN NULL;
|
||
END;
|
||
$$;
|
||
|
||
CREATE TRIGGER trg_atq_dirty_dashboard
|
||
BEFORE UPDATE OF issue_id OR DELETE ON agent_task_queue
|
||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq();
|
||
|
||
-- Trigger 1b: issue BEFORE DELETE.
|
||
--
|
||
-- `DELETE FROM issue` cascades to `agent_task_queue` and onward to
|
||
-- `task_usage`. By the time the atq BEFORE DELETE trigger runs, the
|
||
-- issue row is gone and `LEFT JOIN issue` returns NULL for project_id,
|
||
-- so the atq trigger would enqueue a NULL-project key — the rollup row
|
||
-- under the original project would never get cleared and would keep
|
||
-- billing the workspace for tokens that no longer have a source.
|
||
--
|
||
-- This trigger fires BEFORE the cascade, while `OLD.project_id` is still
|
||
-- readable, and enqueues one dirty row per (date, agent, model) the
|
||
-- issue's tasks contributed to. The next rollup tick recomputes the
|
||
-- bucket, finds no source rows under the original project, and drops it
|
||
-- (deleted_empty path).
|
||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete()
|
||
RETURNS TRIGGER
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
BEGIN
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
OLD.workspace_id,
|
||
atq.agent_id,
|
||
OLD.project_id,
|
||
tu.model
|
||
FROM agent_task_queue atq
|
||
JOIN task_usage tu ON tu.task_id = atq.id
|
||
WHERE atq.issue_id = OLD.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
RETURN OLD;
|
||
END;
|
||
$$;
|
||
|
||
CREATE TRIGGER trg_issue_delete_dirty_dashboard
|
||
BEFORE DELETE ON issue
|
||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete();
|
||
|
||
-- Trigger 2: task_usage BEFORE DELETE.
|
||
-- Rare in practice (no direct DELETE call sites today) but ensures the
|
||
-- rollup converges if one is added. UPDATE / INSERT are caught by the
|
||
-- updated_at watermark in the window function.
|
||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu()
|
||
RETURNS TRIGGER
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
BEGIN
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT
|
||
DATE(OLD.created_at),
|
||
a.workspace_id,
|
||
atq.agent_id,
|
||
i.project_id,
|
||
OLD.model
|
||
FROM agent_task_queue atq
|
||
JOIN agent a ON a.id = atq.agent_id
|
||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||
WHERE atq.id = OLD.task_id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
RETURN OLD;
|
||
END;
|
||
$$;
|
||
|
||
CREATE TRIGGER trg_tu_dirty_dashboard
|
||
BEFORE DELETE ON task_usage
|
||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu();
|
||
|
||
-- Trigger 3: issue BEFORE UPDATE OF project_id.
|
||
-- Re-attribute every (date, agent, model) bucket touched by tasks under
|
||
-- this issue: enqueue OLD project_id (so it stops claiming the tokens)
|
||
-- AND NEW project_id (so it picks them up). Both go through the same
|
||
-- dirty queue; the window function recomputes from the live join in
|
||
-- recomputed CTE, which now sees NEW.project_id, so the OLD bucket
|
||
-- either drops to 0 (deleted_empty path) or shrinks.
|
||
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project()
|
||
RETURNS TRIGGER
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
BEGIN
|
||
IF OLD.project_id IS DISTINCT FROM NEW.project_id THEN
|
||
-- OLD project buckets.
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
NEW.workspace_id,
|
||
atq.agent_id,
|
||
OLD.project_id,
|
||
tu.model
|
||
FROM agent_task_queue atq
|
||
JOIN task_usage tu ON tu.task_id = atq.id
|
||
WHERE atq.issue_id = NEW.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
|
||
-- NEW project buckets.
|
||
INSERT INTO task_usage_dashboard_dirty (
|
||
bucket_date, workspace_id, agent_id, project_id, model
|
||
)
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at),
|
||
NEW.workspace_id,
|
||
atq.agent_id,
|
||
NEW.project_id,
|
||
tu.model
|
||
FROM agent_task_queue atq
|
||
JOIN task_usage tu ON tu.task_id = atq.id
|
||
WHERE atq.issue_id = NEW.id
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
|
||
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||
END IF;
|
||
RETURN NEW;
|
||
END;
|
||
$$;
|
||
|
||
CREATE TRIGGER trg_issue_project_dirty_dashboard
|
||
BEFORE UPDATE OF project_id ON issue
|
||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project();
|
||
|
||
-- Window function. Same shape as 077's rollup_task_usage_daily_window:
|
||
-- 1. Discover dirty keys from updated_at watermark + the explicit queue.
|
||
-- 2. Recompute each from raw via the agent_task_queue + issue join.
|
||
-- 3. Upsert present buckets, delete buckets that recomputed to nothing.
|
||
-- 4. Drain the queue rows whose enqueued_at < p_to.
|
||
--
|
||
-- IDEMPOTENCY: re-running the same window yields the same final state,
|
||
-- because each touched key is rebuilt from raw, not deltaed.
|
||
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily_window(
|
||
p_from TIMESTAMPTZ,
|
||
p_to TIMESTAMPTZ
|
||
)
|
||
RETURNS BIGINT
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
DECLARE
|
||
v_rows BIGINT;
|
||
BEGIN
|
||
IF p_from >= p_to THEN
|
||
RETURN 0;
|
||
END IF;
|
||
|
||
WITH
|
||
dirty_from_updates AS (
|
||
SELECT DISTINCT
|
||
DATE(tu.created_at) AS bucket_date,
|
||
a.workspace_id AS workspace_id,
|
||
atq.agent_id AS agent_id,
|
||
i.project_id AS project_id,
|
||
tu.model AS model
|
||
FROM task_usage tu
|
||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||
JOIN agent a ON a.id = atq.agent_id
|
||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||
WHERE (
|
||
(tu.updated_at >= p_from AND tu.updated_at < p_to)
|
||
-- Legacy `updated_at IS NULL` rows: 077 handles this via a
|
||
-- partial index. We rely on the same `idx_task_usage_created_at_legacy`
|
||
-- (migration 078) for this branch.
|
||
OR (tu.updated_at IS NULL
|
||
AND tu.created_at >= p_from
|
||
AND tu.created_at < p_to)
|
||
)
|
||
),
|
||
dirty_from_queue AS (
|
||
SELECT bucket_date, workspace_id, agent_id, project_id, model
|
||
FROM task_usage_dashboard_dirty
|
||
WHERE enqueued_at < p_to
|
||
),
|
||
dirty_keys AS (
|
||
SELECT * FROM dirty_from_updates
|
||
UNION
|
||
SELECT * FROM dirty_from_queue
|
||
),
|
||
recomputed AS (
|
||
SELECT
|
||
dk.bucket_date,
|
||
dk.workspace_id,
|
||
dk.agent_id,
|
||
dk.project_id,
|
||
dk.model,
|
||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||
COUNT(DISTINCT tu.task_id)::bigint AS task_count,
|
||
COUNT(*)::bigint AS event_count
|
||
FROM dirty_keys dk
|
||
JOIN agent_task_queue atq ON atq.agent_id = dk.agent_id
|
||
JOIN agent a ON a.id = atq.agent_id
|
||
AND a.workspace_id = dk.workspace_id
|
||
LEFT JOIN issue i ON i.id = atq.issue_id
|
||
JOIN task_usage tu ON tu.task_id = atq.id
|
||
AND tu.model = dk.model
|
||
AND DATE(tu.created_at) = dk.bucket_date
|
||
WHERE (i.project_id IS NOT DISTINCT FROM dk.project_id)
|
||
GROUP BY 1, 2, 3, 4, 5
|
||
),
|
||
upserted AS (
|
||
INSERT INTO task_usage_dashboard_daily AS d (
|
||
bucket_date, workspace_id, agent_id, project_id, model,
|
||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||
task_count, event_count
|
||
)
|
||
SELECT
|
||
bucket_date, workspace_id, agent_id, project_id, model,
|
||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||
task_count, event_count
|
||
FROM recomputed
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_daily_key DO UPDATE
|
||
SET input_tokens = EXCLUDED.input_tokens,
|
||
output_tokens = EXCLUDED.output_tokens,
|
||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||
task_count = EXCLUDED.task_count,
|
||
event_count = EXCLUDED.event_count,
|
||
updated_at = now()
|
||
RETURNING 1
|
||
),
|
||
deleted_empty AS (
|
||
DELETE FROM task_usage_dashboard_daily d
|
||
USING dirty_keys dk
|
||
WHERE d.bucket_date = dk.bucket_date
|
||
AND d.workspace_id = dk.workspace_id
|
||
AND d.agent_id = dk.agent_id
|
||
AND d.project_id IS NOT DISTINCT FROM dk.project_id
|
||
AND d.model = dk.model
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM recomputed r
|
||
WHERE r.bucket_date = dk.bucket_date
|
||
AND r.workspace_id = dk.workspace_id
|
||
AND r.agent_id = dk.agent_id
|
||
AND r.project_id IS NOT DISTINCT FROM dk.project_id
|
||
AND r.model = dk.model
|
||
)
|
||
RETURNING 1
|
||
)
|
||
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
|
||
INTO v_rows;
|
||
|
||
DELETE FROM task_usage_dashboard_dirty WHERE enqueued_at < p_to;
|
||
|
||
RETURN v_rows;
|
||
END;
|
||
$$;
|
||
|
||
-- Cron entry. Mirrors `rollup_task_usage_daily` (migration 073) — same
|
||
-- advisory-lock + watermark + error-recovery shape. Uses lock id 4244
|
||
-- so it serialises independently of the per-runtime rollup (4242).
|
||
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily()
|
||
RETURNS BIGINT
|
||
LANGUAGE plpgsql
|
||
AS $$
|
||
DECLARE
|
||
v_lock_ok BOOLEAN;
|
||
v_from TIMESTAMPTZ;
|
||
v_to TIMESTAMPTZ;
|
||
v_rows BIGINT := 0;
|
||
BEGIN
|
||
SELECT pg_try_advisory_lock(4244) INTO v_lock_ok;
|
||
IF NOT v_lock_ok THEN
|
||
RETURN 0;
|
||
END IF;
|
||
|
||
BEGIN
|
||
UPDATE task_usage_dashboard_rollup_state
|
||
SET last_run_started_at = now(),
|
||
last_error = NULL
|
||
WHERE id = 1
|
||
RETURNING watermark_at INTO v_from;
|
||
|
||
v_to := now() - INTERVAL '5 minutes';
|
||
|
||
IF v_from < v_to THEN
|
||
v_rows := rollup_task_usage_dashboard_daily_window(v_from, v_to);
|
||
|
||
UPDATE task_usage_dashboard_rollup_state
|
||
SET watermark_at = v_to,
|
||
last_run_finished_at = now(),
|
||
last_run_rows = v_rows
|
||
WHERE id = 1;
|
||
ELSE
|
||
UPDATE task_usage_dashboard_rollup_state
|
||
SET last_run_finished_at = now(),
|
||
last_run_rows = 0
|
||
WHERE id = 1;
|
||
END IF;
|
||
|
||
PERFORM pg_advisory_unlock(4244);
|
||
RETURN v_rows;
|
||
EXCEPTION WHEN OTHERS THEN
|
||
UPDATE task_usage_dashboard_rollup_state
|
||
SET last_error = SQLERRM,
|
||
last_run_finished_at = now()
|
||
WHERE id = 1;
|
||
PERFORM pg_advisory_unlock(4244);
|
||
RAISE;
|
||
END;
|
||
END;
|
||
$$;
|
||
|
||
-- Health-check helper mirroring task_usage_rollup_lag_seconds() (076).
|
||
-- Alert via monitoring on NULL > 15 min after deploy, or value > 900s.
|
||
CREATE OR REPLACE FUNCTION task_usage_dashboard_rollup_lag_seconds()
|
||
RETURNS DOUBLE PRECISION
|
||
LANGUAGE sql
|
||
STABLE
|
||
AS $$
|
||
SELECT EXTRACT(EPOCH FROM (now() - last_run_finished_at))
|
||
FROM task_usage_dashboard_rollup_state
|
||
WHERE id = 1;
|
||
$$;
|
||
|
||
-- NOTE: cron job is NOT scheduled by this migration — same convention as
|
||
-- 076 for the per-runtime rollup. Operator playbook:
|
||
-- 1) Apply migrations through 084.
|
||
-- 2) Run `go run ./cmd/backfill_task_usage_dashboard_daily`.
|
||
-- 3) Set USAGE_DASHBOARD_ROLLUP_ENABLED=true on the API and roll out.
|
||
-- 4) As superuser:
|
||
-- SELECT cron.schedule(
|
||
-- 'rollup_task_usage_dashboard_daily',
|
||
-- '*/5 * * * *',
|
||
-- $$SELECT rollup_task_usage_dashboard_daily()$$
|
||
-- );
|