mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 21:39:54 +02:00
* feat(dashboard): workspace/project token + run-time dashboard
Add a `/{slug}/dashboard` page showing per-agent token spend and execution
time across the whole workspace, with an optional project filter.
Backend:
- Three new sqlc queries against task_usage + agent_task_queue: daily
usage, per-agent usage, per-agent total run-time. All optionally
scoped to a project via sqlc.narg('project_id'), reaching project
through the issue join.
- Handlers under /api/dashboard return the same wire shape the runtime
page already consumes (model preserved for client-side cost math).
Frontend: - Shared DashboardPage in packages/views/dashboard reusing KpiCard,
DailyCostChart, ActorAvatar, and estimateCost from the runtime page
so the visual style and pricing math stay in lock-step.
- Period selector (7/30/90d), project dropdown, four KPI tiles
(cost, tokens, run time, tasks), daily cost chart, and a combined
"cost + run time by agent" list.
- Routed in both web (app/[slug]/(dashboard)/dashboard) and desktop
(memory router); sidebar nav entry added under Workspace group.
Co-authored-by: multica-agent <github@multica.ai>
* fix(dashboard): drop stale project filter and stop double-counting tasks
Two issues caught in PR #2462 review:
1. Project filter held the previous selection's UUID across workspace
switches and project deletions: the dropdown gracefully showed
"All projects" (because the title lookup missed) while the three
dashboard queries kept forwarding the dead UUID, leaving the UI
looking like a full-workspace view but populated with empty
project-scoped data. Validate the picked UUID against the current
projects list before passing it to the queries.
2. The "by agent" table read its task count from the token rollup,
which is grouped per (agent, model). A single task that spans two
models lands twice and the agent's row reads e.g. "2 tasks" when
the real count is 1. Prefer `ListDashboardAgentRunTime`'s per-agent
distinct count when available; fall back to the token aggregate
only for agents with no terminal run yet (in-flight tasks).
Extract the merge into `mergeAgentDashboardRows` so the precedence
rules are unit-tested directly.
Co-authored-by: multica-agent <github@multica.ai>
* test(dashboard): allocate per-workspace issue.number explicitly
TestDashboardEndpoints creates two issues in the shared fixture
workspace. issue.number defaults to 0 (migration 020), and the table
carries UNIQUE (workspace_id, number), so the second insert raced the
first on the same default and failed in CI.
Allocate MAX(number) + 1 per insert so each row gets a fresh number
without stepping on rows other tests left behind in the same workspace.
Co-authored-by: multica-agent <github@multica.ai>
* feat(dashboard): rollup table + cron-driven aggregation for dashboard
Mirror the per-runtime rollup in `task_usage_daily` (migrations 073/077/082)
to remove the per-request raw aggregation the dashboard was doing.
Migration 084 adds:
- `task_usage_dashboard_daily` keyed on
(bucket_date, workspace_id, agent_id, project_id, model) — the
dimensions the dashboard actually queries, with project_id nullable
via UNIQUE NULLS NOT DISTINCT (PG15+) so "no-project" buckets
upsert cleanly.
- `task_usage_dashboard_rollup_state` watermark table.
- `task_usage_dashboard_dirty` invalidation queue.
- Triggers on agent_task_queue DELETE, task_usage DELETE, and
issue.project_id UPDATE — the cases the updated_at watermark can't
see. The project_id trigger re-attributes existing rollup rows when
a user moves an issue across projects.
- `rollup_task_usage_dashboard_daily_window(from, to)` —
idempotent recompute primitive (same shape as 077).
- `rollup_task_usage_dashboard_daily()` cron entry — own advisory
lock (4244) so it serialises independently of the runtime rollup.
- `task_usage_dashboard_rollup_lag_seconds()` health helper.
Sqlc queries `ListDashboardUsageDailyRollup` /
`ListDashboardUsageByAgentRollup` read from the new table; the handler
dispatches between rollup and raw on a separate
`UseDailyRollupForDashboard` config flag
(`USAGE_DASHBOARD_ROLLUP_ENABLED` env). Same fail-safe default (false →
raw) so operators can roll out independently of the per-runtime flag.
Bucket date is UTC (the dashboard aggregates across runtimes that may
sit in different tzs; there's no single correct local boundary).
Adds `cmd/backfill_task_usage_dashboard_daily` mirroring the existing
per-runtime backfill — operator runs it once before flipping the flag.
Tests: - TestDashboardEndpoints now also exercises the rollup read path
(raw vs. rollup, same project-scoped totals).
- TestDashboardRollupReattributesOnProjectChange verifies the
issue.project_id trigger enqueues both old + new buckets and the
next rollup tick zeroes the old project + populates the new one.
Co-authored-by: multica-agent <github@multica.ai>
* fix(dashboard-rollup): close two invalidation gaps
Two leak paths missed by migration 084 review:
1. Issue cascade DELETE — the atq BEFORE DELETE trigger runs AFTER the
issue row is gone, so `LEFT JOIN issue` returns NULL project_id and
the original-project bucket never gets cleared (issue 077 calls this
out for the runtime rollup but didn't need to act on it). Adds an
`issue BEFORE DELETE` trigger that enqueues using OLD.project_id
while the issue row is still readable.
2. `LinkTaskToIssue` (quick-create task attaching to a real issue post-
completion) UPDATEs `agent_task_queue.issue_id` from NULL to a real
id. Migration 084 only watched DELETE on atq, so usage already
rolled up under the no-project bucket stayed attributed to NULL
forever. Extends the atq trigger to fire on UPDATE OF issue_id too,
enqueueing both OLD (NULL project) and NEW (linked issue's project).
Tests: - TestDashboardRollupClearsOnIssueDelete asserts rollup row drops to
zero after issue delete + rollup tick.
- TestDashboardRollupReattributesOnLinkTaskToIssue verifies tokens
move from the NULL bucket to the project bucket after the UPDATE.
Co-authored-by: multica-agent <github@multica.ai>
---------
Co-authored-by: multica-agent <github@multica.ai>
583 lines
19 KiB
Go
583 lines
19 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: task_usage.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const getIssueUsageSummary = `-- name: GetIssueUsageSummary :one
|
|
SELECT
|
|
COALESCE(SUM(tu.input_tokens), 0)::bigint AS total_input_tokens,
|
|
COALESCE(SUM(tu.output_tokens), 0)::bigint AS total_output_tokens,
|
|
COALESCE(SUM(tu.cache_read_tokens), 0)::bigint AS total_cache_read_tokens,
|
|
COALESCE(SUM(tu.cache_write_tokens), 0)::bigint AS total_cache_write_tokens,
|
|
COUNT(DISTINCT tu.task_id)::int AS task_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
WHERE atq.issue_id = $1
|
|
`
|
|
|
|
type GetIssueUsageSummaryRow struct {
|
|
TotalInputTokens int64 `json:"total_input_tokens"`
|
|
TotalOutputTokens int64 `json:"total_output_tokens"`
|
|
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
|
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
func (q *Queries) GetIssueUsageSummary(ctx context.Context, issueID pgtype.UUID) (GetIssueUsageSummaryRow, error) {
|
|
row := q.db.QueryRow(ctx, getIssueUsageSummary, issueID)
|
|
var i GetIssueUsageSummaryRow
|
|
err := row.Scan(
|
|
&i.TotalInputTokens,
|
|
&i.TotalOutputTokens,
|
|
&i.TotalCacheReadTokens,
|
|
&i.TotalCacheWriteTokens,
|
|
&i.TaskCount,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getTaskUsage = `-- name: GetTaskUsage :many
|
|
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM task_usage
|
|
WHERE task_id = $1
|
|
ORDER BY model
|
|
`
|
|
|
|
func (q *Queries) GetTaskUsage(ctx context.Context, taskID pgtype.UUID) ([]TaskUsage, error) {
|
|
rows, err := q.db.Query(ctx, getTaskUsage, taskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []TaskUsage{}
|
|
for rows.Next() {
|
|
var i TaskUsage
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.TaskID,
|
|
&i.Provider,
|
|
&i.Model,
|
|
&i.InputTokens,
|
|
&i.OutputTokens,
|
|
&i.CacheReadTokens,
|
|
&i.CacheWriteTokens,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkspaceUsageByDay = `-- name: GetWorkspaceUsageByDay :many
|
|
SELECT
|
|
DATE(tu.created_at) AS date,
|
|
tu.model,
|
|
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
|
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
|
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
|
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
|
COUNT(DISTINCT tu.task_id)::int AS task_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
|
GROUP BY DATE(tu.created_at), tu.model
|
|
ORDER BY DATE(tu.created_at) DESC, tu.model
|
|
`
|
|
|
|
type GetWorkspaceUsageByDayParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
}
|
|
|
|
type GetWorkspaceUsageByDayRow struct {
|
|
Date pgtype.Date `json:"date"`
|
|
Model string `json:"model"`
|
|
TotalInputTokens int64 `json:"total_input_tokens"`
|
|
TotalOutputTokens int64 `json:"total_output_tokens"`
|
|
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
|
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Bucket by tu.created_at (usage report time, ~= task completion time), not
|
|
// atq.created_at (task enqueue time), so tasks that queue one day and execute
|
|
// the next are attributed to the day tokens were actually produced. The since
|
|
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
|
|
func (q *Queries) GetWorkspaceUsageByDay(ctx context.Context, arg GetWorkspaceUsageByDayParams) ([]GetWorkspaceUsageByDayRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceUsageByDay, arg.WorkspaceID, arg.Since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceUsageByDayRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceUsageByDayRow
|
|
if err := rows.Scan(
|
|
&i.Date,
|
|
&i.Model,
|
|
&i.TotalInputTokens,
|
|
&i.TotalOutputTokens,
|
|
&i.TotalCacheReadTokens,
|
|
&i.TotalCacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkspaceUsageSummary = `-- name: GetWorkspaceUsageSummary :many
|
|
SELECT
|
|
tu.model,
|
|
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
|
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
|
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
|
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
|
COUNT(DISTINCT tu.task_id)::int AS task_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
|
GROUP BY tu.model
|
|
ORDER BY (SUM(tu.input_tokens) + SUM(tu.output_tokens)) DESC
|
|
`
|
|
|
|
type GetWorkspaceUsageSummaryParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
}
|
|
|
|
type GetWorkspaceUsageSummaryRow struct {
|
|
Model string `json:"model"`
|
|
TotalInputTokens int64 `json:"total_input_tokens"`
|
|
TotalOutputTokens int64 `json:"total_output_tokens"`
|
|
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
|
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Filter by tu.created_at (usage report time), aligned to start-of-day, so
|
|
// `days=N` is interpreted as N full calendar days like the other usage queries.
|
|
func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspaceUsageSummaryParams) ([]GetWorkspaceUsageSummaryRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceUsageSummary, arg.WorkspaceID, arg.Since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceUsageSummaryRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceUsageSummaryRow
|
|
if err := rows.Scan(
|
|
&i.Model,
|
|
&i.TotalInputTokens,
|
|
&i.TotalOutputTokens,
|
|
&i.TotalCacheReadTokens,
|
|
&i.TotalCacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listDashboardAgentRunTime = `-- name: ListDashboardAgentRunTime :many
|
|
SELECT
|
|
atq.agent_id,
|
|
COALESCE(
|
|
SUM(EXTRACT(EPOCH FROM (atq.completed_at - atq.started_at)))::bigint,
|
|
0
|
|
)::bigint AS total_seconds,
|
|
COUNT(*)::int AS task_count,
|
|
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
LEFT JOIN issue i ON i.id = atq.issue_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('completed', 'failed')
|
|
AND atq.started_at IS NOT NULL
|
|
AND atq.completed_at IS NOT NULL
|
|
AND atq.completed_at >= DATE_TRUNC('day', $2::timestamptz)
|
|
AND ($3::uuid IS NULL OR i.project_id = $3)
|
|
GROUP BY atq.agent_id
|
|
ORDER BY total_seconds DESC
|
|
`
|
|
|
|
type ListDashboardAgentRunTimeParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
ProjectID pgtype.UUID `json:"project_id"`
|
|
}
|
|
|
|
type ListDashboardAgentRunTimeRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
TotalSeconds int64 `json:"total_seconds"`
|
|
TaskCount int32 `json:"task_count"`
|
|
FailedCount int32 `json:"failed_count"`
|
|
}
|
|
|
|
// Per-agent total task run time and task count for the workspace, optionally
|
|
// scoped to a single project. Counts only terminal runs (completed or failed)
|
|
// with both started_at and completed_at populated — queued/running tasks have
|
|
// no finite duration. Anchored on completed_at so the window matches the
|
|
// token cost window (which is anchored on tu.created_at, ~= completion time).
|
|
func (q *Queries) ListDashboardAgentRunTime(ctx context.Context, arg ListDashboardAgentRunTimeParams) ([]ListDashboardAgentRunTimeRow, error) {
|
|
rows, err := q.db.Query(ctx, listDashboardAgentRunTime, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ListDashboardAgentRunTimeRow{}
|
|
for rows.Next() {
|
|
var i ListDashboardAgentRunTimeRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.TotalSeconds,
|
|
&i.TaskCount,
|
|
&i.FailedCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listDashboardUsageByAgent = `-- name: ListDashboardUsageByAgent :many
|
|
SELECT
|
|
atq.agent_id,
|
|
tu.model,
|
|
SUM(tu.input_tokens)::bigint AS input_tokens,
|
|
SUM(tu.output_tokens)::bigint AS output_tokens,
|
|
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
|
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
|
COUNT(DISTINCT tu.task_id)::int AS task_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
LEFT JOIN issue i ON i.id = atq.issue_id
|
|
WHERE a.workspace_id = $1
|
|
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
|
AND ($3::uuid IS NULL OR i.project_id = $3)
|
|
GROUP BY atq.agent_id, tu.model
|
|
ORDER BY atq.agent_id, tu.model
|
|
`
|
|
|
|
type ListDashboardUsageByAgentParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
ProjectID pgtype.UUID `json:"project_id"`
|
|
}
|
|
|
|
type ListDashboardUsageByAgentRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Per-(agent, model) token aggregates for the workspace, optionally scoped
|
|
// to a single project. Model dimension is preserved so the client can
|
|
// compute cost from its per-model pricing table; the client folds rows by
|
|
// agent for the "by agent" list on the dashboard.
|
|
func (q *Queries) ListDashboardUsageByAgent(ctx context.Context, arg ListDashboardUsageByAgentParams) ([]ListDashboardUsageByAgentRow, error) {
|
|
rows, err := q.db.Query(ctx, listDashboardUsageByAgent, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ListDashboardUsageByAgentRow{}
|
|
for rows.Next() {
|
|
var i ListDashboardUsageByAgentRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.Model,
|
|
&i.InputTokens,
|
|
&i.OutputTokens,
|
|
&i.CacheReadTokens,
|
|
&i.CacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listDashboardUsageByAgentRollup = `-- name: ListDashboardUsageByAgentRollup :many
|
|
SELECT
|
|
agent_id,
|
|
model,
|
|
SUM(input_tokens)::bigint AS input_tokens,
|
|
SUM(output_tokens)::bigint AS output_tokens,
|
|
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
|
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
|
SUM(task_count)::int AS task_count
|
|
FROM task_usage_dashboard_daily
|
|
WHERE workspace_id = $1
|
|
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
|
|
AND ($3::uuid IS NULL OR project_id = $3)
|
|
GROUP BY agent_id, model
|
|
ORDER BY agent_id, model
|
|
`
|
|
|
|
type ListDashboardUsageByAgentRollupParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
ProjectID pgtype.UUID `json:"project_id"`
|
|
}
|
|
|
|
type ListDashboardUsageByAgentRollupRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Per-(agent, model) token rollup from `task_usage_dashboard_daily`.
|
|
// task_count here is the SUM of per-bucket distinct counts; one task that
|
|
// spans multiple days lands in multiple buckets, so this can over-count
|
|
// by date. The frontend prefers `ListDashboardAgentRunTime`'s per-agent
|
|
// distinct figure for the user-facing "tasks" column, so this value is
|
|
// informational only.
|
|
func (q *Queries) ListDashboardUsageByAgentRollup(ctx context.Context, arg ListDashboardUsageByAgentRollupParams) ([]ListDashboardUsageByAgentRollupRow, error) {
|
|
rows, err := q.db.Query(ctx, listDashboardUsageByAgentRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ListDashboardUsageByAgentRollupRow{}
|
|
for rows.Next() {
|
|
var i ListDashboardUsageByAgentRollupRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.Model,
|
|
&i.InputTokens,
|
|
&i.OutputTokens,
|
|
&i.CacheReadTokens,
|
|
&i.CacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listDashboardUsageDaily = `-- name: ListDashboardUsageDaily :many
|
|
SELECT
|
|
DATE(tu.created_at) AS date,
|
|
tu.model,
|
|
SUM(tu.input_tokens)::bigint AS input_tokens,
|
|
SUM(tu.output_tokens)::bigint AS output_tokens,
|
|
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
|
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
|
COUNT(DISTINCT tu.task_id)::int AS task_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
LEFT JOIN issue i ON i.id = atq.issue_id
|
|
WHERE a.workspace_id = $1
|
|
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
|
AND ($3::uuid IS NULL OR i.project_id = $3)
|
|
GROUP BY DATE(tu.created_at), tu.model
|
|
ORDER BY DATE(tu.created_at) DESC, tu.model
|
|
`
|
|
|
|
type ListDashboardUsageDailyParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
ProjectID pgtype.UUID `json:"project_id"`
|
|
}
|
|
|
|
type ListDashboardUsageDailyRow struct {
|
|
Date pgtype.Date `json:"date"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Daily per-(date, model) token aggregates for the workspace, optionally
|
|
// scoped to a single project via sqlc.narg('project_id'). Bucketed by
|
|
// tu.created_at (token-production time) to match GetWorkspaceUsageByDay,
|
|
// so a task that queues one day and finishes the next is attributed to
|
|
// the day the tokens actually landed. Powers the workspace dashboard's
|
|
// daily cost chart.
|
|
func (q *Queries) ListDashboardUsageDaily(ctx context.Context, arg ListDashboardUsageDailyParams) ([]ListDashboardUsageDailyRow, error) {
|
|
rows, err := q.db.Query(ctx, listDashboardUsageDaily, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ListDashboardUsageDailyRow{}
|
|
for rows.Next() {
|
|
var i ListDashboardUsageDailyRow
|
|
if err := rows.Scan(
|
|
&i.Date,
|
|
&i.Model,
|
|
&i.InputTokens,
|
|
&i.OutputTokens,
|
|
&i.CacheReadTokens,
|
|
&i.CacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listDashboardUsageDailyRollup = `-- name: ListDashboardUsageDailyRollup :many
|
|
SELECT
|
|
bucket_date AS date,
|
|
model,
|
|
SUM(input_tokens)::bigint AS input_tokens,
|
|
SUM(output_tokens)::bigint AS output_tokens,
|
|
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
|
SUM(cache_write_tokens)::bigint AS cache_write_tokens,
|
|
SUM(task_count)::int AS task_count
|
|
FROM task_usage_dashboard_daily
|
|
WHERE workspace_id = $1
|
|
AND bucket_date >= DATE_TRUNC('day', $2::timestamptz)::date
|
|
AND ($3::uuid IS NULL OR project_id = $3)
|
|
GROUP BY bucket_date, model
|
|
ORDER BY bucket_date DESC, model
|
|
`
|
|
|
|
type ListDashboardUsageDailyRollupParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Since pgtype.Timestamptz `json:"since"`
|
|
ProjectID pgtype.UUID `json:"project_id"`
|
|
}
|
|
|
|
type ListDashboardUsageDailyRollupRow struct {
|
|
Date pgtype.Date `json:"date"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
TaskCount int32 `json:"task_count"`
|
|
}
|
|
|
|
// Daily token rollup, served from `task_usage_dashboard_daily` (migration
|
|
// 084). Same wire shape as ListDashboardUsageDaily so the handler can
|
|
// swap them on the `UseDailyRollupForDashboard` flag with no other
|
|
// changes. The rollup is up to ~10 min stale (5 min cron + 5 min lag),
|
|
// which is fine for a dashboard read path.
|
|
func (q *Queries) ListDashboardUsageDailyRollup(ctx context.Context, arg ListDashboardUsageDailyRollupParams) ([]ListDashboardUsageDailyRollupRow, error) {
|
|
rows, err := q.db.Query(ctx, listDashboardUsageDailyRollup, arg.WorkspaceID, arg.Since, arg.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ListDashboardUsageDailyRollupRow{}
|
|
for rows.Next() {
|
|
var i ListDashboardUsageDailyRollupRow
|
|
if err := rows.Scan(
|
|
&i.Date,
|
|
&i.Model,
|
|
&i.InputTokens,
|
|
&i.OutputTokens,
|
|
&i.CacheReadTokens,
|
|
&i.CacheWriteTokens,
|
|
&i.TaskCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
|
|
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
|
|
ON CONFLICT (task_id, provider, model)
|
|
DO UPDATE SET
|
|
input_tokens = EXCLUDED.input_tokens,
|
|
output_tokens = EXCLUDED.output_tokens,
|
|
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
|
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
|
updated_at = now()
|
|
`
|
|
|
|
type UpsertTaskUsageParams struct {
|
|
TaskID pgtype.UUID `json:"task_id"`
|
|
Provider string `json:"provider"`
|
|
Model string `json:"model"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
CacheReadTokens int64 `json:"cache_read_tokens"`
|
|
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
|
}
|
|
|
|
// Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
|
|
// (migration 073) detects the row as dirty and re-aggregates its bucket.
|
|
// Without the conflict-side bump, a correction to historical token counts
|
|
// would never propagate to the rollup.
|
|
func (q *Queries) UpsertTaskUsage(ctx context.Context, arg UpsertTaskUsageParams) error {
|
|
_, err := q.db.Exec(ctx, upsertTaskUsage,
|
|
arg.TaskID,
|
|
arg.Provider,
|
|
arg.Model,
|
|
arg.InputTokens,
|
|
arg.OutputTokens,
|
|
arg.CacheReadTokens,
|
|
arg.CacheWriteTokens,
|
|
)
|
|
return err
|
|
}
|