mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-26 17:09:14 +02:00
Compare commits
5 Commits
codex/agen
...
fix/task-u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d76798d581 | ||
|
|
929f25207c | ||
|
|
45dd094d61 | ||
|
|
5b82370061 | ||
|
|
1e0195d270 |
143
server/cmd/backfill_task_usage_daily/main.go
Normal file
143
server/cmd/backfill_task_usage_daily/main.go
Normal file
@@ -0,0 +1,143 @@
|
||||
// Backfill_task_usage_daily seeds the `task_usage_daily` rollup table
|
||||
// from the historical contents of `task_usage`. Intended to be run once
|
||||
// after migrations 072..078 are applied, before flipping the
|
||||
// USAGE_DAILY_ROLLUP_ENABLED feature flag and before scheduling the
|
||||
// pg_cron job. The cron schedule is intentionally NOT created by a
|
||||
// migration (see 076 header) — operators run this backfill, then
|
||||
// schedule cron, then enable the read-path flag.
|
||||
//
|
||||
// Strategy:
|
||||
// 1. Walk the time range covered by task_usage in monthly slices.
|
||||
// 2. For each slice, call rollup_task_usage_daily_window(p_from, p_to)
|
||||
// — the same primitive the cron path uses, so semantics are
|
||||
// guaranteed identical.
|
||||
// 3. After all slices succeed, advance task_usage_rollup_state.watermark_at
|
||||
// to (now() - 5 minutes) so the cron tick that follows doesn't
|
||||
// reprocess the same window we just rolled up.
|
||||
//
|
||||
// Re-running IS safe. The window function (introduced in 073, refined in
|
||||
// 077) recomputes each dirty bucket from raw and REPLACES the daily row,
|
||||
// so rerunning a slice produces the same result. Use this property to
|
||||
// recover from partial backfill failures without TRUNCATEing
|
||||
// task_usage_daily first.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/multica-ai/multica/server/internal/logger"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger.Init()
|
||||
|
||||
var (
|
||||
dryRun = flag.Bool("dry-run", false, "log slices that would be processed without touching task_usage_daily")
|
||||
monthsBack = flag.Int("months-back", 0, "limit backfill to the last N months (0 = all available history)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pool, err := pgxpool.New(ctx, dbURL)
|
||||
if err != nil {
|
||||
slog.Error("unable to connect to database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
slog.Error("unable to ping database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Discover the time range we need to cover. If task_usage is empty
|
||||
// we just stamp the watermark to now and exit.
|
||||
var minTS, maxTS pgtype.Timestamptz
|
||||
if err := pool.QueryRow(ctx, `SELECT MIN(created_at), MAX(created_at) FROM task_usage`).Scan(&minTS, &maxTS); err != nil {
|
||||
slog.Error("scan task_usage time range", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if !minTS.Valid {
|
||||
slog.Info("task_usage is empty; nothing to backfill")
|
||||
stampWatermark(ctx, pool)
|
||||
return
|
||||
}
|
||||
|
||||
from := monthFloor(minTS.Time.UTC())
|
||||
end := monthFloor(maxTS.Time.UTC()).AddDate(0, 1, 0)
|
||||
|
||||
if *monthsBack > 0 {
|
||||
cutoff := monthFloor(time.Now().UTC()).AddDate(0, -(*monthsBack), 0)
|
||||
if cutoff.After(from) {
|
||||
from = cutoff
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("backfill range", "from", from.Format(time.RFC3339), "to", end.Format(time.RFC3339), "dry_run", *dryRun)
|
||||
|
||||
cursor := from
|
||||
var totalRows int64
|
||||
for cursor.Before(end) {
|
||||
next := cursor.AddDate(0, 1, 0)
|
||||
if *dryRun {
|
||||
slog.Info("would roll up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339))
|
||||
cursor = next
|
||||
continue
|
||||
}
|
||||
var rows int64
|
||||
err := pool.QueryRow(
|
||||
ctx,
|
||||
`SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)`,
|
||||
cursor, next,
|
||||
).Scan(&rows)
|
||||
if err != nil {
|
||||
slog.Error("rollup slice failed", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
totalRows += rows
|
||||
slog.Info("rolled up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "rows_touched", rows)
|
||||
cursor = next
|
||||
}
|
||||
|
||||
if !*dryRun {
|
||||
stampWatermark(ctx, pool)
|
||||
}
|
||||
slog.Info("backfill complete", "total_rows_touched", totalRows)
|
||||
}
|
||||
|
||||
// stampWatermark moves the rollup state's watermark to (now() - 5 min)
|
||||
// so the next cron tick picks up only events newer than the backfill's
|
||||
// upper bound. Mirrors the same lag the cron uses to avoid racing
|
||||
// not-yet-visible inserts.
|
||||
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) {
|
||||
tag, err := pool.Exec(ctx, `
|
||||
UPDATE task_usage_rollup_state
|
||||
SET watermark_at = now() - INTERVAL '5 minutes'
|
||||
WHERE id = 1
|
||||
`)
|
||||
if err != nil {
|
||||
slog.Error("stamp watermark failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if tag.RowsAffected() == 0 {
|
||||
slog.Warn("no rollup state row to stamp; was migration 073 applied?")
|
||||
return
|
||||
}
|
||||
fmt.Println("watermark stamped to now() - 5 minutes")
|
||||
}
|
||||
|
||||
func monthFloor(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
@@ -102,9 +102,10 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
cfSigner := auth.NewCloudFrontSignerFromEnv()
|
||||
|
||||
signupConfig := handler.Config{
|
||||
AllowSignup: os.Getenv("ALLOW_SIGNUP") != "false",
|
||||
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
|
||||
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
|
||||
AllowSignup: os.Getenv("ALLOW_SIGNUP") != "false",
|
||||
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
|
||||
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
|
||||
UseDailyRollupForRuntimeUsage: os.Getenv("USAGE_DAILY_ROLLUP_ENABLED") == "true",
|
||||
}
|
||||
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
|
||||
if opts.DaemonWakeup != nil {
|
||||
|
||||
@@ -47,6 +47,16 @@ type Config struct {
|
||||
AllowSignup bool
|
||||
AllowedEmails []string
|
||||
AllowedEmailDomains []string
|
||||
// UseDailyRollupForRuntimeUsage routes ListRuntimeUsage to the
|
||||
// task_usage_daily rollup table when true. Default false: the read
|
||||
// path stays on the raw task_usage stream so rollup-related issues
|
||||
// (pg_cron not running, backfill not yet performed, watermark stuck)
|
||||
// can never make the dashboard return empty/stale data. Operators
|
||||
// flip this on per environment AFTER:
|
||||
// 1) migrations 072..076 applied,
|
||||
// 2) backfill_task_usage_daily ran successfully,
|
||||
// 3) cron job scheduled and task_usage_rollup_lag_seconds() < 900.
|
||||
UseDailyRollupForRuntimeUsage bool
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -96,17 +97,53 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
since := parseSinceParam(r, 90)
|
||||
|
||||
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
|
||||
RuntimeID: rt.ID,
|
||||
Since: since,
|
||||
})
|
||||
resp, err := h.listRuntimeUsage(r.Context(), rt.ID, since)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list usage")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// listRuntimeUsage dispatches between the raw task_usage scan and the
|
||||
// task_usage_daily rollup based on the UseDailyRollupForRuntimeUsage
|
||||
// feature flag. Both code paths return rows in the same shape, so the
|
||||
// handler doesn't care which one ran.
|
||||
func (h *Handler) listRuntimeUsage(ctx context.Context, runtimeID pgtype.UUID, since pgtype.Timestamptz) ([]RuntimeUsageResponse, error) {
|
||||
resolvedRuntimeID := uuidToString(runtimeID)
|
||||
if h.cfg.UseDailyRollupForRuntimeUsage {
|
||||
rows, err := h.Queries.ListRuntimeUsageDaily(ctx, db.ListRuntimeUsageDailyParams{
|
||||
RuntimeID: runtimeID,
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]RuntimeUsageResponse, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = RuntimeUsageResponse{
|
||||
RuntimeID: resolvedRuntimeID,
|
||||
Date: row.Date.Time.Format("2006-01-02"),
|
||||
Provider: row.Provider,
|
||||
Model: row.Model,
|
||||
InputTokens: row.InputTokens,
|
||||
OutputTokens: row.OutputTokens,
|
||||
CacheReadTokens: row.CacheReadTokens,
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
rows, err := h.Queries.ListRuntimeUsage(ctx, db.ListRuntimeUsageParams{
|
||||
RuntimeID: runtimeID,
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]RuntimeUsageResponse, len(rows))
|
||||
resolvedRuntimeID := uuidToString(rt.ID)
|
||||
for i, row := range rows {
|
||||
resp[i] = RuntimeUsageResponse{
|
||||
RuntimeID: resolvedRuntimeID,
|
||||
@@ -119,8 +156,7 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
||||
CacheWriteTokens: row.CacheWriteTokens,
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// GetRuntimeTaskActivity returns hourly task activity distribution for a runtime.
|
||||
|
||||
527
server/internal/handler/runtime_rollup_test.go
Normal file
527
server/internal/handler/runtime_rollup_test.go
Normal file
@@ -0,0 +1,527 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestRollupTaskUsageDaily_AggregatesAndIsIdempotent exercises the
|
||||
// rollup_task_usage_daily_window() SQL function directly. This is the
|
||||
// shared aggregation primitive used by both the cron-driven watermark
|
||||
// loop and the offline backfill command, so its correctness underpins
|
||||
// the entire ListRuntimeUsage read path. Two properties matter:
|
||||
//
|
||||
// 1. It correctly groups raw `task_usage` rows by (date, runtime,
|
||||
// workspace, provider, model) and sums the four token columns.
|
||||
// 2. Re-aggregating an already-rolled-up window is *idempotent*: the
|
||||
// function recomputes each dirty bucket from ground truth and
|
||||
// REPLACES the daily row, so overlap with backfill / replay is
|
||||
// safe and corrections via UpsertTaskUsage propagate cleanly.
|
||||
func TestRollupTaskUsageDaily_AggregatesAndIsIdempotent(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
runtimeID := handlerTestRuntimeID(t)
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
|
||||
VALUES ($1, 'rollup test', $2, 'member')
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
})
|
||||
|
||||
// Pin the test to a fixed historical day so we don't collide with
|
||||
// concurrent rollups of "today" running against the same fixture
|
||||
// runtime. 2020-06-15 is far outside any backfill window the rest
|
||||
// of the suite touches.
|
||||
day := time.Date(2020, 6, 15, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
// Two rows on the same (date, provider, model) — must collapse to
|
||||
// a single output row whose totals sum the inputs.
|
||||
insertUsage := func(usageAt time.Time, model string, in, out int64) {
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', $4)
|
||||
RETURNING id
|
||||
`, agentID, issueID, runtimeID, usageAt).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||||
VALUES ($1, 'claude', $2, $3, $4, $5, $5)
|
||||
`, taskID, model, in, out, usageAt); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
|
||||
})
|
||||
}
|
||||
|
||||
insertUsage(day.Add(1*time.Hour), "claude-3-5-sonnet", 100, 10)
|
||||
insertUsage(day.Add(2*time.Hour), "claude-3-5-sonnet", 200, 20)
|
||||
// A second model on the same day must produce a *separate* output
|
||||
// row (different group key).
|
||||
insertUsage(day.Add(3*time.Hour), "claude-3-5-haiku", 50, 5)
|
||||
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date`, runtimeID, day)
|
||||
})
|
||||
|
||||
// --- 1) Initial aggregation produces the expected totals.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
|
||||
`, day, day.Add(24*time.Hour)); err != nil {
|
||||
t.Fatalf("rollup_task_usage_daily_window: %v", err)
|
||||
}
|
||||
|
||||
type row struct {
|
||||
Model string
|
||||
InputTokens int64
|
||||
Output int64
|
||||
EventCount int64
|
||||
}
|
||||
read := func() map[string]row {
|
||||
rs, err := testPool.Query(ctx, `
|
||||
SELECT model, input_tokens, output_tokens, event_count
|
||||
FROM task_usage_daily
|
||||
WHERE runtime_id = $1 AND bucket_date = $2::date
|
||||
`, runtimeID, day)
|
||||
if err != nil {
|
||||
t.Fatalf("read task_usage_daily: %v", err)
|
||||
}
|
||||
defer rs.Close()
|
||||
out := map[string]row{}
|
||||
for rs.Next() {
|
||||
var r row
|
||||
if err := rs.Scan(&r.Model, &r.InputTokens, &r.Output, &r.EventCount); err != nil {
|
||||
t.Fatalf("scan: %v", err)
|
||||
}
|
||||
out[r.Model] = r
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
got := read()
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("expected 2 rows (one per model), got %d: %+v", len(got), got)
|
||||
}
|
||||
if got["claude-3-5-sonnet"].InputTokens != 300 || got["claude-3-5-sonnet"].Output != 30 || got["claude-3-5-sonnet"].EventCount != 2 {
|
||||
t.Errorf("sonnet bucket wrong: %+v", got["claude-3-5-sonnet"])
|
||||
}
|
||||
if got["claude-3-5-haiku"].InputTokens != 50 || got["claude-3-5-haiku"].Output != 5 || got["claude-3-5-haiku"].EventCount != 1 {
|
||||
t.Errorf("haiku bucket wrong: %+v", got["claude-3-5-haiku"])
|
||||
}
|
||||
|
||||
// --- 2) Re-aggregating the same window is idempotent.
|
||||
// The new function recomputes each dirty bucket from ground truth and
|
||||
// REPLACES the daily row, so callers can safely overlap windows
|
||||
// (cron + backfill, replay, manual ops). Verifying it explicitly so
|
||||
// the property doesn't silently regress.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
|
||||
`, day, day.Add(24*time.Hour)); err != nil {
|
||||
t.Fatalf("rollup_task_usage_daily_window (second call): %v", err)
|
||||
}
|
||||
got = read()
|
||||
if got["claude-3-5-sonnet"].InputTokens != 300 || got["claude-3-5-sonnet"].EventCount != 2 {
|
||||
t.Errorf("after second call, sonnet should be unchanged (idempotent), got: %+v", got["claude-3-5-sonnet"])
|
||||
}
|
||||
if got["claude-3-5-haiku"].InputTokens != 50 || got["claude-3-5-haiku"].EventCount != 1 {
|
||||
t.Errorf("after second call, haiku should be unchanged (idempotent), got: %+v", got["claude-3-5-haiku"])
|
||||
}
|
||||
|
||||
// --- 3) Correction propagates: bumping a row's updated_at into a
|
||||
// new window must cause the bucket to be recomputed from ground
|
||||
// truth (covers the UpsertTaskUsage correction path that the old
|
||||
// additive design dropped silently).
|
||||
correctionMark := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
UPDATE task_usage SET input_tokens = 1000, updated_at = $1
|
||||
WHERE task_id IN (
|
||||
SELECT id FROM agent_task_queue WHERE runtime_id = $2 AND created_at::date = $3::date
|
||||
)
|
||||
AND model = 'claude-3-5-sonnet'
|
||||
AND input_tokens = 100
|
||||
`, correctionMark, runtimeID, day); err != nil {
|
||||
t.Fatalf("simulate correction: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_daily_window($1::timestamptz, $2::timestamptz)
|
||||
`, correctionMark.Add(-time.Minute), correctionMark.Add(time.Minute)); err != nil {
|
||||
t.Fatalf("rollup correction window: %v", err)
|
||||
}
|
||||
got = read()
|
||||
// New sonnet total: 1000 + 200 = 1200, still 2 events.
|
||||
if got["claude-3-5-sonnet"].InputTokens != 1200 || got["claude-3-5-sonnet"].EventCount != 2 {
|
||||
t.Errorf("after correction, sonnet should reflect new total 1200, got: %+v", got["claude-3-5-sonnet"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestRollupTaskUsageDaily_WatermarkAdvances verifies the cron entry
|
||||
// point: rollup_task_usage_daily() consults task_usage_rollup_state to
|
||||
// decide its window, performs the upsert, and bumps the watermark.
|
||||
// We seed the watermark to a known value, force time to pass via a
|
||||
// fixture, and assert the watermark moves forward by exactly the
|
||||
// elapsed-window minus the 5 minute safety lag built into the function.
|
||||
func TestRollupTaskUsageDaily_WatermarkAdvances(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// Seed the watermark to "long ago" so the next call has a non-empty
|
||||
// window. Use a test-scoped low value so we don't clobber any other
|
||||
// test's state — the singleton row gets restored at the end.
|
||||
var prevWatermark time.Time
|
||||
if err := testPool.QueryRow(ctx, `SELECT watermark_at FROM task_usage_rollup_state WHERE id = 1`).Scan(&prevWatermark); err != nil {
|
||||
t.Fatalf("read prev watermark: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `UPDATE task_usage_rollup_state SET watermark_at = $1 WHERE id = 1`, prevWatermark)
|
||||
})
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
UPDATE task_usage_rollup_state
|
||||
SET watermark_at = '2020-01-01 00:00:00+00', last_error = NULL
|
||||
WHERE id = 1
|
||||
`); err != nil {
|
||||
t.Fatalf("seed watermark: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily()`); err != nil {
|
||||
t.Fatalf("rollup_task_usage_daily: %v", err)
|
||||
}
|
||||
|
||||
var newWatermark time.Time
|
||||
var lastError *string
|
||||
if err := testPool.QueryRow(ctx, `SELECT watermark_at, last_error FROM task_usage_rollup_state WHERE id = 1`).Scan(&newWatermark, &lastError); err != nil {
|
||||
t.Fatalf("read new watermark: %v", err)
|
||||
}
|
||||
if lastError != nil {
|
||||
t.Fatalf("rollup recorded error: %s", *lastError)
|
||||
}
|
||||
|
||||
// New watermark must be near now() - 5 min. Allow a wide window
|
||||
// (±2 min) so this isn't flaky on slow CI.
|
||||
expected := time.Now().UTC().Add(-5 * time.Minute)
|
||||
delta := newWatermark.Sub(expected)
|
||||
if delta < -2*time.Minute || delta > 2*time.Minute {
|
||||
t.Errorf("watermark %s not within 2min of expected %s (delta %s)", newWatermark, expected, delta)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRollupTaskUsageDaily_InvalidationOnReassign verifies that the
|
||||
// trigger-driven dirty-bucket queue handles task reassignment between
|
||||
// runtimes (the ReassignTasksToRuntime path used during runtime merge).
|
||||
// Without invalidation the rollup would keep attributing usage to the
|
||||
// old runtime; the raw fallback would not — so the two read paths would
|
||||
// silently disagree.
|
||||
func TestRollupTaskUsageDaily_InvalidationOnReassign(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
oldRuntimeID := handlerTestRuntimeID(t)
|
||||
// Spin up a second runtime to receive the reassigned task.
|
||||
var newRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
|
||||
)
|
||||
VALUES ($1, NULL, 'reassign-target', 'cloud', 'reassign-target', 'online', '{}'::jsonb, '{}'::jsonb, now())
|
||||
RETURNING id
|
||||
`, testWorkspaceID).Scan(&newRuntimeID); err != nil {
|
||||
t.Fatalf("create dest runtime: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID)
|
||||
})
|
||||
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
|
||||
VALUES ($1, 'reassign test', $2, 'member')
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
})
|
||||
|
||||
day := time.Date(2021, 3, 14, 0, 0, 0, 0, time.UTC)
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', $4)
|
||||
RETURNING id
|
||||
`, agentID, issueID, oldRuntimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
|
||||
})
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||||
VALUES ($1, 'claude', 'm-reassign', 700, 70, $2, $2)
|
||||
`, taskID, day.Add(time.Hour)); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-reassign'`, day)
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day)
|
||||
})
|
||||
|
||||
// Initial roll-up: usage should attach to OLD runtime.
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("initial rollup: %v", err)
|
||||
}
|
||||
var oldTokens, newTokens int64
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, oldRuntimeID, day).Scan(&oldTokens)
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, newRuntimeID, day).Scan(&newTokens)
|
||||
if oldTokens != 700 || newTokens != 0 {
|
||||
t.Fatalf("initial: expected old=700 new=0, got old=%d new=%d", oldTokens, newTokens)
|
||||
}
|
||||
|
||||
// Trigger should enqueue both old + new buckets.
|
||||
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, newRuntimeID, taskID); err != nil {
|
||||
t.Fatalf("reassign task: %v", err)
|
||||
}
|
||||
var dirtyCount int
|
||||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day).Scan(&dirtyCount)
|
||||
if dirtyCount != 2 {
|
||||
t.Fatalf("expected 2 dirty entries (old+new runtime), got %d", dirtyCount)
|
||||
}
|
||||
|
||||
// Re-run rollup. Old bucket should be deleted (no source rows left),
|
||||
// new bucket should receive the moved usage.
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("rollup after reassign: %v", err)
|
||||
}
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, oldRuntimeID, day).Scan(&oldTokens)
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-reassign'`, newRuntimeID, day).Scan(&newTokens)
|
||||
if oldTokens != 0 || newTokens != 700 {
|
||||
t.Fatalf("after reassign: expected old=0 new=700, got old=%d new=%d", oldTokens, newTokens)
|
||||
}
|
||||
// Dirty queue should be drained.
|
||||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-reassign'`, day).Scan(&dirtyCount)
|
||||
if dirtyCount != 0 {
|
||||
t.Errorf("expected dirty queue drained, got %d entries", dirtyCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRollupTaskUsageDaily_InvalidationOnIssueDelete verifies that
|
||||
// cascade delete (issue → agent_task_queue → task_usage) clears the
|
||||
// matching daily rows via the trigger-driven dirty queue.
|
||||
func TestRollupTaskUsageDaily_InvalidationOnIssueDelete(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
runtimeID := handlerTestRuntimeID(t)
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
|
||||
VALUES ($1, 'delete test', $2, 'member') RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
|
||||
day := time.Date(2021, 7, 4, 0, 0, 0, 0, time.UTC)
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
|
||||
`, agentID, issueID, runtimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||||
VALUES ($1, 'claude', 'm-delete', 500, 50, $2, $2)
|
||||
`, taskID, day.Add(time.Hour)); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-delete'`, day)
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-delete'`, day)
|
||||
})
|
||||
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("initial rollup: %v", err)
|
||||
}
|
||||
var tokens int64
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-delete'`, runtimeID, day).Scan(&tokens)
|
||||
if tokens != 500 {
|
||||
t.Fatalf("initial: expected 500, got %d", tokens)
|
||||
}
|
||||
|
||||
// Cascade delete via issue. Trigger fires on agent_task_queue BEFORE
|
||||
// DELETE — that's when the task_usage children + issue parent are
|
||||
// still readable inside the same statement.
|
||||
if _, err := testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID); err != nil {
|
||||
t.Fatalf("delete issue: %v", err)
|
||||
}
|
||||
var dirtyCount int
|
||||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-delete'`, day).Scan(&dirtyCount)
|
||||
if dirtyCount == 0 {
|
||||
t.Fatalf("expected dirty entry after cascade delete, got 0")
|
||||
}
|
||||
|
||||
// Re-run rollup: bucket should be deleted because no source rows exist.
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("rollup after delete: %v", err)
|
||||
}
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-delete'`, runtimeID, day).Scan(&tokens)
|
||||
if tokens != 0 {
|
||||
t.Errorf("after issue delete: expected 0 (bucket cleared), got %d", tokens)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRollupTaskUsageDaily_WorkspaceMismatch constructs an atq row whose
|
||||
// agent.workspace_id != issue.workspace_id and verifies that the rollup
|
||||
// resolves workspace_id consistently from `agent` across triggers,
|
||||
// dirty_from_updates, and recompute. If any of those paths leaked back
|
||||
// to the issue.workspace_id the dirty queue would be misaligned with
|
||||
// the recompute join and the bucket would either be silently dropped
|
||||
// (recompute returns 0 rows → deleted_empty branch fires) or attributed
|
||||
// to the wrong workspace.
|
||||
//
|
||||
// The schema does not enforce agent.workspace_id == issue.workspace_id,
|
||||
// so this canary keeps the alignment honest as the schema evolves.
|
||||
func TestRollupTaskUsageDaily_WorkspaceMismatch(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a foreign workspace + a runtime + an agent there.
|
||||
var foreignWorkspaceID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug) VALUES ('ws-mismatch', 'ws-mismatch-' || gen_random_uuid()::text) RETURNING id
|
||||
`).Scan(&foreignWorkspaceID); err != nil {
|
||||
t.Fatalf("create foreign workspace: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM workspace WHERE id = $1`, foreignWorkspaceID)
|
||||
})
|
||||
var foreignRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
|
||||
)
|
||||
VALUES ($1, NULL, 'mismatch-rt', 'cloud', 'mismatch-rt', 'online', '{}'::jsonb, '{}'::jsonb, now())
|
||||
RETURNING id
|
||||
`, foreignWorkspaceID).Scan(&foreignRuntimeID); err != nil {
|
||||
t.Fatalf("create foreign runtime: %v", err)
|
||||
}
|
||||
var foreignAgentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id,
|
||||
instructions, custom_env, custom_args, mcp_config
|
||||
)
|
||||
VALUES ($1, 'mismatch-agent', '', 'cloud', '{}'::jsonb, $2, 'private', 1, $3, '', '{}'::jsonb, '[]'::jsonb, '[]'::jsonb)
|
||||
RETURNING id
|
||||
`, foreignWorkspaceID, foreignRuntimeID, testUserID).Scan(&foreignAgentID); err != nil {
|
||||
t.Fatalf("create foreign agent: %v", err)
|
||||
}
|
||||
|
||||
// Issue lives in the *primary* test workspace, agent in foreign one.
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
|
||||
VALUES ($1, 'mismatch test', $2, 'member') RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
})
|
||||
|
||||
day := time.Date(2021, 9, 9, 0, 0, 0, 0, time.UTC)
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
|
||||
`, foreignAgentID, issueID, foreignRuntimeID, day.Add(time.Hour)).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert atq: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||||
VALUES ($1, 'claude', 'm-mismatch', 333, 33, $2, $2)
|
||||
`, taskID, day.Add(time.Hour)); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily WHERE bucket_date = $1::date AND model = 'm-mismatch'`, day)
|
||||
testPool.Exec(ctx, `DELETE FROM task_usage_daily_dirty WHERE bucket_date = $1::date AND model = 'm-mismatch'`, day)
|
||||
})
|
||||
|
||||
// Rollup. The bucket must be attributed to FOREIGN workspace
|
||||
// (agent.workspace_id), not the primary one (issue.workspace_id).
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("rollup: %v", err)
|
||||
}
|
||||
var foreignTokens, primaryTokens int64
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE workspace_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignWorkspaceID, day).Scan(&foreignTokens)
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE workspace_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, testWorkspaceID, day).Scan(&primaryTokens)
|
||||
if foreignTokens != 333 {
|
||||
t.Fatalf("expected foreign workspace bucket = 333, got %d", foreignTokens)
|
||||
}
|
||||
if primaryTokens != 0 {
|
||||
t.Errorf("expected primary workspace bucket = 0, got %d", primaryTokens)
|
||||
}
|
||||
|
||||
// Now reassign atq.runtime_id within the foreign workspace and
|
||||
// verify the trigger / recompute pair still agree on workspace_id.
|
||||
var foreignRuntime2ID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
|
||||
)
|
||||
VALUES ($1, NULL, 'mismatch-rt2', 'cloud', 'mismatch-rt2', 'online', '{}'::jsonb, '{}'::jsonb, now())
|
||||
RETURNING id
|
||||
`, foreignWorkspaceID).Scan(&foreignRuntime2ID); err != nil {
|
||||
t.Fatalf("create foreign runtime 2: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, foreignRuntime2ID, taskID); err != nil {
|
||||
t.Fatalf("reassign: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)`); err != nil {
|
||||
t.Fatalf("rollup after reassign: %v", err)
|
||||
}
|
||||
var oldRTTokens, newRTTokens int64
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignRuntimeID, day).Scan(&oldRTTokens)
|
||||
testPool.QueryRow(ctx, `SELECT COALESCE(SUM(input_tokens),0) FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date = $2::date AND model = 'm-mismatch'`, foreignRuntime2ID, day).Scan(&newRTTokens)
|
||||
if oldRTTokens != 0 || newRTTokens != 333 {
|
||||
t.Fatalf("after reassign in mismatched ws: expected old=0 new=333, got old=%d new=%d", oldRTTokens, newRTTokens)
|
||||
}
|
||||
}
|
||||
@@ -139,6 +139,27 @@ func TestGetRuntimeUsage_BucketsByUsageTime(t *testing.T) {
|
||||
insertTaskWithUsage(yesterdayLate, todayEarly, 1000) // cross-midnight
|
||||
insertTaskWithUsage(yesterdayMorning, yesterdayMorning, 2000) // full-day yesterday
|
||||
|
||||
// ListRuntimeUsage now reads from the `task_usage_daily` rollup
|
||||
// table maintained by the cron-driven rollup_task_usage_daily()
|
||||
// function. In production the watermarked wrapper waits a 5 min
|
||||
// safety lag before consuming rows; here we drive the underlying
|
||||
// window function directly with a wide-open range so the freshly
|
||||
// inserted fixture rows are guaranteed to be aggregated before the
|
||||
// handler is called. Each test invocation gets its own isolated
|
||||
// daily buckets keyed by (date, runtime, provider, model), so
|
||||
// re-running the test is idempotent (the upsert just rewrites the
|
||||
// same totals).
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
SELECT rollup_task_usage_daily_window('-infinity'::timestamptz, 'infinity'::timestamptz)
|
||||
`); err != nil {
|
||||
t.Fatalf("rollup_task_usage_daily_window: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `
|
||||
DELETE FROM task_usage_daily WHERE runtime_id = $1 AND bucket_date IN ($2::date, $3::date)
|
||||
`, runtimeID, today, today.Add(-24*time.Hour))
|
||||
})
|
||||
|
||||
// Call the handler with ?days=1 at whatever "now" is. That should include
|
||||
// both today and yesterday in full.
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
1
server/migrations/072_task_usage_updated_at.down.sql
Normal file
1
server/migrations/072_task_usage_updated_at.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
ALTER TABLE task_usage DROP COLUMN IF EXISTS updated_at;
|
||||
23
server/migrations/072_task_usage_updated_at.up.sql
Normal file
23
server/migrations/072_task_usage_updated_at.up.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
-- Add `updated_at` to task_usage so the daily-rollup worker (added in 073)
|
||||
-- can detect rows that were corrected by `UpsertTaskUsage` after their
|
||||
-- original creation. The existing UPSERT path overwrites token counts on
|
||||
-- conflict but leaves created_at unchanged, so a watermark on created_at
|
||||
-- alone would silently miss those corrections.
|
||||
--
|
||||
-- Schema-only, online-safe migration. The column is nullable with no
|
||||
-- backfill UPDATE so this is metadata-only on a hot, high-write table —
|
||||
-- no full-table rewrite, no row-lock storm, no WAL spike. Old rows stay
|
||||
-- NULL; the rollup function (073) handles them via
|
||||
-- `COALESCE(updated_at, created_at)` and an OR branch in the window
|
||||
-- filter so legacy rows are still discoverable by backfill.
|
||||
--
|
||||
-- DEFAULT now() is set after the column exists so new INSERTs (and
|
||||
-- UpsertTaskUsage on conflict, which sets the value explicitly) always
|
||||
-- get a timestamp. Setting the default on an existing column does NOT
|
||||
-- touch existing rows; only new rows get the default. This keeps the
|
||||
-- migration cheap on ~hundreds of millions of `task_usage` rows.
|
||||
ALTER TABLE task_usage
|
||||
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ;
|
||||
|
||||
ALTER TABLE task_usage
|
||||
ALTER COLUMN updated_at SET DEFAULT now();
|
||||
6
server/migrations/073_task_usage_daily_rollup.down.sql
Normal file
6
server/migrations/073_task_usage_daily_rollup.down.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
DROP FUNCTION IF EXISTS rollup_task_usage_daily();
|
||||
DROP FUNCTION IF EXISTS rollup_task_usage_daily_window(TIMESTAMPTZ, TIMESTAMPTZ);
|
||||
DROP TABLE IF EXISTS task_usage_rollup_state;
|
||||
DROP INDEX IF EXISTS idx_task_usage_daily_workspace_date;
|
||||
DROP INDEX IF EXISTS idx_task_usage_daily_runtime_date;
|
||||
DROP TABLE IF EXISTS task_usage_daily;
|
||||
227
server/migrations/073_task_usage_daily_rollup.up.sql
Normal file
227
server/migrations/073_task_usage_daily_rollup.up.sql
Normal file
@@ -0,0 +1,227 @@
|
||||
-- Daily rollup table for `task_usage`. Background: the dashboard query
|
||||
-- ListRuntimeUsage runs `SUM() GROUP BY DATE(created_at), provider, model`
|
||||
-- against the raw event stream and is called once per runtime row on the
|
||||
-- runtimes list (plus once per detail page load), so it dominates DB load
|
||||
-- as event volume grows. We materialise the day-bucketed aggregate here
|
||||
-- so reads scan O(days × providers × models) rows instead of O(events).
|
||||
--
|
||||
-- All query dimensions are denormalised into the table so reads never
|
||||
-- need to join `agent_task_queue`. The PK doubles as the upsert key for
|
||||
-- the rollup worker.
|
||||
CREATE TABLE task_usage_daily (
|
||||
bucket_date DATE NOT NULL,
|
||||
workspace_id UUID NOT NULL,
|
||||
runtime_id UUID NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
event_count BIGINT NOT NULL DEFAULT 0,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
);
|
||||
|
||||
-- Primary read path: runtime detail page + runtimes-list cost cell, both
|
||||
-- filter by runtime_id and order by date DESC. bucket_date DESC in the
|
||||
-- index lets the query avoid an extra sort.
|
||||
CREATE INDEX idx_task_usage_daily_runtime_date
|
||||
ON task_usage_daily (runtime_id, bucket_date DESC);
|
||||
|
||||
-- Workspace-wide aggregations hit this index instead of fanning out per
|
||||
-- runtime.
|
||||
CREATE INDEX idx_task_usage_daily_workspace_date
|
||||
ON task_usage_daily (workspace_id, bucket_date DESC);
|
||||
|
||||
-- Single-row state table tracking how far the rollup worker has consumed.
|
||||
CREATE TABLE task_usage_rollup_state (
|
||||
id SMALLINT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
|
||||
watermark_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
|
||||
last_run_started_at TIMESTAMPTZ,
|
||||
last_run_finished_at TIMESTAMPTZ,
|
||||
last_run_rows BIGINT NOT NULL DEFAULT 0,
|
||||
last_error TEXT
|
||||
);
|
||||
INSERT INTO task_usage_rollup_state (id) VALUES (1) ON CONFLICT DO NOTHING;
|
||||
|
||||
-- Window-based aggregation primitive. Used by both the cron-driven
|
||||
-- watermark advancer and the offline backfill command, so they stay
|
||||
-- byte-identical in their semantics. Returns the number of output rows
|
||||
-- touched.
|
||||
--
|
||||
-- IDEMPOTENCY CONTRACT (this is the important bit):
|
||||
-- For every (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
-- key that has at least one task_usage row whose `updated_at` falls in
|
||||
-- [p_from, p_to), this function REPLACES the corresponding daily row
|
||||
-- with the SUM of *all* task_usage rows for that key (regardless of
|
||||
-- their updated_at). It does NOT add a delta.
|
||||
--
|
||||
-- Consequences:
|
||||
-- * Replaying the same window is safe — the row is rebuilt from raw
|
||||
-- each time, so the result converges.
|
||||
-- * Two callers (cron + backfill) processing overlapping windows is
|
||||
-- safe — both write the same value.
|
||||
-- * `UpsertTaskUsage` corrections that overwrite token counts are
|
||||
-- captured: the corrected row's updated_at gets bumped, the next
|
||||
-- window picks up its bucket key, and the bucket is recomputed
|
||||
-- from current truth.
|
||||
--
|
||||
-- Cost: the recompute reads ALL task_usage rows for each dirty bucket,
|
||||
-- not just the windowed slice. In steady state only "today" buckets are
|
||||
-- dirty (a handful of keys per active runtime), so this stays cheap.
|
||||
-- During backfill the entire history's bucket keys become dirty once;
|
||||
-- the backfill walks history in monthly slices to bound the working
|
||||
-- set per call.
|
||||
CREATE OR REPLACE FUNCTION rollup_task_usage_daily_window(
|
||||
p_from TIMESTAMPTZ,
|
||||
p_to TIMESTAMPTZ
|
||||
)
|
||||
RETURNS BIGINT
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_rows BIGINT;
|
||||
BEGIN
|
||||
IF p_from >= p_to THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
WITH dirty_keys AS (
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at) AS bucket_date,
|
||||
i.workspace_id AS workspace_id,
|
||||
atq.runtime_id AS runtime_id,
|
||||
tu.provider AS provider,
|
||||
tu.model AS model
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN issue i ON i.id = atq.issue_id
|
||||
WHERE atq.runtime_id IS NOT NULL
|
||||
AND (
|
||||
-- Steady state: rows updated within the watermark window.
|
||||
-- Hits idx_task_usage_updated_at directly.
|
||||
(tu.updated_at >= p_from AND tu.updated_at < p_to)
|
||||
-- Legacy rows from before migration 072 (updated_at IS NULL)
|
||||
-- — discoverable via created_at + the partial index added
|
||||
-- in 077. Steady-state windows after backfill never include
|
||||
-- historical dates, so this branch is a no-op once the
|
||||
-- backfill has swept history.
|
||||
OR (tu.updated_at IS NULL
|
||||
AND tu.created_at >= p_from
|
||||
AND tu.created_at < p_to)
|
||||
)
|
||||
),
|
||||
recomputed AS (
|
||||
SELECT
|
||||
dk.bucket_date,
|
||||
dk.workspace_id,
|
||||
dk.runtime_id,
|
||||
dk.provider,
|
||||
dk.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(*)::bigint AS event_count
|
||||
FROM dirty_keys dk
|
||||
JOIN agent_task_queue atq ON atq.runtime_id = dk.runtime_id
|
||||
JOIN issue i ON i.id = atq.issue_id
|
||||
AND i.workspace_id = dk.workspace_id
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
AND tu.provider = dk.provider
|
||||
AND tu.model = dk.model
|
||||
AND DATE(tu.created_at) = dk.bucket_date
|
||||
GROUP BY 1, 2, 3, 4, 5
|
||||
)
|
||||
INSERT INTO task_usage_daily AS d (
|
||||
bucket_date, workspace_id, runtime_id, provider, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
event_count
|
||||
)
|
||||
SELECT
|
||||
bucket_date, workspace_id, runtime_id, provider, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
event_count
|
||||
FROM recomputed
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
event_count = EXCLUDED.event_count,
|
||||
updated_at = now();
|
||||
|
||||
GET DIAGNOSTICS v_rows = ROW_COUNT;
|
||||
RETURN v_rows;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Cron entry point. Advances the watermark by one window each call.
|
||||
--
|
||||
-- Invariants:
|
||||
-- * `pg_try_advisory_lock(4242)` serialises overlapping ticks.
|
||||
-- * The window upper bound is `now() - 5 minutes`. The lag exists
|
||||
-- because `task_usage` rows are written from a separate transaction;
|
||||
-- a row with updated_at = T can become visible to this snapshot at
|
||||
-- some t > T. 5 minutes is a generous bound on that visibility delay
|
||||
-- and keeps the dashboard "today" bucket at most ~10 min stale
|
||||
-- (5 min lag + 5 min cron period).
|
||||
-- * On error we record `last_error` and re-raise; the watermark is NOT
|
||||
-- advanced because the UPDATE that advances it only runs after the
|
||||
-- upsert succeeds.
|
||||
-- * SAFE TO RUN CONCURRENTLY WITH BACKFILL: the window primitive is
|
||||
-- idempotent (see contract above), so even if cron fires while the
|
||||
-- offline backfill is also walking history, the worst case is some
|
||||
-- bucket gets written twice with the same value.
|
||||
CREATE OR REPLACE FUNCTION rollup_task_usage_daily()
|
||||
RETURNS BIGINT
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_lock_ok BOOLEAN;
|
||||
v_from TIMESTAMPTZ;
|
||||
v_to TIMESTAMPTZ;
|
||||
v_rows BIGINT := 0;
|
||||
BEGIN
|
||||
SELECT pg_try_advisory_lock(4242) INTO v_lock_ok;
|
||||
IF NOT v_lock_ok THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
BEGIN
|
||||
UPDATE task_usage_rollup_state
|
||||
SET last_run_started_at = now(),
|
||||
last_error = NULL
|
||||
WHERE id = 1
|
||||
RETURNING watermark_at INTO v_from;
|
||||
|
||||
v_to := now() - INTERVAL '5 minutes';
|
||||
|
||||
IF v_from < v_to THEN
|
||||
v_rows := rollup_task_usage_daily_window(v_from, v_to);
|
||||
|
||||
UPDATE task_usage_rollup_state
|
||||
SET watermark_at = v_to,
|
||||
last_run_finished_at = now(),
|
||||
last_run_rows = v_rows
|
||||
WHERE id = 1;
|
||||
ELSE
|
||||
UPDATE task_usage_rollup_state
|
||||
SET last_run_finished_at = now(),
|
||||
last_run_rows = 0
|
||||
WHERE id = 1;
|
||||
END IF;
|
||||
|
||||
PERFORM pg_advisory_unlock(4242);
|
||||
RETURN v_rows;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
UPDATE task_usage_rollup_state
|
||||
SET last_error = SQLERRM,
|
||||
last_run_finished_at = now()
|
||||
WHERE id = 1;
|
||||
PERFORM pg_advisory_unlock(4242);
|
||||
RAISE;
|
||||
END;
|
||||
END;
|
||||
$$;
|
||||
@@ -0,0 +1 @@
|
||||
DROP INDEX IF EXISTS idx_task_usage_updated_at;
|
||||
5
server/migrations/074_task_usage_updated_at_index.up.sql
Normal file
5
server/migrations/074_task_usage_updated_at_index.up.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
-- Drives the rollup worker's "what changed since last tick" scan in
|
||||
-- 073's window function. CONCURRENTLY avoids blocking writes during
|
||||
-- build (matches the pattern used in 035/067 for live indexes).
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_updated_at
|
||||
ON task_usage (updated_at);
|
||||
@@ -0,0 +1 @@
|
||||
DROP INDEX IF EXISTS idx_task_usage_created_at;
|
||||
5
server/migrations/075_task_usage_created_at_index.up.sql
Normal file
5
server/migrations/075_task_usage_created_at_index.up.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
-- Helps the two lazy endpoints (ListRuntimeUsageByAgent / GetRuntimeUsageByHour)
|
||||
-- that still scan the raw `task_usage` table by created_at. CONCURRENTLY
|
||||
-- avoids blocking writes during build.
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_created_at
|
||||
ON task_usage (created_at);
|
||||
10
server/migrations/076_task_usage_pgcron_extension.down.sql
Normal file
10
server/migrations/076_task_usage_pgcron_extension.down.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
DROP FUNCTION IF EXISTS task_usage_rollup_lag_seconds();
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
|
||||
PERFORM cron.unschedule('rollup_task_usage_daily')
|
||||
FROM cron.job WHERE jobname = 'rollup_task_usage_daily';
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
46
server/migrations/076_task_usage_pgcron_extension.up.sql
Normal file
46
server/migrations/076_task_usage_pgcron_extension.up.sql
Normal file
@@ -0,0 +1,46 @@
|
||||
-- Enable pg_cron extension if available, but DO NOT schedule the rollup
|
||||
-- job here. Scheduling must happen *after* a successful backfill run, so
|
||||
-- the cron tick doesn't race the backfill (both write the same daily
|
||||
-- buckets — the rollup function in 073 is now idempotent, so collisions
|
||||
-- produce correct values, but we still avoid overlap as a defense in
|
||||
-- depth + to keep load low during backfill).
|
||||
--
|
||||
-- Operator playbook (in deployment runbook):
|
||||
-- 1) Apply migrations 072..075 (this file is 076).
|
||||
-- 2) Run `go run ./cmd/backfill_task_usage_daily` — succeeds and
|
||||
-- stamps the rollup-state watermark.
|
||||
-- 3) Set USAGE_DAILY_ROLLUP_ENABLED=true on the API and roll out.
|
||||
-- 4) As superuser:
|
||||
-- SELECT cron.schedule(
|
||||
-- 'rollup_task_usage_daily',
|
||||
-- '*/5 * * * *',
|
||||
-- $$SELECT rollup_task_usage_daily()$$
|
||||
-- );
|
||||
-- 5) As superuser, also schedule cron-log pruning (see notes below).
|
||||
--
|
||||
-- The CREATE EXTENSION is wrapped in DO/EXCEPTION so dev/CI environments
|
||||
-- without `shared_preload_libraries=pg_cron` skip gracefully and the
|
||||
-- migration still succeeds (mirrors migration 032 pg_bigm pattern).
|
||||
DO $$
|
||||
BEGIN
|
||||
CREATE EXTENSION IF NOT EXISTS pg_cron;
|
||||
EXCEPTION
|
||||
WHEN OTHERS THEN
|
||||
RAISE NOTICE 'pg_cron extension not available; skipping. Schedule rollup_task_usage_daily() via your platform''s scheduling primitive (Kubernetes CronJob, etc.).';
|
||||
END
|
||||
$$;
|
||||
|
||||
-- Health check helper. Returns NULL if the rollup has never run, or the
|
||||
-- number of seconds since the last successful tick. Use this from
|
||||
-- monitoring / alerts:
|
||||
-- * Alert if NULL for >15 minutes after deployment (cron not scheduled).
|
||||
-- * Alert if value > 900 seconds (cron stuck or job failing).
|
||||
CREATE OR REPLACE FUNCTION task_usage_rollup_lag_seconds()
|
||||
RETURNS DOUBLE PRECISION
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $$
|
||||
SELECT EXTRACT(EPOCH FROM (now() - last_run_finished_at))
|
||||
FROM task_usage_rollup_state
|
||||
WHERE id = 1;
|
||||
$$;
|
||||
@@ -0,0 +1,7 @@
|
||||
DROP TRIGGER IF EXISTS trg_tu_dirty_rollup ON task_usage;
|
||||
DROP TRIGGER IF EXISTS trg_atq_dirty_rollup ON agent_task_queue;
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_daily_dirty_for_tu();
|
||||
DROP FUNCTION IF EXISTS enqueue_task_usage_daily_dirty_for_atq();
|
||||
DROP TABLE IF EXISTS task_usage_daily_dirty;
|
||||
-- idx_task_usage_created_at_legacy is owned by 078; do not drop here.
|
||||
-- The 073 down-migration recreates the older window function definition.
|
||||
269
server/migrations/077_task_usage_daily_invalidation.up.sql
Normal file
269
server/migrations/077_task_usage_daily_invalidation.up.sql
Normal file
@@ -0,0 +1,269 @@
|
||||
-- Catch joined-table changes that the `updated_at` watermark in 073 misses.
|
||||
--
|
||||
-- The window function in 073 finds dirty buckets via `task_usage.updated_at`.
|
||||
-- That covers INSERT and UPDATE on `task_usage`, but NOT:
|
||||
-- 1) DELETE on `task_usage` itself (no row left to discover).
|
||||
-- 2) Cascade DELETE through `agent_task_queue` (issue/queue rows go away,
|
||||
-- taking task_usage with them).
|
||||
-- 3) UPDATE of `agent_task_queue.runtime_id` — used by the runtime
|
||||
-- consolidation path (`ReassignTasksToRuntime`) — which moves usage
|
||||
-- from one runtime's bucket to another without touching task_usage.
|
||||
--
|
||||
-- Without invalidation, the rollup table diverges from raw task_usage:
|
||||
-- deleted issues stay billed forever, reassigned tasks stay attributed to
|
||||
-- the old runtime. The raw-table fallback path doesn't suffer from this,
|
||||
-- so the two read paths would silently disagree.
|
||||
--
|
||||
-- Solution: an explicit `task_usage_daily_dirty` queue table populated by
|
||||
-- triggers on the joined tables, drained by the rollup window function.
|
||||
|
||||
CREATE TABLE task_usage_daily_dirty (
|
||||
bucket_date DATE NOT NULL,
|
||||
workspace_id UUID NOT NULL,
|
||||
runtime_id UUID NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
);
|
||||
|
||||
-- Drained by enqueued_at <= cutoff in the window function. Enqueue on
|
||||
-- conflict updates enqueued_at to GREATEST(existing, new) so that an
|
||||
-- invalidation arriving DURING a rollup tick (between the function's
|
||||
-- snapshot and its drain step) keeps an enqueued_at > p_to and
|
||||
-- survives the drain. Without that, the late invalidation would be
|
||||
-- silently dropped.
|
||||
CREATE INDEX idx_task_usage_daily_dirty_enqueued_at
|
||||
ON task_usage_daily_dirty (enqueued_at);
|
||||
|
||||
-- NOTE: The partial index supporting the legacy `updated_at IS NULL`
|
||||
-- branch in the rollup window function is created in migration 078 with
|
||||
-- `CREATE INDEX CONCURRENTLY` to avoid blocking writes on the hot
|
||||
-- task_usage table. Until 078 has been applied, the OR branch falls
|
||||
-- back to a sequential scan filtered by `updated_at IS NULL`. That is
|
||||
-- acceptable because the rollup function is only invoked after this
|
||||
-- migration AND the backfill have run; in steady state no rows have
|
||||
-- NULL updated_at.
|
||||
|
||||
-- Trigger function for agent_task_queue. Two cases:
|
||||
-- * UPDATE of runtime_id (old != new): usage moves between runtimes.
|
||||
-- Enqueue both OLD and NEW runtime buckets so both get recomputed.
|
||||
-- * DELETE: row + its task_usage children are about to vanish.
|
||||
-- Enqueue OLD runtime buckets so the daily rows get cleared.
|
||||
-- We resolve workspace_id via `agent` (NOT via `issue`). When a DELETE
|
||||
-- cascades from issue → agent_task_queue, the issue row is already gone
|
||||
-- by the time this BEFORE DELETE trigger fires, so a join on `issue`
|
||||
-- would return zero rows and the enqueue would silently no-op. `agent`
|
||||
-- has its own ON DELETE CASCADE to atq but is not in the issue cascade
|
||||
-- chain, so it's still alive.
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_atq()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
IF TG_OP = 'UPDATE' THEN
|
||||
IF OLD.runtime_id IS DISTINCT FROM NEW.runtime_id THEN
|
||||
IF OLD.runtime_id IS NOT NULL THEN
|
||||
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = OLD.agent_id
|
||||
WHERE tu.task_id = OLD.id
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
END IF;
|
||||
IF NEW.runtime_id IS NOT NULL THEN
|
||||
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, NEW.runtime_id, tu.provider, tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = NEW.agent_id
|
||||
WHERE tu.task_id = NEW.id
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
END IF;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
IF OLD.runtime_id IS NOT NULL THEN
|
||||
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
|
||||
FROM task_usage tu
|
||||
JOIN agent a ON a.id = OLD.agent_id
|
||||
WHERE tu.task_id = OLD.id
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
END IF;
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_atq_dirty_rollup
|
||||
BEFORE UPDATE OF runtime_id OR DELETE ON agent_task_queue
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_daily_dirty_for_atq();
|
||||
|
||||
-- Trigger function for direct task_usage DELETE (rare — direct cleanup,
|
||||
-- not via cascade). UPDATE on task_usage is already covered by the
|
||||
-- updated_at watermark in the window function.
|
||||
-- workspace_id resolved via agent (see comment on the atq trigger
|
||||
-- function for why issue is unsafe in cascade contexts).
|
||||
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_tu()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
|
||||
SELECT DATE(OLD.created_at), a.workspace_id, atq.runtime_id, OLD.provider, OLD.model
|
||||
FROM agent_task_queue atq
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE atq.id = OLD.task_id
|
||||
AND atq.runtime_id IS NOT NULL
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER trg_tu_dirty_rollup
|
||||
BEFORE DELETE ON task_usage
|
||||
FOR EACH ROW EXECUTE FUNCTION enqueue_task_usage_daily_dirty_for_tu();
|
||||
|
||||
-- Replace the rollup window function to also drain the dirty queue and
|
||||
-- DELETE buckets that no longer have any source rows.
|
||||
--
|
||||
-- Pure-SQL CTE form so multiple calls in the same transaction (tests,
|
||||
-- backfill scripts) don't collide on temp-table names.
|
||||
CREATE OR REPLACE FUNCTION rollup_task_usage_daily_window(
|
||||
p_from TIMESTAMPTZ,
|
||||
p_to TIMESTAMPTZ
|
||||
)
|
||||
RETURNS BIGINT
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_rows BIGINT;
|
||||
BEGIN
|
||||
IF p_from >= p_to THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
WITH
|
||||
-- Source 1: rows with updated_at in this window (steady state) plus
|
||||
-- the legacy-row OR branch for NULL updated_at (covered by partial
|
||||
-- index idx_task_usage_created_at_legacy from migration 078).
|
||||
--
|
||||
-- workspace_id is resolved via `agent`, NOT `issue`, to match the
|
||||
-- trigger functions above. There is no schema-level FK guaranteeing
|
||||
-- agent.workspace_id == issue.workspace_id, so mixing the two
|
||||
-- sources would let dirty_from_updates / recomputed disagree with
|
||||
-- dirty_from_queue's view of which workspace a task belongs to.
|
||||
-- Going through agent everywhere keeps trigger / discovery /
|
||||
-- recompute aligned without leaning on an unenforced invariant.
|
||||
dirty_from_updates AS (
|
||||
SELECT DISTINCT
|
||||
DATE(tu.created_at) AS bucket_date,
|
||||
a.workspace_id AS workspace_id,
|
||||
atq.runtime_id AS runtime_id,
|
||||
tu.provider AS provider,
|
||||
tu.model AS model
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE atq.runtime_id IS NOT NULL
|
||||
AND (
|
||||
(tu.updated_at >= p_from AND tu.updated_at < p_to)
|
||||
OR (tu.updated_at IS NULL
|
||||
AND tu.created_at >= p_from
|
||||
AND tu.created_at < p_to)
|
||||
)
|
||||
),
|
||||
-- Source 2: explicit invalidation queue (deletes + reassignments).
|
||||
dirty_from_queue AS (
|
||||
SELECT bucket_date, workspace_id, runtime_id, provider, model
|
||||
FROM task_usage_daily_dirty
|
||||
WHERE enqueued_at < p_to
|
||||
),
|
||||
dirty_keys AS (
|
||||
SELECT * FROM dirty_from_updates
|
||||
UNION
|
||||
SELECT * FROM dirty_from_queue
|
||||
),
|
||||
-- Recompute each dirty bucket from ground truth. Same agent-based
|
||||
-- workspace resolution as dirty_from_updates above.
|
||||
recomputed AS (
|
||||
SELECT
|
||||
dk.bucket_date,
|
||||
dk.workspace_id,
|
||||
dk.runtime_id,
|
||||
dk.provider,
|
||||
dk.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
||||
COUNT(*)::bigint AS event_count
|
||||
FROM dirty_keys dk
|
||||
JOIN agent_task_queue atq ON atq.runtime_id = dk.runtime_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
AND a.workspace_id = dk.workspace_id
|
||||
JOIN task_usage tu ON tu.task_id = atq.id
|
||||
AND tu.provider = dk.provider
|
||||
AND tu.model = dk.model
|
||||
AND DATE(tu.created_at) = dk.bucket_date
|
||||
GROUP BY 1, 2, 3, 4, 5
|
||||
),
|
||||
-- REPLACE present buckets.
|
||||
upserted AS (
|
||||
INSERT INTO task_usage_daily AS d (
|
||||
bucket_date, workspace_id, runtime_id, provider, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
event_count
|
||||
)
|
||||
SELECT
|
||||
bucket_date, workspace_id, runtime_id, provider, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
event_count
|
||||
FROM recomputed
|
||||
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
||||
SET input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
event_count = EXCLUDED.event_count,
|
||||
updated_at = now()
|
||||
RETURNING 1
|
||||
),
|
||||
-- DELETE buckets that are dirty but have no source rows anymore.
|
||||
-- Important: USING dirty_keys (not recomputed) so we can detect
|
||||
-- "all source rows gone" — if recomputed has no row for a key, the
|
||||
-- bucket is empty and should be removed.
|
||||
deleted_empty AS (
|
||||
DELETE FROM task_usage_daily d
|
||||
USING dirty_keys dk
|
||||
WHERE d.bucket_date = dk.bucket_date
|
||||
AND d.workspace_id = dk.workspace_id
|
||||
AND d.runtime_id = dk.runtime_id
|
||||
AND d.provider = dk.provider
|
||||
AND d.model = dk.model
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM recomputed r
|
||||
WHERE r.bucket_date = dk.bucket_date
|
||||
AND r.workspace_id = dk.workspace_id
|
||||
AND r.runtime_id = dk.runtime_id
|
||||
AND r.provider = dk.provider
|
||||
AND r.model = dk.model
|
||||
)
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
|
||||
INTO v_rows;
|
||||
|
||||
-- Drain the consumed dirty queue rows. Anything enqueued AFTER p_to
|
||||
-- stays for the next call — keeps the contract aligned with the
|
||||
-- watermark.
|
||||
DELETE FROM task_usage_daily_dirty WHERE enqueued_at < p_to;
|
||||
|
||||
RETURN v_rows;
|
||||
END;
|
||||
$$;
|
||||
@@ -0,0 +1 @@
|
||||
DROP INDEX CONCURRENTLY IF EXISTS idx_task_usage_created_at_legacy;
|
||||
@@ -0,0 +1,11 @@
|
||||
-- Partial index supporting the rollup window function's legacy NULL
|
||||
-- branch (072 added `updated_at` as nullable; rows that existed before
|
||||
-- the column was added stay NULL until either backfill replaces them or
|
||||
-- a subsequent UpsertTaskUsage refreshes them).
|
||||
--
|
||||
-- Built CONCURRENTLY because task_usage is a hot, large table — same
|
||||
-- pattern as 074/075. Run this AFTER 077 is applied and BEFORE turning
|
||||
-- on the read-path feature flag / scheduling pg_cron.
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_usage_created_at_legacy
|
||||
ON task_usage (created_at)
|
||||
WHERE updated_at IS NULL;
|
||||
@@ -421,6 +421,39 @@ type TaskUsage struct {
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type TaskUsageDaily struct {
|
||||
BucketDate pgtype.Date `json:"bucket_date"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
EventCount int64 `json:"event_count"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type TaskUsageDailyDirty struct {
|
||||
BucketDate pgtype.Date `json:"bucket_date"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
EnqueuedAt pgtype.Timestamptz `json:"enqueued_at"`
|
||||
}
|
||||
|
||||
type TaskUsageRollupState struct {
|
||||
ID int16 `json:"id"`
|
||||
WatermarkAt pgtype.Timestamptz `json:"watermark_at"`
|
||||
LastRunStartedAt pgtype.Timestamptz `json:"last_run_started_at"`
|
||||
LastRunFinishedAt pgtype.Timestamptz `json:"last_run_finished_at"`
|
||||
LastRunRows int64 `json:"last_run_rows"`
|
||||
LastError pgtype.Text `json:"last_error"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
|
||||
@@ -141,10 +141,11 @@ type ListRuntimeUsageRow struct {
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// Bucket by tu.created_at (usage report time, ~= task completion time), not
|
||||
// atq.created_at (task enqueue time), so tasks that queue one day and execute
|
||||
// the next are attributed to the day tokens were actually produced. The since
|
||||
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
|
||||
// Reads from raw `task_usage`, bucketed by DATE(tu.created_at) — usage
|
||||
// report time, ~= task completion time. Since cutoff is truncated to
|
||||
// start-of-day so `days=N` yields full calendar days. This is the
|
||||
// always-correct fallback path; used when USAGE_DAILY_ROLLUP_ENABLED
|
||||
// is false (or the rollup hasn't been deployed yet).
|
||||
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]ListRuntimeUsageRow, error) {
|
||||
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Since)
|
||||
if err != nil {
|
||||
@@ -238,3 +239,76 @@ func (q *Queries) ListRuntimeUsageByAgent(ctx context.Context, arg ListRuntimeUs
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listRuntimeUsageDaily = `-- name: ListRuntimeUsageDaily :many
|
||||
SELECT
|
||||
bucket_date AS date,
|
||||
provider,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens
|
||||
FROM task_usage_daily
|
||||
WHERE runtime_id = $1
|
||||
AND bucket_date >= DATE(DATE_TRUNC('day', $2::timestamptz))
|
||||
GROUP BY bucket_date, provider, model
|
||||
ORDER BY bucket_date DESC, provider, model
|
||||
`
|
||||
|
||||
type ListRuntimeUsageDailyParams struct {
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
}
|
||||
|
||||
type ListRuntimeUsageDailyRow struct {
|
||||
Date pgtype.Date `json:"date"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// Reads from the `task_usage_daily` rollup table maintained by
|
||||
// rollup_task_usage_daily() (scheduled every 5 min via pg_cron, or any
|
||||
// equivalent external scheduler that calls the function). Same shape as
|
||||
// ListRuntimeUsage above. Today's bucket may lag the raw table by up to
|
||||
// ~10 min (5 min cron period + 5 min rollup safety lag); intentional.
|
||||
//
|
||||
// Only used when USAGE_DAILY_ROLLUP_ENABLED is true AND deploy has
|
||||
// verified that the rollup is fresh (see task_usage_rollup_lag_seconds
|
||||
// helper from migration 076).
|
||||
//
|
||||
// The PK on task_usage_daily already collapses to one row per
|
||||
// (bucket_date, runtime_id, provider, model), but SUM/GROUP BY is kept
|
||||
// so future schema changes (extra dimensions promoted into the table)
|
||||
// don't silently change query semantics.
|
||||
func (q *Queries) ListRuntimeUsageDaily(ctx context.Context, arg ListRuntimeUsageDailyParams) ([]ListRuntimeUsageDailyRow, error) {
|
||||
rows, err := q.db.Query(ctx, listRuntimeUsageDaily, arg.RuntimeID, arg.Since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListRuntimeUsageDailyRow{}
|
||||
for rows.Next() {
|
||||
var i ListRuntimeUsageDailyRow
|
||||
if err := rows.Scan(
|
||||
&i.Date,
|
||||
&i.Provider,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ func (q *Queries) GetIssueUsageSummary(ctx context.Context, issueID pgtype.UUID)
|
||||
}
|
||||
|
||||
const getTaskUsage = `-- name: GetTaskUsage :many
|
||||
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at FROM task_usage
|
||||
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM task_usage
|
||||
WHERE task_id = $1
|
||||
ORDER BY model
|
||||
`
|
||||
@@ -69,6 +69,7 @@ func (q *Queries) GetTaskUsage(ctx context.Context, taskID pgtype.UUID) ([]TaskU
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -206,14 +207,15 @@ func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspace
|
||||
}
|
||||
|
||||
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
|
||||
ON CONFLICT (task_id, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
updated_at = now()
|
||||
`
|
||||
|
||||
type UpsertTaskUsageParams struct {
|
||||
@@ -226,6 +228,10 @@ type UpsertTaskUsageParams struct {
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
|
||||
// (migration 073) detects the row as dirty and re-aggregates its bucket.
|
||||
// Without the conflict-side bump, a correction to historical token counts
|
||||
// would never propagate to the rollup.
|
||||
func (q *Queries) UpsertTaskUsage(ctx context.Context, arg UpsertTaskUsageParams) error {
|
||||
_, err := q.db.Exec(ctx, upsertTaskUsage,
|
||||
arg.TaskID,
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
-- name: ListRuntimeUsage :many
|
||||
-- Bucket by tu.created_at (usage report time, ~= task completion time), not
|
||||
-- atq.created_at (task enqueue time), so tasks that queue one day and execute
|
||||
-- the next are attributed to the day tokens were actually produced. The since
|
||||
-- cutoff is truncated to start-of-day so `days=N` yields full calendar days.
|
||||
-- Reads from raw `task_usage`, bucketed by DATE(tu.created_at) — usage
|
||||
-- report time, ~= task completion time. Since cutoff is truncated to
|
||||
-- start-of-day so `days=N` yields full calendar days. This is the
|
||||
-- always-correct fallback path; used when USAGE_DAILY_ROLLUP_ENABLED
|
||||
-- is false (or the rollup hasn't been deployed yet).
|
||||
SELECT
|
||||
DATE(tu.created_at) AS date,
|
||||
tu.provider,
|
||||
@@ -18,6 +19,35 @@ WHERE atq.runtime_id = $1
|
||||
GROUP BY DATE(tu.created_at), tu.provider, tu.model
|
||||
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model;
|
||||
|
||||
-- name: ListRuntimeUsageDaily :many
|
||||
-- Reads from the `task_usage_daily` rollup table maintained by
|
||||
-- rollup_task_usage_daily() (scheduled every 5 min via pg_cron, or any
|
||||
-- equivalent external scheduler that calls the function). Same shape as
|
||||
-- ListRuntimeUsage above. Today's bucket may lag the raw table by up to
|
||||
-- ~10 min (5 min cron period + 5 min rollup safety lag); intentional.
|
||||
--
|
||||
-- Only used when USAGE_DAILY_ROLLUP_ENABLED is true AND deploy has
|
||||
-- verified that the rollup is fresh (see task_usage_rollup_lag_seconds
|
||||
-- helper from migration 076).
|
||||
--
|
||||
-- The PK on task_usage_daily already collapses to one row per
|
||||
-- (bucket_date, runtime_id, provider, model), but SUM/GROUP BY is kept
|
||||
-- so future schema changes (extra dimensions promoted into the table)
|
||||
-- don't silently change query semantics.
|
||||
SELECT
|
||||
bucket_date AS date,
|
||||
provider,
|
||||
model,
|
||||
SUM(input_tokens)::bigint AS input_tokens,
|
||||
SUM(output_tokens)::bigint AS output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS cache_write_tokens
|
||||
FROM task_usage_daily
|
||||
WHERE runtime_id = $1
|
||||
AND bucket_date >= DATE(DATE_TRUNC('day', @since::timestamptz))
|
||||
GROUP BY bucket_date, provider, model
|
||||
ORDER BY bucket_date DESC, provider, model;
|
||||
|
||||
-- name: GetRuntimeTaskHourlyActivity :many
|
||||
SELECT EXTRACT(HOUR FROM started_at)::int AS hour, COUNT(*)::int AS count
|
||||
FROM agent_task_queue
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
-- name: UpsertTaskUsage :exec
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
-- Bumps `updated_at` on INSERT and on conflict so the daily-rollup worker
|
||||
-- (migration 073) detects the row as dirty and re-aggregates its bucket.
|
||||
-- Without the conflict-side bump, a correction to historical token counts
|
||||
-- would never propagate to the rollup.
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, now())
|
||||
ON CONFLICT (task_id, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens;
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
updated_at = now();
|
||||
|
||||
-- name: GetTaskUsage :many
|
||||
SELECT * FROM task_usage
|
||||
|
||||
Reference in New Issue
Block a user