Compare commits

...

5 Commits

Author SHA1 Message Date
Devv
d76798d581 fix(server): unify workspace_id source via agent in rollup window function
Round-4 review feedback (J) on PR #2256:

M1 (must-fix): The dirty queue triggers resolved workspace_id via
`agent.workspace_id`, but the window function's `dirty_from_updates`
discovery and `recomputed` recompute join used `issue.workspace_id`.
There is no schema-level FK guaranteeing
`agent.workspace_id == issue.workspace_id`. Any divergence (future
cross-workspace task scenarios, data repairs, migration bugs) would
cause:

  - dirty queue rows with workspace_id from agent
  - recompute join filtering by workspace_id from issue
  - 0 matches in recompute → bucket erroneously hits the
    deleted_empty branch and the daily row is silently dropped
  - dirty_from_updates path attributing usage to the wrong workspace

Replaced both CTEs to JOIN agent (not issue) so trigger / discovery /
recompute share one workspace_id source. Comment in 077 explains the
constraint.

N1: Refreshed two stale references in
cmd/backfill_task_usage_daily/main.go (header now says "072..078";
stampWatermark warning now mentions migration 073, where the rollup
state table is actually introduced).

Test: New TestRollupTaskUsageDaily_WorkspaceMismatch constructs an
atq with agent.workspace_id != issue.workspace_id, asserts the bucket
lands under agent's workspace (not issue's), and re-asserts after a
runtime reassign in the foreign workspace. Acts as a canary if the
schema invariant changes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-05-08 15:15:39 +08:00
Devv
929f25207c fix(server): close dirty-queue race + move legacy partial index to its own concurrent migration
Round-3 review feedback on PR #2256:

1. Blocker: dirty-queue invalidations could be silently lost under
   concurrency. ON CONFLICT DO NOTHING let a late trigger see the row
   already enqueued, no-op, and then the rollup drain (WHERE
   enqueued_at < p_to) would delete the original row — losing the
   late invalidation. Switched all three trigger enqueue paths to
   ON CONFLICT DO UPDATE SET enqueued_at = GREATEST(existing,
   EXCLUDED.enqueued_at), so any invalidation arriving during a
   rollup tick keeps enqueued_at > p_to (p_to = now() - 5min) and
   survives the post-tick drain.

2. High: idx_task_usage_created_at_legacy (partial index on hot
   task_usage table) was being created in the regular 077 migration
   without CONCURRENTLY. Moved to new migration 078 with
   CREATE INDEX CONCURRENTLY, matching the pattern of 074/075.
   077's down migration leaves the index alone (it is owned by 078).

3. Minor: gofmt -w on runtime_rollup_test.go and
   backfill_task_usage_daily/main.go (tabs were lost in the original
   heredoc append). PR description rewritten to describe the current
   recompute/replace + dirty queue + feature flag design and the
   072..078 migration ordering.

Tests still green: TestRollupTaskUsageDaily_* (including both new
invalidation regressions), TestGetRuntimeUsage_*, TestWorkspaceUsage_*.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-05-08 15:02:41 +08:00
Devv
45dd094d61 fix(server): trigger-driven invalidation + online-safe migration for task_usage_daily
Round-2 review feedback on PR #2256:

1. Add explicit dirty-bucket queue (task_usage_daily_dirty) populated by
   triggers on agent_task_queue (UPDATE OF runtime_id, DELETE) and
   task_usage (DELETE). The rollup window function drains both this queue
   and the updated_at-based discovery, so runtime reassignment and
   issue-cascade deletes no longer leave the rollup divergent from the
   raw query.

   Triggers join via agent (not issue) to look up workspace_id, because
   when the cascade comes from issue, the issue row is already gone by
   the time atq's BEFORE DELETE fires; agent stays alive.

2. Make migration 072 online-safe: only ADD COLUMN updated_at TIMESTAMPTZ
   (nullable, no default → metadata-only ALTER, no row rewrite) and a
   separate ALTER for SET DEFAULT now() (also metadata-only). No bulk
   UPDATE on the hot task_usage table. The rollup window function's
   dirty_keys CTE handles legacy NULL rows via an OR branch, supported
   by partial index idx_task_usage_created_at_legacy.

3. Refresh stale documentation in cmd/backfill_task_usage_daily/main.go
   header to describe the current recompute/replace semantics, idempotent
   re-runnability, and the actual migration numbering (072..077).

Tests:
- TestRollupTaskUsageDaily_InvalidationOnReassign: verifies usage moves
  between runtime buckets after ReassignTasksToRuntime-style update.
- TestRollupTaskUsageDaily_InvalidationOnIssueDelete: verifies daily
  bucket is cleared after issue delete cascades through atq → task_usage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-05-08 14:55:57 +08:00
Devv
5b82370061 fix(server): make task_usage_daily rollup safe to overlap, replay, and correct
Addresses 4 review blockers on the original PR:

1. Cron/backfill double-count race: the rollup function is now idempotent.
   Window calls find DIRTY KEYS via task_usage.updated_at, then RECOMPUTE
   each bucket from ground truth and REPLACE the daily row (no more
   additive ON CONFLICT). Cron and backfill can now overlap safely.

2. Silent pg_cron absence: the read path is gated behind a new
   USAGE_DAILY_ROLLUP_ENABLED feature flag (default off). The raw
   task_usage scan is preserved as the fallback. Operators flip the
   flag per-environment after backfill + cron are confirmed healthy
   (task_usage_rollup_lag_seconds() helper added for monitoring).

3. UpsertTaskUsage corrections invisible to rollup: added
   task_usage.updated_at column (default now(), backfilled from
   created_at), and bumped it on conflict. Corrections now mark the
   bucket dirty and the next window call recomputes it correctly.

4. CREATE INDEX blocking writes on hot table: split into separate
   single-statement migrations using CREATE INDEX CONCURRENTLY
   (074, 075), matching the 035/067 pattern.

Also: cron.schedule() removed from migrations entirely. Migration 076
only enables the extension (gracefully on unsupported envs); the actual
schedule is a documented operator runbook step that runs AFTER backfill.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-05-08 14:39:27 +08:00
Eve
1e0195d270 fix(server): aggregate task_usage into daily rollup table to cut DB load
ListRuntimeUsage previously did a SUM(...) GROUP BY DATE(created_at), provider,
model over the raw task_usage stream once per runtime row on the runtimes
list and once per detail page load, scaling O(events) per call. This is the
hot read path responsible for sustained load on Postgres.

Switch the read path to a materialized daily rollup table maintained by a
pg_cron job:

- 072_task_usage_daily_rollup: schema for task_usage_daily +
  task_usage_rollup_state, plus rollup_task_usage_daily_window(p_from, p_to)
  (window primitive used by both cron and offline backfill, idempotent via
  ON CONFLICT DO UPDATE adding deltas) and rollup_task_usage_daily() (cron
  entry point — pg_try_advisory_lock(4242) for serialization, watermark
  advancement, 5-minute safety lag for late-visible inserts). Also adds
  idx_task_usage_created_at to help the two lazy endpoints
  (ListRuntimeUsageByAgent / GetRuntimeUsageByHour) that still hit the
  raw table.

- 073_task_usage_daily_pgcron: CREATE EXTENSION IF NOT EXISTS pg_cron in a
  DO/EXCEPTION block (mirrors the migration 032 pg_bigm pattern so envs
  without shared_preload_libraries=pg_cron skip gracefully) and schedules
  rollup_task_usage_daily() every 5 minutes when the extension is present.

- queries/runtime_usage.sql ListRuntimeUsage rewritten to read from
  task_usage_daily; sqlc regenerated. Other usage queries unchanged.

- cmd/backfill_task_usage_daily: one-shot Go command that walks
  task_usage in monthly slices through rollup_task_usage_daily_window,
  then stamps the watermark to now()-5m so the cron resumes cleanly.
  Run once after migrations have applied, before relying on the rollup.

