Files
multica/server/migrations/084_task_usage_dashboard_rollup.up.sql
Bohan Jiang 96695a79c5 feat(dashboard): workspace/project token + run-time dashboard MUL-1882 (#2462)
* feat(dashboard): workspace/project token + run-time dashboard

Add a `/{slug}/dashboard` page showing per-agent token spend and execution
time across the whole workspace, with an optional project filter.

Backend:
  - Three new sqlc queries against task_usage + agent_task_queue: daily
    usage, per-agent usage, per-agent total run-time. All optionally
    scoped to a project via sqlc.narg('project_id'), reaching project
    through the issue join.
  - Handlers under /api/dashboard return the same wire shape the runtime
    page already consumes (model preserved for client-side cost math).

Frontend: - Shared DashboardPage in packages/views/dashboard reusing KpiCard,
    DailyCostChart, ActorAvatar, and estimateCost from the runtime page
    so the visual style and pricing math stay in lock-step.
  - Period selector (7/30/90d), project dropdown, four KPI tiles
    (cost, tokens, run time, tasks), daily cost chart, and a combined
    "cost + run time by agent" list.
  - Routed in both web (app/[slug]/(dashboard)/dashboard) and desktop
    (memory router); sidebar nav entry added under Workspace group.
Co-authored-by: multica-agent <github@multica.ai>

* fix(dashboard): drop stale project filter and stop double-counting tasks

Two issues caught in PR #2462 review:

1. Project filter held the previous selection's UUID across workspace
   switches and project deletions: the dropdown gracefully showed
   "All projects" (because the title lookup missed) while the three
   dashboard queries kept forwarding the dead UUID, leaving the UI
   looking like a full-workspace view but populated with empty
   project-scoped data. Validate the picked UUID against the current
   projects list before passing it to the queries.

2. The "by agent" table read its task count from the token rollup,
   which is grouped per (agent, model). A single task that spans two
   models lands twice and the agent's row reads e.g. "2 tasks" when
   the real count is 1. Prefer `ListDashboardAgentRunTime`'s per-agent
   distinct count when available; fall back to the token aggregate
   only for agents with no terminal run yet (in-flight tasks).

Extract the merge into `mergeAgentDashboardRows` so the precedence
rules are unit-tested directly.

Co-authored-by: multica-agent <github@multica.ai>

* test(dashboard): allocate per-workspace issue.number explicitly

TestDashboardEndpoints creates two issues in the shared fixture
workspace. issue.number defaults to 0 (migration 020), and the table
carries UNIQUE (workspace_id, number), so the second insert raced the
first on the same default and failed in CI.

Allocate MAX(number) + 1 per insert so each row gets a fresh number
without stepping on rows other tests left behind in the same workspace.

Co-authored-by: multica-agent <github@multica.ai>

* feat(dashboard): rollup table + cron-driven aggregation for dashboard

Mirror the per-runtime rollup in `task_usage_daily` (migrations 073/077/082)
to remove the per-request raw aggregation the dashboard was doing.

Migration 084 adds:
  - `task_usage_dashboard_daily` keyed on
    (bucket_date, workspace_id, agent_id, project_id, model) — the
    dimensions the dashboard actually queries, with project_id nullable
    via UNIQUE NULLS NOT DISTINCT (PG15+) so "no-project" buckets
    upsert cleanly.
  - `task_usage_dashboard_rollup_state` watermark table.
  - `task_usage_dashboard_dirty` invalidation queue.
  - Triggers on agent_task_queue DELETE, task_usage DELETE, and
    issue.project_id UPDATE — the cases the updated_at watermark can't
    see. The project_id trigger re-attributes existing rollup rows when
    a user moves an issue across projects.
  - `rollup_task_usage_dashboard_daily_window(from, to)` —
    idempotent recompute primitive (same shape as 077).
  - `rollup_task_usage_dashboard_daily()` cron entry — own advisory
    lock (4244) so it serialises independently of the runtime rollup.
  - `task_usage_dashboard_rollup_lag_seconds()` health helper.

Sqlc queries `ListDashboardUsageDailyRollup` /
`ListDashboardUsageByAgentRollup` read from the new table; the handler
dispatches between rollup and raw on a separate
`UseDailyRollupForDashboard` config flag
(`USAGE_DASHBOARD_ROLLUP_ENABLED` env). Same fail-safe default (false →
raw) so operators can roll out independently of the per-runtime flag.

Bucket date is UTC (the dashboard aggregates across runtimes that may
sit in different tzs; there's no single correct local boundary).

Adds `cmd/backfill_task_usage_dashboard_daily` mirroring the existing
per-runtime backfill — operator runs it once before flipping the flag.

Tests: - TestDashboardEndpoints now also exercises the rollup read path
    (raw vs. rollup, same project-scoped totals).
  - TestDashboardRollupReattributesOnProjectChange verifies the
    issue.project_id trigger enqueues both old + new buckets and the
    next rollup tick zeroes the old project + populates the new one.
Co-authored-by: multica-agent <github@multica.ai>

* fix(dashboard-rollup): close two invalidation gaps

Two leak paths missed by migration 084 review:

1. Issue cascade DELETE — the atq BEFORE DELETE trigger runs AFTER the
   issue row is gone, so `LEFT JOIN issue` returns NULL project_id and
   the original-project bucket never gets cleared (issue 077 calls this
   out for the runtime rollup but didn't need to act on it). Adds an
   `issue BEFORE DELETE` trigger that enqueues using OLD.project_id
   while the issue row is still readable.

2. `LinkTaskToIssue` (quick-create task attaching to a real issue post-
   completion) UPDATEs `agent_task_queue.issue_id` from NULL to a real
   id. Migration 084 only watched DELETE on atq, so usage already
   rolled up under the no-project bucket stayed attributed to NULL
   forever. Extends the atq trigger to fire on UPDATE OF issue_id too,
   enqueueing both OLD (NULL project) and NEW (linked issue's project).

Tests: - TestDashboardRollupClearsOnIssueDelete asserts rollup row drops to
    zero after issue delete + rollup tick.
  - TestDashboardRollupReattributesOnLinkTaskToIssue verifies tokens
    move from the NULL bucket to the project bucket after the UPDATE.
Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-13 12:51:16 +08:00

520 lines
21 KiB
PL/PgSQL
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
-- Daily rollup table for the workspace `/{slug}/dashboard` page. Mirrors
-- the per-runtime rollup in migration 073 (`task_usage_daily`) but indexes
-- on a different set of dimensions:
--
-- (bucket_date, workspace_id, agent_id, project_id, model)
--
-- vs. the existing 073:
--
-- (bucket_date, workspace_id, runtime_id, provider, model)
--
-- The dashboard queries don't filter by `runtime_id` and the per-runtime
-- table doesn't carry `agent_id` / `project_id`, so we materialise a
-- dedicated rollup instead of extending the existing one and balloonng
-- its cardinality (`runtime × agent × project × model` per day vs.
-- `runtime × model`). The cron entry point + dirty-queue invalidation
-- pattern is otherwise identical.
--
-- `project_id` is the project at rollup time, snapshotted via
-- `agent_task_queue.issue_id → issue.project_id`. Re-attribution after a
-- user reassigns an issue's project lands via the `issue` trigger below;
-- historical buckets that aren't touched again stay attributed where
-- they were when the rollup ran. (Operator follow-up: re-run the backfill
-- command on the affected window if a bulk project move needs to
-- propagate to old data.)
--
-- `project_id` is nullable — issues without a project still produce
-- usage rows. We use `UNIQUE NULLS NOT DISTINCT` (PG 15+) so NULL is
-- treated as a single distinct value in the unique key, which lets
-- `INSERT ... ON CONFLICT` upsert "no-project" buckets the same way it
-- handles a specific project.
--
-- Bucket date is computed in **UTC**. The per-runtime rollup uses the
-- runtime's tz (migration 082) because each row has a single runtime;
-- this rollup aggregates *across* runtimes that may sit in different
-- tzs, and there is no single correct local boundary. The dashboard
-- frontend just renders the raw ISO date.
CREATE TABLE task_usage_dashboard_daily (
bucket_date DATE NOT NULL,
workspace_id UUID NOT NULL,
agent_id UUID NOT NULL,
project_id UUID,
model TEXT NOT NULL,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
task_count BIGINT NOT NULL DEFAULT 0,
event_count BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_task_usage_dashboard_daily_key
UNIQUE NULLS NOT DISTINCT
(bucket_date, workspace_id, agent_id, project_id, model)
);
-- Workspace-wide reads (no project filter) hit this index.
CREATE INDEX idx_task_usage_dashboard_daily_workspace_date
ON task_usage_dashboard_daily (workspace_id, bucket_date DESC);
-- Per-project reads. Partial index because most queries either filter to
-- one project or "all" — both extremes benefit from this layout.
CREATE INDEX idx_task_usage_dashboard_daily_project_date
ON task_usage_dashboard_daily (workspace_id, project_id, bucket_date DESC);
-- Per-agent reads (the "by agent" panel).
CREATE INDEX idx_task_usage_dashboard_daily_agent_date
ON task_usage_dashboard_daily (workspace_id, agent_id, bucket_date DESC);
CREATE TABLE task_usage_dashboard_rollup_state (
id SMALLINT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
watermark_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
last_run_started_at TIMESTAMPTZ,
last_run_finished_at TIMESTAMPTZ,
last_run_rows BIGINT NOT NULL DEFAULT 0,
last_error TEXT
);
INSERT INTO task_usage_dashboard_rollup_state (id) VALUES (1) ON CONFLICT DO NOTHING;
-- Dirty queue for invalidations the `updated_at` watermark can't see:
-- * DELETE on `task_usage` (no row left for the watermark).
-- * DELETE cascade through `agent_task_queue` (task_usage rows gone).
-- * UPDATE of `issue.project_id` — moves the bucket to a new key.
--
-- bucket key matches the rollup table so the queue can be UNIONed into
-- `dirty_keys` in the window function with no translation. project_id is
-- nullable here for the same reason as in the rollup table.
CREATE TABLE task_usage_dashboard_dirty (
bucket_date DATE NOT NULL,
workspace_id UUID NOT NULL,
agent_id UUID NOT NULL,
project_id UUID,
model TEXT NOT NULL,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_task_usage_dashboard_dirty_key
UNIQUE NULLS NOT DISTINCT
(bucket_date, workspace_id, agent_id, project_id, model)
);
CREATE INDEX idx_task_usage_dashboard_dirty_enqueued_at
ON task_usage_dashboard_dirty (enqueued_at);
-- Trigger 1: agent_task_queue BEFORE UPDATE OF issue_id OR DELETE.
--
-- Two cases:
--
-- * UPDATE OF issue_id — currently only `LinkTaskToIssue` (quick-create
-- tasks attaching to the issue the agent just produced) writes here,
-- moving the task from `issue_id IS NULL` to a real issue. If usage
-- already rolled up under the no-project bucket, we have to enqueue
-- both OLD (NULL project) AND NEW (the new issue's project) so the
-- next tick re-attributes the tokens.
--
-- * DELETE — direct atq deletions land here with the issue row still
-- alive, so `LEFT JOIN issue` resolves the project correctly.
-- Cascade DELETE driven from `DELETE FROM issue` is handled by
-- `enqueue_task_usage_dashboard_dirty_for_issue_delete` below (which
-- fires *before* the cascade, while `issue.project_id` is still
-- readable); this trigger may also fire during that cascade, but the
-- join returns no row → workspace_id is missing, so the JOIN on
-- `agent` keeps the enqueue safe and the resulting NULL-project key
-- no-ops on recompute (deleted_empty path).
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
IF OLD.issue_id IS DISTINCT FROM NEW.issue_id THEN
-- OLD side: project_id of OLD.issue_id (NULL when OLD.issue_id IS NULL,
-- e.g. the quick-create starting state).
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
a.workspace_id,
OLD.agent_id,
i_old.project_id,
tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
LEFT JOIN issue i_old ON i_old.id = OLD.issue_id
WHERE tu.task_id = OLD.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
-- NEW side: project_id of NEW.issue_id.
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
a.workspace_id,
NEW.agent_id,
i_new.project_id,
tu.model
FROM task_usage tu
JOIN agent a ON a.id = NEW.agent_id
LEFT JOIN issue i_new ON i_new.id = NEW.issue_id
WHERE tu.task_id = NEW.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
a.workspace_id,
OLD.agent_id,
i.project_id,
tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
LEFT JOIN issue i ON i.id = OLD.issue_id
WHERE tu.task_id = OLD.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
CREATE TRIGGER trg_atq_dirty_dashboard
BEFORE UPDATE OF issue_id OR DELETE ON agent_task_queue
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_atq();
-- Trigger 1b: issue BEFORE DELETE.
--
-- `DELETE FROM issue` cascades to `agent_task_queue` and onward to
-- `task_usage`. By the time the atq BEFORE DELETE trigger runs, the
-- issue row is gone and `LEFT JOIN issue` returns NULL for project_id,
-- so the atq trigger would enqueue a NULL-project key — the rollup row
-- under the original project would never get cleared and would keep
-- billing the workspace for tokens that no longer have a source.
--
-- This trigger fires BEFORE the cascade, while `OLD.project_id` is still
-- readable, and enqueues one dirty row per (date, agent, model) the
-- issue's tasks contributed to. The next rollup tick recomputes the
-- bucket, finds no source rows under the original project, and drops it
-- (deleted_empty path).
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
OLD.workspace_id,
atq.agent_id,
OLD.project_id,
tu.model
FROM agent_task_queue atq
JOIN task_usage tu ON tu.task_id = atq.id
WHERE atq.issue_id = OLD.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
RETURN OLD;
END;
$$;
CREATE TRIGGER trg_issue_delete_dirty_dashboard
BEFORE DELETE ON issue
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_delete();
-- Trigger 2: task_usage BEFORE DELETE.
-- Rare in practice (no direct DELETE call sites today) but ensures the
-- rollup converges if one is added. UPDATE / INSERT are caught by the
-- updated_at watermark in the window function.
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT
DATE(OLD.created_at),
a.workspace_id,
atq.agent_id,
i.project_id,
OLD.model
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
LEFT JOIN issue i ON i.id = atq.issue_id
WHERE atq.id = OLD.task_id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
RETURN OLD;
END;
$$;
CREATE TRIGGER trg_tu_dirty_dashboard
BEFORE DELETE ON task_usage
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_tu();
-- Trigger 3: issue BEFORE UPDATE OF project_id.
-- Re-attribute every (date, agent, model) bucket touched by tasks under
-- this issue: enqueue OLD project_id (so it stops claiming the tokens)
-- AND NEW project_id (so it picks them up). Both go through the same
-- dirty queue; the window function recomputes from the live join in
-- recomputed CTE, which now sees NEW.project_id, so the OLD bucket
-- either drops to 0 (deleted_empty path) or shrinks.
CREATE OR REPLACE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.project_id IS DISTINCT FROM NEW.project_id THEN
-- OLD project buckets.
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
NEW.workspace_id,
atq.agent_id,
OLD.project_id,
tu.model
FROM agent_task_queue atq
JOIN task_usage tu ON tu.task_id = atq.id
WHERE atq.issue_id = NEW.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
-- NEW project buckets.
INSERT INTO task_usage_dashboard_dirty (
bucket_date, workspace_id, agent_id, project_id, model
)
SELECT DISTINCT
DATE(tu.created_at),
NEW.workspace_id,
atq.agent_id,
NEW.project_id,
tu.model
FROM agent_task_queue atq
JOIN task_usage tu ON tu.task_id = atq.id
WHERE atq.issue_id = NEW.id
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_dirty_key DO UPDATE
SET enqueued_at = GREATEST(task_usage_dashboard_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER trg_issue_project_dirty_dashboard
BEFORE UPDATE OF project_id ON issue
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_dashboard_dirty_for_issue_project();
-- Window function. Same shape as 077's rollup_task_usage_daily_window:
-- 1. Discover dirty keys from updated_at watermark + the explicit queue.
-- 2. Recompute each from raw via the agent_task_queue + issue join.
-- 3. Upsert present buckets, delete buckets that recomputed to nothing.
-- 4. Drain the queue rows whose enqueued_at < p_to.
--
-- IDEMPOTENCY: re-running the same window yields the same final state,
-- because each touched key is rebuilt from raw, not deltaed.
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily_window(
p_from TIMESTAMPTZ,
p_to TIMESTAMPTZ
)
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_rows BIGINT;
BEGIN
IF p_from >= p_to THEN
RETURN 0;
END IF;
WITH
dirty_from_updates AS (
SELECT DISTINCT
DATE(tu.created_at) AS bucket_date,
a.workspace_id AS workspace_id,
atq.agent_id AS agent_id,
i.project_id AS project_id,
tu.model AS model
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
LEFT JOIN issue i ON i.id = atq.issue_id
WHERE (
(tu.updated_at >= p_from AND tu.updated_at < p_to)
-- Legacy `updated_at IS NULL` rows: 077 handles this via a
-- partial index. We rely on the same `idx_task_usage_created_at_legacy`
-- (migration 078) for this branch.
OR (tu.updated_at IS NULL
AND tu.created_at >= p_from
AND tu.created_at < p_to)
)
),
dirty_from_queue AS (
SELECT bucket_date, workspace_id, agent_id, project_id, model
FROM task_usage_dashboard_dirty
WHERE enqueued_at < p_to
),
dirty_keys AS (
SELECT * FROM dirty_from_updates
UNION
SELECT * FROM dirty_from_queue
),
recomputed AS (
SELECT
dk.bucket_date,
dk.workspace_id,
dk.agent_id,
dk.project_id,
dk.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(DISTINCT tu.task_id)::bigint AS task_count,
COUNT(*)::bigint AS event_count
FROM dirty_keys dk
JOIN agent_task_queue atq ON atq.agent_id = dk.agent_id
JOIN agent a ON a.id = atq.agent_id
AND a.workspace_id = dk.workspace_id
LEFT JOIN issue i ON i.id = atq.issue_id
JOIN task_usage tu ON tu.task_id = atq.id
AND tu.model = dk.model
AND DATE(tu.created_at) = dk.bucket_date
WHERE (i.project_id IS NOT DISTINCT FROM dk.project_id)
GROUP BY 1, 2, 3, 4, 5
),
upserted AS (
INSERT INTO task_usage_dashboard_daily AS d (
bucket_date, workspace_id, agent_id, project_id, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
task_count, event_count
)
SELECT
bucket_date, workspace_id, agent_id, project_id, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
task_count, event_count
FROM recomputed
ON CONFLICT ON CONSTRAINT uq_task_usage_dashboard_daily_key DO UPDATE
SET input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
task_count = EXCLUDED.task_count,
event_count = EXCLUDED.event_count,
updated_at = now()
RETURNING 1
),
deleted_empty AS (
DELETE FROM task_usage_dashboard_daily d
USING dirty_keys dk
WHERE d.bucket_date = dk.bucket_date
AND d.workspace_id = dk.workspace_id
AND d.agent_id = dk.agent_id
AND d.project_id IS NOT DISTINCT FROM dk.project_id
AND d.model = dk.model
AND NOT EXISTS (
SELECT 1 FROM recomputed r
WHERE r.bucket_date = dk.bucket_date
AND r.workspace_id = dk.workspace_id
AND r.agent_id = dk.agent_id
AND r.project_id IS NOT DISTINCT FROM dk.project_id
AND r.model = dk.model
)
RETURNING 1
)
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
INTO v_rows;
DELETE FROM task_usage_dashboard_dirty WHERE enqueued_at < p_to;
RETURN v_rows;
END;
$$;
-- Cron entry. Mirrors `rollup_task_usage_daily` (migration 073) — same
-- advisory-lock + watermark + error-recovery shape. Uses lock id 4244
-- so it serialises independently of the per-runtime rollup (4242).
CREATE OR REPLACE FUNCTION rollup_task_usage_dashboard_daily()
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_lock_ok BOOLEAN;
v_from TIMESTAMPTZ;
v_to TIMESTAMPTZ;
v_rows BIGINT := 0;
BEGIN
SELECT pg_try_advisory_lock(4244) INTO v_lock_ok;
IF NOT v_lock_ok THEN
RETURN 0;
END IF;
BEGIN
UPDATE task_usage_dashboard_rollup_state
SET last_run_started_at = now(),
last_error = NULL
WHERE id = 1
RETURNING watermark_at INTO v_from;
v_to := now() - INTERVAL '5 minutes';
IF v_from < v_to THEN
v_rows := rollup_task_usage_dashboard_daily_window(v_from, v_to);
UPDATE task_usage_dashboard_rollup_state
SET watermark_at = v_to,
last_run_finished_at = now(),
last_run_rows = v_rows
WHERE id = 1;
ELSE
UPDATE task_usage_dashboard_rollup_state
SET last_run_finished_at = now(),
last_run_rows = 0
WHERE id = 1;
END IF;
PERFORM pg_advisory_unlock(4244);
RETURN v_rows;
EXCEPTION WHEN OTHERS THEN
UPDATE task_usage_dashboard_rollup_state
SET last_error = SQLERRM,
last_run_finished_at = now()
WHERE id = 1;
PERFORM pg_advisory_unlock(4244);
RAISE;
END;
END;
$$;
-- Health-check helper mirroring task_usage_rollup_lag_seconds() (076).
-- Alert via monitoring on NULL > 15 min after deploy, or value > 900s.
CREATE OR REPLACE FUNCTION task_usage_dashboard_rollup_lag_seconds()
RETURNS DOUBLE PRECISION
LANGUAGE sql
STABLE
AS $$
SELECT EXTRACT(EPOCH FROM (now() - last_run_finished_at))
FROM task_usage_dashboard_rollup_state
WHERE id = 1;
$$;
-- NOTE: cron job is NOT scheduled by this migration — same convention as
-- 076 for the per-runtime rollup. Operator playbook:
-- 1) Apply migrations through 084.
-- 2) Run `go run ./cmd/backfill_task_usage_dashboard_daily`.
-- 3) Set USAGE_DASHBOARD_ROLLUP_ENABLED=true on the API and roll out.
-- 4) As superuser:
-- SELECT cron.schedule(
-- 'rollup_task_usage_dashboard_daily',
-- '*/5 * * * *',
-- $$SELECT rollup_task_usage_dashboard_daily()$$
-- );