Files

372 lines
13 KiB
Go

// Package taskusagebackfill seeds task_usage_hourly from historical
// task_usage rows.
//
// It exists in two callers:
//
// - server/cmd/backfill_task_usage_hourly: explicit operator command,
// used by the SELF-HOST UPGRADE ORDER documented in that file.
// - server/cmd/migrate: invoked as a hook BEFORE migration 103 runs,
// so that operators upgrading directly from v0.3.4 (or any version
// prior to the hourly pipeline) do not trip migration 103's
// fail-closed watermark guard while the server is still down. The
// hook can run a full idempotent backfill in the same `migrate up`
// invocation and then continue applying 103/104.
//
// The implementation uses the same SQL window primitive
// (`rollup_task_usage_hourly_window`) that the rollup worker uses, so
// re-running is safe — partial progress is recovered on the next call.
//
// All callers MUST hold advisory lock 4246 (AdvisoryLockKey) for the
// duration of the backfill walk. That lock is what makes this safe to
// run alongside the SQL `rollup_task_usage_hourly()` cron entry, the
// in-process scheduler, and any other concurrent backfill — winners
// take the lock, losers no-op until it is released.
package taskusagebackfill
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
// AdvisoryLockKey is the int64 identifier shared by every path that
// drives task_usage_hourly's rollup writes:
//
// - rollup_task_usage_hourly() in migration 102.
// - cmd/backfill_task_usage_hourly.
// - cmd/migrate's pre-103 hook (MUL-2957).
// - the in-process scheduler's rollup_task_usage_hourly handler
// (defense in depth — the scheduler's lease already prevents
// double-runs across instances; the advisory lock additionally
// prevents racing legacy pg_cron / manual entrypoints).
//
// It is the same id used by every prior version of the rollup pipeline,
// so a mixed-version cluster (rolling deploy) cannot double-write.
const AdvisoryLockKey int64 = 4246
// MaxLagThreshold mirrors the v_lag interval inside migration 103's
// fail-closed guard. The pre-103 hook only triggers a backfill when the
// watermark trails the latest task_usage event by more than this; below
// the threshold the migration would have passed anyway, so we save the
// scan.
const MaxLagThreshold = time.Hour
// The watermark upper bound of `now() - 5 minutes` is encoded directly
// in the SQL UPDATE in stampWatermark / stampWatermarkOnConn so the
// math runs in the same session that does the write — no app-side
// time.Now() participates. (MUL-2957 review: blocker #3.)
// Result describes what a single backfill run did. Exposed so callers
// (the migrate command and tests) can log or assert on it.
type Result struct {
// Skipped is a short reason string when the run did no slice work.
// Empty string means the run actually walked at least one slice.
Skipped string
// SlicesProcessed counts monthly slices that were rolled up.
SlicesProcessed int
// RowsTouched is the sum of rollup_task_usage_hourly_window's
// returned counts.
RowsTouched int64
// From / To bracket the walk's UTC range.
From time.Time
To time.Time
// WatermarkStamped reports whether the watermark UPDATE was issued.
WatermarkStamped bool
}
// HookOptions controls Hook behaviour. The defaults are correct for
// production; tests override fields as needed.
type HookOptions struct {
// Logger receives slog records about the backfill walk. nil =
// slog.Default().
Logger *slog.Logger
// LagThreshold overrides MaxLagThreshold. Zero = MaxLagThreshold.
// Tests pass a small value to force a backfill on a deterministic
// fixture.
LagThreshold time.Duration
// SleepBetweenSlices throttles the walk on a busy DB. Zero = no
// pause. Mirrors the operator-facing flag on the standalone
// backfill command.
SleepBetweenSlices time.Duration
}
// Hook is the migration-time entrypoint. It checks whether the
// task_usage_hourly_rollup_state.watermark_at trails the latest
// task_usage event by more than the lag threshold and, if so, runs an
// idempotent monthly-slice backfill of task_usage_hourly. After the
// walk completes (or if no backfill was needed) the watermark is
// stamped to `now() - 5 minutes`, mirroring the standalone backfill
// command's stampWatermark step.
//
// The hook does NOT fail when:
//
// - task_usage is empty (a fresh database has no history to
// backfill — the watermark is stamped so migration 103's guard
// accepts the empty state).
//
// - the rollup state tables are missing (the hook ran before
// migration 101 in some unusual ordering — treated as nothing to
// do, the migration loop will install them next).
//
// It DOES return an error when the rollup walk itself fails: that case
// is identical to the standalone backfill failing, and the migration
// run must abort so the operator can investigate before migration 103
// drops the legacy daily rollups.
//
// The hook acquires advisory lock 4246 on its own session connection
// so it does not collide with the migrate loop's session-level
// migrationAdvisoryLockKey on a different conn.
func Hook(ctx context.Context, pool *pgxpool.Pool, opts HookOptions) (Result, error) {
log := opts.Logger
if log == nil {
log = slog.Default()
}
threshold := opts.LagThreshold
if threshold <= 0 {
threshold = MaxLagThreshold
}
// Step 1: cheap precondition check — if the rollup state tables
// have not been created yet, this hook simply has nothing to do
// (migrations 101/102 install them, and the migration loop will
// reach 103 only after those have already applied).
stateExists, err := rollupStateExists(ctx, pool)
if err != nil {
return Result{}, fmt.Errorf("check rollup state existence: %w", err)
}
if !stateExists {
log.Info("task_usage hourly rollup hook: rollup state tables not present, skipping",
"reason", "migrations 101/102 not yet applied")
return Result{Skipped: "rollup_state_missing"}, nil
}
// Step 2: read task_usage range and current watermark on the pool.
// These do not need to be locked — they are only used to decide
// whether the lock-protected walk should run at all, and the walk
// itself is idempotent if the watermark advances under us.
usageRange, err := loadUsageRange(ctx, pool)
if err != nil {
return Result{}, fmt.Errorf("load task_usage range: %w", err)
}
watermark, err := loadWatermark(ctx, pool)
if err != nil {
return Result{}, fmt.Errorf("load rollup watermark: %w", err)
}
if !usageRange.HasRows {
// Empty database. Stamp the watermark so migration 103's
// guard accepts the no-history path on a fresh upgrade and
// the rollup worker starts forward from the stamp. The DB's
// own clock is used so a clock-skewed app process cannot
// stamp the watermark into the DB's future.
if err := stampWatermark(ctx, pool); err != nil {
return Result{}, err
}
log.Info("task_usage hourly rollup hook: task_usage empty, watermark stamped from db now()")
return Result{Skipped: "task_usage_empty", WatermarkStamped: true}, nil
}
if !watermark.Valid {
// Defensive — schema guarantees the row exists with a default
// of 1970-01-01, so an invalid value here means somebody has
// fiddled with the row directly.
return Result{}, errors.New("task_usage_hourly_rollup_state row is missing or watermark is NULL; manual intervention required before migration 103")
}
// Lag is computed against the DB's max_event (already DB-time);
// comparing to the DB-time watermark avoids any app/DB skew.
maxEvent := usageRange.MaxEvent
lag := maxEvent.Sub(watermark.Time)
if lag <= threshold {
log.Info("task_usage hourly rollup hook: watermark already current, skipping backfill",
"watermark_at", watermark.Time.UTC().Format(time.RFC3339),
"max_event", maxEvent.UTC().Format(time.RFC3339),
"lag", lag.String(),
"threshold", threshold.String())
// Re-stamp from DB now() to bring the value flush with the
// cron upper bound; the lag-based guard in migration 103 will
// pass either way, but stamping keeps the post-hook state
// consistent with the standalone backfill command.
if err := stampWatermark(ctx, pool); err != nil {
return Result{}, err
}
return Result{Skipped: "watermark_within_threshold", WatermarkStamped: true}, nil
}
log.Info("task_usage hourly rollup hook: backfilling under advisory lock",
"watermark_at", watermark.Time.UTC().Format(time.RFC3339),
"max_event", maxEvent.UTC().Format(time.RFC3339),
"lag", lag.String(),
"threshold", threshold.String())
// Step 3: serialise against the SQL cron entry / standalone backfill
// / scheduler handler via advisory lock 4246. We use a dedicated
// session-pinned conn because pg advisory locks are per-session.
lockConn, err := pool.Acquire(ctx)
if err != nil {
return Result{}, fmt.Errorf("acquire advisory-lock connection: %w", err)
}
defer lockConn.Release()
if _, err := lockConn.Exec(ctx, `SELECT pg_advisory_lock($1)`, AdvisoryLockKey); err != nil {
return Result{}, fmt.Errorf("acquire advisory lock %d: %w", AdvisoryLockKey, err)
}
// Use a fresh context for the unlock so a cancelled ctx does not
// skip the release. Releasing the connection afterwards would end
// the session anyway, but an explicit unlock frees it immediately.
defer func() {
_, _ = lockConn.Exec(context.Background(), `SELECT pg_advisory_unlock($1)`, AdvisoryLockKey)
}()
from := monthFloor(usageRange.MinEvent)
end := monthFloor(maxEvent).AddDate(0, 1, 0)
res := Result{From: from, To: end}
cursor := from
for cursor.Before(end) {
select {
case <-ctx.Done():
return res, ctx.Err()
default:
}
next := cursor.AddDate(0, 1, 0)
var rows int64
err := lockConn.QueryRow(
ctx,
`SELECT rollup_task_usage_hourly_window($1::timestamptz, $2::timestamptz)`,
cursor, next,
).Scan(&rows)
if err != nil {
return res, fmt.Errorf("rollup slice %s..%s: %w",
cursor.Format(time.RFC3339), next.Format(time.RFC3339), err)
}
res.SlicesProcessed++
res.RowsTouched += rows
log.Info("task_usage hourly rollup hook: slice complete",
"from", cursor.Format(time.RFC3339),
"to", next.Format(time.RFC3339),
"rows_touched", rows)
cursor = next
if opts.SleepBetweenSlices > 0 && cursor.Before(end) {
select {
case <-time.After(opts.SleepBetweenSlices):
case <-ctx.Done():
return res, ctx.Err()
}
}
}
if err := stampWatermarkOnConn(ctx, lockConn.Conn()); err != nil {
return res, err
}
res.WatermarkStamped = true
log.Info("task_usage hourly rollup hook: complete",
"slices", res.SlicesProcessed,
"total_rows_touched", res.RowsTouched,
"watermark_source", "db_now")
return res, nil
}
type usageRange struct {
HasRows bool
MinEvent time.Time
MaxEvent time.Time
}
func loadUsageRange(ctx context.Context, pool *pgxpool.Pool) (usageRange, error) {
var minTS, maxTS pgtype.Timestamptz
// COALESCE(updated_at, created_at) tracks the same expression
// migration 103's guard uses, so the lag comparison stays
// consistent with the value the guard will check next.
err := pool.QueryRow(ctx, `
SELECT MIN(created_at), MAX(COALESCE(updated_at, created_at))
FROM task_usage
`).Scan(&minTS, &maxTS)
if err != nil {
return usageRange{}, err
}
if !minTS.Valid || !maxTS.Valid {
return usageRange{HasRows: false}, nil
}
return usageRange{
HasRows: true,
MinEvent: minTS.Time.UTC(),
MaxEvent: maxTS.Time.UTC(),
}, nil
}
func loadWatermark(ctx context.Context, pool *pgxpool.Pool) (pgtype.Timestamptz, error) {
var watermark pgtype.Timestamptz
err := pool.QueryRow(ctx, `
SELECT watermark_at FROM task_usage_hourly_rollup_state WHERE id = 1
`).Scan(&watermark)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return pgtype.Timestamptz{}, nil
}
return pgtype.Timestamptz{}, err
}
return watermark, nil
}
func rollupStateExists(ctx context.Context, pool *pgxpool.Pool) (bool, error) {
var exists bool
err := pool.QueryRow(ctx, `
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'task_usage_hourly_rollup_state'
)
`).Scan(&exists)
return exists, err
}
// stampWatermark moves the hourly rollup state watermark to
// `now() - 5 min` using PostgreSQL's clock, NOT the app process clock.
// This matches the cron entry's upper bound and — critically —
// guarantees the watermark cannot be stamped into the DB's future
// because of container clock drift. (MUL-2957 review: see张大彪's
// blocker #3.)
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) error {
_, err := pool.Exec(ctx, `
UPDATE task_usage_hourly_rollup_state
SET watermark_at = now() - INTERVAL '5 minutes'
WHERE id = 1
`)
if err != nil {
return fmt.Errorf("stamp watermark: %w", err)
}
return nil
}
func stampWatermarkOnConn(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Exec(ctx, `
UPDATE task_usage_hourly_rollup_state
SET watermark_at = now() - INTERVAL '5 minutes'
WHERE id = 1
`)
if err != nil {
return fmt.Errorf("stamp watermark: %w", err)
}
return nil
}
func monthFloor(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
}