- runtime_test.go: TestGetRuntimeUsage_BucketsByUsageTime now invokes
  rollup_task_usage_daily_window after fixture inserts so the handler
  sees the rolled-up rows. Synthetic daily rows cleaned up after each
  test.

- runtime_rollup_test.go: new tests covering aggregation correctness,
  idempotency contract of ON CONFLICT DO UPDATE, and the watermark
  advancing exactly to now()-5m via the cron entry point.

Deployment order: apply migrations → run backfill_task_usage_daily once
→ pg_cron picks up subsequent windows automatically. Today bucket may be
up to ~10 minutes stale (5 min cron + 5 min lag) by design.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
2026-05-08 14:18:53 +08:00
25 changed files with 1524 additions and 25 deletions

View File

@@ -0,0 +1,143 @@
// Backfill_task_usage_daily seeds the `task_usage_daily` rollup table
// from the historical contents of `task_usage`. Intended to be run once
// after migrations 072..078 are applied, before flipping the
// USAGE_DAILY_ROLLUP_ENABLED feature flag and before scheduling the
// pg_cron job. The cron schedule is intentionally NOT created by a
// migration (see 076 header) — operators run this backfill, then
// schedule cron, then enable the read-path flag.
//
// Strategy:
// 1. Walk the time range covered by task_usage in monthly slices.
// 2. For each slice, call rollup_task_usage_daily_window(p_from, p_to)
// — the same primitive the cron path uses, so semantics are
// guaranteed identical.
// 3. After all slices succeed, advance task_usage_rollup_state.watermark_at
// to (now() - 5 minutes) so the cron tick that follows doesn't
// reprocess the same window we just rolled up.
//
// Re-running IS safe. The window function (introduced in 073, refined in
// 077) recomputes each dirty bucket from raw and REPLACES the daily row,
// so rerunning a slice produces the same result. Use this property to
// recover from partial backfill failures without TRUNCATEing
// task_usage_daily first.
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/multica-ai/multica/server/internal/logger"
)
func main() {
logger.Init()
var (
dryRun = flag.Bool("dry-run", false, "log slices that would be processed without touching task_usage_daily")
monthsBack = flag.Int("months-back", 0, "limit backfill to the last N months (0 = all available history)")
)
flag.Parse()
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
}
ctx := context.Background()
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
slog.Error("unable to connect to database", "error", err)
os.Exit(1)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
slog.Error("unable to ping database", "error", err)
os.Exit(1)
}
// Discover the time range we need to cover. If task_usage is empty
// we just stamp the watermark to now and exit.
var minTS, maxTS pgtype.Timestamptz
if err := pool.QueryRow(ctx, `SELECT MIN(created_at), MAX(created_at) FROM task_usage`).Scan(&minTS, &maxTS); err != nil {
slog.Error("scan task_usage time range", "error", err)
os.Exit(1)
}
if !minTS.Valid {
slog.Info("task_usage is empty; nothing to backfill")
stampWatermark(ctx, pool)
return
}
from := monthFloor(minTS.Time.UTC())
end := monthFloor(maxTS.Time.UTC()).AddDate(0, 1, 0)
if *monthsBack > 0 {
cutoff := monthFloor(time.Now().UTC()).AddDate(0, -(*monthsBack), 0)
if cutoff.After(from) {
from = cutoff
}
}
slog.Info("backfill range", "from", from.Format(time.RFC3339), "to", end.Format(time.RFC3339), "dry_run", *dryRun)
cursor := from
var totalRows int64
for cursor.Before(end) {
next := cursor.AddDate(0, 1, 0)
if *dryRun {
slog.Info("would roll up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339))
cursor = next
continue
}
var rows int64
err := pool.QueryRow(
ctx,
`SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)`,
cursor, next,
).Scan(&rows)
if err != nil {
slog.Error("rollup slice failed", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "error", err)
os.Exit(1)
}
totalRows += rows
slog.Info("rolled up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "rows_touched", rows)
cursor = next
}
if !*dryRun {
stampWatermark(ctx, pool)
}
slog.Info("backfill complete", "total_rows_touched", totalRows)
}
// stampWatermark moves the rollup state's watermark to (now() - 5 min)
// so the next cron tick picks up only events newer than the backfill's
// upper bound. Mirrors the same lag the cron uses to avoid racing
// not-yet-visible inserts.
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) {
tag, err := pool.Exec(ctx, `
UPDATE task_usage_rollup_state
SET watermark_at = now() - INTERVAL '5 minutes'
WHERE id = 1
`)
if err != nil {
slog.Error("stamp watermark failed", "error", err)
os.Exit(1)
}
if tag.RowsAffected() == 0 {
slog.Warn("no rollup state row to stamp; was migration 073 applied?")
return
}
fmt.Println("watermark stamped to now() - 5 minutes")
}
func monthFloor(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
}

View File

@@ -102,9 +102,10 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
cfSigner := auth.NewCloudFrontSignerFromEnv()
signupConfig := handler.Config{
AllowSignup: os.Getenv("ALLOW_SIGNUP") != "false",
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
AllowSignup: os.Getenv("ALLOW_SIGNUP") != "false",
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
UseDailyRollupForRuntimeUsage: os.Getenv("USAGE_DAILY_ROLLUP_ENABLED") == "true",
}
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
if opts.DaemonWakeup != nil {

View File

@@ -47,6 +47,16 @@ type Config struct {
AllowSignup bool
AllowedEmails []string
AllowedEmailDomains []string
// UseDailyRollupForRuntimeUsage routes ListRuntimeUsage to the
// task_usage_daily rollup table when true. Default false: the read
// path stays on the raw task_usage stream so rollup-related issues
// (pg_cron not running, backfill not yet performed, watermark stuck)
// can never make the dashboard return empty/stale data. Operators
// flip this on per environment AFTER:
// 1) migrations 072..076 applied,
// 2) backfill_task_usage_daily ran successfully,
// 3) cron job scheduled and task_usage_rollup_lag_seconds() < 900.
UseDailyRollupForRuntimeUsage bool
}
type Handler struct {

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
@@ -96,17 +97,53 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
since := parseSinceParam(r, 90)
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
RuntimeID: rt.ID,
Since: since,
})
resp, err := h.listRuntimeUsage(r.Context(), rt.ID, since)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list usage")
return
}
writeJSON(w, http.StatusOK, resp)
}
// listRuntimeUsage dispatches between the raw task_usage scan and the
// task_usage_daily rollup based on the UseDailyRollupForRuntimeUsage
// feature flag. Both code paths return rows in the same shape, so the
// handler doesn't care which one ran.
func (h *Handler) listRuntimeUsage(ctx context.Context, runtimeID pgtype.UUID, since pgtype.Timestamptz) ([]RuntimeUsageResponse, error) {
resolvedRuntimeID := uuidToString(runtimeID)
if h.cfg.UseDailyRollupForRuntimeUsage {
rows, err := h.Queries.ListRuntimeUsageDaily(ctx, db.ListRuntimeUsageDailyParams{
RuntimeID: runtimeID,
Since: since,
})
if err != nil {
return nil, err
}
resp := make([]RuntimeUsageResponse, len(rows))
for i, row := range rows {
resp[i] = RuntimeUsageResponse{
RuntimeID: resolvedRuntimeID,
Date: row.Date.Time.Format("2006-01-02"),
Provider: row.Provider,
Model: row.Model,
InputTokens: row.InputTokens,
OutputTokens: row.OutputTokens,
CacheReadTokens: row.CacheReadTokens,
CacheWriteTokens: row.CacheWriteTokens,
}
}
return resp, nil
}
rows, err := h.Queries.ListRuntimeUsage(ctx, db.ListRuntimeUsageParams{
RuntimeID: runtimeID,
Since: since,
})
if err != nil {
return nil, err
}
resp := make([]RuntimeUsageResponse, len(rows))
resolvedRuntimeID := uuidToString(rt.ID)
for i, row := range rows {
resp[i] = RuntimeUsageResponse{
RuntimeID: resolvedRuntimeID,
@@ -119,8 +156,7 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
CacheWriteTokens: row.CacheWriteTokens,
}
}
writeJSON(w, http.StatusOK, resp)
return resp, nil
}
// GetRuntimeTaskActivity returns hourly task activity distribution for a runtime.

