Files
multica/server/cmd/backfill_task_usage_hourly/main.go
YYClaw 614dfae884 MUL-2488 feat(timezone): Scheduling / Viewing two-layer timezone architecture (#2968)
* docs(timezone): add scheduling/viewing timezone architecture RFC

* feat(db): replace daily rollups with task_usage_hourly, add user.timezone

Migrations 100-104: add "user".timezone (Viewing tz), build the UTC
hourly task_usage_hourly rollup with its pipeline, drop the legacy
task_usage_daily / task_usage_dashboard_daily pipelines, and drop the
agent_runtime.timezone column. Report queries now slice day boundaries
at read time by the caller-supplied @tz instead of materialising in a
fixed tz. Regenerate sqlc.

* feat(server): add task_usage_hourly backfill command

Replace the two legacy backfill commands (daily / dashboard_daily) with
a single backfill_task_usage_hourly that loads historical task_usage
into the new UTC hourly rollup, sliced per workspace.

* refactor(server): resolve viewing timezone in report handlers

Report handlers resolve the Viewing tz per request (?tz query param,
then user.timezone, then UTC) and pass it to the hourly-rollup queries.
Drop the UseDailyRollup feature flags and the old raw-scan/daily-rollup
dual paths, remove the /api/usage endpoints, and stop the daemon from
reporting and the runtime handler from accepting host timezone.

* refactor(core): switch report queries to viewing timezone

API client and dashboard/runtime queries send ?tz with each report
request, the user schema/types carry the new timezone field, and the
runtime timezone field/mutation is removed.

* feat(views): add viewing timezone preference and UI

Add the useViewingTimezone hook and a Timezone setting in Preferences;
report charts and the dashboard week boundary follow the viewer tz.
Remove the runtime detail timezone editor and its locale strings.

* fix(test): update fixtures and stabilize tests for timezone refactor

The timezone architecture refactor changed several types without
updating dependent test code:

- RuntimeDevice no longer has a timezone field — drop it from the
  create-agent-dialog runtime fixture.
- User now requires a timezone field — add it to the apps/web mockUser
  fixture.
- The PreferencesTab timezone tests asserted on the async save handler
  (PATCH then store update) with a bare expect, racing the mutation's
  settle callback, and timed out querying the Select's ~600-option IANA
  list on a loaded CI runner. Wrap the assertions in waitFor and extend
  the timeout for those three tests.

* docs(timezone): document self-host migration order and trigger invariant

Add a SELF-HOST UPGRADE ORDER runbook to the backfill command's package
comment: applying migrations 100-104 in a single migrate-up drops the
legacy daily rollups before the hourly backfill runs, leaving dashboards
empty until cron catches up.

Add an INVARIANT comment on trg_atq_dirty_hourly noting that agent_id
must be added to the trigger's OF list if it ever becomes mutable,
otherwise dirty buckets for the old agent_id are silently missed.

* style(runtimes): drop trailing blank line in runtime-detail
2026-05-21 15:33:47 +08:00

220 lines
8.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Backfill_task_usage_hourly seeds the unified hourly rollup table
// (`task_usage_hourly`) from historical `task_usage`
// rows. Run once after the hourly-pipeline migrations ship, BEFORE registering
// the pg_cron job for rollup_task_usage_hourly().
//
// SELF-HOST UPGRADE ORDER — migrations 100104 are one group, but they
// must NOT be applied in a single `make migrate-up`:
//
// 1. Apply 101+102 (creates task_usage_hourly + installs the triggers).
// 2. Run THIS backfill to seed historical buckets.
// 3. Apply 103+104 (drops the legacy daily rollups + runtime.timezone)
// and register the pg_cron job.
//
// If you run `migrate-up` straight through to 103/104 before this
// backfill, the legacy daily pipelines are gone while task_usage_hourly
// only holds buckets the triggers wrote since 102 — dashboards will show
// empty history until backfill + cron catch up (tens to hundreds of
// ticks on a DB with years of data, given the per-tick 1-day cap).
//
// Mirrors backfill_task_usage_dashboard_daily: walk task_usage's time
// range in monthly slices and call the same idempotent window
// primitive the cron path uses. Then stamp the rollup-state watermark
// so the first cron tick after backfill does not reprocess history.
//
// Re-running is safe — rollup_task_usage_hourly_window is idempotent
// (recomputes each dirty key from raw and REPLACES the bucket), so a
// partially completed backfill can be resumed without TRUNCATEing
// task_usage_hourly first.
//
// Read pressure: each slice scans task_usage / agent_task_queue / agent
// / issue. On a database with years of history that is sustained heavy
// load. Use --sleep-between-slices to throttle on a busy production DB,
// and coordinate a maintenance window with the DB team before a
// full-history run (see docs/timezone-architecture-rfc.md §7.1).
//
// Operator note: this command does NOT call prune_task_usage_hourly_dirty.
// The dirty queue starts empty during backfill (triggers fire only on
// future writes), so there is nothing to prune until the rollup worker
// has been running for a while.
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/multica-ai/multica/server/internal/logger"
)
func main() {
logger.Init()
if err := run(); err != nil {
slog.Error("backfill failed", "error", err)
os.Exit(1)
}
}
func run() error {
var (
dryRun = flag.Bool("dry-run", false, "log slices that would be processed without touching task_usage_hourly")
monthsBack = flag.Int("months-back", 0, "limit backfill to the last N months (0 = all available history)")
forcePartial = flag.Bool("force-partial", false, "acknowledge that --months-back permanently abandons buckets older than the cutoff (the watermark still advances past them)")
sleep = flag.Duration("sleep-between-slices", 0, "pause this long between monthly slices to throttle source-table read pressure on a busy DB (e.g. 2s)")
)
flag.Parse()
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
}
// SIGINT/SIGTERM cancels ctx so an in-flight slice stops cleanly —
// each slice runs in its own transaction (the window function), so
// Postgres rolls back the interrupted one and the idempotent design
// lets a later run resume from where this one stopped.
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return fmt.Errorf("connect to database: %w", err)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
return fmt.Errorf("ping database: %w", err)
}
// Serialise against the cron rollup and any other backfill run via
// advisory lock 4246 — the same id the cron entry checks with
// pg_try_advisory_lock. While this backfill holds it, the cron tick
// no-ops instead of racing on task_usage_hourly row locks; a second
// concurrent backfill blocks here until this one finishes. The lock
// is held on a dedicated session connection for the whole run.
lockConn, err := pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire advisory-lock connection: %w", err)
}
defer lockConn.Release()
if _, err := lockConn.Exec(ctx, `SELECT pg_advisory_lock(4246)`); err != nil {
return fmt.Errorf("acquire advisory lock 4246: %w", err)
}
defer func() {
// Unlock on a fresh context so a cancelled ctx (SIGINT) does not
// skip the release. Releasing the connection afterwards would end
// the session anyway, but an explicit unlock frees it immediately.
_, _ = lockConn.Exec(context.Background(), `SELECT pg_advisory_unlock(4246)`)
}()
var minTS, maxTS pgtype.Timestamptz
if err := pool.QueryRow(ctx, `SELECT MIN(created_at), MAX(created_at) FROM task_usage`).Scan(&minTS, &maxTS); err != nil {
return fmt.Errorf("scan task_usage time range: %w", err)
}
if !minTS.Valid {
slog.Info("task_usage is empty; nothing to backfill")
if *dryRun {
return nil
}
return stampWatermark(ctx, pool)
}
from := monthFloor(minTS.Time.UTC())
end := monthFloor(maxTS.Time.UTC()).AddDate(0, 1, 0)
if *monthsBack > 0 {
cutoff := monthFloor(time.Now().UTC()).AddDate(0, -(*monthsBack), 0)
// A partial backfill still stamps the watermark at now()-5min, so
// buckets older than the cutoff are abandoned permanently: the cron
// worker will never look back that far. That data loss must be an
// explicit operator decision — require --force-partial to proceed.
if cutoff.After(from) {
if !*forcePartial {
return fmt.Errorf("--months-back=%d would skip buckets before %s (oldest available %s) and the watermark would still advance past them; re-run with --force-partial to accept this, or omit --months-back for a full backfill",
*monthsBack, cutoff.Format(time.RFC3339), minTS.Time.UTC().Format(time.RFC3339))
}
from = cutoff
slog.Warn("partial backfill: --months-back limits coverage; older buckets will be left empty and the watermark will still advance past them",
"months_back", *monthsBack, "effective_from", from.Format(time.RFC3339),
"oldest_available", minTS.Time.UTC().Format(time.RFC3339))
}
}
slog.Info("backfill range", "from", from.Format(time.RFC3339), "to", end.Format(time.RFC3339), "dry_run", *dryRun, "sleep_between_slices", sleep.String())
cursor := from
var totalRows int64
for cursor.Before(end) {
next := cursor.AddDate(0, 1, 0)
if *dryRun {
slog.Info("would roll up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339))
cursor = next
continue
}
var rows int64
err := pool.QueryRow(
ctx,
`SELECT rollup_task_usage_hourly_window($1::timestamptz, $2::timestamptz)`,
cursor, next,
).Scan(&rows)
if err != nil {
return fmt.Errorf("rollup slice %s..%s: %w", cursor.Format(time.RFC3339), next.Format(time.RFC3339), err)
}
totalRows += rows
slog.Info("rolled up slice", "from", cursor.Format(time.RFC3339), "to", next.Format(time.RFC3339), "rows_touched", rows)
cursor = next
if *sleep > 0 && cursor.Before(end) {
select {
case <-time.After(*sleep):
case <-ctx.Done():
return ctx.Err()
}
}
}
if *dryRun {
slog.Info("dry-run complete; watermark left untouched")
return nil
}
// Stamp on a fresh context so a SIGINT arriving after the slices
// complete cannot skip the watermark UPDATE — losing it would force the
// next run to restart from epoch. Mirrors the deferred advisory-unlock.
if err := stampWatermark(context.Background(), pool); err != nil {
return err
}
slog.Info("backfill complete", "total_rows_touched", totalRows)
return nil
}
// stampWatermark moves the hourly rollup state's watermark to
// `now() - 5 min`, mirroring the cron entry's upper bound. The next
// scheduled tick therefore picks up only events newer than the
// backfill horizon and does not redo work the backfill already did.
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) error {
tag, err := pool.Exec(ctx, `
UPDATE task_usage_hourly_rollup_state
SET watermark_at = now() - INTERVAL '5 minutes'
WHERE id = 1
`)
if err != nil {
return fmt.Errorf("stamp watermark: %w", err)
}
if tag.RowsAffected() == 0 {
slog.Warn("no rollup state row to stamp; was the task_usage_hourly schema migration applied?")
return nil
}
fmt.Println("watermark stamped to now() - 5 minutes")
return nil
}
func monthFloor(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
}