Files
multica/server/internal/handler/dashboard_test.go
YYClaw 614dfae884 MUL-2488 feat(timezone): Scheduling / Viewing two-layer timezone architecture (#2968)
* docs(timezone): add scheduling/viewing timezone architecture RFC

* feat(db): replace daily rollups with task_usage_hourly, add user.timezone

Migrations 100-104: add "user".timezone (Viewing tz), build the UTC
hourly task_usage_hourly rollup with its pipeline, drop the legacy
task_usage_daily / task_usage_dashboard_daily pipelines, and drop the
agent_runtime.timezone column. Report queries now slice day boundaries
at read time by the caller-supplied @tz instead of materialising in a
fixed tz. Regenerate sqlc.

* feat(server): add task_usage_hourly backfill command

Replace the two legacy backfill commands (daily / dashboard_daily) with
a single backfill_task_usage_hourly that loads historical task_usage
into the new UTC hourly rollup, sliced per workspace.

* refactor(server): resolve viewing timezone in report handlers

Report handlers resolve the Viewing tz per request (?tz query param,
then user.timezone, then UTC) and pass it to the hourly-rollup queries.
Drop the UseDailyRollup feature flags and the old raw-scan/daily-rollup
dual paths, remove the /api/usage endpoints, and stop the daemon from
reporting and the runtime handler from accepting host timezone.

* refactor(core): switch report queries to viewing timezone

API client and dashboard/runtime queries send ?tz with each report
request, the user schema/types carry the new timezone field, and the
runtime timezone field/mutation is removed.

* feat(views): add viewing timezone preference and UI

Add the useViewingTimezone hook and a Timezone setting in Preferences;
report charts and the dashboard week boundary follow the viewer tz.
Remove the runtime detail timezone editor and its locale strings.

* fix(test): update fixtures and stabilize tests for timezone refactor

The timezone architecture refactor changed several types without
updating dependent test code:

- RuntimeDevice no longer has a timezone field — drop it from the
  create-agent-dialog runtime fixture.
- User now requires a timezone field — add it to the apps/web mockUser
  fixture.
- The PreferencesTab timezone tests asserted on the async save handler
  (PATCH then store update) with a bare expect, racing the mutation's
  settle callback, and timed out querying the Select's ~600-option IANA
  list on a loaded CI runner. Wrap the assertions in waitFor and extend
  the timeout for those three tests.

* docs(timezone): document self-host migration order and trigger invariant

Add a SELF-HOST UPGRADE ORDER runbook to the backfill command's package
comment: applying migrations 100-104 in a single migrate-up drops the
legacy daily rollups before the hourly backfill runs, leaving dashboards
empty until cron catches up.

Add an INVARIANT comment on trg_atq_dirty_hourly noting that agent_id
must be added to the trigger's OF list if it ever becomes mutable,
otherwise dirty buckets for the old agent_id are silently missed.

* style(runtimes): drop trailing blank line in runtime-detail
2026-05-21 15:33:47 +08:00

1398 lines
54 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
// TestDashboardEndpoints covers the workspace-dashboard rollups:
// - daily token usage with and without project filter
// - per-agent token usage with and without project filter
// - per-agent run time
//
// Asserts that (1) tasks belonging to a project show up under the workspace
// view, (2) the project filter excludes tasks tied to issues without a
// matching project_id, and (3) run-time aggregation accumulates the
// completed_at started_at delta correctly.
func TestDashboardEndpoints(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
// Two issues: one bound to a project, one not.
var projectID string
if err := testPool.QueryRow(ctx, `
INSERT INTO project (workspace_id, title)
VALUES ($1, 'dashboard test project')
RETURNING id
`, testWorkspaceID).Scan(&projectID); err != nil {
t.Fatalf("create project: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
// issue.number is `UNIQUE (workspace_id, number)` (migration 020) and
// defaults to 0. Two inserts into the same workspace would collide on the
// default; allocate `MAX(number) + 1` per row to stay sequential and
// avoid stepping on rows other tests have left behind in the shared
// fixture workspace.
mkIssue := func(withProject bool) string {
var id string
var pid any
if withProject {
pid = projectID
}
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
VALUES (
$1, 'dashboard test', $2, 'member', $3,
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1)
)
RETURNING id
`, testWorkspaceID, testUserID, pid).Scan(&id); err != nil {
t.Fatalf("insert issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, id) })
return id
}
projectIssueID := mkIssue(true)
otherIssueID := mkIssue(false)
now := time.Now().UTC()
started := now.Add(-30 * time.Minute)
completed := started.Add(10 * time.Minute) // 600s run
mkTaskWithUsage := func(issueID string, status string, tokens int64) {
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, started_at, completed_at, created_at)
VALUES ($1, $2, $3, $4, $5, $6, now())
RETURNING id
`, agentID, issueID, runtimeID, status, started, completed).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'claude-3-5-sonnet', $2, 0, now())
`, taskID, tokens); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
}
mkTaskWithUsage(projectIssueID, "completed", 1000)
mkTaskWithUsage(otherIssueID, "completed", 500)
// All dashboard endpoints now read from task_usage_hourly (post-RFC
// Phase 3). Drive the underlying window function directly so the
// freshly inserted fixture rows are aggregated before assertions —
// in production the cron tick handles this with a 5-min lag.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup window: %v", err)
}
type dailyRow struct {
Date string `json:"date"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
}
type byAgentRow struct {
AgentID string `json:"agent_id"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
}
type runtimeRow struct {
AgentID string `json:"agent_id"`
TotalSeconds int64 `json:"total_seconds"`
TaskCount int32 `json:"task_count"`
}
// daily — workspace-wide
{
w := httptest.NewRecorder()
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1", nil))
if w.Code != http.StatusOK {
t.Fatalf("daily ws: expected 200, got %d: %s", w.Code, w.Body.String())
}
var rows []dailyRow
_ = json.NewDecoder(w.Body).Decode(&rows)
var total int64
for _, r := range rows {
if r.Model == "claude-3-5-sonnet" {
total += r.InputTokens
}
}
if total < 1500 {
t.Errorf("daily ws: expected >=1500 tokens (1000+500), got %d", total)
}
}
// daily — project-scoped
{
w := httptest.NewRecorder()
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1&project_id="+projectID, nil))
if w.Code != http.StatusOK {
t.Fatalf("daily project: expected 200, got %d: %s", w.Code, w.Body.String())
}
var rows []dailyRow
_ = json.NewDecoder(w.Body).Decode(&rows)
var total int64
for _, r := range rows {
if r.Model == "claude-3-5-sonnet" {
total += r.InputTokens
}
}
// Project filter must exclude the 500-token "other" issue. Token total
// for this project must be >= 1000 (our task) and < 1500 (would only
// reach 1500 if filter leaked).
if total < 1000 {
t.Errorf("daily project: expected >=1000 tokens, got %d", total)
}
if total >= 1500 {
t.Errorf("daily project: filter leaked — expected <1500 tokens, got %d", total)
}
}
// by-agent — project-scoped
{
w := httptest.NewRecorder()
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1&project_id="+projectID, nil))
if w.Code != http.StatusOK {
t.Fatalf("by-agent project: expected 200, got %d: %s", w.Code, w.Body.String())
}
var rows []byAgentRow
_ = json.NewDecoder(w.Body).Decode(&rows)
found := false
for _, r := range rows {
if r.AgentID == agentID && r.InputTokens >= 1000 {
found = true
}
}
if !found {
t.Errorf("by-agent project: expected agent %s with >=1000 tokens; got %v", agentID, rows)
}
}
// agent-runtime — project-scoped
{
w := httptest.NewRecorder()
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?days=1&project_id="+projectID, nil))
if w.Code != http.StatusOK {
t.Fatalf("agent-runtime: expected 200, got %d: %s", w.Code, w.Body.String())
}
var rows []runtimeRow
_ = json.NewDecoder(w.Body).Decode(&rows)
var seconds int64
var tasks int32
for _, r := range rows {
if r.AgentID == agentID {
seconds += r.TotalSeconds
tasks += r.TaskCount
}
}
if tasks < 1 {
t.Errorf("agent-runtime: expected >=1 task for agent, got %d", tasks)
}
if seconds < 600 {
t.Errorf("agent-runtime: expected >=600s (one 10-minute run), got %d", seconds)
}
}
// agent-runtime — invalid project_id rejected
{
w := httptest.NewRecorder()
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?project_id=not-a-uuid", nil))
if w.Code != http.StatusBadRequest {
t.Errorf("agent-runtime: expected 400 for invalid uuid, got %d", w.Code)
}
}
// Workspace-wide by-agent through the same rollup, just to confirm
// the no-project-filter shape matches up.
{
w := httptest.NewRecorder()
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1", nil))
if w.Code != http.StatusOK {
t.Fatalf("by-agent ws: expected 200, got %d: %s", w.Code, w.Body.String())
}
var aRows []byAgentRow
_ = json.NewDecoder(w.Body).Decode(&aRows)
var aTotal int64
for _, r := range aRows {
if r.AgentID == agentID && r.Model == "claude-3-5-sonnet" {
aTotal += r.InputTokens
}
}
if aTotal < 1500 {
t.Errorf("by-agent ws: expected >=1500 tokens across workspace, got %d", aTotal)
}
}
}
// TestDashboardUsageDailyBucketsByViewerTimezone proves the `?tz=` query
// param drives the calendar-day boundary: the same UTC instant lands under
// a different `date` for a UTC viewer vs an America/Los_Angeles viewer.
// This is the core promise of the timezone-architecture RFC.
func TestDashboardUsageDailyBucketsByViewerTimezone(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE runtime_id = $1 AND provider = 'tz-bucket-test'`, runtimeID)
})
// One bucket at 04:00 UTC two days ago. 04:00 UTC is still the
// previous evening in America/Los_Angeles (UTC-7/-8), so the UTC
// viewer and the LA viewer must see this row under different dates.
var bucketHour time.Time
if err := testPool.QueryRow(ctx, `
INSERT INTO task_usage_hourly (
bucket_hour, workspace_id, runtime_id, agent_id, project_id,
provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, event_count
)
VALUES (
((CURRENT_DATE - 2)::timestamp + interval '4 hours') AT TIME ZONE 'UTC',
$1, $2, $3, NULL, 'tz-bucket-test', 'tz-bucket-model',
999, 0, 0, 0, 1
)
ON CONFLICT ON CONSTRAINT uq_task_usage_hourly_key DO UPDATE
SET input_tokens = EXCLUDED.input_tokens
RETURNING bucket_hour
`, testWorkspaceID, runtimeID, agentID).Scan(&bucketHour); err != nil {
t.Fatalf("seed hourly row: %v", err)
}
utcDate := bucketHour.UTC().Format("2006-01-02")
laLoc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
t.Fatalf("load LA location: %v", err)
}
laDate := bucketHour.In(laLoc).Format("2006-01-02")
if utcDate == laDate {
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
}
readDate := func(tz string) string {
w := httptest.NewRecorder()
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=10&tz="+tz, nil))
if w.Code != http.StatusOK {
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
}
var rows []struct {
Date string `json:"date"`
Model string `json:"model"`
}
_ = json.NewDecoder(w.Body).Decode(&rows)
for _, r := range rows {
if r.Model == "tz-bucket-model" {
return r.Date
}
}
t.Fatalf("tz=%s: tz-bucket-model row not found in %v", tz, rows)
return ""
}
if got := readDate("UTC"); got != utcDate {
t.Errorf("UTC viewer: expected date %s, got %s", utcDate, got)
}
if got := readDate("America/Los_Angeles"); got != laDate {
t.Errorf("LA viewer: expected date %s, got %s", laDate, got)
}
}
// TestDashboardRunTimeDailyBucketsByViewerTimezone proves the `?tz=` query
// param drives the calendar-day boundary of the Time / Tasks dashboard tab:
// GetDashboardRunTimeDaily applies `@tz` to `completed_at AT TIME ZONE @tz`
// on agent_task_queue. A task completed at 04:00 UTC is still the previous
// evening in America/Los_Angeles (UTC-7/-8), so the LA viewer must see the
// row under the prior calendar date relative to a UTC viewer.
func TestDashboardRunTimeDailyBucketsByViewerTimezone(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
// Issue tagged so we can clean up just this test's rows.
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'runtime-daily tz test', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
// completed_at at 04:00 UTC two days ago — still the prior evening in LA.
// started_at 10 minutes earlier so the run has a non-zero duration.
var completedAt time.Time
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, started_at, completed_at, created_at)
VALUES (
$1, $2, $3, 'completed',
((CURRENT_DATE - 2)::timestamp + interval '3 hours 50 minutes') AT TIME ZONE 'UTC',
((CURRENT_DATE - 2)::timestamp + interval '4 hours') AT TIME ZONE 'UTC',
now()
)
RETURNING id, completed_at
`, agentID, issueID, runtimeID).Scan(&taskID, &completedAt); err != nil {
t.Fatalf("insert completed task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
utcDate := completedAt.UTC().Format("2006-01-02")
laLoc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
t.Fatalf("load LA location: %v", err)
}
laDate := completedAt.In(laLoc).Format("2006-01-02")
if utcDate == laDate {
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
}
readRow := func(tz string) (string, int64, int32) {
w := httptest.NewRecorder()
testHandler.GetDashboardRunTimeDaily(w, newRequest("GET", "/api/dashboard/runtime/daily?days=10&tz="+tz, nil))
if w.Code != http.StatusOK {
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
}
var rows []struct {
Date string `json:"date"`
TotalSeconds int64 `json:"total_seconds"`
TaskCount int32 `json:"task_count"`
}
_ = json.NewDecoder(w.Body).Decode(&rows)
want := utcDate
if tz == "America/Los_Angeles" {
want = laDate
}
for _, r := range rows {
if r.Date == want {
return r.Date, r.TotalSeconds, r.TaskCount
}
}
t.Fatalf("tz=%s: no row on expected date %s in %v", tz, want, rows)
return "", 0, 0
}
if date, secs, count := readRow("UTC"); date != utcDate || count < 1 || secs < 600 {
t.Errorf("UTC viewer: got date=%s seconds=%d count=%d, want date=%s seconds>=600 count>=1",
date, secs, count, utcDate)
}
if date, secs, count := readRow("America/Los_Angeles"); date != laDate || count < 1 || secs < 600 {
t.Errorf("LA viewer: got date=%s seconds=%d count=%d, want date=%s seconds>=600 count>=1",
date, secs, count, laDate)
}
}
// TestRollupTaskUsageHourlyIdempotentAndWatermark covers two pipeline
// invariants the deleted runtime_rollup_test.go used to guard for the
// legacy daily rollup: (1) re-running the window function over the same
// range produces identical totals, and (2) the cron entry point advances
// the rollup-state watermark and clears last_error.
func TestRollupTaskUsageHourlyIdempotentAndWatermark(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID, taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'rollup idempotency', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', now() - interval '20 minutes') RETURNING id
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'rollup-idem-model', 3333, 0, now() - interval '20 minutes')
`, taskID); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
readTotal := func() int64 {
var total int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE runtime_id = $1 AND model = 'rollup-idem-model'
`, runtimeID).Scan(&total); err != nil {
t.Fatalf("read total: %v", err)
}
return total
}
// Idempotency: two passes over the same range must not double-count.
for i := 0; i < 2; i++ {
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup pass %d: %v", i, err)
}
}
if got := readTotal(); got != 3333 {
t.Errorf("idempotency: expected exactly 3333 tokens after two passes, got %d", got)
}
// Watermark advance: park the watermark an hour back, run the cron
// entry, confirm it moved forward to ~now()-5min with no error.
if _, err := testPool.Exec(ctx, `
UPDATE task_usage_hourly_rollup_state
SET watermark_at = now() - interval '1 hour', last_error = 'stale'
WHERE id = 1
`); err != nil {
t.Fatalf("park watermark: %v", err)
}
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
t.Fatalf("rollup_task_usage_hourly: %v", err)
}
var watermarkAge time.Duration
var lastError *string
var ageSeconds float64
if err := testPool.QueryRow(ctx, `
SELECT EXTRACT(EPOCH FROM (now() - watermark_at)), last_error
FROM task_usage_hourly_rollup_state WHERE id = 1
`).Scan(&ageSeconds, &lastError); err != nil {
t.Fatalf("read rollup state: %v", err)
}
watermarkAge = time.Duration(ageSeconds) * time.Second
// Watermark should sit at now()-5min (the cron upper bound), well
// short of the 1-hour-back value we parked it at.
if watermarkAge > 10*time.Minute {
t.Errorf("watermark did not advance: still %s behind now()", watermarkAge)
}
if lastError != nil {
t.Errorf("expected last_error cleared, got %q", *lastError)
}
}
// TestRollupTaskUsageHourlyReassignBetweenRuntimes ports the invalidation
// coverage the deleted runtime_rollup_test.go held for the legacy daily
// rollup. Reassigning a task between runtimes (the runtime-merge path) must
// move its usage: the `trg_atq_dirty_hourly` trigger enqueues both the old
// and new runtime buckets, and the next window run drains the queue,
// empties the old bucket, and fills the new one. Without this the rollup
// keeps attributing usage to the merged-away runtime.
func TestRollupTaskUsageHourlyReassignBetweenRuntimes(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
oldRuntimeID := handlerTestRuntimeID(t)
var newRuntimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'reassign-target-hourly', 'cloud', 'reassign-target-hourly', 'online', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id
`, testWorkspaceID).Scan(&newRuntimeID); err != nil {
t.Fatalf("create dest runtime: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID) })
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'reassign hourly test', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
usageAt := time.Date(2021, 3, 14, 1, 0, 0, 0, time.UTC)
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
`, agentID, issueID, oldRuntimeID, usageAt).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', 'm-reassign-hourly', 700, 70, $2, $2)
`, taskID, usageAt); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'm-reassign-hourly'`)
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`)
})
runWindow := func(label string) {
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('-infinity'::timestamptz, 'infinity'::timestamptz)
`); err != nil {
t.Fatalf("%s: %v", label, err)
}
}
runtimeTotal := func(rt string) int64 {
var total int64
testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE runtime_id = $1 AND model = 'm-reassign-hourly'
`, rt).Scan(&total)
return total
}
runWindow("initial rollup")
if old, new := runtimeTotal(oldRuntimeID), runtimeTotal(newRuntimeID); old != 700 || new != 0 {
t.Fatalf("initial: expected old=700 new=0, got old=%d new=%d", old, new)
}
// Reassignment fires trg_atq_dirty_hourly, which enqueues the OLD and
// NEW runtime buckets (same bucket_hour, two runtime_ids).
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, newRuntimeID, taskID); err != nil {
t.Fatalf("reassign task: %v", err)
}
var dirtyCount int
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`).Scan(&dirtyCount)
if dirtyCount != 2 {
t.Fatalf("expected 2 dirty entries (old+new runtime), got %d", dirtyCount)
}
runWindow("rollup after reassign")
if old, new := runtimeTotal(oldRuntimeID), runtimeTotal(newRuntimeID); old != 0 || new != 700 {
t.Fatalf("after reassign: expected old=0 new=700, got old=%d new=%d", old, new)
}
// The window function must drain every queue row whose enqueued_at
// predates p_to — a regression on that DELETE pins recomputes forever.
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`).Scan(&dirtyCount)
if dirtyCount != 0 {
t.Errorf("expected dirty queue drained, got %d entries", dirtyCount)
}
}
// TestRollupTaskUsageHourlyWorkspaceMismatch constructs an atq row whose
// agent.workspace_id differs from issue.workspace_id and verifies the
// hourly rollup resolves workspace_id consistently from `agent` across the
// trigger, dirty_from_updates, and the recompute join. If any path leaked
// back to issue.workspace_id the dirty key would miss the recompute join
// and the bucket would be dropped or mis-attributed across tenants. The
// schema does not enforce the two workspace_ids match, so this canary
// keeps the alignment honest.
func TestRollupTaskUsageHourlyWorkspaceMismatch(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var foreignWorkspaceID string
if err := testPool.QueryRow(ctx, `
INSERT INTO workspace (name, slug)
VALUES ('ws-mismatch-hourly', 'ws-mismatch-hourly-' || gen_random_uuid()::text)
RETURNING id
`).Scan(&foreignWorkspaceID); err != nil {
t.Fatalf("create foreign workspace: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM workspace WHERE id = $1`, foreignWorkspaceID) })
var foreignRuntimeID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_runtime (
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
)
VALUES ($1, NULL, 'mismatch-rt-hourly', 'cloud', 'mismatch-rt-hourly', 'online', '{}'::jsonb, '{}'::jsonb, now())
RETURNING id
`, foreignWorkspaceID).Scan(&foreignRuntimeID); err != nil {
t.Fatalf("create foreign runtime: %v", err)
}
var foreignAgentID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent (
workspace_id, name, description, runtime_mode, runtime_config,
runtime_id, visibility, max_concurrent_tasks, owner_id,
instructions, custom_env, custom_args, mcp_config
)
VALUES ($1, 'mismatch-agent-hourly', '', 'cloud', '{}'::jsonb, $2, 'private', 1, $3, '', '{}'::jsonb, '[]'::jsonb, '[]'::jsonb)
RETURNING id
`, foreignWorkspaceID, foreignRuntimeID, testUserID).Scan(&foreignAgentID); err != nil {
t.Fatalf("create foreign agent: %v", err)
}
// Issue lives in the primary test workspace; the agent lives in the
// foreign one — so agent.workspace_id != issue.workspace_id.
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'mismatch hourly test', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
usageAt := time.Date(2021, 9, 9, 1, 0, 0, 0, time.UTC)
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
`, foreignAgentID, issueID, foreignRuntimeID, usageAt).Scan(&taskID); err != nil {
t.Fatalf("insert atq: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
VALUES ($1, 'claude', 'm-mismatch-hourly', 333, 33, $2, $2)
`, taskID, usageAt); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'm-mismatch-hourly'`)
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'm-mismatch-hourly'`)
})
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('-infinity'::timestamptz, 'infinity'::timestamptz)
`); err != nil {
t.Fatalf("rollup: %v", err)
}
wsTotal := func(ws string) int64 {
var total int64
testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND model = 'm-mismatch-hourly'
`, ws).Scan(&total)
return total
}
if got := wsTotal(foreignWorkspaceID); got != 333 {
t.Fatalf("expected foreign workspace bucket = 333 (resolved from agent), got %d", got)
}
if got := wsTotal(testWorkspaceID); got != 0 {
t.Errorf("expected primary workspace bucket = 0 (issue.workspace_id must not leak), got %d", got)
}
}
// TestDashboardRollupReattributesOnProjectChange verifies the trigger that
// fires on `UPDATE issue SET project_id` enqueues both old + new project
// buckets so the next rollup tick re-attributes the affected tokens.
// Uses the rollup window function directly to drain the dirty queue,
// then asserts the rollup table reflects the new project_id.
func TestDashboardRollupReattributesOnProjectChange(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
mkProject := func(name string) string {
var id string
if err := testPool.QueryRow(ctx, `
INSERT INTO project (workspace_id, title) VALUES ($1, $2) RETURNING id
`, testWorkspaceID, name).Scan(&id); err != nil {
t.Fatalf("create project: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, id) })
return id
}
projectA := mkProject("dashboard reattr A")
projectB := mkProject("dashboard reattr B")
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
VALUES ($1, 'reattr issue', $2, 'member', $3,
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID, projectA).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'claude-3-5-sonnet', 7777, 0, now())
`, taskID); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
// First rollup pass: tokens attributed to project A.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup A: %v", err)
}
var aTokens int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
`, testWorkspaceID, projectA, agentID).Scan(&aTokens); err != nil {
t.Fatalf("read A rollup: %v", err)
}
if aTokens < 7777 {
t.Fatalf("project A: expected >=7777 tokens after first rollup, got %d", aTokens)
}
// Move the issue to project B. Trigger enqueues both A and B buckets.
if _, err := testPool.Exec(ctx, `UPDATE issue SET project_id = $1 WHERE id = $2`, projectB, issueID); err != nil {
t.Fatalf("reassign project: %v", err)
}
// Second rollup pass: A bucket drops to zero (deleted_empty), B
// bucket gets the tokens.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup B: %v", err)
}
var bTokens, aTokensAfter int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
`, testWorkspaceID, projectB, agentID).Scan(&bTokens); err != nil {
t.Fatalf("read B rollup: %v", err)
}
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
`, testWorkspaceID, projectA, agentID).Scan(&aTokensAfter); err != nil {
t.Fatalf("read A rollup after move: %v", err)
}
if bTokens < 7777 {
t.Errorf("project B: expected >=7777 tokens after reassign + rollup, got %d", bTokens)
}
if aTokensAfter != 0 {
t.Errorf("project A: expected 0 tokens after reassign + rollup, got %d", aTokensAfter)
}
}
// TestDashboardRollupClearsOnIssueDelete verifies that deleting an issue
// (which cascades to its tasks and task_usage rows) also clears the
// dashboard rollup row attributed to that issue's project. The
// `issue BEFORE DELETE` trigger has to fire ahead of the cascade so the
// dirty queue captures the original project_id while the issue row is
// still readable.
func TestDashboardRollupClearsOnIssueDelete(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var projectID string
if err := testPool.QueryRow(ctx, `
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard cascade test') RETURNING id
`, testWorkspaceID).Scan(&projectID); err != nil {
t.Fatalf("create project: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
VALUES ($1, 'cascade issue', $2, 'member', $3,
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
// No t.Cleanup deleting the issue — that's what the test exercises.
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
// Don't bother cleaning up taskID either; cascade will take it.
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'claude-3-5-sonnet', 4242, 0, now())
`, taskID); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
// First rollup: project bucket exists with 4242 tokens.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup before delete: %v", err)
}
var before int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2
`, testWorkspaceID, projectID).Scan(&before); err != nil {
t.Fatalf("read before: %v", err)
}
if before < 4242 {
t.Fatalf("project bucket: expected >=4242 tokens before delete, got %d", before)
}
// Delete the issue. Cascade removes atq + task_usage. The issue
// BEFORE DELETE trigger should have enqueued the project bucket
// before the cascade started.
if _, err := testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID); err != nil {
t.Fatalf("delete issue: %v", err)
}
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup after delete: %v", err)
}
var after int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2
`, testWorkspaceID, projectID).Scan(&after); err != nil {
t.Fatalf("read after: %v", err)
}
if after != 0 {
t.Errorf("project bucket: expected 0 tokens after issue delete, got %d", after)
}
}
// TestDashboardRollupReattributesOnLinkTaskToIssue verifies that
// `LinkTaskToIssue` (which UPDATEs `agent_task_queue.issue_id` from NULL
// to a real issue id) re-attributes existing rollup rows from the
// no-project bucket to the linked issue's project bucket. Mirrors the
// quick-create flow in `service.task.LinkTaskToIssue`.
func TestDashboardRollupReattributesOnLinkTaskToIssue(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
// Quick-create task: issue_id is NULL at creation time.
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, context, created_at)
VALUES ($1, NULL, $2, 'completed', '{}'::jsonb, now()) RETURNING id
`, agentID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert quick-create task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'claude-3-5-sonnet', 1234, 0, now())
`, taskID); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
// First rollup: tokens attributed to the no-project bucket (NULL).
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup pre-link: %v", err)
}
var nullBefore int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
`, testWorkspaceID, agentID).Scan(&nullBefore); err != nil {
t.Fatalf("read NULL bucket pre-link: %v", err)
}
if nullBefore < 1234 {
t.Fatalf("NULL bucket: expected >=1234 tokens pre-link, got %d", nullBefore)
}
// Create a project + issue, then run the same UPDATE LinkTaskToIssue
// uses. The atq trigger should enqueue OLD (NULL project) AND NEW
// (the project's id) so the next rollup tick zeroes the NULL bucket
// and populates the project bucket.
var projectID string
if err := testPool.QueryRow(ctx, `
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard link test') RETURNING id
`, testWorkspaceID).Scan(&projectID); err != nil {
t.Fatalf("create project: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
VALUES ($1, 'link test issue', $2, 'member', $3,
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
// Mirror LinkTaskToIssue's UPDATE shape.
if _, err := testPool.Exec(ctx, `
UPDATE agent_task_queue SET issue_id = $1 WHERE id = $2 AND issue_id IS NULL
`, issueID, taskID); err != nil {
t.Fatalf("link task to issue: %v", err)
}
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup post-link: %v", err)
}
var projectAfter, nullAfter int64
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
`, testWorkspaceID, projectID, agentID).Scan(&projectAfter); err != nil {
t.Fatalf("read project bucket post-link: %v", err)
}
if err := testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
`, testWorkspaceID, agentID).Scan(&nullAfter); err != nil {
t.Fatalf("read NULL bucket post-link: %v", err)
}
if projectAfter < 1234 {
t.Errorf("project bucket: expected >=1234 tokens after link, got %d", projectAfter)
}
if nullAfter != 0 {
t.Errorf("NULL bucket: expected 0 tokens after link, got %d", nullAfter)
}
}
// TestPruneTaskUsageHourlyDirty covers the dirty-queue TTL. Both the RFC
// (§7.1) and the rollup-pipeline migration call this THE most-easily-missed correctness
// requirement of the hourly pipeline: without the prune, a row that escapes
// the per-tick drain (crash mid-tick, worker paused during an incident)
// pins its bucket's recompute forever and the queue grows unbounded.
func TestPruneTaskUsageHourlyDirty(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
// task_usage_hourly_dirty carries no FKs (it is a queue), so synthetic
// UUIDs are fine. `provider` tags the rows for isolated cleanup.
const tag = "ttl-prune-test"
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE provider = $1`, tag)
})
seed := func(model, age string) {
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage_hourly_dirty (
bucket_hour, workspace_id, runtime_id, agent_id, project_id,
provider, model, enqueued_at
)
VALUES (
date_trunc('hour', now()), gen_random_uuid(), gen_random_uuid(),
gen_random_uuid(), NULL, $1, $2, now() - $3::interval
)
`, tag, model, age); err != nil {
t.Fatalf("seed dirty row %s: %v", model, err)
}
}
countModel := func(model string) int {
var n int
testPool.QueryRow(ctx,
`SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE provider = $1 AND model = $2`,
tag, model,
).Scan(&n)
return n
}
seed("stale-row", "8 days")
seed("fresh-row", "1 day")
// Default 7-day retention: the 8-day row goes, the 1-day row stays.
var pruned int64
if err := testPool.QueryRow(ctx, `SELECT prune_task_usage_hourly_dirty()`).Scan(&pruned); err != nil {
t.Fatalf("prune (default retention): %v", err)
}
if pruned < 1 {
t.Errorf("expected prune to report at least the one stale row deleted, got %d", pruned)
}
if got := countModel("stale-row"); got != 0 {
t.Errorf("default prune: expected stale row deleted, still %d present", got)
}
if got := countModel("fresh-row"); got != 1 {
t.Errorf("default prune: expected fresh row kept, got %d", got)
}
// An explicit retention shorter than the surviving row's age drops it.
if _, err := testPool.Exec(ctx, `SELECT prune_task_usage_hourly_dirty(interval '12 hours')`); err != nil {
t.Fatalf("prune (12h retention): %v", err)
}
if got := countModel("fresh-row"); got != 0 {
t.Errorf("12h-retention prune: expected fresh row deleted, still %d present", got)
}
// The cron entry folds the prune in so operators do
// not need a second scheduled job. A single tick must drop a stale row.
seed("cron-fold-row", "9 days")
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
t.Fatalf("rollup_task_usage_hourly: %v", err)
}
if got := countModel("cron-fold-row"); got != 0 {
t.Errorf("cron entry did not fold in the prune: stale row still present (%d)", got)
}
}
// TestRollupTaskUsageHourlyCapsWindowAtOneDay covers the catch-up cap
// in rollup_task_usage_hourly(): when the watermark has
// fallen far behind (worker paused for an incident or a migration freeze),
// a single tick must advance it by at most one day, so a multi-week backlog
// drains in bounded steps instead of one giant statement holding advisory
// lock 4246. The existing watermark test only parks the watermark one hour
// back, so the cap itself is never exercised there.
func TestRollupTaskUsageHourlyCapsWindowAtOneDay(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
// Other tests drive rollup_task_usage_hourly_window directly and never
// read the watermark; the idempotency test parks it itself. Restore to
// now() so nothing downstream observes a stale value.
t.Cleanup(func() {
testPool.Exec(ctx,
`UPDATE task_usage_hourly_rollup_state SET watermark_at = now(), last_error = NULL WHERE id = 1`)
})
park := func(behind string) {
if _, err := testPool.Exec(ctx, `
UPDATE task_usage_hourly_rollup_state
SET watermark_at = now() - $1::interval, last_error = NULL
WHERE id = 1
`, behind); err != nil {
t.Fatalf("park watermark: %v", err)
}
}
ageDays := func() float64 {
var sec float64
if err := testPool.QueryRow(ctx, `
SELECT EXTRACT(EPOCH FROM (now() - watermark_at))
FROM task_usage_hourly_rollup_state WHERE id = 1
`).Scan(&sec); err != nil {
t.Fatalf("read watermark: %v", err)
}
return sec / 86400
}
tick := func(label string) {
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
t.Fatalf("%s: %v", label, err)
}
}
// Park 3 days back. One tick advances by exactly one day (v_from + 1d,
// well short of now()-5min), leaving the watermark ~2 days behind.
park("3 days")
tick("tick 1")
if age := ageDays(); age < 1.9 || age > 2.1 {
t.Fatalf("after one tick: expected watermark ~2 days behind, got %.3f days", age)
}
// A second tick advances another bounded day → ~1 day behind.
tick("tick 2")
if age := ageDays(); age < 0.9 || age > 1.1 {
t.Fatalf("after two ticks: expected watermark ~1 day behind, got %.3f days", age)
}
// Once within a day of now, the tick snaps the watermark to now()-5min
// (LEAST picks the now bound) rather than taking a further fixed day.
tick("tick 3")
if age := ageDays(); age > 0.02 {
t.Fatalf("after catch-up: expected watermark within minutes of now, got %.3f days", age)
}
}
// TestDashboardUsageDailyCrossMidnightFullPipeline runs the WHOLE timezone
// pipeline end to end: insert a raw `task_usage` row near UTC midnight →
// run `rollup_task_usage_hourly_window` to bucket it → call
// GetDashboardUsageDaily with a non-UTC viewer tz. It asserts the tokens
// land on the viewer's correct calendar day and NOT on the UTC day.
//
// This is the #2822 bug class the RFC exists to prevent. The existing
// TestDashboardUsageDailyBucketsByViewerTimezone seeds a pre-built
// task_usage_hourly row and only exercises the SQL read path; here the
// row travels from raw task_usage through the rollup, so a regression in
// task_usage_hour_bucket or the recompute join is also caught.
func TestDashboardUsageDailyCrossMidnightFullPipeline(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'cross-midnight pipeline test', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
// Raw task_usage at 00:30 UTC two days ago — genuinely near UTC
// midnight. 00:30 UTC is still the PRIOR evening (~16:30/17:30) in
// America/Los_Angeles (UTC-7/-8), so the UTC viewer and the LA viewer
// must see this row under different calendar days. Using CURRENT_DATE
// keeps the row inside the days=10 window without a fixed-date drift.
var usageAt time.Time
if err := testPool.QueryRow(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES (
$1, 'claude', 'cross-midnight-model', 8888, 0,
((CURRENT_DATE - 2)::timestamp + interval '30 minutes') AT TIME ZONE 'UTC'
)
RETURNING created_at
`, taskID).Scan(&usageAt); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'cross-midnight-model'`)
})
// Run the rollup so the raw row is aggregated into task_usage_hourly.
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("rollup window: %v", err)
}
utcDate := usageAt.UTC().Format("2006-01-02")
laLoc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
t.Fatalf("load LA location: %v", err)
}
laDate := usageAt.In(laLoc).Format("2006-01-02")
if utcDate == laDate {
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
}
readDate := func(tz string) string {
w := httptest.NewRecorder()
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=10&tz="+tz, nil))
if w.Code != http.StatusOK {
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
}
var rows []struct {
Date string `json:"date"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
}
_ = json.NewDecoder(w.Body).Decode(&rows)
for _, r := range rows {
if r.Model == "cross-midnight-model" {
if r.InputTokens != 8888 {
t.Errorf("tz=%s: expected 8888 tokens, got %d", tz, r.InputTokens)
}
return r.Date
}
}
t.Fatalf("tz=%s: cross-midnight-model row not found in %v", tz, rows)
return ""
}
if got := readDate("UTC"); got != utcDate {
t.Errorf("UTC viewer: expected date %s, got %s", utcDate, got)
}
if got := readDate("America/Los_Angeles"); got != laDate {
t.Errorf("LA viewer: expected date %s, got %s; row must NOT land on the UTC day %s",
laDate, got, utcDate)
}
}
// TestRollupTaskUsageHourlyConvergesOnTaskUsageDelete covers the
// `trg_tu_dirty_hourly` trigger — a BEFORE DELETE trigger on task_usage.
// Migration 102 notes it has no production callers today and exists purely
// as defensive convergence guard, so a single minimal test is enough:
// seed a task_usage row, roll it up, DELETE the task_usage row directly,
// roll up again, and assert the hourly bucket is recomputed down to zero.
// Without the trigger the deleted row's bucket would never be re-enqueued.
func TestRollupTaskUsageHourlyConvergesOnTaskUsageDelete(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
var runtimeID, agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
VALUES ($1, 'tu-delete trigger test', $2, 'member',
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', now() - interval '30 minutes') RETURNING id
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
var usageID string
if err := testPool.QueryRow(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'tu-delete-model', 5050, 0, now() - interval '30 minutes')
RETURNING id
`, taskID).Scan(&usageID); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'tu-delete-model'`)
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'tu-delete-model'`)
})
bucketTotal := func() int64 {
var total int64
testPool.QueryRow(ctx, `
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
WHERE runtime_id = $1 AND model = 'tu-delete-model'
`, runtimeID).Scan(&total)
return total
}
runWindow := func(label string) {
if _, err := testPool.Exec(ctx, `
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
`); err != nil {
t.Fatalf("%s: %v", label, err)
}
}
runWindow("initial rollup")
if got := bucketTotal(); got != 5050 {
t.Fatalf("initial: expected bucket = 5050, got %d", got)
}
// Delete the task_usage row directly — fires trg_tu_dirty_hourly,
// which enqueues the bucket on task_usage_hourly_dirty.
if _, err := testPool.Exec(ctx, `DELETE FROM task_usage WHERE id = $1`, usageID); err != nil {
t.Fatalf("delete task_usage: %v", err)
}
var dirtyCount int
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'tu-delete-model'`).Scan(&dirtyCount)
if dirtyCount != 1 {
t.Fatalf("expected 1 dirty entry from task_usage DELETE trigger, got %d", dirtyCount)
}
runWindow("rollup after delete")
if got := bucketTotal(); got != 0 {
t.Errorf("after delete: expected bucket recomputed to 0, got %d", got)
}
}