Files
multica/server/pkg/db/generated/task_usage.sql.go
Bohan Jiang 96695a79c5 feat(dashboard): workspace/project token + run-time dashboard MUL-1882 (#2462)
* feat(dashboard): workspace/project token + run-time dashboard

Add a `/{slug}/dashboard` page showing per-agent token spend and execution
time across the whole workspace, with an optional project filter.

Backend:
  - Three new sqlc queries against task_usage + agent_task_queue: daily
    usage, per-agent usage, per-agent total run-time. All optionally
    scoped to a project via sqlc.narg('project_id'), reaching project
    through the issue join.
  - Handlers under /api/dashboard return the same wire shape the runtime
    page already consumes (model preserved for client-side cost math).

Frontend: - Shared DashboardPage in packages/views/dashboard reusing KpiCard,
    DailyCostChart, ActorAvatar, and estimateCost from the runtime page
    so the visual style and pricing math stay in lock-step.
  - Period selector (7/30/90d), project dropdown, four KPI tiles
    (cost, tokens, run time, tasks), daily cost chart, and a combined
    "cost + run time by agent" list.
  - Routed in both web (app/[slug]/(dashboard)/dashboard) and desktop
    (memory router); sidebar nav entry added under Workspace group.
Co-authored-by: multica-agent <github@multica.ai>

* fix(dashboard): drop stale project filter and stop double-counting tasks

Two issues caught in PR #2462 review:

1. Project filter held the previous selection's UUID across workspace
   switches and project deletions: the dropdown gracefully showed
   "All projects" (because the title lookup missed) while the three
   dashboard queries kept forwarding the dead UUID, leaving the UI
   looking like a full-workspace view but populated with empty
   project-scoped data. Validate the picked UUID against the current
   projects list before passing it to the queries.

2. The "by agent" table read its task count from the token rollup,
   which is grouped per (agent, model). A single task that spans two
   models lands twice and the agent's row reads e.g. "2 tasks" when
   the real count is 1. Prefer `ListDashboardAgentRunTime`'s per-agent
   distinct count when available; fall back to the token aggregate
   only for agents with no terminal run yet (in-flight tasks).

Extract the merge into `mergeAgentDashboardRows` so the precedence
rules are unit-tested directly.

Co-authored-by: multica-agent <github@multica.ai>

* test(dashboard): allocate per-workspace issue.number explicitly

TestDashboardEndpoints creates two issues in the shared fixture
workspace. issue.number defaults to 0 (migration 020), and the table
carries UNIQUE (workspace_id, number), so the second insert raced the
first on the same default and failed in CI.

Allocate MAX(number) + 1 per insert so each row gets a fresh number
without stepping on rows other tests left behind in the same workspace.

Co-authored-by: multica-agent <github@multica.ai>

* feat(dashboard): rollup table + cron-driven aggregation for dashboard

Mirror the per-runtime rollup in `task_usage_daily` (migrations 073/077/082)
to remove the per-request raw aggregation the dashboard was doing.

Migration 084 adds:
  - `task_usage_dashboard_daily` keyed on
    (bucket_date, workspace_id, agent_id, project_id, model) — the
    dimensions the dashboard actually queries, with project_id nullable
    via UNIQUE NULLS NOT DISTINCT (PG15+) so "no-project" buckets
    upsert cleanly.
  - `task_usage_dashboard_rollup_state` watermark table.
  - `task_usage_dashboard_dirty` invalidation queue.
  - Triggers on agent_task_queue DELETE, task_usage DELETE, and
    issue.project_id UPDATE — the cases the updated_at watermark can't
    see. The project_id trigger re-attributes existing rollup rows when
    a user moves an issue across projects.
  - `rollup_task_usage_dashboard_daily_window(from, to)` —
    idempotent recompute primitive (same shape as 077).
  - `rollup_task_usage_dashboard_daily()` cron entry — own advisory
    lock (4244) so it serialises independently of the runtime rollup.
  - `task_usage_dashboard_rollup_lag_seconds()` health helper.

Sqlc queries `ListDashboardUsageDailyRollup` /
`ListDashboardUsageByAgentRollup` read from the new table; the handler
dispatches between rollup and raw on a separate
`UseDailyRollupForDashboard` config flag
(`USAGE_DASHBOARD_ROLLUP_ENABLED` env). Same fail-safe default (false →
raw) so operators can roll out independently of the per-runtime flag.

Bucket date is UTC (the dashboard aggregates across runtimes that may
sit in different tzs; there's no single correct local boundary).

Adds `cmd/backfill_task_usage_dashboard_daily` mirroring the existing
per-runtime backfill — operator runs it once before flipping the flag.

Tests: - TestDashboardEndpoints now also exercises the rollup read path
    (raw vs. rollup, same project-scoped totals).
  - TestDashboardRollupReattributesOnProjectChange verifies the
    issue.project_id trigger enqueues both old + new buckets and the
    next rollup tick zeroes the old project + populates the new one.
Co-authored-by: multica-agent <github@multica.ai>

* fix(dashboard-rollup): close two invalidation gaps

Two leak paths missed by migration 084 review:

1. Issue cascade DELETE — the atq BEFORE DELETE trigger runs AFTER the
   issue row is gone, so `LEFT JOIN issue` returns NULL project_id and
   the original-project bucket never gets cleared (issue 077 calls this
   out for the runtime rollup but didn't need to act on it). Adds an
   `issue BEFORE DELETE` trigger that enqueues using OLD.project_id
   while the issue row is still readable.

2. `LinkTaskToIssue` (quick-create task attaching to a real issue post-
   completion) UPDATEs `agent_task_queue.issue_id` from NULL to a real
   id. Migration 084 only watched DELETE on atq, so usage already
   rolled up under the no-project bucket stayed attributed to NULL
   forever. Extends the atq trigger to fire on UPDATE OF issue_id too,
   enqueueing both OLD (NULL project) and NEW (linked issue's project).

Tests: - TestDashboardRollupClearsOnIssueDelete asserts rollup row drops to
    zero after issue delete + rollup tick.
  - TestDashboardRollupReattributesOnLinkTaskToIssue verifies tokens
    move from the NULL bucket to the project bucket after the UPDATE.
Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
2026-05-13 12:51:16 +08:00

583 lines
19 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: task_usage.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const getIssueUsageSummary = `-- name: GetIssueUsageSummary :one
SELECT
COALESCE(SUM(tu.input_tokens), 0)::bigint AS total_input_tokens,
COALESCE(SUM(tu.output_tokens), 0)::bigint AS total_output_tokens,
COALESCE(SUM(tu.cache_read_tokens), 0)::bigint AS total_cache_read_tokens,
COALESCE(SUM(tu.cache_write_tokens), 0)::bigint AS total_cache_write_tokens,
COUNT(DISTINCT tu.task_id)::int AS task_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
WHERE atq.issue_id = $1
`
type GetIssueUsageSummaryRow struct {
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
func (q *Queries) GetIssueUsageSummary(ctx context.Context, issueID pgtype.UUID) (GetIssueUsageSummaryRow, error) {
row := q.db.QueryRow(ctx, getIssueUsageSummary, issueID)
var i GetIssueUsageSummaryRow
err := row.Scan(
&i.TotalInputTokens,
&i.TotalOutputTokens,
&i.TotalCacheReadTokens,
&i.TotalCacheWriteTokens,
&i.TaskCount,
)
return i, err
}
const getTaskUsage = `-- name: GetTaskUsage :many
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM task_usage
WHERE task_id = $1
ORDER BY model
`
func (q *Queries) GetTaskUsage(ctx context.Context, taskID pgtype.UUID) ([]TaskUsage, error) {
rows, err := q.db.Query(ctx, getTaskUsage, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []TaskUsage{}
for rows.Next() {
var i TaskUsage
if err := rows.Scan(
&i.ID,
&i.TaskID,
&i.Provider,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getWorkspaceUsageByDay = `-- name: GetWorkspaceUsageByDay :many
SELECT
DATE(tu.created_at) AS date,
tu.model,
SUM(tu.input_tokens)::bigint AS total_input_tokens,
SUM(tu.output_tokens)::bigint AS total_output_tokens,
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
COUNT(DISTINCT tu.task_id)::int AS task_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
GROUP BY DATE(tu.created_at), tu.model
ORDER BY DATE(tu.created_at) DESC, tu.model
`
type GetWorkspaceUsageByDayParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
}
type GetWorkspaceUsageByDayRow struct {
Date pgtype.Date `json:"date"`
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Bucket by tu.created_at (usage report time, ~= task completion time), not
// atq.created_at (task enqueue time), so tasks that queue one day and execute
// the next are attributed to the day tokens were actually produced. The since
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
func (q *Queries) GetWorkspaceUsageByDay(ctx context.Context, arg GetWorkspaceUsageByDayParams) ([]GetWorkspaceUsageByDayRow, error) {
rows, err := q.db.Query(ctx, getWorkspaceUsageByDay, arg.WorkspaceID, arg.Since)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetWorkspaceUsageByDayRow{}
for rows.Next() {
var i GetWorkspaceUsageByDayRow
if err := rows.Scan(
&i.Date,
&i.Model,
&i.TotalInputTokens,
&i.TotalOutputTokens,
&i.TotalCacheReadTokens,
&i.TotalCacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getWorkspaceUsageSummary = `-- name: GetWorkspaceUsageSummary :many
SELECT
tu.model,
SUM(tu.input_tokens)::bigint AS total_input_tokens,
SUM(tu.output_tokens)::bigint AS total_output_tokens,
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
COUNT(DISTINCT tu.task_id)::int AS task_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
GROUP BY tu.model
ORDER BY (SUM(tu.input_tokens) + SUM(tu.output_tokens)) DESC
`
type GetWorkspaceUsageSummaryParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
}
type GetWorkspaceUsageSummaryRow struct {
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Filter by tu.created_at (usage report time), aligned to start-of-day, so
// `days=N` is interpreted as N full calendar days like the other usage queries.
func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspaceUsageSummaryParams) ([]GetWorkspaceUsageSummaryRow, error) {
rows, err := q.db.Query(ctx, getWorkspaceUsageSummary, arg.WorkspaceID, arg.Since)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetWorkspaceUsageSummaryRow{}
for rows.Next() {
var i GetWorkspaceUsageSummaryRow
if err := rows.Scan(
&i.Model,
&i.TotalInputTokens,
&i.TotalOutputTokens,
&i.TotalCacheReadTokens,
&i.TotalCacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listDashboardAgentRunTime = `-- name: ListDashboardAgentRunTime :many
SELECT
atq.agent_id,
COALESCE(
SUM(EXTRACT(EPOCH FROM (atq.completed_at - atq.started_at)))::bigint,
0
)::bigint AS total_seconds,
COUNT(*)::int AS task_count,
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
LEFT JOIN issue i ON i.id = atq.issue_id
WHERE a.workspace_id = $1
AND atq.status IN ('completed', 'failed')
AND atq.started_at IS NOT NULL
AND atq.completed_at IS NOT NULL
AND atq.completed_at >= DATE_TRUNC('day', $2::timestamptz)
AND ($3::uuid IS NULL OR i.project_id = $3)
GROUP BY atq.agent_id
ORDER BY total_seconds DESC
`
type ListDashboardAgentRunTimeParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
ProjectID pgtype.UUID `json:"project_id"`
}
type ListDashboardAgentRunTimeRow struct {
AgentID pgtype.UUID `json:"agent_id"`
TotalSeconds int64 `json:"total_seconds"`
TaskCount int32 `json:"task_count"`
FailedCount int32 `json:"failed_count"`
}
// Per-agent total task run time and task count for the workspace, optionally
// scoped to a single project. Counts only terminal runs (completed or failed)
// with both started_at and completed_at populated — queued/running tasks have
// no finite duration. Anchored on completed_at so the window matches the
// token cost window (which is anchored on tu.created_at, ~= completion time).
func (q *Queries) ListDashboardAgentRunTime(ctx context.Context, arg ListDashboardAgentRunTimeParams) ([]ListDashboardAgentRunTimeRow, error) {
rows, err := q.db.Query(ctx, listDashboardAgentRunTime, arg.WorkspaceID, arg.Since, arg.ProjectID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListDashboardAgentRunTimeRow{}
for rows.Next() {
var i ListDashboardAgentRunTimeRow
if err := rows.Scan(
&i.AgentID,
&i.TotalSeconds,
&i.TaskCount,
&i.FailedCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listDashboardUsageByAgent = `-- name: ListDashboardUsageByAgent :many
SELECT
atq.agent_id,
tu.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(DISTINCT tu.task_id)::int AS task_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
LEFT JOIN issue i ON i.id = atq.issue_id
WHERE a.workspace_id = $1
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
AND ($3::uuid IS NULL OR i.project_id = $3)
GROUP BY atq.agent_id, tu.model
ORDER BY atq.agent_id, tu.model
`
type ListDashboardUsageByAgentParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
ProjectID pgtype.UUID `json:"project_id"`
}
type ListDashboardUsageByAgentRow struct {
AgentID pgtype.UUID `json:"agent_id"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Per-(agent, model) token aggregates for the workspace, optionally scoped
// to a single project. Model dimension is preserved so the client can
// compute cost from its per-model pricing table; the client folds rows by
// agent for the "by agent" list on the dashboard.
func (q *Queries) ListDashboardUsageByAgent(ctx context.Context, arg ListDashboardUsageByAgentParams) ([]ListDashboardUsageByAgentRow, error) {
rows, err := q.db.Query(ctx, listDashboardUsageByAgent, arg.WorkspaceID, arg.Since, arg.ProjectID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListDashboardUsageByAgentRow{}
for rows.Next() {
var i ListDashboardUsageByAgentRow
if err := rows.Scan(
&i.AgentID,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listDashboardUsageByAgentRollup = `-- name: ListDashboardUsageByAgentRollup :many
SELECT
agent_id,
model,
SUM(input_tokens)::bigint AS input_tokens,
SUM(output_tokens)::bigint AS output_tokens,
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
SUM(task_count)::int AS task_count
FROM task_usage_dashboard_daily
WHERE workspace_id = $1
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
AND ($3::uuid IS NULL OR project_id = $3)
GROUP BY agent_id, model
ORDER BY agent_id, model
`
type ListDashboardUsageByAgentRollupParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
ProjectID pgtype.UUID `json:"project_id"`
}
type ListDashboardUsageByAgentRollupRow struct {
AgentID pgtype.UUID `json:"agent_id"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Per-(agent, model) token rollup from `task_usage_dashboard_daily`.
// task_count here is the SUM of per-bucket distinct counts; one task that
// spans multiple days lands in multiple buckets, so this can over-count
// by date. The frontend prefers `ListDashboardAgentRunTime`'s per-agent
// distinct figure for the user-facing "tasks" column, so this value is
// informational only.
func (q *Queries) ListDashboardUsageByAgentRollup(ctx context.Context, arg ListDashboardUsageByAgentRollupParams) ([]ListDashboardUsageByAgentRollupRow, error) {
rows, err := q.db.Query(ctx, listDashboardUsageByAgentRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListDashboardUsageByAgentRollupRow{}
for rows.Next() {
var i ListDashboardUsageByAgentRollupRow
if err := rows.Scan(
&i.AgentID,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listDashboardUsageDaily = `-- name: ListDashboardUsageDaily :many
SELECT
DATE(tu.created_at) AS date,
tu.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(DISTINCT tu.task_id)::int AS task_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
LEFT JOIN issue i ON i.id = atq.issue_id
WHERE a.workspace_id = $1
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
AND ($3::uuid IS NULL OR i.project_id = $3)
GROUP BY DATE(tu.created_at), tu.model
ORDER BY DATE(tu.created_at) DESC, tu.model
`
type ListDashboardUsageDailyParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
ProjectID pgtype.UUID `json:"project_id"`
}
type ListDashboardUsageDailyRow struct {
Date pgtype.Date `json:"date"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Daily per-(date, model) token aggregates for the workspace, optionally
// scoped to a single project via sqlc.narg('project_id'). Bucketed by
// tu.created_at (token-production time) to match GetWorkspaceUsageByDay,
// so a task that queues one day and finishes the next is attributed to
// the day the tokens actually landed. Powers the workspace dashboard's
// daily cost chart.
func (q *Queries) ListDashboardUsageDaily(ctx context.Context, arg ListDashboardUsageDailyParams) ([]ListDashboardUsageDailyRow, error) {
rows, err := q.db.Query(ctx, listDashboardUsageDaily, arg.WorkspaceID, arg.Since, arg.ProjectID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListDashboardUsageDailyRow{}
for rows.Next() {
var i ListDashboardUsageDailyRow
if err := rows.Scan(
&i.Date,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listDashboardUsageDailyRollup = `-- name: ListDashboardUsageDailyRollup :many
SELECT
bucket_date AS date,
model,
SUM(input_tokens)::bigint AS input_tokens,
SUM(output_tokens)::bigint AS output_tokens,
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
SUM(task_count)::int AS task_count
FROM task_usage_dashboard_daily
WHERE workspace_id = $1
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
AND ($3::uuid IS NULL OR project_id = $3)
GROUP BY bucket_date, model
ORDER BY bucket_date DESC, model
`
type ListDashboardUsageDailyRollupParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Since pgtype.Timestamptz `json:"since"`
ProjectID pgtype.UUID `json:"project_id"`
}
type ListDashboardUsageDailyRollupRow struct {
Date pgtype.Date `json:"date"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
TaskCount int32 `json:"task_count"`
}
// Daily token rollup, served from `task_usage_dashboard_daily` (migration
// 084). Same wire shape as ListDashboardUsageDaily so the handler can
// swap them on the `UseDailyRollupForDashboard` flag with no other
// changes. The rollup is up to ~10 min stale (5 min cron + 5 min lag),
// which is fine for a dashboard read path.
func (q *Queries) ListDashboardUsageDailyRollup(ctx context.Context, arg ListDashboardUsageDailyRollupParams) ([]ListDashboardUsageDailyRollupRow, error) {
rows, err := q.db.Query(ctx, listDashboardUsageDailyRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListDashboardUsageDailyRollupRow{}
for rows.Next() {
var i ListDashboardUsageDailyRollupRow
if err := rows.Scan(
&i.Date,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.TaskCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
ON CONFLICT (task_id, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now()
`
type UpsertTaskUsageParams struct {
TaskID pgtype.UUID `json:"task_id"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
// (migration 073) detects the row as dirty and re-aggregates its bucket.
// Without the conflict-side bump, a correction to historical token counts
// would never propagate to the rollup.
func (q *Queries) UpsertTaskUsage(ctx context.Context, arg UpsertTaskUsageParams) error {
_, err := q.db.Exec(ctx, upsertTaskUsage,
arg.TaskID,
arg.Provider,
arg.Model,
arg.InputTokens,
arg.OutputTokens,
arg.CacheReadTokens,
arg.CacheWriteTokens,
)
return err
}