View File

@@ -0,0 +1,527 @@
package handler
import (
"context"
"testing"
"time"
)
// TestRollupTaskUsageDaily_AggregatesAndIsIdempotent exercises the
// rollup_task_usage_daily_window() SQL function directly. This is the
// shared aggregation primitive used by both the cron-driven watermark
// loop and the offline backfill command, so its correctness underpins
// the entire ListRuntimeUsage read path. Two properties matter:
//
// 1. It correctly groups raw `task_usage` rows by (date, runtime,
// workspace, provider, model) and sums the four token columns.
// 2. Re-aggregating an already-rolled-up window is *idempotent*: the
// function recomputes each dirty bucket from ground truth and
// REPLACES the daily row, so overlap with backfill / replay is
// safe and corrections via UpsertTaskUsage propagate cleanly.
func TestRollupTaskUsageDaily_AggregatesAndIsIdempotent(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
runtimeID := handlerTestRuntimeID(t)
var agentID string
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
VALUES ($1, 'rollup test', $2, 'member')
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
})
// Pin the test to a fixed historical day so we don't collide with
// concurrent rollups of "today" running against the same fixture
// runtime. 2020-06-15 is far outside any backfill window the rest
// of the suite touches.
day := time.Date(2020, 6, 15, 0, 0, 0, 0, time.UTC)
// Two rows on the same (date, provider, model) — must collapse to
// a single output row whose totals sum the inputs.
insertUsage := func(usageAt time.Time, model string, in, out int64) {
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4)
RETURNING id
`, agentID, issueID, runtimeID, usageAt).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', $2, $3, $4, $5, $5)
`, taskID, model, in, out, usageAt); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
})
}
insertUsage(day.Add(1*time.Hour), "claude-3-5-sonnet", 100, 10)
insertUsage(day.Add(2*time.Hour), "claude-3-5-sonnet", 200, 20)
// A second model on the same day must produce a *separate* output
// row (different group key).
insertUsage(day.Add(3*time.Hour), "claude-3-5-haiku", 50, 5)
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date`, runtimeID, day)
})
// --- 1) Initial aggregation produces the expected totals.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
`, day, day.Add(24*time.Hour)); err != nil {
t.Fatalf("rollup_task_usage_daily_window: %v", err)
}
type row struct {
Model string
InputTokens int64
Output int64
EventCount int64
}
read := func() map[string]row {
rs, err := testPool.Query(ctx, `
SELECT model, input_tokens, output_tokens, event_count
FROM task_usage_daily
WHERE runtime_id = $1 AND bucket_date = $2::date
`, runtimeID, day)
if err != nil {
t.Fatalf("read task_usage_daily: %v", err)
}
defer rs.Close()
out := map[string]row{}
for rs.Next() {
var r row
if err := rs.Scan(&r.Model, &r.InputTokens, &r.Output, &r.EventCount); err != nil {
t.Fatalf("scan: %v", err)
}
out[r.Model] = r
}
return out
}
got := read()
if len(got) != 2 {
t.Fatalf("expected 2 rows (one per model), got %d: %+v", len(got), got)
}
if got["claude-3-5-sonnet"].InputTokens != 300 || got["claude-3-5-sonnet"].Output != 30 || got["claude-3-5-sonnet"].EventCount != 2 {
t.Errorf("sonnet bucket wrong: %+v", got["claude-3-5-sonnet"])
}
if got["claude-3-5-haiku"].InputTokens != 50 || got["claude-3-5-haiku"].Output != 5 || got["claude-3-5-haiku"].EventCount != 1 {
t.Errorf("haiku bucket wrong: %+v", got["claude-3-5-haiku"])
}
// --- 2) Re-aggregating the same window is idempotent.
// The new function recomputes each dirty bucket from ground truth and
// REPLACES the daily row, so callers can safely overlap windows
// (cron + backfill, replay, manual ops). Verifying it explicitly so
// the property doesn't silently regress.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
`, day, day.Add(24*time.Hour)); err != nil {
t.Fatalf("rollup_task_usage_daily_window (second call): %v", err)
}
got = read()
if got["claude-3-5-sonnet"].InputTokens != 300 || got["claude-3-5-sonnet"].EventCount != 2 {
t.Errorf("after second call, sonnet should be unchanged (idempotent), got: %+v", got["claude-3-5-sonnet"])
}
if got["claude-3-5-haiku"].InputTokens != 50 || got["claude-3-5-haiku"].EventCount != 1 {
t.Errorf("after second call, haiku should be unchanged (idempotent), got: %+v", got["claude-3-5-haiku"])
}
// --- 3) Correction propagates: bumping a row's updated_at into a
// new window must cause the bucket to be recomputed from ground
// truth (covers the UpsertTaskUsage correction path that the old
// additive design dropped silently).
correctionMark := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
if _, err := testPool.Exec(ctx, `
UPDATE task_usage SET input_tokens = 1000, updated_at = $1
WHERE task_id IN (
SELECT id FROM agent_task_queue WHERE runtime_id = $2 AND created_at::date = $3::date
)
AND model = 'claude-3-5-sonnet'
AND input_tokens = 100
`, correctionMark, runtimeID, day); err != nil {
t.Fatalf("simulate correction: %v", err)
}
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
`, correctionMark.Add(-time.Minute), correctionMark.Add(time.Minute)); err != nil {
t.Fatalf("rollup correction window: %v", err)
}
got = read()
// New sonnet total: 1000 + 200 = 1200, still 2 events.
if got["claude-3-5-sonnet"].InputTokens != 1200 || got["claude-3-5-sonnet"].EventCount != 2 {
t.Errorf("after correction, sonnet should reflect new total 1200, got: %+v", got["claude-3-5-sonnet"])
}
}
// TestRollupTaskUsageDaily_WatermarkAdvances verifies the cron entry
// point: rollup_task_usage_daily() consults task_usage_rollup_state to
// decide its window, performs the upsert, and bumps the watermark.
// We seed the watermark to a known value, force time to pass via a
// fixture, and assert the watermark moves forward by exactly the
// elapsed-window minus the 5 minute safety lag built into the function.
func TestRollupTaskUsageDaily_WatermarkAdvances(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
// Seed the watermark to "long ago" so the next call has a non-empty
// window. Use a test-scoped low value so we don't clobber any other
// test's state — the singleton row gets restored at the end.
var prevWatermark time.Time
if err := testPool.QueryRow(ctx, `SELECT watermark_at FROM task_usage_rollup_state WHERE id = 1`).Scan(&prevWatermark); err != nil {
t.Fatalf("read prev watermark: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `UPDATE task_usage_rollup_state SET watermark_at = $1 WHERE id = 1`, prevWatermark)
})
if _, err := testPool.Exec(ctx, `
UPDATE task_usage_rollup_state
SET watermark_at = '2020-01-01 00:00:00+00', last_error = NULL
WHERE id = 1
`); err != nil {
t.Fatalf("seed watermark: %v", err)
}
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily()`); err != nil {
t.Fatalf("rollup_task_usage_daily: %v", err)
}
var newWatermark time.Time
var lastError *string
if err := testPool.QueryRow(ctx, `SELECT watermark_at, last_error FROM task_usage_rollup_state WHERE id = 1`).Scan(&newWatermark, &lastError); err != nil {
t.Fatalf("read new watermark: %v", err)
}
if lastError != nil {
t.Fatalf("rollup recorded error: %s", *lastError)
}
// New watermark must be near now() - 5 min. Allow a wide window
// (±2 min) so this isn't flaky on slow CI.
expected := time.Now().UTC().Add(-5 * time.Minute)
delta := newWatermark.Sub(expected)
if delta < -2*time.Minute || delta > 2*time.Minute {
t.Errorf("watermark %s not within 2min of expected %s (delta %s)", newWatermark, expected, delta)
}
}
// TestRollupTaskUsageDaily_InvalidationOnReassign verifies that the
// trigger-driven dirty-bucket queue handles task reassignment between
// runtimes (the ReassignTasksToRuntime path used during runtime merge).
// Without invalidation the rollup would keep attributing usage to the
// old runtime; the raw fallback would not — so the two read paths would
// silently disagree.
func TestRollupTaskUsageDaily_InvalidationOnReassign(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
oldRuntimeID := handlerTestRuntimeID(t)
// Spin up a second runtime to receive the reassigned task.
var newRuntimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'reassign-target', 'cloud', 'reassign-target', 'online', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id
`, testWorkspaceID).Scan(&newRuntimeID); err != nil {
t.Fatalf("create dest runtime: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID)
})
var agentID string
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
VALUES ($1, 'reassign test', $2, 'member')
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
})
day := time.Date(2021, 3, 14, 0, 0, 0, 0, time.UTC)
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4)
RETURNING id
`, agentID, issueID, oldRuntimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
})
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', 'm-reassign', 700, 70, $2, $2)
`, taskID, day.Add(time.Hour)); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-reassign'`, day)
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day)
})
// Initial roll-up: usage should attach to OLD runtime.
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("initial rollup: %v", err)
}
var oldTokens, newTokens int64
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, oldRuntimeID, day).Scan(&oldTokens)
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, newRuntimeID, day).Scan(&newTokens)
if oldTokens != 700 || newTokens != 0 {
t.Fatalf("initial: expected old=700 new=0, got old=%d new=%d", oldTokens, newTokens)
}
// Trigger should enqueue both old + new buckets.
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, newRuntimeID, taskID); err != nil {
t.Fatalf("reassign task: %v", err)
}
var dirtyCount int
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day).Scan(&dirtyCount)
if dirtyCount != 2 {
t.Fatalf("expected 2 dirty entries (old+new runtime), got %d", dirtyCount)
}
// Re-run rollup. Old bucket should be deleted (no source rows left),
// new bucket should receive the moved usage.
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("rollup after reassign: %v", err)
}
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, oldRuntimeID, day).Scan(&oldTokens)
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, newRuntimeID, day).Scan(&newTokens)
if oldTokens != 0 || newTokens != 700 {
t.Fatalf("after reassign: expected old=0 new=700, got old=%d new=%d", oldTokens, newTokens)
}
// Dirty queue should be drained.
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day).Scan(&dirtyCount)
if dirtyCount != 0 {
t.Errorf("expected dirty queue drained, got %d entries", dirtyCount)
}
}
// TestRollupTaskUsageDaily_InvalidationOnIssueDelete verifies that
// cascade delete (issue → agent_task_queue → task_usage) clears the
// matching daily rows via the trigger-driven dirty queue.
func TestRollupTaskUsageDaily_InvalidationOnIssueDelete(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
runtimeID := handlerTestRuntimeID(t)
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
VALUES ($1, 'delete test', $2, 'member') RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
day := time.Date(2021, 7, 4, 0, 0, 0, 0, time.UTC)
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
`, agentID, issueID, runtimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', 'm-delete', 500, 50, $2, $2)
`, taskID, day.Add(time.Hour)); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-delete'`, day)
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-delete'`, day)
})
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("initial rollup: %v", err)
}
var tokens int64
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-delete'`, runtimeID, day).Scan(&tokens)
if tokens != 500 {
t.Fatalf("initial: expected 500, got %d", tokens)
}
// Cascade delete via issue. Trigger fires on agent_task_queue BEFORE
// DELETE — that's when the task_usage children + issue parent are
// still readable inside the same statement.
if _, err := testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID); err != nil {
t.Fatalf("delete issue: %v", err)
}
var dirtyCount int
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-delete'`, day).Scan(&dirtyCount)
if dirtyCount == 0 {
t.Fatalf("expected dirty entry after cascade delete, got 0")
}
// Re-run rollup: bucket should be deleted because no source rows exist.
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("rollup after delete: %v", err)
}
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-delete'`, runtimeID, day).Scan(&tokens)
if tokens != 0 {
t.Errorf("after issue delete: expected 0 (bucket cleared), got %d", tokens)
}
}
// TestRollupTaskUsageDaily_WorkspaceMismatch constructs an atq row whose
// agent.workspace_id != issue.workspace_id and verifies that the rollup
// resolves workspace_id consistently from `agent` across triggers,
// dirty_from_updates, and recompute. If any of those paths leaked back
// to the issue.workspace_id the dirty queue would be misaligned with
// the recompute join and the bucket would either be silently dropped
// (recompute returns 0 rows → deleted_empty branch fires) or attributed
// to the wrong workspace.
//
// The schema does not enforce agent.workspace_id == issue.workspace_id,
// so this canary keeps the alignment honest as the schema evolves.
func TestRollupTaskUsageDaily_WorkspaceMismatch(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
// Create a foreign workspace + a runtime + an agent there.
var foreignWorkspaceID string
if err := testPool.QueryRow(ctx, `
INSERT INTO workspace (name, slug) VALUES ('ws-mismatch', 'ws-mismatch-' || gen_random_uuid()::text) RETURNING id
`).Scan(&foreignWorkspaceID); err != nil {
t.Fatalf("create foreign workspace: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM workspace WHERE id = $1`, foreignWorkspaceID)
})
var foreignRuntimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'mismatch-rt', 'cloud', 'mismatch-rt', 'online', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id
`, foreignWorkspaceID).Scan(&foreignRuntimeID); err != nil {
t.Fatalf("create foreign runtime: %v", err)
}
var foreignAgentID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent (
workspace_id, name, description, runtime_mode, runtime_config,
runtime_id, visibility, max_concurrent_tasks, owner_id,
instructions, custom_env, custom_args, mcp_config
)
VALUES ($1, 'mismatch-agent', '', 'cloud', '{}'::jsonb, $2, 'private', 1, $3, '', '{}'::jsonb, '[]'::jsonb, '[]'::jsonb)
RETURNING id
`, foreignWorkspaceID, foreignRuntimeID, testUserID).Scan(&foreignAgentID); err != nil {
t.Fatalf("create foreign agent: %v", err)
}
// Issue lives in the *primary* test workspace, agent in foreign one.
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
VALUES ($1, 'mismatch test', $2, 'member') RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
})
day := time.Date(2021, 9, 9, 0, 0, 0, 0, time.UTC)
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
`, foreignAgentID, issueID, foreignRuntimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
t.Fatalf("insert atq: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', 'm-mismatch', 333, 33, $2, $2)
`, taskID, day.Add(time.Hour)); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-mismatch'`, day)
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-mismatch'`, day)
})
// Rollup. The bucket must be attributed to FOREIGN workspace
// (agent.workspace_id), not the primary one (issue.workspace_id).
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("rollup: %v", err)
}
var foreignTokens, primaryTokens int64
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE workspace_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignWorkspaceID, day).Scan(&foreignTokens)
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE workspace_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, testWorkspaceID, day).Scan(&primaryTokens)
if foreignTokens != 333 {
t.Fatalf("expected foreign workspace bucket = 333, got %d", foreignTokens)
}
if primaryTokens != 0 {
t.Errorf("expected primary workspace bucket = 0, got %d", primaryTokens)
}
// Now reassign atq.runtime_id within the foreign workspace and
// verify the trigger / recompute pair still agree on workspace_id.
var foreignRuntime2ID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'mismatch-rt2', 'cloud', 'mismatch-rt2', 'online', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id
`, foreignWorkspaceID).Scan(&foreignRuntime2ID); err != nil {
t.Fatalf("create foreign runtime 2: %v", err)
}
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, foreignRuntime2ID, taskID); err != nil {
t.Fatalf("reassign: %v", err)
}
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
t.Fatalf("rollup after reassign: %v", err)
}
var oldRTTokens, newRTTokens int64
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignRuntimeID, day).Scan(&oldRTTokens)
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignRuntime2ID, day).Scan(&newRTTokens)
if oldRTTokens != 0 || newRTTokens != 333 {
t.Fatalf("after reassign in mismatched ws: expected old=0 new=333, got old=%d new=%d", oldRTTokens, newRTTokens)
}
}

View File

@@ -139,6 +139,27 @@ func TestGetRuntimeUsage_BucketsByUsageTime(t *testing.T) {
insertTaskWithUsage(yesterdayLate, todayEarly, 1000) // cross-midnight
insertTaskWithUsage(yesterdayMorning, yesterdayMorning, 2000) // full-day yesterday
// ListRuntimeUsage now reads from the `task_usage_daily` rollup
// table maintained by the cron-driven rollup_task_usage_daily()
// function. In production the watermarked wrapper waits a 5 min
// safety lag before consuming rows; here we drive the underlying
// window function directly with a wide-open range so the freshly
// inserted fixture rows are guaranteed to be aggregated before the
// handler is called. Each test invocation gets its own isolated
// daily buckets keyed by (date, runtime, provider, model), so
// re-running the test is idempotent (the upsert just rewrites the
// same totals).
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)
`); err != nil {
t.Fatalf("rollup_task_usage_daily_window: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `
DELETE FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date IN ($2::date, $3::date)
`, runtimeID, today, today.Add(-24*time.Hour))
})
// Call the handler with ?days=1 at whatever "now" is. That should include
// both today and yesterday in full.
w := httptest.NewRecorder()

View File

@@ -0,0 +1 @@
ALTER TABLE task_usage DROP COLUMN IF EXISTS updated_at;

View File

@@ -0,0 +1,23 @@
-- Add `updated_at` to task_usage so the daily-rollup worker (added in 073)
-- can detect rows that were corrected by `UpsertTaskUsage` after their
-- original creation. The existing UPSERT path overwrites token counts on
-- conflict but leaves created_at unchanged, so a watermark on created_at
-- alone would silently miss those corrections.
--
-- Schema-only, online-safe migration. The column is nullable with no
-- backfill UPDATE so this is metadata-only on a hot, high-write table —
-- no full-table rewrite, no row-lock storm, no WAL spike. Old rows stay
-- NULL; the rollup function (073) handles them via
-- `COALESCE(updated_at, created_at)` and an OR branch in the window
-- filter so legacy rows are still discoverable by backfill.
--
-- DEFAULT now() is set after the column exists so new INSERTs (and
-- UpsertTaskUsage on conflict, which sets the value explicitly) always
-- get a timestamp. Setting the default on an existing column does NOT
-- touch existing rows; only new rows get the default. This keeps the
-- migration cheap on ~hundreds of millions of `task_usage` rows.
ALTER TABLE task_usage
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ;
ALTER TABLE task_usage
ALTER COLUMN updated_at SET DEFAULT now();

View File

@@ -0,0 +1,6 @@
DROP FUNCTION IF EXISTS rollup_task_usage_daily();
DROP FUNCTION IF EXISTS rollup_task_usage_daily_window(TIMESTAMPTZ, TIMESTAMPTZ);
DROP TABLE IF EXISTS task_usage_rollup_state;
DROP INDEX IF EXISTS idx_task_usage_daily_workspace_date;
DROP INDEX IF EXISTS idx_task_usage_daily_runtime_date;
DROP TABLE IF EXISTS task_usage_daily;

View File

@@ -0,0 +1,227 @@
-- Daily rollup table for `task_usage`. Background: the dashboard query
-- ListRuntimeUsage runs `SUM() GROUP BY DATE(created_at), provider, model`
-- against the raw event stream and is called once per runtime row on the
-- runtimes list (plus once per detail page load), so it dominates DB load
-- as event volume grows. We materialise the day-bucketed aggregate here
-- so reads scan O(days × providers × models) rows instead of O(events).
--
-- All query dimensions are denormalised into the table so reads never
-- need to join `agent_task_queue`. The PK doubles as the upsert key for
-- the rollup worker.
CREATE TABLE task_usage_daily (
bucket_date DATE NOT NULL,
workspace_id UUID NOT NULL,
runtime_id UUID NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
event_count BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (bucket_date, workspace_id, runtime_id, provider, model)
);
-- Primary read path: runtime detail page + runtimes-list cost cell, both
-- filter by runtime_id and order by date DESC. bucket_date DESC in the
-- index lets the query avoid an extra sort.
CREATE INDEX idx_task_usage_daily_runtime_date
ON task_usage_daily (runtime_id, bucket_date DESC);
-- Workspace-wide aggregations hit this index instead of fanning out per
-- runtime.
CREATE INDEX idx_task_usage_daily_workspace_date
ON task_usage_daily (workspace_id, bucket_date DESC);
-- Single-row state table tracking how far the rollup worker has consumed.
CREATE TABLE task_usage_rollup_state (
id SMALLINT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
watermark_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
last_run_started_at TIMESTAMPTZ,
last_run_finished_at TIMESTAMPTZ,
last_run_rows BIGINT NOT NULL DEFAULT 0,
last_error TEXT
);
INSERT INTO task_usage_rollup_state (id) VALUES (1) ON CONFLICT DO NOTHING;
-- Window-based aggregation primitive. Used by both the cron-driven
-- watermark advancer and the offline backfill command, so they stay
-- byte-identical in their semantics. Returns the number of output rows
-- touched.
--
-- IDEMPOTENCY CONTRACT (this is the important bit):
-- For every (bucket_date, workspace_id, runtime_id, provider, model)
-- key that has at least one task_usage row whose `updated_at` falls in
-- [p_from, p_to), this function REPLACES the corresponding daily row
-- with the SUM of *all* task_usage rows for that key (regardless of
-- their updated_at). It does NOT add a delta.
--
-- Consequences:
-- * Replaying the same window is safe — the row is rebuilt from raw
-- each time, so the result converges.
-- * Two callers (cron + backfill) processing overlapping windows is
-- safe — both write the same value.
-- * `UpsertTaskUsage` corrections that overwrite token counts are
-- captured: the corrected row's updated_at gets bumped, the next
-- window picks up its bucket key, and the bucket is recomputed
-- from current truth.
--
-- Cost: the recompute reads ALL task_usage rows for each dirty bucket,
-- not just the windowed slice. In steady state only "today" buckets are
-- dirty (a handful of keys per active runtime), so this stays cheap.
-- During backfill the entire history's bucket keys become dirty once;
-- the backfill walks history in monthly slices to bound the working
-- set per call.
CREATE OR REPLACE FUNCTION rollup_task_usage_daily_window(
p_from TIMESTAMPTZ,
p_to TIMESTAMPTZ
)
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_rows BIGINT;
BEGIN
IF p_from >= p_to THEN
RETURN 0;
END IF;
WITH dirty_keys AS (
SELECT DISTINCT
DATE(tu.created_at) AS bucket_date,
i.workspace_id AS workspace_id,
atq.runtime_id AS runtime_id,
tu.provider AS provider,
tu.model AS model
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN issue i ON i.id = atq.issue_id
WHERE atq.runtime_id IS NOT NULL
AND (
-- Steady state: rows updated within the watermark window.
-- Hits idx_task_usage_updated_at directly.
(tu.updated_at >= p_from AND tu.updated_at < p_to)
-- Legacy rows from before migration 072 (updated_at IS NULL)
-- — discoverable via created_at + the partial index added
-- in 077. Steady-state windows after backfill never include
-- historical dates, so this branch is a no-op once the
-- backfill has swept history.
OR (tu.updated_at IS NULL
AND tu.created_at >= p_from
AND tu.created_at < p_to)
)
),
recomputed AS (
SELECT
dk.bucket_date,
dk.workspace_id,
dk.runtime_id,
dk.provider,
dk.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(*)::bigint AS event_count
FROM dirty_keys dk
JOIN agent_task_queue atq ON atq.runtime_id = dk.runtime_id
JOIN issue i ON i.id = atq.issue_id
AND i.workspace_id = dk.workspace_id
JOIN task_usage tu ON tu.task_id = atq.id
AND tu.provider = dk.provider
AND tu.model = dk.model
AND DATE(tu.created_at) = dk.bucket_date
GROUP BY 1, 2, 3, 4, 5
)
INSERT INTO task_usage_daily AS d (
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
)
SELECT
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
FROM recomputed
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
event_count = EXCLUDED.event_count,
updated_at = now();
GET DIAGNOSTICS v_rows = ROW_COUNT;
RETURN v_rows;
END;
$$;
-- Cron entry point. Advances the watermark by one window each call.
--
-- Invariants:
-- * `pg_try_advisory_lock(4242)` serialises overlapping ticks.
-- * The window upper bound is `now() - 5 minutes`. The lag exists
-- because `task_usage` rows are written from a separate transaction;
-- a row with updated_at = T can become visible to this snapshot at
-- some t > T. 5 minutes is a generous bound on that visibility delay
-- and keeps the dashboard "today" bucket at most ~10 min stale
-- (5 min lag + 5 min cron period).
-- * On error we record `last_error` and re-raise; the watermark is NOT
-- advanced because the UPDATE that advances it only runs after the
-- upsert succeeds.
-- * SAFE TO RUN CONCURRENTLY WITH BACKFILL: the window primitive is
-- idempotent (see contract above), so even if cron fires while the
-- offline backfill is also walking history, the worst case is some
-- bucket gets written twice with the same value.
CREATE OR REPLACE FUNCTION rollup_task_usage_daily()
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_lock_ok BOOLEAN;
v_from TIMESTAMPTZ;
v_to TIMESTAMPTZ;
v_rows BIGINT := 0;
BEGIN
SELECT pg_try_advisory_lock(4242) INTO v_lock_ok;
IF NOT v_lock_ok THEN
RETURN 0;
END IF;
BEGIN
UPDATE task_usage_rollup_state
SET last_run_started_at = now(),
last_error = NULL
WHERE id = 1
RETURNING watermark_at INTO v_from;
v_to := now() - INTERVAL '5 minutes';
IF v_from < v_to THEN
v_rows := rollup_task_usage_daily_window(v_from, v_to);
UPDATE task_usage_rollup_state
SET watermark_at = v_to,
last_run_finished_at = now(),
last_run_rows = v_rows
WHERE id = 1;
ELSE
UPDATE task_usage_rollup_state
SET last_run_finished_at = now(),
last_run_rows = 0
WHERE id = 1;
END IF;
PERFORM pg_advisory_unlock(4242);
RETURN v_rows;
EXCEPTION WHEN OTHERS THEN
UPDATE task_usage_rollup_state
SET last_error = SQLERRM,
last_run_finished_at = now()
WHERE id = 1;
PERFORM pg_advisory_unlock(4242);
RAISE;
END;
END;
$$;

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS idx_task_usage_updated_at;

View File

@@ -0,0 +1,5 @@
-- Drives the rollup worker's "what changed since last tick" scan in
-- 073's window function. CONCURRENTLY avoids blocking writes during
-- build (matches the pattern used in 035/067 for live indexes).
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_updated_at
ON task_usage (updated_at);

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS idx_task_usage_created_at;

View File

@@ -0,0 +1,5 @@
-- Helps the two lazy endpoints (ListRuntimeUsageByAgent / GetRuntimeUsageByHour)
-- that still scan the raw `task_usage` table by created_at. CONCURRENTLY
-- avoids blocking writes during build.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_created_at
ON task_usage (created_at);

View File

@@ -0,0 +1,10 @@
DROP FUNCTION IF EXISTS task_usage_rollup_lag_seconds();
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
PERFORM cron.unschedule('rollup_task_usage_daily')
FROM cron.job WHERE jobname = 'rollup_task_usage_daily';
END IF;
END
$$;

View File

@@ -0,0 +1,46 @@
-- Enable pg_cron extension if available, but DO NOT schedule the rollup
-- job here. Scheduling must happen *after* a successful backfill run, so
-- the cron tick doesn't race the backfill (both write the same daily
-- buckets — the rollup function in 073 is now idempotent, so collisions
-- produce correct values, but we still avoid overlap as a defense in
-- depth + to keep load low during backfill).
--
-- Operator playbook (in deployment runbook):
-- 1) Apply migrations 072..075 (this file is 076).
-- 2) Run `go run ./cmd/backfill_task_usage_daily` — succeeds and
-- stamps the rollup-state watermark.
-- 3) Set USAGE_DAILY_ROLLUP_ENABLED=true on the API and roll out.
-- 4) As superuser:
-- SELECT cron.schedule(
-- 'rollup_task_usage_daily',
-- '*/5 * * * *',
-- $$SELECT rollup_task_usage_daily()$$
-- );
-- 5) As superuser, also schedule cron-log pruning (see notes below).
--
-- The CREATE EXTENSION is wrapped in DO/EXCEPTION so dev/CI environments
-- without `shared_preload_libraries=pg_cron` skip gracefully and the
-- migration still succeeds (mirrors migration 032 pg_bigm pattern).
DO $$
BEGIN
CREATE EXTENSION IF NOT EXISTS pg_cron;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'pg_cron extension not available; skipping. Schedule rollup_task_usage_daily() via your platform''s scheduling primitive (Kubernetes CronJob, etc.).';
END
$$;
-- Health check helper. Returns NULL if the rollup has never run, or the
-- number of seconds since the last successful tick. Use this from
-- monitoring / alerts:
-- * Alert if NULL for >15 minutes after deployment (cron not scheduled).
-- * Alert if value > 900 seconds (cron stuck or job failing).
CREATE OR REPLACE FUNCTION task_usage_rollup_lag_seconds()
RETURNS DOUBLE PRECISION
LANGUAGE sql
STABLE
AS $$
SELECT EXTRACT(EPOCH FROM (now() - last_run_finished_at))
FROM task_usage_rollup_state
WHERE id = 1;
$$;

View File

@@ -0,0 +1,7 @@
DROP TRIGGER IF EXISTS trg_tu_dirty_rollup ON task_usage;
DROP TRIGGER IF EXISTS trg_atq_dirty_rollup ON agent_task_queue;
DROP FUNCTION IF EXISTS enqueue_task_usage_daily_dirty_for_tu();
DROP FUNCTION IF EXISTS enqueue_task_usage_daily_dirty_for_atq();
DROP TABLE IF EXISTS task_usage_daily_dirty;
-- idx_task_usage_created_at_legacy is owned by 078; do not drop here.
-- The 073 down-migration recreates the older window function definition.

View File

@@ -0,0 +1,269 @@
-- Catch joined-table changes that the `updated_at` watermark in 073 misses.
--
-- The window function in 073 finds dirty buckets via `task_usage.updated_at`.
-- That covers INSERT and UPDATE on `task_usage`, but NOT:
-- 1) DELETE on `task_usage` itself (no row left to discover).
-- 2) Cascade DELETE through `agent_task_queue` (issue/queue rows go away,
-- taking task_usage with them).
-- 3) UPDATE of `agent_task_queue.runtime_id` — used by the runtime
-- consolidation path (`ReassignTasksToRuntime`) — which moves usage
-- from one runtime's bucket to another without touching task_usage.
--
-- Without invalidation, the rollup table diverges from raw task_usage:
-- deleted issues stay billed forever, reassigned tasks stay attributed to
-- the old runtime. The raw-table fallback path doesn't suffer from this,
-- so the two read paths would silently disagree.
--
-- Solution: an explicit `task_usage_daily_dirty` queue table populated by
-- triggers on the joined tables, drained by the rollup window function.
CREATE TABLE task_usage_daily_dirty (
bucket_date DATE NOT NULL,
workspace_id UUID NOT NULL,
runtime_id UUID NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (bucket_date, workspace_id, runtime_id, provider, model)
);
-- Drained by enqueued_at <= cutoff in the window function. Enqueue on
-- conflict updates enqueued_at to GREATEST(existing, new) so that an
-- invalidation arriving DURING a rollup tick (between the function's
-- snapshot and its drain step) keeps an enqueued_at > p_to and
-- survives the drain. Without that, the late invalidation would be
-- silently dropped.
CREATE INDEX idx_task_usage_daily_dirty_enqueued_at
ON task_usage_daily_dirty (enqueued_at);
-- NOTE: The partial index supporting the legacy `updated_at IS NULL`
-- branch in the rollup window function is created in migration 078 with
-- `CREATE INDEX CONCURRENTLY` to avoid blocking writes on the hot
-- task_usage table. Until 078 has been applied, the OR branch falls
-- back to a sequential scan filtered by `updated_at IS NULL`. That is
-- acceptable because the rollup function is only invoked after this
-- migration AND the backfill have run; in steady state no rows have
-- NULL updated_at.
-- Trigger function for agent_task_queue. Two cases:
-- * UPDATE of runtime_id (old != new): usage moves between runtimes.
-- Enqueue both OLD and NEW runtime buckets so both get recomputed.
-- * DELETE: row + its task_usage children are about to vanish.
-- Enqueue OLD runtime buckets so the daily rows get cleared.
-- We resolve workspace_id via `agent` (NOT via `issue`). When a DELETE
-- cascades from issue → agent_task_queue, the issue row is already gone
-- by the time this BEFORE DELETE trigger fires, so a join on `issue`
-- would return zero rows and the enqueue would silently no-op. `agent`
-- has its own ON DELETE CASCADE to atq but is not in the issue cascade
-- chain, so it's still alive.
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_atq()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
IF OLD.runtime_id IS DISTINCT FROM NEW.runtime_id THEN
IF OLD.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
WHERE tu.task_id = OLD.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
IF NEW.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, NEW.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = NEW.agent_id
WHERE tu.task_id = NEW.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
END IF;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
IF OLD.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
WHERE tu.task_id = OLD.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
CREATE TRIGGER trg_atq_dirty_rollup
BEFORE UPDATE OF runtime_id OR DELETE ON agent_task_queue
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_daily_dirty_for_atq();
-- Trigger function for direct task_usage DELETE (rare — direct cleanup,
-- not via cascade). UPDATE on task_usage is already covered by the
-- updated_at watermark in the window function.
-- workspace_id resolved via agent (see comment on the atq trigger
-- function for why issue is unsafe in cascade contexts).
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_tu()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DATE(OLD.created_at), a.workspace_id, atq.runtime_id, OLD.provider, OLD.model
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE atq.id = OLD.task_id
AND atq.runtime_id IS NOT NULL
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
RETURN OLD;
END;
$$;
CREATE TRIGGER trg_tu_dirty_rollup
BEFORE DELETE ON task_usage
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_daily_dirty_for_tu();
-- Replace the rollup window function to also drain the dirty queue and
-- DELETE buckets that no longer have any source rows.
--
-- Pure-SQL CTE form so multiple calls in the same transaction (tests,
-- backfill scripts) don't collide on temp-table names.
CREATE OR REPLACE FUNCTION rollup_task_usage_daily_window(
p_from TIMESTAMPTZ,
p_to TIMESTAMPTZ
)
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_rows BIGINT;
BEGIN
IF p_from >= p_to THEN
RETURN 0;
END IF;
WITH
-- Source 1: rows with updated_at in this window (steady state) plus
-- the legacy-row OR branch for NULL updated_at (covered by partial
-- index idx_task_usage_created_at_legacy from migration 078).
--
-- workspace_id is resolved via `agent`, NOT `issue`, to match the
-- trigger functions above. There is no schema-level FK guaranteeing
-- agent.workspace_id == issue.workspace_id, so mixing the two
-- sources would let dirty_from_updates / recomputed disagree with
-- dirty_from_queue's view of which workspace a task belongs to.
-- Going through agent everywhere keeps trigger / discovery /
-- recompute aligned without leaning on an unenforced invariant.
dirty_from_updates AS (
SELECT DISTINCT
DATE(tu.created_at) AS bucket_date,
a.workspace_id AS workspace_id,
atq.runtime_id AS runtime_id,
tu.provider AS provider,
tu.model AS model
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
WHERE atq.runtime_id IS NOT NULL
AND (
(tu.updated_at >= p_from AND tu.updated_at < p_to)
OR (tu.updated_at IS NULL
AND tu.created_at >= p_from
AND tu.created_at < p_to)
)
),
-- Source 2: explicit invalidation queue (deletes + reassignments).
dirty_from_queue AS (
SELECT bucket_date, workspace_id, runtime_id, provider, model
FROM task_usage_daily_dirty
WHERE enqueued_at < p_to
),
dirty_keys AS (
SELECT * FROM dirty_from_updates
UNION
SELECT * FROM dirty_from_queue
),
-- Recompute each dirty bucket from ground truth. Same agent-based
-- workspace resolution as dirty_from_updates above.
recomputed AS (
SELECT
dk.bucket_date,
dk.workspace_id,
dk.runtime_id,
dk.provider,
dk.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(*)::bigint AS event_count
FROM dirty_keys dk
JOIN agent_task_queue atq ON atq.runtime_id = dk.runtime_id
JOIN agent a ON a.id = atq.agent_id
AND a.workspace_id = dk.workspace_id
JOIN task_usage tu ON tu.task_id = atq.id
AND tu.provider = dk.provider
AND tu.model = dk.model
AND DATE(tu.created_at) = dk.bucket_date
GROUP BY 1, 2, 3, 4, 5
),
-- REPLACE present buckets.
upserted AS (
INSERT INTO task_usage_daily AS d (
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
)
SELECT
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
FROM recomputed
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
event_count = EXCLUDED.event_count,
updated_at = now()
RETURNING 1
),
-- DELETE buckets that are dirty but have no source rows anymore.
-- Important: USING dirty_keys (not recomputed) so we can detect
-- "all source rows gone" — if recomputed has no row for a key, the
-- bucket is empty and should be removed.
deleted_empty AS (
DELETE FROM task_usage_daily d
USING dirty_keys dk
WHERE d.bucket_date = dk.bucket_date
AND d.workspace_id = dk.workspace_id
AND d.runtime_id = dk.runtime_id
AND d.provider = dk.provider
AND d.model = dk.model
AND NOT EXISTS (
SELECT 1 FROM recomputed r
WHERE r.bucket_date = dk.bucket_date
AND r.workspace_id = dk.workspace_id
AND r.runtime_id = dk.runtime_id
AND r.provider = dk.provider
AND r.model = dk.model
)
RETURNING 1
)
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
INTO v_rows;
-- Drain the consumed dirty queue rows. Anything enqueued AFTER p_to
-- stays for the next call — keeps the contract aligned with the
-- watermark.
DELETE FROM task_usage_daily_dirty WHERE enqueued_at < p_to;
RETURN v_rows;
END;
$$;

View File

@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_task_usage_created_at_legacy;

View File

@@ -0,0 +1,11 @@
-- Partial index supporting the rollup window function's legacy NULL
-- branch (072 added `updated_at` as nullable; rows that existed before
-- the column was added stay NULL until either backfill replaces them or
-- a subsequent UpsertTaskUsage refreshes them).
--
-- Built CONCURRENTLY because task_usage is a hot, large table — same
-- pattern as 074/075. Run this AFTER 077 is applied and BEFORE turning
-- on the read-path feature flag / scheduling pg_cron.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_created_at_legacy
ON task_usage (created_at)
WHERE updated_at IS NULL;

View File

@@ -421,6 +421,39 @@ type TaskUsage struct {
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type TaskUsageDaily struct {
BucketDate pgtype.Date `json:"bucket_date"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
EventCount int64 `json:"event_count"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type TaskUsageDailyDirty struct {
BucketDate pgtype.Date `json:"bucket_date"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Provider string `json:"provider"`
Model string `json:"model"`
EnqueuedAt pgtype.Timestamptz `json:"enqueued_at"`
}
type TaskUsageRollupState struct {
ID int16 `json:"id"`
WatermarkAt pgtype.Timestamptz `json:"watermark_at"`
LastRunStartedAt pgtype.Timestamptz `json:"last_run_started_at"`
LastRunFinishedAt pgtype.Timestamptz `json:"last_run_finished_at"`
LastRunRows int64 `json:"last_run_rows"`
LastError pgtype.Text `json:"last_error"`
}
type User struct {

View File

@@ -141,10 +141,11 @@ type ListRuntimeUsageRow struct {
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Bucket by tu.created_at (usage report time, ~= task completion time), not
// atq.created_at (task enqueue time), so tasks that queue one day and execute
// the next are attributed to the day tokens were actually produced. The since
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
// Reads from raw `task_usage`, bucketed by DATE(tu.created_at) — usage
// report time, ~= task completion time. Since cutoff is truncated to
// start-of-day so `days=N` yields full calendar days. This is the
// always-correct fallback path; used when USAGE_DAILY_ROLLUP_ENABLED
// is false (or the rollup hasn't been deployed yet).
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]ListRuntimeUsageRow, error) {
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Since)
if err != nil {
@@ -238,3 +239,76 @@ func (q *Queries) ListRuntimeUsageByAgent(ctx context.Context, arg ListRuntimeUs
}
return items, nil
}
const listRuntimeUsageDaily = `-- name: ListRuntimeUsageDaily :many
SELECT
bucket_date AS date,
provider,
model,
SUM(input_tokens)::bigint AS input_tokens,
SUM(output_tokens)::bigint AS output_tokens,
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
SUM(cache_write_tokens)::bigint AS cache_write_tokens
FROM task_usage_daily
WHERE runtime_id = $1
AND bucket_date >= DATE(DATE_TRUNC('day', $2::timestamptz))
GROUP BY bucket_date, provider, model
ORDER BY bucket_date DESC, provider, model
`
type ListRuntimeUsageDailyParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
Since pgtype.Timestamptz `json:"since"`
}
type ListRuntimeUsageDailyRow struct {
Date pgtype.Date `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Reads from the `task_usage_daily` rollup table maintained by
// rollup_task_usage_daily() (scheduled every 5 min via pg_cron, or any
// equivalent external scheduler that calls the function). Same shape as
// ListRuntimeUsage above. Today's bucket may lag the raw table by up to
// ~10 min (5 min cron period + 5 min rollup safety lag); intentional.
//
// Only used when USAGE_DAILY_ROLLUP_ENABLED is true AND deploy has
// verified that the rollup is fresh (see task_usage_rollup_lag_seconds
// helper from migration 076).
//
// The PK on task_usage_daily already collapses to one row per
// (bucket_date, runtime_id, provider, model), but SUM/GROUP BY is kept
// so future schema changes (extra dimensions promoted into the table)
// don't silently change query semantics.
func (q *Queries) ListRuntimeUsageDaily(ctx context.Context, arg ListRuntimeUsageDailyParams) ([]ListRuntimeUsageDailyRow, error) {
rows, err := q.db.Query(ctx, listRuntimeUsageDaily, arg.RuntimeID, arg.Since)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListRuntimeUsageDailyRow{}
for rows.Next() {
var i ListRuntimeUsageDailyRow
if err := rows.Scan(
&i.Date,
&i.Provider,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View File

@@ -45,7 +45,7 @@ func (q *Queries) GetIssueUsageSummary(ctx context.Context, issueID pgtype.UUID)
}
const getTaskUsage = `-- name: GetTaskUsage :many
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at FROM task_usage
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM task_usage
WHERE task_id = $1
ORDER BY model
`
@@ -69,6 +69,7 @@ func (q *Queries) GetTaskUsage(ctx context.Context, taskID pgtype.UUID) ([]TaskU
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
@@ -206,14 +207,15 @@ func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspace
}
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
ON CONFLICT (task_id, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now()
`
type UpsertTaskUsageParams struct {
@@ -226,6 +228,10 @@ type UpsertTaskUsageParams struct {
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
// (migration 073) detects the row as dirty and re-aggregates its bucket.
// Without the conflict-side bump, a correction to historical token counts
// would never propagate to the rollup.
func (q *Queries) UpsertTaskUsage(ctx context.Context, arg UpsertTaskUsageParams) error {
_, err := q.db.Exec(ctx, upsertTaskUsage,
arg.TaskID,

View File

@@ -1,8 +1,9 @@
-- name: ListRuntimeUsage :many
-- Bucket by tu.created_at (usage report time, ~= task completion time), not
-- atq.created_at (task enqueue time), so tasks that queue one day and execute
-- the next are attributed to the day tokens were actually produced. The since
-- cutoff is truncated to start-of-day so `days=N` yields full calendar days.
-- Reads from raw `task_usage`, bucketed by DATE(tu.created_at) — usage
-- report time, ~= task completion time. Since cutoff is truncated to
-- start-of-day so `days=N` yields full calendar days. This is the
-- always-correct fallback path; used when USAGE_DAILY_ROLLUP_ENABLED
-- is false (or the rollup hasn't been deployed yet).
SELECT
DATE(tu.created_at) AS date,
tu.provider,
@@ -18,6 +19,35 @@ WHERE atq.runtime_id = $1
GROUP BY DATE(tu.created_at), tu.provider, tu.model
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model;
-- name: ListRuntimeUsageDaily :many
-- Reads from the `task_usage_daily` rollup table maintained by
-- rollup_task_usage_daily() (scheduled every 5 min via pg_cron, or any
-- equivalent external scheduler that calls the function). Same shape as
-- ListRuntimeUsage above. Today's bucket may lag the raw table by up to
-- ~10 min (5 min cron period + 5 min rollup safety lag); intentional.
--
-- Only used when USAGE_DAILY_ROLLUP_ENABLED is true AND deploy has
-- verified that the rollup is fresh (see task_usage_rollup_lag_seconds
-- helper from migration 076).
--
-- The PK on task_usage_daily already collapses to one row per
-- (bucket_date, runtime_id, provider, model), but SUM/GROUP BY is kept
-- so future schema changes (extra dimensions promoted into the table)
-- don't silently change query semantics.
SELECT
bucket_date AS date,
provider,
model,
SUM(input_tokens)::bigint AS input_tokens,
SUM(output_tokens)::bigint AS output_tokens,
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
SUM(cache_write_tokens)::bigint AS cache_write_tokens
FROM task_usage_daily
WHERE runtime_id = $1
AND bucket_date >= DATE(DATE_TRUNC('day', @since::timestamptz))
GROUP BY bucket_date, provider, model
ORDER BY bucket_date DESC, provider, model;
-- name: GetRuntimeTaskHourlyActivity :many
SELECT EXTRACT(HOUR FROM started_at)::int AS hour, COUNT(*)::int AS count
FROM agent_task_queue

View File

@@ -1,12 +1,17 @@
-- name: UpsertTaskUsage :exec
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7)
-- Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
-- (migration 073) detects the row as dirty and re-aggregates its bucket.
-- Without the conflict-side bump, a correction to historical token counts
-- would never propagate to the rollup.
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
ON CONFLICT (task_id, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens;
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now();
-- name: GetTaskUsage :many
SELECT * FROM task_usage