mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
372 lines
13 KiB
Go
372 lines
13 KiB
Go
// Package taskusagebackfill seeds task_usage_hourly from historical
|
|
// task_usage rows.
|
|
//
|
|
// It exists in two callers:
|
|
//
|
|
// - server/cmd/backfill_task_usage_hourly: explicit operator command,
|
|
// used by the SELF-HOST UPGRADE ORDER documented in that file.
|
|
// - server/cmd/migrate: invoked as a hook BEFORE migration 103 runs,
|
|
// so that operators upgrading directly from v0.3.4 (or any version
|
|
// prior to the hourly pipeline) do not trip migration 103's
|
|
// fail-closed watermark guard while the server is still down. The
|
|
// hook can run a full idempotent backfill in the same `migrate up`
|
|
// invocation and then continue applying 103/104.
|
|
//
|
|
// The implementation uses the same SQL window primitive
|
|
// (`rollup_task_usage_hourly_window`) that the rollup worker uses, so
|
|
// re-running is safe — partial progress is recovered on the next call.
|
|
//
|
|
// All callers MUST hold advisory lock 4246 (AdvisoryLockKey) for the
|
|
// duration of the backfill walk. That lock is what makes this safe to
|
|
// run alongside the SQL `rollup_task_usage_hourly()` cron entry, the
|
|
// in-process scheduler, and any other concurrent backfill — winners
|
|
// take the lock, losers no-op until it is released.
|
|
package taskusagebackfill
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// AdvisoryLockKey is the int64 identifier shared by every path that
|
|
// drives task_usage_hourly's rollup writes:
|
|
//
|
|
// - rollup_task_usage_hourly() in migration 102.
|
|
// - cmd/backfill_task_usage_hourly.
|
|
// - cmd/migrate's pre-103 hook (MUL-2957).
|
|
// - the in-process scheduler's rollup_task_usage_hourly handler
|
|
// (defense in depth — the scheduler's lease already prevents
|
|
// double-runs across instances; the advisory lock additionally
|
|
// prevents racing legacy pg_cron / manual entrypoints).
|
|
//
|
|
// It is the same id used by every prior version of the rollup pipeline,
|
|
// so a mixed-version cluster (rolling deploy) cannot double-write.
|
|
const AdvisoryLockKey int64 = 4246
|
|
|
|
// MaxLagThreshold mirrors the v_lag interval inside migration 103's
|
|
// fail-closed guard. The pre-103 hook only triggers a backfill when the
|
|
// watermark trails the latest task_usage event by more than this; below
|
|
// the threshold the migration would have passed anyway, so we save the
|
|
// scan.
|
|
const MaxLagThreshold = time.Hour
|
|
|
|
// The watermark upper bound of `now() - 5 minutes` is encoded directly
|
|
// in the SQL UPDATE in stampWatermark / stampWatermarkOnConn so the
|
|
// math runs in the same session that does the write — no app-side
|
|
// time.Now() participates. (MUL-2957 review: blocker #3.)
|
|
|
|
// Result describes what a single backfill run did. Exposed so callers
|
|
// (the migrate command and tests) can log or assert on it.
|
|
type Result struct {
|
|
// Skipped is a short reason string when the run did no slice work.
|
|
// Empty string means the run actually walked at least one slice.
|
|
Skipped string
|
|
|
|
// SlicesProcessed counts monthly slices that were rolled up.
|
|
SlicesProcessed int
|
|
|
|
// RowsTouched is the sum of rollup_task_usage_hourly_window's
|
|
// returned counts.
|
|
RowsTouched int64
|
|
|
|
// From / To bracket the walk's UTC range.
|
|
From time.Time
|
|
To time.Time
|
|
|
|
// WatermarkStamped reports whether the watermark UPDATE was issued.
|
|
WatermarkStamped bool
|
|
}
|
|
|
|
// HookOptions controls Hook behaviour. The defaults are correct for
|
|
// production; tests override fields as needed.
|
|
type HookOptions struct {
|
|
// Logger receives slog records about the backfill walk. nil =
|
|
// slog.Default().
|
|
Logger *slog.Logger
|
|
|
|
// LagThreshold overrides MaxLagThreshold. Zero = MaxLagThreshold.
|
|
// Tests pass a small value to force a backfill on a deterministic
|
|
// fixture.
|
|
LagThreshold time.Duration
|
|
|
|
// SleepBetweenSlices throttles the walk on a busy DB. Zero = no
|
|
// pause. Mirrors the operator-facing flag on the standalone
|
|
// backfill command.
|
|
SleepBetweenSlices time.Duration
|
|
}
|
|
|
|
// Hook is the migration-time entrypoint. It checks whether the
|
|
// task_usage_hourly_rollup_state.watermark_at trails the latest
|
|
// task_usage event by more than the lag threshold and, if so, runs an
|
|
// idempotent monthly-slice backfill of task_usage_hourly. After the
|
|
// walk completes (or if no backfill was needed) the watermark is
|
|
// stamped to `now() - 5 minutes`, mirroring the standalone backfill
|
|
// command's stampWatermark step.
|
|
//
|
|
// The hook does NOT fail when:
|
|
//
|
|
// - task_usage is empty (a fresh database has no history to
|
|
// backfill — the watermark is stamped so migration 103's guard
|
|
// accepts the empty state).
|
|
//
|
|
// - the rollup state tables are missing (the hook ran before
|
|
// migration 101 in some unusual ordering — treated as nothing to
|
|
// do, the migration loop will install them next).
|
|
//
|
|
// It DOES return an error when the rollup walk itself fails: that case
|
|
// is identical to the standalone backfill failing, and the migration
|
|
// run must abort so the operator can investigate before migration 103
|
|
// drops the legacy daily rollups.
|
|
//
|
|
// The hook acquires advisory lock 4246 on its own session connection
|
|
// so it does not collide with the migrate loop's session-level
|
|
// migrationAdvisoryLockKey on a different conn.
|
|
func Hook(ctx context.Context, pool *pgxpool.Pool, opts HookOptions) (Result, error) {
|
|
log := opts.Logger
|
|
if log == nil {
|
|
log = slog.Default()
|
|
}
|
|
threshold := opts.LagThreshold
|
|
if threshold <= 0 {
|
|
threshold = MaxLagThreshold
|
|
}
|
|
|
|
// Step 1: cheap precondition check — if the rollup state tables
|
|
// have not been created yet, this hook simply has nothing to do
|
|
// (migrations 101/102 install them, and the migration loop will
|
|
// reach 103 only after those have already applied).
|
|
stateExists, err := rollupStateExists(ctx, pool)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("check rollup state existence: %w", err)
|
|
}
|
|
if !stateExists {
|
|
log.Info("task_usage hourly rollup hook: rollup state tables not present, skipping",
|
|
"reason", "migrations 101/102 not yet applied")
|
|
return Result{Skipped: "rollup_state_missing"}, nil
|
|
}
|
|
|
|
// Step 2: read task_usage range and current watermark on the pool.
|
|
// These do not need to be locked — they are only used to decide
|
|
// whether the lock-protected walk should run at all, and the walk
|
|
// itself is idempotent if the watermark advances under us.
|
|
usageRange, err := loadUsageRange(ctx, pool)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("load task_usage range: %w", err)
|
|
}
|
|
watermark, err := loadWatermark(ctx, pool)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("load rollup watermark: %w", err)
|
|
}
|
|
|
|
if !usageRange.HasRows {
|
|
// Empty database. Stamp the watermark so migration 103's
|
|
// guard accepts the no-history path on a fresh upgrade and
|
|
// the rollup worker starts forward from the stamp. The DB's
|
|
// own clock is used so a clock-skewed app process cannot
|
|
// stamp the watermark into the DB's future.
|
|
if err := stampWatermark(ctx, pool); err != nil {
|
|
return Result{}, err
|
|
}
|
|
log.Info("task_usage hourly rollup hook: task_usage empty, watermark stamped from db now()")
|
|
return Result{Skipped: "task_usage_empty", WatermarkStamped: true}, nil
|
|
}
|
|
|
|
if !watermark.Valid {
|
|
// Defensive — schema guarantees the row exists with a default
|
|
// of 1970-01-01, so an invalid value here means somebody has
|
|
// fiddled with the row directly.
|
|
return Result{}, errors.New("task_usage_hourly_rollup_state row is missing or watermark is NULL; manual intervention required before migration 103")
|
|
}
|
|
|
|
// Lag is computed against the DB's max_event (already DB-time);
|
|
// comparing to the DB-time watermark avoids any app/DB skew.
|
|
maxEvent := usageRange.MaxEvent
|
|
lag := maxEvent.Sub(watermark.Time)
|
|
if lag <= threshold {
|
|
log.Info("task_usage hourly rollup hook: watermark already current, skipping backfill",
|
|
"watermark_at", watermark.Time.UTC().Format(time.RFC3339),
|
|
"max_event", maxEvent.UTC().Format(time.RFC3339),
|
|
"lag", lag.String(),
|
|
"threshold", threshold.String())
|
|
// Re-stamp from DB now() to bring the value flush with the
|
|
// cron upper bound; the lag-based guard in migration 103 will
|
|
// pass either way, but stamping keeps the post-hook state
|
|
// consistent with the standalone backfill command.
|
|
if err := stampWatermark(ctx, pool); err != nil {
|
|
return Result{}, err
|
|
}
|
|
return Result{Skipped: "watermark_within_threshold", WatermarkStamped: true}, nil
|
|
}
|
|
|
|
log.Info("task_usage hourly rollup hook: backfilling under advisory lock",
|
|
"watermark_at", watermark.Time.UTC().Format(time.RFC3339),
|
|
"max_event", maxEvent.UTC().Format(time.RFC3339),
|
|
"lag", lag.String(),
|
|
"threshold", threshold.String())
|
|
|
|
// Step 3: serialise against the SQL cron entry / standalone backfill
|
|
// / scheduler handler via advisory lock 4246. We use a dedicated
|
|
// session-pinned conn because pg advisory locks are per-session.
|
|
lockConn, err := pool.Acquire(ctx)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("acquire advisory-lock connection: %w", err)
|
|
}
|
|
defer lockConn.Release()
|
|
|
|
if _, err := lockConn.Exec(ctx, `SELECT pg_advisory_lock($1)`, AdvisoryLockKey); err != nil {
|
|
return Result{}, fmt.Errorf("acquire advisory lock %d: %w", AdvisoryLockKey, err)
|
|
}
|
|
// Use a fresh context for the unlock so a cancelled ctx does not
|
|
// skip the release. Releasing the connection afterwards would end
|
|
// the session anyway, but an explicit unlock frees it immediately.
|
|
defer func() {
|
|
_, _ = lockConn.Exec(context.Background(), `SELECT pg_advisory_unlock($1)`, AdvisoryLockKey)
|
|
}()
|
|
|
|
from := monthFloor(usageRange.MinEvent)
|
|
end := monthFloor(maxEvent).AddDate(0, 1, 0)
|
|
res := Result{From: from, To: end}
|
|
|
|
cursor := from
|
|
for cursor.Before(end) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return res, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
next := cursor.AddDate(0, 1, 0)
|
|
var rows int64
|
|
err := lockConn.QueryRow(
|
|
ctx,
|
|
`SELECT rollup_task_usage_hourly_window($1::timestamptz, $2::timestamptz)`,
|
|
cursor, next,
|
|
).Scan(&rows)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rollup slice %s..%s: %w",
|
|
cursor.Format(time.RFC3339), next.Format(time.RFC3339), err)
|
|
}
|
|
res.SlicesProcessed++
|
|
res.RowsTouched += rows
|
|
log.Info("task_usage hourly rollup hook: slice complete",
|
|
"from", cursor.Format(time.RFC3339),
|
|
"to", next.Format(time.RFC3339),
|
|
"rows_touched", rows)
|
|
cursor = next
|
|
if opts.SleepBetweenSlices > 0 && cursor.Before(end) {
|
|
select {
|
|
case <-time.After(opts.SleepBetweenSlices):
|
|
case <-ctx.Done():
|
|
return res, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := stampWatermarkOnConn(ctx, lockConn.Conn()); err != nil {
|
|
return res, err
|
|
}
|
|
res.WatermarkStamped = true
|
|
|
|
log.Info("task_usage hourly rollup hook: complete",
|
|
"slices", res.SlicesProcessed,
|
|
"total_rows_touched", res.RowsTouched,
|
|
"watermark_source", "db_now")
|
|
return res, nil
|
|
}
|
|
|
|
type usageRange struct {
|
|
HasRows bool
|
|
MinEvent time.Time
|
|
MaxEvent time.Time
|
|
}
|
|
|
|
func loadUsageRange(ctx context.Context, pool *pgxpool.Pool) (usageRange, error) {
|
|
var minTS, maxTS pgtype.Timestamptz
|
|
// COALESCE(updated_at, created_at) tracks the same expression
|
|
// migration 103's guard uses, so the lag comparison stays
|
|
// consistent with the value the guard will check next.
|
|
err := pool.QueryRow(ctx, `
|
|
SELECT MIN(created_at), MAX(COALESCE(updated_at, created_at))
|
|
FROM task_usage
|
|
`).Scan(&minTS, &maxTS)
|
|
if err != nil {
|
|
return usageRange{}, err
|
|
}
|
|
if !minTS.Valid || !maxTS.Valid {
|
|
return usageRange{HasRows: false}, nil
|
|
}
|
|
return usageRange{
|
|
HasRows: true,
|
|
MinEvent: minTS.Time.UTC(),
|
|
MaxEvent: maxTS.Time.UTC(),
|
|
}, nil
|
|
}
|
|
|
|
func loadWatermark(ctx context.Context, pool *pgxpool.Pool) (pgtype.Timestamptz, error) {
|
|
var watermark pgtype.Timestamptz
|
|
err := pool.QueryRow(ctx, `
|
|
SELECT watermark_at FROM task_usage_hourly_rollup_state WHERE id = 1
|
|
`).Scan(&watermark)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return pgtype.Timestamptz{}, nil
|
|
}
|
|
return pgtype.Timestamptz{}, err
|
|
}
|
|
return watermark, nil
|
|
}
|
|
|
|
func rollupStateExists(ctx context.Context, pool *pgxpool.Pool) (bool, error) {
|
|
var exists bool
|
|
err := pool.QueryRow(ctx, `
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'task_usage_hourly_rollup_state'
|
|
)
|
|
`).Scan(&exists)
|
|
return exists, err
|
|
}
|
|
|
|
// stampWatermark moves the hourly rollup state watermark to
|
|
// `now() - 5 min` using PostgreSQL's clock, NOT the app process clock.
|
|
// This matches the cron entry's upper bound and — critically —
|
|
// guarantees the watermark cannot be stamped into the DB's future
|
|
// because of container clock drift. (MUL-2957 review: see张大彪's
|
|
// blocker #3.)
|
|
func stampWatermark(ctx context.Context, pool *pgxpool.Pool) error {
|
|
_, err := pool.Exec(ctx, `
|
|
UPDATE task_usage_hourly_rollup_state
|
|
SET watermark_at = now() - INTERVAL '5 minutes'
|
|
WHERE id = 1
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("stamp watermark: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func stampWatermarkOnConn(ctx context.Context, conn *pgx.Conn) error {
|
|
_, err := conn.Exec(ctx, `
|
|
UPDATE task_usage_hourly_rollup_state
|
|
SET watermark_at = now() - INTERVAL '5 minutes'
|
|
WHERE id = 1
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("stamp watermark: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func monthFloor(t time.Time) time.Time {
|
|
t = t.UTC()
|
|
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
|
}
|