mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
* docs(timezone): add scheduling/viewing timezone architecture RFC * feat(db): replace daily rollups with task_usage_hourly, add user.timezone Migrations 100-104: add "user".timezone (Viewing tz), build the UTC hourly task_usage_hourly rollup with its pipeline, drop the legacy task_usage_daily / task_usage_dashboard_daily pipelines, and drop the agent_runtime.timezone column. Report queries now slice day boundaries at read time by the caller-supplied @tz instead of materialising in a fixed tz. Regenerate sqlc. * feat(server): add task_usage_hourly backfill command Replace the two legacy backfill commands (daily / dashboard_daily) with a single backfill_task_usage_hourly that loads historical task_usage into the new UTC hourly rollup, sliced per workspace. * refactor(server): resolve viewing timezone in report handlers Report handlers resolve the Viewing tz per request (?tz query param, then user.timezone, then UTC) and pass it to the hourly-rollup queries. Drop the UseDailyRollup feature flags and the old raw-scan/daily-rollup dual paths, remove the /api/usage endpoints, and stop the daemon from reporting and the runtime handler from accepting host timezone. * refactor(core): switch report queries to viewing timezone API client and dashboard/runtime queries send ?tz with each report request, the user schema/types carry the new timezone field, and the runtime timezone field/mutation is removed. * feat(views): add viewing timezone preference and UI Add the useViewingTimezone hook and a Timezone setting in Preferences; report charts and the dashboard week boundary follow the viewer tz. Remove the runtime detail timezone editor and its locale strings. * fix(test): update fixtures and stabilize tests for timezone refactor The timezone architecture refactor changed several types without updating dependent test code: - RuntimeDevice no longer has a timezone field — drop it from the create-agent-dialog runtime fixture. - User now requires a timezone field — add it to the apps/web mockUser fixture. - The PreferencesTab timezone tests asserted on the async save handler (PATCH then store update) with a bare expect, racing the mutation's settle callback, and timed out querying the Select's ~600-option IANA list on a loaded CI runner. Wrap the assertions in waitFor and extend the timeout for those three tests. * docs(timezone): document self-host migration order and trigger invariant Add a SELF-HOST UPGRADE ORDER runbook to the backfill command's package comment: applying migrations 100-104 in a single migrate-up drops the legacy daily rollups before the hourly backfill runs, leaving dashboards empty until cron catches up. Add an INVARIANT comment on trg_atq_dirty_hourly noting that agent_id must be added to the trigger's OF list if it ever becomes mutable, otherwise dirty buckets for the old agent_id are silently missed. * style(runtimes): drop trailing blank line in runtime-detail
1398 lines
54 KiB
Go
1398 lines
54 KiB
Go
package handler
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"net/http"
|
||
"net/http/httptest"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// TestDashboardEndpoints covers the workspace-dashboard rollups:
|
||
// - daily token usage with and without project filter
|
||
// - per-agent token usage with and without project filter
|
||
// - per-agent run time
|
||
//
|
||
// Asserts that (1) tasks belonging to a project show up under the workspace
|
||
// view, (2) the project filter excludes tasks tied to issues without a
|
||
// matching project_id, and (3) run-time aggregation accumulates the
|
||
// completed_at − started_at delta correctly.
|
||
func TestDashboardEndpoints(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1
|
||
`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
|
||
`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
// Two issues: one bound to a project, one not.
|
||
var projectID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO project (workspace_id, title)
|
||
VALUES ($1, 'dashboard test project')
|
||
RETURNING id
|
||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||
t.Fatalf("create project: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||
|
||
// issue.number is `UNIQUE (workspace_id, number)` (migration 020) and
|
||
// defaults to 0. Two inserts into the same workspace would collide on the
|
||
// default; allocate `MAX(number) + 1` per row to stay sequential and
|
||
// avoid stepping on rows other tests have left behind in the shared
|
||
// fixture workspace.
|
||
mkIssue := func(withProject bool) string {
|
||
var id string
|
||
var pid any
|
||
if withProject {
|
||
pid = projectID
|
||
}
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||
VALUES (
|
||
$1, 'dashboard test', $2, 'member', $3,
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1)
|
||
)
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID, pid).Scan(&id); err != nil {
|
||
t.Fatalf("insert issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, id) })
|
||
return id
|
||
}
|
||
projectIssueID := mkIssue(true)
|
||
otherIssueID := mkIssue(false)
|
||
|
||
now := time.Now().UTC()
|
||
started := now.Add(-30 * time.Minute)
|
||
completed := started.Add(10 * time.Minute) // 600s run
|
||
|
||
mkTaskWithUsage := func(issueID string, status string, tokens int64) {
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, started_at, completed_at, created_at)
|
||
VALUES ($1, $2, $3, $4, $5, $6, now())
|
||
RETURNING id
|
||
`, agentID, issueID, runtimeID, status, started, completed).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'claude-3-5-sonnet', $2, 0, now())
|
||
`, taskID, tokens); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
}
|
||
|
||
mkTaskWithUsage(projectIssueID, "completed", 1000)
|
||
mkTaskWithUsage(otherIssueID, "completed", 500)
|
||
|
||
// All dashboard endpoints now read from task_usage_hourly (post-RFC
|
||
// Phase 3). Drive the underlying window function directly so the
|
||
// freshly inserted fixture rows are aggregated before assertions —
|
||
// in production the cron tick handles this with a 5-min lag.
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup window: %v", err)
|
||
}
|
||
|
||
type dailyRow struct {
|
||
Date string `json:"date"`
|
||
Model string `json:"model"`
|
||
InputTokens int64 `json:"input_tokens"`
|
||
}
|
||
type byAgentRow struct {
|
||
AgentID string `json:"agent_id"`
|
||
Model string `json:"model"`
|
||
InputTokens int64 `json:"input_tokens"`
|
||
}
|
||
type runtimeRow struct {
|
||
AgentID string `json:"agent_id"`
|
||
TotalSeconds int64 `json:"total_seconds"`
|
||
TaskCount int32 `json:"task_count"`
|
||
}
|
||
|
||
// daily — workspace-wide
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1", nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("daily ws: expected 200, got %d: %s", w.Code, w.Body.String())
|
||
}
|
||
var rows []dailyRow
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
var total int64
|
||
for _, r := range rows {
|
||
if r.Model == "claude-3-5-sonnet" {
|
||
total += r.InputTokens
|
||
}
|
||
}
|
||
if total < 1500 {
|
||
t.Errorf("daily ws: expected >=1500 tokens (1000+500), got %d", total)
|
||
}
|
||
}
|
||
|
||
// daily — project-scoped
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=1&project_id="+projectID, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("daily project: expected 200, got %d: %s", w.Code, w.Body.String())
|
||
}
|
||
var rows []dailyRow
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
var total int64
|
||
for _, r := range rows {
|
||
if r.Model == "claude-3-5-sonnet" {
|
||
total += r.InputTokens
|
||
}
|
||
}
|
||
// Project filter must exclude the 500-token "other" issue. Token total
|
||
// for this project must be >= 1000 (our task) and < 1500 (would only
|
||
// reach 1500 if filter leaked).
|
||
if total < 1000 {
|
||
t.Errorf("daily project: expected >=1000 tokens, got %d", total)
|
||
}
|
||
if total >= 1500 {
|
||
t.Errorf("daily project: filter leaked — expected <1500 tokens, got %d", total)
|
||
}
|
||
}
|
||
|
||
// by-agent — project-scoped
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1&project_id="+projectID, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("by-agent project: expected 200, got %d: %s", w.Code, w.Body.String())
|
||
}
|
||
var rows []byAgentRow
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
found := false
|
||
for _, r := range rows {
|
||
if r.AgentID == agentID && r.InputTokens >= 1000 {
|
||
found = true
|
||
}
|
||
}
|
||
if !found {
|
||
t.Errorf("by-agent project: expected agent %s with >=1000 tokens; got %v", agentID, rows)
|
||
}
|
||
}
|
||
|
||
// agent-runtime — project-scoped
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?days=1&project_id="+projectID, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("agent-runtime: expected 200, got %d: %s", w.Code, w.Body.String())
|
||
}
|
||
var rows []runtimeRow
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
var seconds int64
|
||
var tasks int32
|
||
for _, r := range rows {
|
||
if r.AgentID == agentID {
|
||
seconds += r.TotalSeconds
|
||
tasks += r.TaskCount
|
||
}
|
||
}
|
||
if tasks < 1 {
|
||
t.Errorf("agent-runtime: expected >=1 task for agent, got %d", tasks)
|
||
}
|
||
if seconds < 600 {
|
||
t.Errorf("agent-runtime: expected >=600s (one 10-minute run), got %d", seconds)
|
||
}
|
||
}
|
||
|
||
// agent-runtime — invalid project_id rejected
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardAgentRunTime(w, newRequest("GET", "/api/dashboard/agent-runtime?project_id=not-a-uuid", nil))
|
||
if w.Code != http.StatusBadRequest {
|
||
t.Errorf("agent-runtime: expected 400 for invalid uuid, got %d", w.Code)
|
||
}
|
||
}
|
||
|
||
// Workspace-wide by-agent through the same rollup, just to confirm
|
||
// the no-project-filter shape matches up.
|
||
{
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageByAgent(w, newRequest("GET", "/api/dashboard/usage/by-agent?days=1", nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("by-agent ws: expected 200, got %d: %s", w.Code, w.Body.String())
|
||
}
|
||
var aRows []byAgentRow
|
||
_ = json.NewDecoder(w.Body).Decode(&aRows)
|
||
var aTotal int64
|
||
for _, r := range aRows {
|
||
if r.AgentID == agentID && r.Model == "claude-3-5-sonnet" {
|
||
aTotal += r.InputTokens
|
||
}
|
||
}
|
||
if aTotal < 1500 {
|
||
t.Errorf("by-agent ws: expected >=1500 tokens across workspace, got %d", aTotal)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestDashboardUsageDailyBucketsByViewerTimezone proves the `?tz=` query
|
||
// param drives the calendar-day boundary: the same UTC instant lands under
|
||
// a different `date` for a UTC viewer vs an America/Los_Angeles viewer.
|
||
// This is the core promise of the timezone-architecture RFC.
|
||
func TestDashboardUsageDailyBucketsByViewerTimezone(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE runtime_id = $1 AND provider = 'tz-bucket-test'`, runtimeID)
|
||
})
|
||
// One bucket at 04:00 UTC two days ago. 04:00 UTC is still the
|
||
// previous evening in America/Los_Angeles (UTC-7/-8), so the UTC
|
||
// viewer and the LA viewer must see this row under different dates.
|
||
var bucketHour time.Time
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO task_usage_hourly (
|
||
bucket_hour, workspace_id, runtime_id, agent_id, project_id,
|
||
provider, model,
|
||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, event_count
|
||
)
|
||
VALUES (
|
||
((CURRENT_DATE - 2)::timestamp + interval '4 hours') AT TIME ZONE 'UTC',
|
||
$1, $2, $3, NULL, 'tz-bucket-test', 'tz-bucket-model',
|
||
999, 0, 0, 0, 1
|
||
)
|
||
ON CONFLICT ON CONSTRAINT uq_task_usage_hourly_key DO UPDATE
|
||
SET input_tokens = EXCLUDED.input_tokens
|
||
RETURNING bucket_hour
|
||
`, testWorkspaceID, runtimeID, agentID).Scan(&bucketHour); err != nil {
|
||
t.Fatalf("seed hourly row: %v", err)
|
||
}
|
||
|
||
utcDate := bucketHour.UTC().Format("2006-01-02")
|
||
laLoc, err := time.LoadLocation("America/Los_Angeles")
|
||
if err != nil {
|
||
t.Fatalf("load LA location: %v", err)
|
||
}
|
||
laDate := bucketHour.In(laLoc).Format("2006-01-02")
|
||
if utcDate == laDate {
|
||
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
|
||
}
|
||
|
||
readDate := func(tz string) string {
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=10&tz="+tz, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
|
||
}
|
||
var rows []struct {
|
||
Date string `json:"date"`
|
||
Model string `json:"model"`
|
||
}
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
for _, r := range rows {
|
||
if r.Model == "tz-bucket-model" {
|
||
return r.Date
|
||
}
|
||
}
|
||
t.Fatalf("tz=%s: tz-bucket-model row not found in %v", tz, rows)
|
||
return ""
|
||
}
|
||
|
||
if got := readDate("UTC"); got != utcDate {
|
||
t.Errorf("UTC viewer: expected date %s, got %s", utcDate, got)
|
||
}
|
||
if got := readDate("America/Los_Angeles"); got != laDate {
|
||
t.Errorf("LA viewer: expected date %s, got %s", laDate, got)
|
||
}
|
||
}
|
||
|
||
// TestDashboardRunTimeDailyBucketsByViewerTimezone proves the `?tz=` query
|
||
// param drives the calendar-day boundary of the Time / Tasks dashboard tab:
|
||
// GetDashboardRunTimeDaily applies `@tz` to `completed_at AT TIME ZONE @tz`
|
||
// on agent_task_queue. A task completed at 04:00 UTC is still the previous
|
||
// evening in America/Los_Angeles (UTC-7/-8), so the LA viewer must see the
|
||
// row under the prior calendar date relative to a UTC viewer.
|
||
func TestDashboardRunTimeDailyBucketsByViewerTimezone(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
// Issue tagged so we can clean up just this test's rows.
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'runtime-daily tz test', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
// completed_at at 04:00 UTC two days ago — still the prior evening in LA.
|
||
// started_at 10 minutes earlier so the run has a non-zero duration.
|
||
var completedAt time.Time
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, started_at, completed_at, created_at)
|
||
VALUES (
|
||
$1, $2, $3, 'completed',
|
||
((CURRENT_DATE - 2)::timestamp + interval '3 hours 50 minutes') AT TIME ZONE 'UTC',
|
||
((CURRENT_DATE - 2)::timestamp + interval '4 hours') AT TIME ZONE 'UTC',
|
||
now()
|
||
)
|
||
RETURNING id, completed_at
|
||
`, agentID, issueID, runtimeID).Scan(&taskID, &completedAt); err != nil {
|
||
t.Fatalf("insert completed task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
utcDate := completedAt.UTC().Format("2006-01-02")
|
||
laLoc, err := time.LoadLocation("America/Los_Angeles")
|
||
if err != nil {
|
||
t.Fatalf("load LA location: %v", err)
|
||
}
|
||
laDate := completedAt.In(laLoc).Format("2006-01-02")
|
||
if utcDate == laDate {
|
||
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
|
||
}
|
||
|
||
readRow := func(tz string) (string, int64, int32) {
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardRunTimeDaily(w, newRequest("GET", "/api/dashboard/runtime/daily?days=10&tz="+tz, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
|
||
}
|
||
var rows []struct {
|
||
Date string `json:"date"`
|
||
TotalSeconds int64 `json:"total_seconds"`
|
||
TaskCount int32 `json:"task_count"`
|
||
}
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
want := utcDate
|
||
if tz == "America/Los_Angeles" {
|
||
want = laDate
|
||
}
|
||
for _, r := range rows {
|
||
if r.Date == want {
|
||
return r.Date, r.TotalSeconds, r.TaskCount
|
||
}
|
||
}
|
||
t.Fatalf("tz=%s: no row on expected date %s in %v", tz, want, rows)
|
||
return "", 0, 0
|
||
}
|
||
|
||
if date, secs, count := readRow("UTC"); date != utcDate || count < 1 || secs < 600 {
|
||
t.Errorf("UTC viewer: got date=%s seconds=%d count=%d, want date=%s seconds>=600 count>=1",
|
||
date, secs, count, utcDate)
|
||
}
|
||
if date, secs, count := readRow("America/Los_Angeles"); date != laDate || count < 1 || secs < 600 {
|
||
t.Errorf("LA viewer: got date=%s seconds=%d count=%d, want date=%s seconds>=600 count>=1",
|
||
date, secs, count, laDate)
|
||
}
|
||
}
|
||
|
||
// TestRollupTaskUsageHourlyIdempotentAndWatermark covers two pipeline
|
||
// invariants the deleted runtime_rollup_test.go used to guard for the
|
||
// legacy daily rollup: (1) re-running the window function over the same
|
||
// range produces identical totals, and (2) the cron entry point advances
|
||
// the rollup-state watermark and clears last_error.
|
||
func TestRollupTaskUsageHourlyIdempotentAndWatermark(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
var issueID, taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'rollup idempotency', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', now() - interval '20 minutes') RETURNING id
|
||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'rollup-idem-model', 3333, 0, now() - interval '20 minutes')
|
||
`, taskID); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
|
||
readTotal := func() int64 {
|
||
var total int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE runtime_id = $1 AND model = 'rollup-idem-model'
|
||
`, runtimeID).Scan(&total); err != nil {
|
||
t.Fatalf("read total: %v", err)
|
||
}
|
||
return total
|
||
}
|
||
|
||
// Idempotency: two passes over the same range must not double-count.
|
||
for i := 0; i < 2; i++ {
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup pass %d: %v", i, err)
|
||
}
|
||
}
|
||
if got := readTotal(); got != 3333 {
|
||
t.Errorf("idempotency: expected exactly 3333 tokens after two passes, got %d", got)
|
||
}
|
||
|
||
// Watermark advance: park the watermark an hour back, run the cron
|
||
// entry, confirm it moved forward to ~now()-5min with no error.
|
||
if _, err := testPool.Exec(ctx, `
|
||
UPDATE task_usage_hourly_rollup_state
|
||
SET watermark_at = now() - interval '1 hour', last_error = 'stale'
|
||
WHERE id = 1
|
||
`); err != nil {
|
||
t.Fatalf("park watermark: %v", err)
|
||
}
|
||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
|
||
t.Fatalf("rollup_task_usage_hourly: %v", err)
|
||
}
|
||
var watermarkAge time.Duration
|
||
var lastError *string
|
||
var ageSeconds float64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT EXTRACT(EPOCH FROM (now() - watermark_at)), last_error
|
||
FROM task_usage_hourly_rollup_state WHERE id = 1
|
||
`).Scan(&ageSeconds, &lastError); err != nil {
|
||
t.Fatalf("read rollup state: %v", err)
|
||
}
|
||
watermarkAge = time.Duration(ageSeconds) * time.Second
|
||
// Watermark should sit at now()-5min (the cron upper bound), well
|
||
// short of the 1-hour-back value we parked it at.
|
||
if watermarkAge > 10*time.Minute {
|
||
t.Errorf("watermark did not advance: still %s behind now()", watermarkAge)
|
||
}
|
||
if lastError != nil {
|
||
t.Errorf("expected last_error cleared, got %q", *lastError)
|
||
}
|
||
}
|
||
|
||
// TestRollupTaskUsageHourlyReassignBetweenRuntimes ports the invalidation
|
||
// coverage the deleted runtime_rollup_test.go held for the legacy daily
|
||
// rollup. Reassigning a task between runtimes (the runtime-merge path) must
|
||
// move its usage: the `trg_atq_dirty_hourly` trigger enqueues both the old
|
||
// and new runtime buckets, and the next window run drains the queue,
|
||
// empties the old bucket, and fills the new one. Without this the rollup
|
||
// keeps attributing usage to the merged-away runtime.
|
||
func TestRollupTaskUsageHourlyReassignBetweenRuntimes(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
oldRuntimeID := handlerTestRuntimeID(t)
|
||
var newRuntimeID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_runtime (
|
||
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
|
||
)
|
||
VALUES ($1, NULL, 'reassign-target-hourly', 'cloud', 'reassign-target-hourly', 'online', '{}'::jsonb, '{}'::jsonb, now())
|
||
RETURNING id
|
||
`, testWorkspaceID).Scan(&newRuntimeID); err != nil {
|
||
t.Fatalf("create dest runtime: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_runtime WHERE id = $1`, newRuntimeID) })
|
||
|
||
var agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'reassign hourly test', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
usageAt := time.Date(2021, 3, 14, 1, 0, 0, 0, time.UTC)
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
|
||
`, agentID, issueID, oldRuntimeID, usageAt).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||
VALUES ($1, 'claude', 'm-reassign-hourly', 700, 70, $2, $2)
|
||
`, taskID, usageAt); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'm-reassign-hourly'`)
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`)
|
||
})
|
||
|
||
runWindow := func(label string) {
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('-infinity'::timestamptz, 'infinity'::timestamptz)
|
||
`); err != nil {
|
||
t.Fatalf("%s: %v", label, err)
|
||
}
|
||
}
|
||
runtimeTotal := func(rt string) int64 {
|
||
var total int64
|
||
testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE runtime_id = $1 AND model = 'm-reassign-hourly'
|
||
`, rt).Scan(&total)
|
||
return total
|
||
}
|
||
|
||
runWindow("initial rollup")
|
||
if old, new := runtimeTotal(oldRuntimeID), runtimeTotal(newRuntimeID); old != 700 || new != 0 {
|
||
t.Fatalf("initial: expected old=700 new=0, got old=%d new=%d", old, new)
|
||
}
|
||
|
||
// Reassignment fires trg_atq_dirty_hourly, which enqueues the OLD and
|
||
// NEW runtime buckets (same bucket_hour, two runtime_ids).
|
||
if _, err := testPool.Exec(ctx, `UPDATE agent_task_queue SET runtime_id = $1 WHERE id = $2`, newRuntimeID, taskID); err != nil {
|
||
t.Fatalf("reassign task: %v", err)
|
||
}
|
||
var dirtyCount int
|
||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`).Scan(&dirtyCount)
|
||
if dirtyCount != 2 {
|
||
t.Fatalf("expected 2 dirty entries (old+new runtime), got %d", dirtyCount)
|
||
}
|
||
|
||
runWindow("rollup after reassign")
|
||
if old, new := runtimeTotal(oldRuntimeID), runtimeTotal(newRuntimeID); old != 0 || new != 700 {
|
||
t.Fatalf("after reassign: expected old=0 new=700, got old=%d new=%d", old, new)
|
||
}
|
||
// The window function must drain every queue row whose enqueued_at
|
||
// predates p_to — a regression on that DELETE pins recomputes forever.
|
||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'm-reassign-hourly'`).Scan(&dirtyCount)
|
||
if dirtyCount != 0 {
|
||
t.Errorf("expected dirty queue drained, got %d entries", dirtyCount)
|
||
}
|
||
}
|
||
|
||
// TestRollupTaskUsageHourlyWorkspaceMismatch constructs an atq row whose
|
||
// agent.workspace_id differs from issue.workspace_id and verifies the
|
||
// hourly rollup resolves workspace_id consistently from `agent` across the
|
||
// trigger, dirty_from_updates, and the recompute join. If any path leaked
|
||
// back to issue.workspace_id the dirty key would miss the recompute join
|
||
// and the bucket would be dropped or mis-attributed across tenants. The
|
||
// schema does not enforce the two workspace_ids match, so this canary
|
||
// keeps the alignment honest.
|
||
func TestRollupTaskUsageHourlyWorkspaceMismatch(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var foreignWorkspaceID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO workspace (name, slug)
|
||
VALUES ('ws-mismatch-hourly', 'ws-mismatch-hourly-' || gen_random_uuid()::text)
|
||
RETURNING id
|
||
`).Scan(&foreignWorkspaceID); err != nil {
|
||
t.Fatalf("create foreign workspace: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM workspace WHERE id = $1`, foreignWorkspaceID) })
|
||
|
||
var foreignRuntimeID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_runtime (
|
||
workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at
|
||
)
|
||
VALUES ($1, NULL, 'mismatch-rt-hourly', 'cloud', 'mismatch-rt-hourly', 'online', '{}'::jsonb, '{}'::jsonb, now())
|
||
RETURNING id
|
||
`, foreignWorkspaceID).Scan(&foreignRuntimeID); err != nil {
|
||
t.Fatalf("create foreign runtime: %v", err)
|
||
}
|
||
var foreignAgentID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent (
|
||
workspace_id, name, description, runtime_mode, runtime_config,
|
||
runtime_id, visibility, max_concurrent_tasks, owner_id,
|
||
instructions, custom_env, custom_args, mcp_config
|
||
)
|
||
VALUES ($1, 'mismatch-agent-hourly', '', 'cloud', '{}'::jsonb, $2, 'private', 1, $3, '', '{}'::jsonb, '[]'::jsonb, '[]'::jsonb)
|
||
RETURNING id
|
||
`, foreignWorkspaceID, foreignRuntimeID, testUserID).Scan(&foreignAgentID); err != nil {
|
||
t.Fatalf("create foreign agent: %v", err)
|
||
}
|
||
|
||
// Issue lives in the primary test workspace; the agent lives in the
|
||
// foreign one — so agent.workspace_id != issue.workspace_id.
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'mismatch hourly test', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
usageAt := time.Date(2021, 9, 9, 1, 0, 0, 0, time.UTC)
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', $4) RETURNING id
|
||
`, foreignAgentID, issueID, foreignRuntimeID, usageAt).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert atq: %v", err)
|
||
}
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at, updated_at)
|
||
VALUES ($1, 'claude', 'm-mismatch-hourly', 333, 33, $2, $2)
|
||
`, taskID, usageAt); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'm-mismatch-hourly'`)
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'm-mismatch-hourly'`)
|
||
})
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('-infinity'::timestamptz, 'infinity'::timestamptz)
|
||
`); err != nil {
|
||
t.Fatalf("rollup: %v", err)
|
||
}
|
||
|
||
wsTotal := func(ws string) int64 {
|
||
var total int64
|
||
testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND model = 'm-mismatch-hourly'
|
||
`, ws).Scan(&total)
|
||
return total
|
||
}
|
||
if got := wsTotal(foreignWorkspaceID); got != 333 {
|
||
t.Fatalf("expected foreign workspace bucket = 333 (resolved from agent), got %d", got)
|
||
}
|
||
if got := wsTotal(testWorkspaceID); got != 0 {
|
||
t.Errorf("expected primary workspace bucket = 0 (issue.workspace_id must not leak), got %d", got)
|
||
}
|
||
}
|
||
|
||
// TestDashboardRollupReattributesOnProjectChange verifies the trigger that
|
||
// fires on `UPDATE issue SET project_id` enqueues both old + new project
|
||
// buckets so the next rollup tick re-attributes the affected tokens.
|
||
// Uses the rollup window function directly to drain the dirty queue,
|
||
// then asserts the rollup table reflects the new project_id.
|
||
func TestDashboardRollupReattributesOnProjectChange(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
mkProject := func(name string) string {
|
||
var id string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO project (workspace_id, title) VALUES ($1, $2) RETURNING id
|
||
`, testWorkspaceID, name).Scan(&id); err != nil {
|
||
t.Fatalf("create project: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, id) })
|
||
return id
|
||
}
|
||
projectA := mkProject("dashboard reattr A")
|
||
projectB := mkProject("dashboard reattr B")
|
||
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||
VALUES ($1, 'reattr issue', $2, 'member', $3,
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID, projectA).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
|
||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 7777, 0, now())
|
||
`, taskID); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
|
||
// First rollup pass: tokens attributed to project A.
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup A: %v", err)
|
||
}
|
||
var aTokens int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||
`, testWorkspaceID, projectA, agentID).Scan(&aTokens); err != nil {
|
||
t.Fatalf("read A rollup: %v", err)
|
||
}
|
||
if aTokens < 7777 {
|
||
t.Fatalf("project A: expected >=7777 tokens after first rollup, got %d", aTokens)
|
||
}
|
||
|
||
// Move the issue to project B. Trigger enqueues both A and B buckets.
|
||
if _, err := testPool.Exec(ctx, `UPDATE issue SET project_id = $1 WHERE id = $2`, projectB, issueID); err != nil {
|
||
t.Fatalf("reassign project: %v", err)
|
||
}
|
||
// Second rollup pass: A bucket drops to zero (deleted_empty), B
|
||
// bucket gets the tokens.
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup B: %v", err)
|
||
}
|
||
|
||
var bTokens, aTokensAfter int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||
`, testWorkspaceID, projectB, agentID).Scan(&bTokens); err != nil {
|
||
t.Fatalf("read B rollup: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||
`, testWorkspaceID, projectA, agentID).Scan(&aTokensAfter); err != nil {
|
||
t.Fatalf("read A rollup after move: %v", err)
|
||
}
|
||
if bTokens < 7777 {
|
||
t.Errorf("project B: expected >=7777 tokens after reassign + rollup, got %d", bTokens)
|
||
}
|
||
if aTokensAfter != 0 {
|
||
t.Errorf("project A: expected 0 tokens after reassign + rollup, got %d", aTokensAfter)
|
||
}
|
||
}
|
||
|
||
// TestDashboardRollupClearsOnIssueDelete verifies that deleting an issue
|
||
// (which cascades to its tasks and task_usage rows) also clears the
|
||
// dashboard rollup row attributed to that issue's project. The
|
||
// `issue BEFORE DELETE` trigger has to fire ahead of the cascade so the
|
||
// dirty queue captures the original project_id while the issue row is
|
||
// still readable.
|
||
func TestDashboardRollupClearsOnIssueDelete(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
var projectID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard cascade test') RETURNING id
|
||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||
t.Fatalf("create project: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||
VALUES ($1, 'cascade issue', $2, 'member', $3,
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
// No t.Cleanup deleting the issue — that's what the test exercises.
|
||
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
|
||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
// Don't bother cleaning up taskID either; cascade will take it.
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 4242, 0, now())
|
||
`, taskID); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
|
||
// First rollup: project bucket exists with 4242 tokens.
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup before delete: %v", err)
|
||
}
|
||
var before int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2
|
||
`, testWorkspaceID, projectID).Scan(&before); err != nil {
|
||
t.Fatalf("read before: %v", err)
|
||
}
|
||
if before < 4242 {
|
||
t.Fatalf("project bucket: expected >=4242 tokens before delete, got %d", before)
|
||
}
|
||
|
||
// Delete the issue. Cascade removes atq + task_usage. The issue
|
||
// BEFORE DELETE trigger should have enqueued the project bucket
|
||
// before the cascade started.
|
||
if _, err := testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID); err != nil {
|
||
t.Fatalf("delete issue: %v", err)
|
||
}
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup after delete: %v", err)
|
||
}
|
||
var after int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2
|
||
`, testWorkspaceID, projectID).Scan(&after); err != nil {
|
||
t.Fatalf("read after: %v", err)
|
||
}
|
||
if after != 0 {
|
||
t.Errorf("project bucket: expected 0 tokens after issue delete, got %d", after)
|
||
}
|
||
}
|
||
|
||
// TestDashboardRollupReattributesOnLinkTaskToIssue verifies that
|
||
// `LinkTaskToIssue` (which UPDATEs `agent_task_queue.issue_id` from NULL
|
||
// to a real issue id) re-attributes existing rollup rows from the
|
||
// no-project bucket to the linked issue's project bucket. Mirrors the
|
||
// quick-create flow in `service.task.LinkTaskToIssue`.
|
||
func TestDashboardRollupReattributesOnLinkTaskToIssue(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
// Quick-create task: issue_id is NULL at creation time.
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, context, created_at)
|
||
VALUES ($1, NULL, $2, 'completed', '{}'::jsonb, now()) RETURNING id
|
||
`, agentID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert quick-create task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'claude-3-5-sonnet', 1234, 0, now())
|
||
`, taskID); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
|
||
// First rollup: tokens attributed to the no-project bucket (NULL).
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup pre-link: %v", err)
|
||
}
|
||
var nullBefore int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
|
||
`, testWorkspaceID, agentID).Scan(&nullBefore); err != nil {
|
||
t.Fatalf("read NULL bucket pre-link: %v", err)
|
||
}
|
||
if nullBefore < 1234 {
|
||
t.Fatalf("NULL bucket: expected >=1234 tokens pre-link, got %d", nullBefore)
|
||
}
|
||
|
||
// Create a project + issue, then run the same UPDATE LinkTaskToIssue
|
||
// uses. The atq trigger should enqueue OLD (NULL project) AND NEW
|
||
// (the project's id) so the next rollup tick zeroes the NULL bucket
|
||
// and populates the project bucket.
|
||
var projectID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO project (workspace_id, title) VALUES ($1, 'dashboard link test') RETURNING id
|
||
`, testWorkspaceID).Scan(&projectID); err != nil {
|
||
t.Fatalf("create project: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM project WHERE id = $1`, projectID) })
|
||
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, project_id, number)
|
||
VALUES ($1, 'link test issue', $2, 'member', $3,
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID, projectID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
// Mirror LinkTaskToIssue's UPDATE shape.
|
||
if _, err := testPool.Exec(ctx, `
|
||
UPDATE agent_task_queue SET issue_id = $1 WHERE id = $2 AND issue_id IS NULL
|
||
`, issueID, taskID); err != nil {
|
||
t.Fatalf("link task to issue: %v", err)
|
||
}
|
||
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup post-link: %v", err)
|
||
}
|
||
|
||
var projectAfter, nullAfter int64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id = $2 AND agent_id = $3
|
||
`, testWorkspaceID, projectID, agentID).Scan(&projectAfter); err != nil {
|
||
t.Fatalf("read project bucket post-link: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE workspace_id = $1 AND project_id IS NULL AND agent_id = $2
|
||
`, testWorkspaceID, agentID).Scan(&nullAfter); err != nil {
|
||
t.Fatalf("read NULL bucket post-link: %v", err)
|
||
}
|
||
if projectAfter < 1234 {
|
||
t.Errorf("project bucket: expected >=1234 tokens after link, got %d", projectAfter)
|
||
}
|
||
if nullAfter != 0 {
|
||
t.Errorf("NULL bucket: expected 0 tokens after link, got %d", nullAfter)
|
||
}
|
||
}
|
||
|
||
// TestPruneTaskUsageHourlyDirty covers the dirty-queue TTL. Both the RFC
|
||
// (§7.1) and the rollup-pipeline migration call this THE most-easily-missed correctness
|
||
// requirement of the hourly pipeline: without the prune, a row that escapes
|
||
// the per-tick drain (crash mid-tick, worker paused during an incident)
|
||
// pins its bucket's recompute forever and the queue grows unbounded.
|
||
func TestPruneTaskUsageHourlyDirty(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
// task_usage_hourly_dirty carries no FKs (it is a queue), so synthetic
|
||
// UUIDs are fine. `provider` tags the rows for isolated cleanup.
|
||
const tag = "ttl-prune-test"
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE provider = $1`, tag)
|
||
})
|
||
seed := func(model, age string) {
|
||
if _, err := testPool.Exec(ctx, `
|
||
INSERT INTO task_usage_hourly_dirty (
|
||
bucket_hour, workspace_id, runtime_id, agent_id, project_id,
|
||
provider, model, enqueued_at
|
||
)
|
||
VALUES (
|
||
date_trunc('hour', now()), gen_random_uuid(), gen_random_uuid(),
|
||
gen_random_uuid(), NULL, $1, $2, now() - $3::interval
|
||
)
|
||
`, tag, model, age); err != nil {
|
||
t.Fatalf("seed dirty row %s: %v", model, err)
|
||
}
|
||
}
|
||
countModel := func(model string) int {
|
||
var n int
|
||
testPool.QueryRow(ctx,
|
||
`SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE provider = $1 AND model = $2`,
|
||
tag, model,
|
||
).Scan(&n)
|
||
return n
|
||
}
|
||
|
||
seed("stale-row", "8 days")
|
||
seed("fresh-row", "1 day")
|
||
|
||
// Default 7-day retention: the 8-day row goes, the 1-day row stays.
|
||
var pruned int64
|
||
if err := testPool.QueryRow(ctx, `SELECT prune_task_usage_hourly_dirty()`).Scan(&pruned); err != nil {
|
||
t.Fatalf("prune (default retention): %v", err)
|
||
}
|
||
if pruned < 1 {
|
||
t.Errorf("expected prune to report at least the one stale row deleted, got %d", pruned)
|
||
}
|
||
if got := countModel("stale-row"); got != 0 {
|
||
t.Errorf("default prune: expected stale row deleted, still %d present", got)
|
||
}
|
||
if got := countModel("fresh-row"); got != 1 {
|
||
t.Errorf("default prune: expected fresh row kept, got %d", got)
|
||
}
|
||
|
||
// An explicit retention shorter than the surviving row's age drops it.
|
||
if _, err := testPool.Exec(ctx, `SELECT prune_task_usage_hourly_dirty(interval '12 hours')`); err != nil {
|
||
t.Fatalf("prune (12h retention): %v", err)
|
||
}
|
||
if got := countModel("fresh-row"); got != 0 {
|
||
t.Errorf("12h-retention prune: expected fresh row deleted, still %d present", got)
|
||
}
|
||
|
||
// The cron entry folds the prune in so operators do
|
||
// not need a second scheduled job. A single tick must drop a stale row.
|
||
seed("cron-fold-row", "9 days")
|
||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
|
||
t.Fatalf("rollup_task_usage_hourly: %v", err)
|
||
}
|
||
if got := countModel("cron-fold-row"); got != 0 {
|
||
t.Errorf("cron entry did not fold in the prune: stale row still present (%d)", got)
|
||
}
|
||
}
|
||
|
||
// TestRollupTaskUsageHourlyCapsWindowAtOneDay covers the catch-up cap
|
||
// in rollup_task_usage_hourly(): when the watermark has
|
||
// fallen far behind (worker paused for an incident or a migration freeze),
|
||
// a single tick must advance it by at most one day, so a multi-week backlog
|
||
// drains in bounded steps instead of one giant statement holding advisory
|
||
// lock 4246. The existing watermark test only parks the watermark one hour
|
||
// back, so the cap itself is never exercised there.
|
||
func TestRollupTaskUsageHourlyCapsWindowAtOneDay(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
// Other tests drive rollup_task_usage_hourly_window directly and never
|
||
// read the watermark; the idempotency test parks it itself. Restore to
|
||
// now() so nothing downstream observes a stale value.
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx,
|
||
`UPDATE task_usage_hourly_rollup_state SET watermark_at = now(), last_error = NULL WHERE id = 1`)
|
||
})
|
||
|
||
park := func(behind string) {
|
||
if _, err := testPool.Exec(ctx, `
|
||
UPDATE task_usage_hourly_rollup_state
|
||
SET watermark_at = now() - $1::interval, last_error = NULL
|
||
WHERE id = 1
|
||
`, behind); err != nil {
|
||
t.Fatalf("park watermark: %v", err)
|
||
}
|
||
}
|
||
ageDays := func() float64 {
|
||
var sec float64
|
||
if err := testPool.QueryRow(ctx, `
|
||
SELECT EXTRACT(EPOCH FROM (now() - watermark_at))
|
||
FROM task_usage_hourly_rollup_state WHERE id = 1
|
||
`).Scan(&sec); err != nil {
|
||
t.Fatalf("read watermark: %v", err)
|
||
}
|
||
return sec / 86400
|
||
}
|
||
tick := func(label string) {
|
||
if _, err := testPool.Exec(ctx, `SELECT rollup_task_usage_hourly()`); err != nil {
|
||
t.Fatalf("%s: %v", label, err)
|
||
}
|
||
}
|
||
|
||
// Park 3 days back. One tick advances by exactly one day (v_from + 1d,
|
||
// well short of now()-5min), leaving the watermark ~2 days behind.
|
||
park("3 days")
|
||
tick("tick 1")
|
||
if age := ageDays(); age < 1.9 || age > 2.1 {
|
||
t.Fatalf("after one tick: expected watermark ~2 days behind, got %.3f days", age)
|
||
}
|
||
|
||
// A second tick advances another bounded day → ~1 day behind.
|
||
tick("tick 2")
|
||
if age := ageDays(); age < 0.9 || age > 1.1 {
|
||
t.Fatalf("after two ticks: expected watermark ~1 day behind, got %.3f days", age)
|
||
}
|
||
|
||
// Once within a day of now, the tick snaps the watermark to now()-5min
|
||
// (LEAST picks the now bound) rather than taking a further fixed day.
|
||
tick("tick 3")
|
||
if age := ageDays(); age > 0.02 {
|
||
t.Fatalf("after catch-up: expected watermark within minutes of now, got %.3f days", age)
|
||
}
|
||
}
|
||
|
||
// TestDashboardUsageDailyCrossMidnightFullPipeline runs the WHOLE timezone
|
||
// pipeline end to end: insert a raw `task_usage` row near UTC midnight →
|
||
// run `rollup_task_usage_hourly_window` to bucket it → call
|
||
// GetDashboardUsageDaily with a non-UTC viewer tz. It asserts the tokens
|
||
// land on the viewer's correct calendar day and NOT on the UTC day.
|
||
//
|
||
// This is the #2822 bug class the RFC exists to prevent. The existing
|
||
// TestDashboardUsageDailyBucketsByViewerTimezone seeds a pre-built
|
||
// task_usage_hourly row and only exercises the SQL read path; here the
|
||
// row travels from raw task_usage through the rollup, so a regression in
|
||
// task_usage_hour_bucket or the recompute join is also caught.
|
||
func TestDashboardUsageDailyCrossMidnightFullPipeline(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'cross-midnight pipeline test', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', now()) RETURNING id
|
||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
// Raw task_usage at 00:30 UTC two days ago — genuinely near UTC
|
||
// midnight. 00:30 UTC is still the PRIOR evening (~16:30/17:30) in
|
||
// America/Los_Angeles (UTC-7/-8), so the UTC viewer and the LA viewer
|
||
// must see this row under different calendar days. Using CURRENT_DATE
|
||
// keeps the row inside the days=10 window without a fixed-date drift.
|
||
var usageAt time.Time
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES (
|
||
$1, 'claude', 'cross-midnight-model', 8888, 0,
|
||
((CURRENT_DATE - 2)::timestamp + interval '30 minutes') AT TIME ZONE 'UTC'
|
||
)
|
||
RETURNING created_at
|
||
`, taskID).Scan(&usageAt); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'cross-midnight-model'`)
|
||
})
|
||
|
||
// Run the rollup so the raw row is aggregated into task_usage_hourly.
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("rollup window: %v", err)
|
||
}
|
||
|
||
utcDate := usageAt.UTC().Format("2006-01-02")
|
||
laLoc, err := time.LoadLocation("America/Los_Angeles")
|
||
if err != nil {
|
||
t.Fatalf("load LA location: %v", err)
|
||
}
|
||
laDate := usageAt.In(laLoc).Format("2006-01-02")
|
||
if utcDate == laDate {
|
||
t.Fatalf("test setup: UTC and LA dates must differ, both %s", utcDate)
|
||
}
|
||
|
||
readDate := func(tz string) string {
|
||
w := httptest.NewRecorder()
|
||
testHandler.GetDashboardUsageDaily(w, newRequest("GET", "/api/dashboard/usage/daily?days=10&tz="+tz, nil))
|
||
if w.Code != http.StatusOK {
|
||
t.Fatalf("tz=%s: expected 200, got %d: %s", tz, w.Code, w.Body.String())
|
||
}
|
||
var rows []struct {
|
||
Date string `json:"date"`
|
||
Model string `json:"model"`
|
||
InputTokens int64 `json:"input_tokens"`
|
||
}
|
||
_ = json.NewDecoder(w.Body).Decode(&rows)
|
||
for _, r := range rows {
|
||
if r.Model == "cross-midnight-model" {
|
||
if r.InputTokens != 8888 {
|
||
t.Errorf("tz=%s: expected 8888 tokens, got %d", tz, r.InputTokens)
|
||
}
|
||
return r.Date
|
||
}
|
||
}
|
||
t.Fatalf("tz=%s: cross-midnight-model row not found in %v", tz, rows)
|
||
return ""
|
||
}
|
||
|
||
if got := readDate("UTC"); got != utcDate {
|
||
t.Errorf("UTC viewer: expected date %s, got %s", utcDate, got)
|
||
}
|
||
if got := readDate("America/Los_Angeles"); got != laDate {
|
||
t.Errorf("LA viewer: expected date %s, got %s; row must NOT land on the UTC day %s",
|
||
laDate, got, utcDate)
|
||
}
|
||
}
|
||
|
||
// TestRollupTaskUsageHourlyConvergesOnTaskUsageDelete covers the
|
||
// `trg_tu_dirty_hourly` trigger — a BEFORE DELETE trigger on task_usage.
|
||
// Migration 102 notes it has no production callers today and exists purely
|
||
// as defensive convergence guard, so a single minimal test is enough:
|
||
// seed a task_usage row, roll it up, DELETE the task_usage row directly,
|
||
// roll up again, and assert the hourly bucket is recomputed down to zero.
|
||
// Without the trigger the deleted row's bucket would never be re-enqueued.
|
||
func TestRollupTaskUsageHourlyConvergesOnTaskUsageDelete(t *testing.T) {
|
||
if testHandler == nil {
|
||
t.Skip("database not available")
|
||
}
|
||
ctx := context.Background()
|
||
|
||
var runtimeID, agentID string
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||
t.Fatalf("fetch runtime: %v", err)
|
||
}
|
||
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
|
||
t.Fatalf("fetch agent: %v", err)
|
||
}
|
||
|
||
var issueID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO issue (workspace_id, title, creator_id, creator_type, number)
|
||
VALUES ($1, 'tu-delete trigger test', $2, 'member',
|
||
(SELECT COALESCE(MAX(number), 0) + 1 FROM issue WHERE workspace_id = $1))
|
||
RETURNING id
|
||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||
t.Fatalf("create issue: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) })
|
||
|
||
var taskID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||
VALUES ($1, $2, $3, 'completed', now() - interval '30 minutes') RETURNING id
|
||
`, agentID, issueID, runtimeID).Scan(&taskID); err != nil {
|
||
t.Fatalf("insert task: %v", err)
|
||
}
|
||
t.Cleanup(func() { testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||
|
||
var usageID string
|
||
if err := testPool.QueryRow(ctx, `
|
||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||
VALUES ($1, 'claude', 'tu-delete-model', 5050, 0, now() - interval '30 minutes')
|
||
RETURNING id
|
||
`, taskID).Scan(&usageID); err != nil {
|
||
t.Fatalf("insert task_usage: %v", err)
|
||
}
|
||
t.Cleanup(func() {
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly WHERE model = 'tu-delete-model'`)
|
||
testPool.Exec(ctx, `DELETE FROM task_usage_hourly_dirty WHERE model = 'tu-delete-model'`)
|
||
})
|
||
|
||
bucketTotal := func() int64 {
|
||
var total int64
|
||
testPool.QueryRow(ctx, `
|
||
SELECT COALESCE(SUM(input_tokens), 0) FROM task_usage_hourly
|
||
WHERE runtime_id = $1 AND model = 'tu-delete-model'
|
||
`, runtimeID).Scan(&total)
|
||
return total
|
||
}
|
||
runWindow := func(label string) {
|
||
if _, err := testPool.Exec(ctx, `
|
||
SELECT rollup_task_usage_hourly_window('1970-01-01'::timestamptz, now() + interval '1 hour')
|
||
`); err != nil {
|
||
t.Fatalf("%s: %v", label, err)
|
||
}
|
||
}
|
||
|
||
runWindow("initial rollup")
|
||
if got := bucketTotal(); got != 5050 {
|
||
t.Fatalf("initial: expected bucket = 5050, got %d", got)
|
||
}
|
||
|
||
// Delete the task_usage row directly — fires trg_tu_dirty_hourly,
|
||
// which enqueues the bucket on task_usage_hourly_dirty.
|
||
if _, err := testPool.Exec(ctx, `DELETE FROM task_usage WHERE id = $1`, usageID); err != nil {
|
||
t.Fatalf("delete task_usage: %v", err)
|
||
}
|
||
var dirtyCount int
|
||
testPool.QueryRow(ctx, `SELECT COUNT(*) FROM task_usage_hourly_dirty WHERE model = 'tu-delete-model'`).Scan(&dirtyCount)
|
||
if dirtyCount != 1 {
|
||
t.Fatalf("expected 1 dirty entry from task_usage DELETE trigger, got %d", dirtyCount)
|
||
}
|
||
|
||
runWindow("rollup after delete")
|
||
if got := bucketTotal(); got != 0 {
|
||
t.Errorf("after delete: expected bucket recomputed to 0, got %d", got)
|
||
}
|
||
}
|